2
3
6
7
13#include <restinio/asio_include.hpp>
17#include <restinio/impl/include_fmtlib.hpp>
19#include <restinio/core.hpp>
20#include <restinio/impl/executor_wrapper.hpp>
21#include <restinio/impl/write_group_output_ctx.hpp>
22#include <restinio/websocket/message.hpp>
23#include <restinio/websocket/impl/ws_parser.hpp>
24#include <restinio/websocket/impl/ws_protocol_validator.hpp>
26#include <restinio/utils/impl/safe_uint_truncate.hpp>
28#include <restinio/compiler_features.hpp>
63 m_awaiting_write_groups.emplace( std::move( wg ) );
71 if( !m_awaiting_write_groups.empty() )
73 result = std::move( m_awaiting_write_groups.front() );
74 m_awaiting_write_groups.pop();
121 typename WS_Message_Handler >
147 restinio::impl::connection_settings_handle_t< Traits > settings,
152 : ws_connection_base_t{ conn_id }
154 , m_settings{ std::move( settings ) }
157 , m_timer_guard{ m_settings->create_timer_guard() }
158 , m_input{ websocket_header_max_size() }
160 , m_logger{ *( m_settings->m_logger ) }
166 "[connection:{}] move socket to [ws_connection:{}]" ),
174 "[ws_connection:{}] start connection with {}" ),
180 m_settings->call_state_listener( [
this]()
noexcept {
181 return connection_state::notice_t {
183 m_socket.remote_endpoint(),
184 connection_state::upgraded_to_websocket_t{}
202 "[ws_connection:{}] destructor called" ),
215 this->get_executor(),
216 [
this, ctx = shared_from_this() ]
227 "[ws_connection:{}] shutdown" ),
234 catch(
const std::exception & ex )
240 "[ws_connection:{}] shutdown operation error: {}" ),
253 this->get_executor(),
254 [
this, ctx = shared_from_this() ]
266 "[ws_connection:{}] kill" ),
275 catch(
const std::exception & ex )
281 "[ws_connection:{}] kill operation error: {}" ),
293 ws_weak_handle_t wswh{ wsh };
297 this->get_executor(),
298 [
this, ctx = shared_from_this(), wswh = std::move( wswh ) ]
305 m_prepared_weak_ctx = shared_from_this();
308 m_websocket_weak_handle = std::move( wswh );
312 catch(
const std::exception & ex )
314 trigger_error_and_close(
319 "[ws_connection:{}] unable to init read: {}" ),
331 bool is_close_frame )
override
335 this->get_executor(),
337 actual_wg = std::move( wg ),
338 ctx = shared_from_this(),
347 std::move( actual_wg ),
354 "[ws_connection:{}] cannot write to websocket: "
355 "write operations disabled" ),
360 catch(
const std::exception & ex )
362 trigger_error_and_close(
367 "[ws_connection:{}] unable to write data: {}" ),
377
378
379
389 "[ws_connection:{}] close socket" ),
397 "ws_connection.close_impl.socket.shutdown",
399 asio_ns::error_code ignored_ec;
401 asio_ns::ip::tcp::socket::shutdown_both,
407 "ws_connection.close_impl.socket.close",
437 writable_items_container_t bufs;
446 bufs.emplace_back( std::move( payload ) );
447 m_outgoing_data.append( write_group_t{ std::move( bufs ) } );
459 std::string desc = std::string{} )
466
467
468
469
470
471
472
473 template<
typename MSG_BUILDER >
477 MSG_BUILDER msg_builder )
noexcept
481 m_logger, std::move( msg_builder ) );
485 m_logger,
"ws_connection.call_close_handler_if_necessary",
501 "[ws_connection:{}] start reading header" ),
506 m_input.reset_parser_and_payload();
508 if( 0 == m_input.m_buf.length() )
515 consume_header_from_buffer(
516 m_input.m_buf.bytes(), m_input.m_buf.length() );
527 "[ws_connection:{}] continue reading message" ),
531 m_socket.async_read_some(
532 m_input.m_buf.make_asio_buffer(),
533 asio_ns::bind_executor(
534 this->get_executor(),
535 [
this, ctx = shared_from_this() ]
537 (
const asio_ns::error_code & ec, std::size_t length )
noexcept
541 after_read_header( ec, length );
543 catch(
const std::exception & ex )
545 trigger_error_and_close(
546 status_code_t::unexpected_condition,
550 "[ws_connection:{}] after read header "
551 "callback error: {}" ),
564 trigger_error_and_close(
578 const asio_ns::error_code & ec,
586 "[ws_connection:{}] received {} bytes" ),
587 this->connection_id(),
591 m_input.m_buf.obtained_bytes( length );
592 consume_header_from_buffer( m_input.m_buf.bytes(), length );
604 const auto nparsed = m_input.m_parser.parser_execute( data, length );
606 m_input.m_buf.consumed_bytes( nparsed );
608 if( m_input.m_parser.header_parsed() )
610 handle_parsed_header( m_input.m_parser.current_message() );
614 assert( nparsed == length );
626 "[ws_connection:{}] start handling {} ({:#x})" ),
629 static_cast<std::uint16_t>(md
.m_opcode) );
632 const auto validation_result =
633 m_protocol_validator.process_new_frame( md );
640 "[ws_connection:{}] invalid header" ),
671 const auto payload_length =
674 m_input.m_payload.resize( payload_length );
676 if( payload_length == 0 )
683 const auto payload_part_size =
684 std::min( m_input.m_buf.length(), payload_length );
687 &m_input.m_payload.front(),
688 m_input.m_buf.bytes(),
691 m_input.m_buf.consumed_bytes( payload_part_size );
693 const std::size_t length_remaining =
694 payload_length - payload_part_size;
696 if( validate_payload_part(
697 &m_input.m_payload.front(),
701 if( 0 == length_remaining )
710 &m_input.m_payload.front() + payload_part_size,
725 std::size_t length_remaining,
727 bool do_validate_payload_and_call_msg_handler =
true )
730 asio_ns::buffer( payload_data, length_remaining ),
731 asio_ns::bind_executor(
732 this->get_executor(),
734 ctx = shared_from_this(),
737 do_validate_payload_and_call_msg_handler ]
739 (
const asio_ns::error_code & ec, std::size_t length )
noexcept
748 do_validate_payload_and_call_msg_handler
);
750 catch(
const std::exception & ex )
752 trigger_error_and_close(
757 "[ws_connection:{}] after read payload "
758 "callback error: {}" ),
770 std::size_t length_remaining,
771 const asio_ns::error_code & ec,
773 bool do_validate_payload_and_call_msg_handler =
true )
780 "[ws_connection:{}] received {} bytes" ),
781 this->connection_id(),
785 assert( length <= length_remaining );
787 const std::size_t next_length_remaining =
788 length_remaining - length;
790 if( do_validate_payload_and_call_msg_handler )
794 if( 0 == next_length_remaining )
806 payload_data + length
,
807 next_length_remaining
,
808 do_validate_payload_and_call_msg_handler
);
816 if( 0 == next_length_remaining )
823 payload_data + length
,
824 length_remaining - length
,
825 do_validate_payload_and_call_msg_handler
);
839 if(
auto wsh = m_websocket_weak_handle.lock() )
845 std::move( close_frame )
);
847 catch(
const std::exception & ex )
852 "[ws_connection:{}] execute handler error: {}" ),
865 std::size_t next_length_remaining )
867 const auto validation_result =
868 m_protocol_validator.process_and_unmask_next_payload_part( payload_data, length );
881 m_protocol_validator.reset();
883 if( 0 == next_length_remaining )
890 const bool do_validate_payload_and_call_msg_handler =
false;
892 payload_data + length
,
893 next_length_remaining
,
894 do_validate_payload_and_call_msg_handler
);
910 "[ws_connection:{}] invalid paload" ),
960 auto & md = m_input.m_parser.current_message();
962 const auto validation_result = m_protocol_validator.finish_frame();
967 if(
opcode_t::connection_close_frame == md.m_opcode )
972 "[ws_connection:{}] got close frame from "
973 "peer, status: {}" ),
975 static_cast<std::uint16_t>(
976 status_code_from_bin( m_input.m_payload )) );
982 send_close_frame_to_peer( m_input.m_payload );
988 call_message_handler(
989 std::make_shared< message_t >(
990 md.m_final_flag ? final_frame : not_final_frame,
992 std::move( m_input.m_payload ) ) );
1001 if(
opcode_t::connection_close_frame == md.m_opcode )
1011 "[ws_connection:{}] expected close frame came" ),
1034 std::make_shared< message_t >(
1047 if( is_close_frame )
1052 "[ws_connection:{}] user sends close frame" ),
1066 m_outgoing_data.append( std::move( wg ) );
1075 "[ws_connection:{}] try to write while "
1076 "socket is closed" ),
1096 if( !m_write_output_ctx.transmitting() )
1108 auto next_write_group = m_outgoing_data.pop_ready_buffers();
1110 if( next_write_group )
1115 "[ws_connection:{}] start next write group, "
1117 this->connection_id(),
1118 next_write_group->items_count() );
1122 m_write_output_ctx.start_next_write_group(
1123 std::move( next_write_group ) );
1140 auto wo = m_write_output_ctx.extract_next_write_operation();
1153 throw exception_t{ "sendfile write operation not implemented" };
1156 catch(
const std::exception & ex )
1158 trigger_error_and_close(
1163 "[ws_connection:{}] handle_current_write_ctx failed: {}" ),
1174 auto & bufs = op.get_trivial_bufs();
1179 "[ws_connection:{}] sending data with "
1189 asio_ns::async_write(
1192 asio_ns::bind_executor(
1193 this->get_executor(),
1195 ctx = shared_from_this() ]
1197 (
const asio_ns::error_code & ec, std::size_t written )
noexcept
1206 "[ws_connection:{}] outgoing data was "
1215 catch(
const std::exception & ex )
1217 trigger_error_and_close(
1222 "[ws_connection:{}] after write "
1223 "callback error: {}" ),
1239 "[ws_connection:{}] finishing current write group" ),
1240 this->connection_id() );
1244 m_write_output_ctx.finish_write_group();
1261 trigger_error_and_close(
1266 "[ws_connection:{}] unable to write: {}" ),
1273 m_write_output_ctx.fail_write_group( ec );
1275 catch(
const std::exception & ex )
1280 "[ws_connection:{}] notificator error: {}" ),
1295
1296
1297
1298
1313 this->get_executor(),
1314 [ ctx = std::move( self ) ]
1318 auto & conn_object = cast_to_self( *ctx );
1323 conn_object.check_timeout_impl();
1325 catch(
const std::exception & x )
1327 conn_object.trigger_error_and_close(
1332 "[connection: {}] unexpected "
1333 "error during timeout handling: {}" ),
1334 conn_object.connection_id(),
1350 const auto now = std::chrono::steady_clock::now();
1351 if( m_write_output_ctx.transmitting() && now > m_write_operation_timeout_after )
1356 "[wd_connection:{}] write operation timed out" ),
1363 else if( now > m_close_frame_from_peer_timeout_after )
1368 "[wd_connection:{}] waiting for close-frame "
1369 "from peer timed out" ),
1384 m_timer_guard.schedule( m_prepared_weak_ctx );
1391 m_write_operation_timeout_after =
1392 std::chrono::steady_clock::now() + m_settings->m_write_http_response_timelimit;
1398 m_close_frame_from_peer_timeout_after =
1399 std::chrono::steady_clock::now() + m_settings->m_read_next_http_message_timelimit;
1455 template <
typename Action >
Exception class for all exceptions thrown by RESTinio.
exception_t(const char *err)
Wrapper for an executor (strand) used by connections.
Helper class for reading bytes and feeding them to parser.
Write operaton using sendfile.
auto size() const noexcept
The size of data within this operation.
Helper class for writting response data.
Websocket message class with more detailed protocol information.
std::uint64_t payload_len() const
Get payload len.
A helper class for running exclusive action. Only a first action will run.
void run_if_first(Action &&action) noexcept(noexcept(action()))
void disable()
Disable ation: action will not be executed even on a first shot.
timer_guard_t m_timer_guard
void finish_handling_current_write_ctx()
Do post write actions for current write group.
void after_read_header(const asio_ns::error_code &ec, std::size_t length)
Handle read operation result, when reading header.
restinio::impl::connection_settings_handle_t< Traits > m_settings
Common paramaters of a connection.
void consume_header_from_socket()
Initiate read operation on socket to receive bytes for header.
message_handler_t m_msg_handler
Websocket message handler provided by user.
void handle_parsed_header(const message_details_t &md)
Handle parsed header.
WS_Message_Handler message_handler_t
bool validate_payload_part(char *payload_data, std::size_t length, std::size_t next_length_remaining)
Validates a part of received payload.
ws_weak_handle_t m_websocket_weak_handle
A waek handler for owning ws_t to use it when call message handler.
virtual void shutdown() override
Shutdown websocket.
void send_close_frame_to_peer(std::string payload)
Send close frame to peer.
void write_data_impl(write_group_t wg, bool is_close_frame)
Implementation of writing data performed on the asio_ns::io_context.
read_state_t
Websocket input states.
@ read_only_close_frame
Reads only close frame: skip all frames until close-frame.
@ read_nothing
Do not read anything (before activation).
@ read_any_frame
Reads any type of frame and serve it to user.
std::weak_ptr< ws_t > ws_weak_handle_t
void check_timeout_impl()
typename Traits::logger_t logger_t
write_state_t
Websocket output states.
@ write_enabled
Able to append outgoing data.
@ write_disabled
No more outgoing data can be added (e.g. close-frame was sent).
void guard_close_frame_from_peer_operation()
std::chrono::steady_clock::time_point m_close_frame_from_peer_timeout_after
typename timer_manager_t::timer_guard_t timer_guard_t
one_shot_action_t m_close_impl
void init_write_if_necessary()
Checks if there is something to write, and if so starts write operation.
void trigger_error_and_close(status_code_t status, MSG_BUILDER msg_builder) noexcept
Trigger an error.
void start_read_header()
Start the process of reading ws messages from socket.
void start_read_payload(char *payload_data, std::size_t length_remaining, bool do_validate_payload_and_call_msg_handler=true)
Start reading message payload.
logger_t & m_logger
Logger for operation.
one_shot_action_t m_close_frame_to_peer
one_shot_action_t m_close_frame_to_user
tcp_connection_ctx_weak_handle_t m_prepared_weak_ctx
void handle_trivial_write_operation(const trivial_write_operation_t &op)
void init_next_timeout_checking()
schedule next timeout checking.
void handle_invalid_payload(validation_state_t validation_result)
Handle payload errors.
void handle_current_write_ctx()
void after_read_payload(char *payload_data, std::size_t length_remaining, const asio_ns::error_code &ec, std::size_t length, bool do_validate_payload_and_call_msg_handler=true)
Handle read operation result, when reading payload.
void call_close_handler_if_necessary(status_code_t status)
void consume_header_from_buffer(const char *data, std::size_t length)
Parse header from internal buffer.
void send_close_frame_to_peer(status_code_t code, std::string desc=std::string{})
Send close frame to peer.
ws_connection_t(const ws_connection_t &)=delete
virtual void check_timeout(tcp_connection_ctx_handle_t &self) override
connection_input_t m_input
Input routine.
void handle_read_error(const char *desc, const asio_ns::error_code &ec)
Handle read error (reading header or payload)
void call_handler_on_current_message()
write_state_t m_write_state
A state of a websocket output.
ws_protocol_validator_t m_protocol_validator
Helper for validating protocol.
void handle_parsed_and_valid_header(const message_details_t &md)
Handle parsed and valid header.
ws_connection_t & operator=(const ws_connection_t &)=delete
ws_connection_t(ws_connection_t &&)=delete
restinio::impl::executor_wrapper_t< typename Traits::strand_t > executor_wrapper_base_t
void start_waiting_close_frame_only()
Start waiting for close-frame.
std::chrono::steady_clock::time_point m_write_operation_timeout_after
typename Traits::strand_t strand_t
virtual void kill() override
Kill websocket.
ws_connection_t & operator=(ws_connection_t &&)=delete
void guard_write_operation()
Start guard write operation if necessary.
~ws_connection_t() override
ws_outgoing_data_t m_outgoing_data
Output buffers queue.
void init_read(ws_handle_t wsh) override
Start reading ws-messages.
std::shared_ptr< timer_manager_t > timer_manager_handle_t
void call_message_handler(message_handle_t close_frame)
Call user message handler with current message.
static ws_connection_t & cast_to_self(tcp_connection_ctx_base_t &base)
Timers.
void init_write()
Initiate write operation.
typename connection_count_limit_types< Traits >::lifetime_monitor_t lifetime_monitor_t
::restinio::impl::write_group_output_ctx_t::file_write_operation_t file_write_operation_t
lifetime_monitor_t m_lifetime_monitor
Monitor of the connection lifetime.
::restinio::impl::write_group_output_ctx_t::none_write_operation_t none_write_operation_t
void after_write(const asio_ns::error_code &ec)
Handle write response finished.
restinio::impl::write_group_output_ctx_t m_write_output_ctx
Write to socket operation context.
void close_impl() noexcept
Standard close routine.
::restinio::impl::write_group_output_ctx_t::trivial_write_operation_t trivial_write_operation_t
typename Traits::stream_socket_t stream_socket_t
virtual void write_data(write_group_t wg, bool is_close_frame) override
Write pieces of outgoing data.
stream_socket_t m_socket
Connection.
void graceful_close()
Close WebSocket connection in a graceful manner.
read_state_t m_read_state
A state of a websocket input.
typename Traits::timer_manager_t timer_manager_t
A queue for outgoing buffers.
write_groups_queue_t m_awaiting_write_groups
A queue of buffers.
void append(write_group_t wg)
Add buffers to queue.
std::optional< write_group_t > pop_ready_buffers()
Class for websocket protocol validations.
Group of writable items transported to the context of underlying connection as one solid piece.
void invoke_after_write_notificator_if_exists(const asio_ns::error_code &ec)
Get after write notificator.
#define RESTINIO_ENSURE_NOEXCEPT_CALL(expr)
A wrapper around static_assert for checking that an expression is noexcept and execution of that expr...
#define RESTINIO_FMT_FORMAT_STRING(s)
std::size_t uint64_to_size_t(std::uint64_t v)
Helper function for truncating uint64 to std::size_t with exception if that truncation will lead to d...
raw_data_t write_message_details(const message_details_t &message)
Serialize websocket message details into bytes buffer.
constexpr size_t websocket_header_max_size()
Max possible size of websocket frame header (a part before payload).
validation_state_t
States of validated frame.
constexpr final_frame_flag_t final_frame
const char * opcode_to_string(opcode_t opcode)
Helper sunction to get method string name.
std::string status_code_to_bin(status_code_t code)
std::weak_ptr< tcp_connection_ctx_base_t > tcp_connection_ctx_weak_handle_t
Alias for http connection weak handle.
asio_convertible_error_t
Enum for restinio errors that must presented as asio_ns::error_code value.
@ write_was_not_executed
After write notificator error: data was not sent, connection closed (or aborted) before a given piece...
std::uint64_t connection_id_t
Type for ID of connection.
asio_ns::error_code make_asio_compaible_error(asio_convertible_error_t err) noexcept
Make restinio error_code compatible with asio_ns::error_code.
std::shared_ptr< tcp_connection_ctx_base_t > tcp_connection_ctx_handle_t
Alias for http connection handle.
A kind of metafunction that deduces actual types related to connection count limiter in the dependecy...