RESTinio
Loading...
Searching...
No Matches
ws_connection.hpp
Go to the documentation of this file.
1/*
2 restinio
3*/
4
5/*!
6 WebSocket connection routine.
7*/
8
9#pragma once
10
11#include <queue>
12
13#include <restinio/asio_include.hpp>
14
15#include <llhttp.h>
16
17#include <restinio/impl/include_fmtlib.hpp>
18
19#include <restinio/core.hpp>
20#include <restinio/impl/executor_wrapper.hpp>
21#include <restinio/impl/write_group_output_ctx.hpp>
22#include <restinio/websocket/message.hpp>
23#include <restinio/websocket/impl/ws_parser.hpp>
24#include <restinio/websocket/impl/ws_protocol_validator.hpp>
25
26#include <restinio/utils/impl/safe_uint_truncate.hpp>
27
28#include <restinio/compiler_features.hpp>
29
30namespace restinio
31{
32
33namespace websocket
34{
35
36namespace basic
37{
38
39namespace impl
40{
41
43
44//! Max possible size of websocket frame header (a part before payload).
45constexpr size_t
47{
48 return 14;
49}
50
51//
52// ws_outgoing_data_t
53//
54
55//! A queue for outgoing buffers.
57{
58 public:
59 //! Add buffers to queue.
60 void
62 {
63 m_awaiting_write_groups.emplace( std::move( wg ) );
64 }
65
66 std::optional< write_group_t >
68 {
69 std::optional< write_group_t > result;
70
71 if( !m_awaiting_write_groups.empty() )
72 {
73 result = std::move( m_awaiting_write_groups.front() );
74 m_awaiting_write_groups.pop();
75 }
76
77 return result;
78 }
79
80 private:
81 //! A queue of buffers.
83};
84
85//
86// connection_input_t
87//
88
89//! Websocket input stuff.
91{
92 connection_input_t( std::size_t buffer_size )
94 {}
95
96 //! websocket parser.
98
99 //! Input buffer.
101
102 //! Current payload.
103 std::string m_payload;
104
105 //! Prepare parser for reading new http-message.
106 void
108 {
109 m_parser.reset();
110 m_payload.clear();
111 }
112};
113
114//
115// ws_connection_t
116//
117
118//! Context for handling websocket connections.
119template <
120 typename Traits,
121 typename WS_Message_Handler >
123 : public ws_connection_base_t
124 , public restinio::impl::executor_wrapper_t< typename Traits::strand_t >
125{
127
128 public:
129 using message_handler_t = WS_Message_Handler;
130
131 using timer_manager_t = typename Traits::timer_manager_t;
132 using timer_manager_handle_t = std::shared_ptr< timer_manager_t >;
134 using logger_t = typename Traits::logger_t;
135 using strand_t = typename Traits::strand_t;
136 using stream_socket_t = typename Traits::stream_socket_t;
139
141
142 ws_connection_t(
143 //! Connection id.
144 connection_id_t conn_id,
145 //! Data inherited from http-connection.
146 //! \{
147 restinio::impl::connection_settings_handle_t< Traits > settings,
148 stream_socket_t socket,
149 lifetime_monitor_t lifetime_monitor,
150 //! \}
151 message_handler_t msg_handler )
152 : ws_connection_base_t{ conn_id }
153 , executor_wrapper_base_t{ socket.get_executor() }
154 , m_settings{ std::move( settings ) }
155 , m_socket{ std::move( socket ) }
156 , m_lifetime_monitor{ std::move( lifetime_monitor ) }
157 , m_timer_guard{ m_settings->create_timer_guard() }
158 , m_input{ websocket_header_max_size() }
159 , m_msg_handler{ std::move( msg_handler ) }
160 , m_logger{ *( m_settings->m_logger ) }
161 {
162 // Notify of a new connection instance.
163 m_logger.trace( [&]{
164 return fmt::format(
166 "[connection:{}] move socket to [ws_connection:{}]" ),
167 connection_id(),
168 connection_id() );
169 } );
170
171 m_logger.trace( [&]{
172 return fmt::format(
174 "[ws_connection:{}] start connection with {}" ),
175 connection_id(),
176 fmtlib_tools::streamed( m_socket.remote_endpoint() ) );
177 } );
178
179 // Inform state listener if it used.
180 m_settings->call_state_listener( [this]() noexcept {
181 return connection_state::notice_t {
182 connection_id(),
183 m_socket.remote_endpoint(),
184 connection_state::upgraded_to_websocket_t{}
185 };
186 } );
187 }
188
189 ws_connection_t( const ws_connection_t & ) = delete;
191 ws_connection_t & operator = ( const ws_connection_t & ) = delete;
193
194 ~ws_connection_t() override
195 {
196 try
197 {
198 // Notify of a new connection instance.
199 m_logger.trace( [&]{
200 return fmt::format(
202 "[ws_connection:{}] destructor called" ),
203 connection_id() );
204 } );
205 }
206 catch( ... )
207 {}
208 }
209
210 //! Shutdown websocket.
211 virtual void
212 shutdown() override
213 {
214 asio_ns::dispatch(
215 this->get_executor(),
216 [ this, ctx = shared_from_this() ]
217 // NOTE: this lambda is noexcept since v.0.6.0.
218 () noexcept {
219 try
220 {
221 // An exception from logger shouldn't prevent
222 // main shutdown actions.
223 restinio::utils::log_trace_noexcept( m_logger,
224 [&]{
225 return fmt::format(
227 "[ws_connection:{}] shutdown" ),
228 connection_id() );
229 } );
230
231 m_close_frame_to_user.disable();
233 }
234 catch( const std::exception & ex )
235 {
236 restinio::utils::log_error_noexcept( m_logger,
237 [&]{
238 return fmt::format(
240 "[ws_connection:{}] shutdown operation error: {}" ),
241 connection_id(),
242 ex.what() );
243 } );
244 }
245 } );
246 }
247
248 //! Kill websocket.
249 virtual void
250 kill() override
251 {
252 asio_ns::dispatch(
253 this->get_executor(),
254 [ this, ctx = shared_from_this() ]
255 // NOTE: this lambda is noexcept since v.0.6.0.
256 () noexcept
257 {
258 try
259 {
260 // An exception from logger shouldn't prevent
261 // main kill actions.
262 restinio::utils::log_trace_noexcept( m_logger,
263 [&]{
264 return fmt::format(
266 "[ws_connection:{}] kill" ),
267 connection_id() );
268 } );
269
270 m_close_frame_to_user.disable();
271 m_close_frame_to_peer.disable();
272
274 }
275 catch( const std::exception & ex )
276 {
277 restinio::utils::log_error_noexcept( m_logger,
278 [&]{
279 return fmt::format(
281 "[ws_connection:{}] kill operation error: {}" ),
282 connection_id(),
283 ex.what() );
284 } );
285 }
286 } );
287 }
288
289 //! Start reading ws-messages.
290 void
291 init_read( ws_handle_t wsh ) override
292 {
293 ws_weak_handle_t wswh{ wsh };
294
295 // Run write message on io_context loop (direct invocation if possible).
296 asio_ns::dispatch(
297 this->get_executor(),
298 [ this, ctx = shared_from_this(), wswh = std::move( wswh ) ]
299 // NOTE: this lambda is noexcept since v.0.6.0.
300 () noexcept
301 {
302 try
303 {
304 // Start timeout checking.
305 m_prepared_weak_ctx = shared_from_this();
307
308 m_websocket_weak_handle = std::move( wswh );
309 m_read_state = read_state_t::read_any_frame;
311 }
312 catch( const std::exception & ex )
313 {
314 trigger_error_and_close(
316 [&]{
317 return fmt::format(
319 "[ws_connection:{}] unable to init read: {}" ),
320 connection_id(),
321 ex.what() );
322 } );
323 }
324 } );
325 }
326
327 //! Write pieces of outgoing data.
328 virtual void
330 write_group_t wg,
331 bool is_close_frame ) override
332 {
333 //! Run write message on io_context loop if possible.
334 asio_ns::dispatch(
335 this->get_executor(),
336 [ this,
337 actual_wg = std::move( wg ),
338 ctx = shared_from_this(),
339 is_close_frame ]
340 // NOTE: this lambda is noexcept since v.0.6.0.
341 () mutable noexcept
342 {
343 try
344 {
345 if( write_state_t::write_enabled == m_write_state )
346 write_data_impl(
347 std::move( actual_wg ),
348 is_close_frame );
349 else
350 {
351 m_logger.warn( [&]{
352 return fmt::format(
354 "[ws_connection:{}] cannot write to websocket: "
355 "write operations disabled" ),
356 connection_id() );
357 } );
358 }
359 }
360 catch( const std::exception & ex )
361 {
362 trigger_error_and_close(
364 [&]{
365 return fmt::format(
367 "[ws_connection:{}] unable to write data: {}" ),
368 connection_id(),
369 ex.what() );
370 } );
371 }
372 } );
373 }
374 private:
375 //! Standard close routine.
376 /*!
377 * @note
378 * This method is noexcept since v.0.6.0.
379 */
380 void
381 close_impl() noexcept
382 {
383 m_close_impl.run_if_first(
384 [&]() noexcept {
385 restinio::utils::log_trace_noexcept( m_logger,
386 [&]{
387 return fmt::format(
389 "[ws_connection:{}] close socket" ),
390 connection_id() );
391 } );
392
393 // This actions can throw and because of that we have
394 // to wrap them...
395 restinio::utils::suppress_exceptions(
396 m_logger,
397 "ws_connection.close_impl.socket.shutdown",
398 [&] {
399 asio_ns::error_code ignored_ec;
400 m_socket.shutdown(
401 asio_ns::ip::tcp::socket::shutdown_both,
402 ignored_ec );
403 } );
404
405 restinio::utils::suppress_exceptions(
406 m_logger,
407 "ws_connection.close_impl.socket.close",
408 [&] {
409 m_socket.close();
410 } );
411 } );
412 }
413
414 //! Start waiting for close-frame.
415 void
421
422 //! Close WebSocket connection in a graceful manner.
423 void
425 {
426 m_close_frame_to_peer.run_if_first(
427 [&]{
428 send_close_frame_to_peer( status_code_t::normal_closure );
430 } );
431 }
432
433 //! Send close frame to peer.
434 void
435 send_close_frame_to_peer( std::string payload )
436 {
437 writable_items_container_t bufs;
438 bufs.reserve( 2 );
439
440 bufs.emplace_back(
443 opcode_t::connection_close_frame,
444 payload.size() ) );
445
446 bufs.emplace_back( std::move( payload ) );
447 m_outgoing_data.append( write_group_t{ std::move( bufs ) } );
448
450
451 // No more data must be written.
452 m_write_state = write_state_t::write_disabled;
453 }
454
455 //! Send close frame to peer.
456 void
458 status_code_t code,
459 std::string desc = std::string{} )
460 {
461 send_close_frame_to_peer( std::string{ status_code_to_bin( code ) + desc } );
462 }
463
464 //! Trigger an error.
465 /*!
466 Writes error message to log,
467 closes socket,
468 and sends close frame to user if necessary.
469
470 @note
471 This method is noexcept since v.0.6.0
472 */
473 template< typename MSG_BUILDER >
474 void
476 status_code_t status,
477 MSG_BUILDER msg_builder ) noexcept
478 {
479 // An exception in logger shouldn't prevent the main actions.
480 restinio::utils::log_error_noexcept(
481 m_logger, std::move( msg_builder ) );
482
483 // This can throw but we have to suppress any exceptions.
484 restinio::utils::suppress_exceptions(
485 m_logger, "ws_connection.call_close_handler_if_necessary",
486 [this, status] {
488 } );
489
491 }
492
493
494 //! Start the process of reading ws messages from socket.
495 void
497 {
498 m_logger.trace( [&]{
499 return fmt::format(
501 "[ws_connection:{}] start reading header" ),
502 connection_id() );
503 } );
504
505 // Prepare parser for consuming new message.
506 m_input.reset_parser_and_payload();
507
508 if( 0 == m_input.m_buf.length() )
509 {
511 }
512 else
513 {
514 // Has something to read from m_input.m_buf.
515 consume_header_from_buffer(
516 m_input.m_buf.bytes(), m_input.m_buf.length() );
517 }
518 }
519
520 //! Initiate read operation on socket to receive bytes for header.
521 void
523 {
524 m_logger.trace( [&]{
525 return fmt::format(
527 "[ws_connection:{}] continue reading message" ),
528 connection_id() );
529 } );
530
531 m_socket.async_read_some(
532 m_input.m_buf.make_asio_buffer(),
533 asio_ns::bind_executor(
534 this->get_executor(),
535 [ this, ctx = shared_from_this() ]
536 // NOTE: this lambda is noexcept since v.0.6.0.
537 ( const asio_ns::error_code & ec, std::size_t length ) noexcept
538 {
539 try
540 {
541 after_read_header( ec, length );
542 }
543 catch( const std::exception & ex )
544 {
545 trigger_error_and_close(
546 status_code_t::unexpected_condition,
547 [&]{
548 return fmt::format(
550 "[ws_connection:{}] after read header "
551 "callback error: {}" ),
552 connection_id(),
553 ex.what() );
554 } );
555 }
556 } ) );
557 }
558
559 //! Handle read error (reading header or payload)
560 void
561 handle_read_error( const char * desc, const asio_ns::error_code & ec )
562 {
563 // Assume that connection is lost.
564 trigger_error_and_close(
566 [&]{
567 return fmt::format(
568 RESTINIO_FMT_FORMAT_STRING( "[ws_connection:{}] {}: {}" ),
569 connection_id(),
570 desc,
571 ec.message() );
572 } );
573 }
574
575 //! Handle read operation result, when reading header.
576 void
578 const asio_ns::error_code & ec,
579 std::size_t length )
580 {
581 if( !ec )
582 {
583 m_logger.trace( [&]{
584 return fmt::format(
586 "[ws_connection:{}] received {} bytes" ),
587 this->connection_id(),
588 length );
589 } );
590
591 m_input.m_buf.obtained_bytes( length );
592 consume_header_from_buffer( m_input.m_buf.bytes(), length );
593 }
594 else
595 {
596 handle_read_error( "reading message header error", ec );
597 }
598 }
599
600 //! Parse header from internal buffer.
601 void
602 consume_header_from_buffer( const char * data, std::size_t length )
603 {
604 const auto nparsed = m_input.m_parser.parser_execute( data, length );
605
606 m_input.m_buf.consumed_bytes( nparsed );
607
608 if( m_input.m_parser.header_parsed() )
609 {
610 handle_parsed_header( m_input.m_parser.current_message() );
611 }
612 else
613 {
614 assert( nparsed == length );
616 }
617 }
618
619 //! Handle parsed header.
620 void
622 {
623 m_logger.trace( [&]{
624 return fmt::format(
626 "[ws_connection:{}] start handling {} ({:#x})" ),
627 connection_id(),
629 static_cast<std::uint16_t>(md.m_opcode) );
630 } );
631
632 const auto validation_result =
633 m_protocol_validator.process_new_frame( md );
634
635 if( validation_state_t::frame_header_is_valid != validation_result )
636 {
637 m_logger.error( [&]{
638 return fmt::format(
640 "[ws_connection:{}] invalid header" ),
641 connection_id() );
642 } );
643
644 if( read_state_t::read_any_frame == m_read_state )
645 {
646 m_close_frame_to_peer.run_if_first(
647 [&]{
648 send_close_frame_to_peer( status_code_t::protocol_error );
649 // Do not wait anything in return, because
650 // protocol is violated.
651 } );
652
654 }
655 else if( read_state_t::read_only_close_frame == m_read_state )
656 {
657 // Wait for close frame cannot be done.
659 }
660
661 return;
662 }
663
665 }
666
667 //! Handle parsed and valid header.
668 void
670 {
671 const auto payload_length =
673
674 m_input.m_payload.resize( payload_length );
675
676 if( payload_length == 0 )
677 {
678 // Callback for message with 0-size payload.
680 }
681 else
682 {
683 const auto payload_part_size =
684 std::min( m_input.m_buf.length(), payload_length );
685
686 std::memcpy(
687 &m_input.m_payload.front(),
688 m_input.m_buf.bytes(),
689 payload_part_size );
690
691 m_input.m_buf.consumed_bytes( payload_part_size );
692
693 const std::size_t length_remaining =
694 payload_length - payload_part_size;
695
696 if( validate_payload_part(
697 &m_input.m_payload.front(),
698 payload_part_size,
699 length_remaining ) )
700 {
701 if( 0 == length_remaining )
702 {
703 // All message is obtained.
705 }
706 else
707 {
708 // Read the rest of payload:
709 start_read_payload(
710 &m_input.m_payload.front() + payload_part_size,
711 length_remaining );
712 }
713 }
714 // Else payload is invalid and validate_payload_part()
715 // has handled the case so do nothing.
716 }
717 }
718
719 //! Start reading message payload.
720 void
722 //! A pointer to the remainder of unfetched payload.
723 char * payload_data,
724 //! The size of the remainder of unfetched payload.
725 std::size_t length_remaining,
726 //! Validate payload and call handler.
727 bool do_validate_payload_and_call_msg_handler = true )
728 {
729 m_socket.async_read_some(
730 asio_ns::buffer( payload_data, length_remaining ),
731 asio_ns::bind_executor(
732 this->get_executor(),
733 [ this,
734 ctx = shared_from_this(),
735 payload_data,
736 length_remaining,
737 do_validate_payload_and_call_msg_handler ]
738 // NOTE: this lambda is noexcept since v.0.6.0.
739 ( const asio_ns::error_code & ec, std::size_t length ) noexcept
740 {
741 try
742 {
744 payload_data,
745 length_remaining,
746 ec,
747 length,
748 do_validate_payload_and_call_msg_handler );
749 }
750 catch( const std::exception & ex )
751 {
752 trigger_error_and_close(
754 [&]{
755 return fmt::format(
757 "[ws_connection:{}] after read payload "
758 "callback error: {}" ),
759 connection_id(),
760 ex.what() );
761 } );
762 }
763 } ) );
764 }
765
766 //! Handle read operation result, when reading payload.
767 void
769 char * payload_data,
770 std::size_t length_remaining,
771 const asio_ns::error_code & ec,
772 std::size_t length,
773 bool do_validate_payload_and_call_msg_handler = true )
774 {
775 if( !ec )
776 {
777 m_logger.trace( [&]{
778 return fmt::format(
780 "[ws_connection:{}] received {} bytes" ),
781 this->connection_id(),
782 length );
783 } );
784
785 assert( length <= length_remaining );
786
787 const std::size_t next_length_remaining =
788 length_remaining - length;
789
790 if( do_validate_payload_and_call_msg_handler )
791 {
792 if( validate_payload_part( payload_data, length, next_length_remaining ) )
793 {
794 if( 0 == next_length_remaining )
795 {
796 // Here: all the payload is ready.
797
798 // All message is obtained.
800 }
801 else
802 {
803 //Here: not all payload is obtained,
804 // so inintiate read once again:
806 payload_data + length,
807 next_length_remaining,
808 do_validate_payload_and_call_msg_handler );
809 }
810 }
811 // Else payload is invalid and validate_payload_part()
812 // has handled the case so do nothing.
813 }
814 else
815 {
816 if( 0 == next_length_remaining )
817 {
819 }
820 else
821 {
823 payload_data + length,
824 length_remaining - length,
825 do_validate_payload_and_call_msg_handler );
826 }
827 }
828 }
829 else
830 {
831 handle_read_error( "reading message payload error", ec );
832 }
833 }
834
835 //! Call user message handler with current message.
836 void
837 call_message_handler( message_handle_t close_frame )
838 {
839 if( auto wsh = m_websocket_weak_handle.lock() )
840 {
841 try
842 {
844 std::move( wsh ),
845 std::move( close_frame ) );
846 }
847 catch( const std::exception & ex )
848 {
849 m_logger.error( [&]{
850 return fmt::format(
852 "[ws_connection:{}] execute handler error: {}" ),
853 connection_id(),
854 ex.what() );
855 } );
856 }
857 }
858 }
859
860 //! Validates a part of received payload.
861 bool
863 char * payload_data,
864 std::size_t length,
865 std::size_t next_length_remaining )
866 {
867 const auto validation_result =
868 m_protocol_validator.process_and_unmask_next_payload_part( payload_data, length );
869
870 if( validation_state_t::payload_part_is_valid != validation_result )
871 {
872 handle_invalid_payload( validation_result );
873
874 if( validation_state_t::incorrect_utf8_data == validation_result )
875 {
876 // Can skip this payload because it was not a bad close frame.
877
878 // It is the case we are expecting close frame
879 // so validator must be ready to receive more headers
880 // and payloads after this frame.
881 m_protocol_validator.reset();
882
883 if( 0 == next_length_remaining )
884 {
886 }
887 else
888 {
889 // Skip checking payload for this frame:
890 const bool do_validate_payload_and_call_msg_handler = false;
892 payload_data + length,
893 next_length_remaining,
894 do_validate_payload_and_call_msg_handler );
895 }
896 }
897 return false;
898 }
899
900 return true;
901 }
902
903 //! Handle payload errors.
904 void
906 {
907 m_logger.error( [&]{
908 return fmt::format(
910 "[ws_connection:{}] invalid paload" ),
911 connection_id() );
912 } );
913
914 if( validation_state_t::invalid_close_code == validation_result )
915 {
916 // A corner case: invalid payload in close frame.
917
918 if( read_state_t::read_any_frame == m_read_state )
919 {
920 // Case: close frame was not expected.
921
922 // This actually must be executed:
923 m_close_frame_to_peer.run_if_first(
924 [&]{
925 send_close_frame_to_peer( status_code_t::protocol_error );
926 // Do not wait anything in return, because
927 // protocol is violated.
928 } );
929
930 // Notify user of a close but use a correct close code.
932 }
933 else if( read_state_t::read_only_close_frame == m_read_state )
934 {
935 // Case: close frame was expected.
936
937 // We got a close frame but it is incorrect,
938 // so just close (there is not too much we can do).
940 }
941 }
942 else
943 {
944 if( read_state_t::read_any_frame == m_read_state )
945 {
946 m_close_frame_to_peer.run_if_first(
947 [&]{
948 send_close_frame_to_peer( status_code_t::invalid_message_data );
950 } );
951
953 }
954 }
955 }
956
957 void
959 {
960 auto & md = m_input.m_parser.current_message();
961
962 const auto validation_result = m_protocol_validator.finish_frame();
963 if( validation_state_t::frame_is_valid == validation_result )
964 {
965 if( read_state_t::read_any_frame == m_read_state )
966 {
967 if( opcode_t::connection_close_frame == md.m_opcode )
968 {
969 m_logger.trace( [&]{
970 return fmt::format(
972 "[ws_connection:{}] got close frame from "
973 "peer, status: {}" ),
974 connection_id(),
975 static_cast<std::uint16_t>(
976 status_code_from_bin( m_input.m_payload )) );
977 } );
978
979 m_close_frame_to_user.disable();
980 m_close_frame_to_peer.run_if_first(
981 [&]{
982 send_close_frame_to_peer( m_input.m_payload );
983 } );
984
985 m_read_state = read_state_t::read_nothing;
986 }
987
988 call_message_handler(
989 std::make_shared< message_t >(
990 md.m_final_flag ? final_frame : not_final_frame,
991 md.m_opcode,
992 std::move( m_input.m_payload ) ) );
993
994 if( read_state_t::read_nothing != m_read_state )
996 }
997 else
998 {
999 assert( read_state_t::read_only_close_frame == m_read_state );
1000
1001 if( opcode_t::connection_close_frame == md.m_opcode )
1002 {
1003 // Got it!
1004 m_timer_guard.cancel();
1005
1007
1008 m_logger.trace( [&]{
1009 return fmt::format(
1011 "[ws_connection:{}] expected close frame came" ),
1012 connection_id() );
1013 } );
1014 }
1015 else
1016 {
1017 // Wait for next frame.
1019 }
1020 }
1021 }
1022 else
1023 {
1024 handle_invalid_payload( validation_result );
1025 }
1026 }
1027
1028 void
1030 {
1031 m_close_frame_to_user.run_if_first(
1032 [&]{
1034 std::make_shared< message_t >(
1036 opcode_t::connection_close_frame,
1037 status_code_to_bin( status ) ) );
1038 } );
1039 }
1040
1041 //! Implementation of writing data performed on the asio_ns::io_context.
1042 void
1043 write_data_impl( write_group_t wg, bool is_close_frame )
1044 {
1045 if( m_socket.is_open() )
1046 {
1047 if( is_close_frame )
1048 {
1049 m_logger.trace( [&]{
1050 return fmt::format(
1052 "[ws_connection:{}] user sends close frame" ),
1053 connection_id() );
1054 } );
1055
1056 m_close_frame_to_peer.disable(); // It is formed and sent by user
1057 m_close_frame_to_user.disable(); // And user knows that websocket is closed.
1058 // No more writes.
1059 m_write_state = write_state_t::write_disabled;
1060
1061 // Start waiting only close-frame.
1063 }
1064
1065 // Push write_group to queue.
1066 m_outgoing_data.append( std::move( wg ) );
1067
1069 }
1070 else
1071 {
1072 m_logger.warn( [&]{
1073 return fmt::format(
1075 "[ws_connection:{}] try to write while "
1076 "socket is closed" ),
1077 connection_id() );
1078 } );
1079
1080 try
1081 {
1085 }
1086 catch( ... )
1087 {}
1088 }
1089 }
1090
1091 //! Checks if there is something to write,
1092 //! and if so starts write operation.
1093 void
1095 {
1096 if( !m_write_output_ctx.transmitting() )
1097 {
1099 }
1100 }
1101
1102 //! Initiate write operation.
1103 void
1105 {
1106 // Here: not writing anything to socket, so
1107 // write operation can be initiated.
1108 auto next_write_group = m_outgoing_data.pop_ready_buffers();
1109
1110 if( next_write_group )
1111 {
1112 m_logger.trace( [&]{
1113 return fmt::format(
1115 "[ws_connection:{}] start next write group, "
1116 "size: {}" ),
1117 this->connection_id(),
1118 next_write_group->items_count() );
1119 } );
1120
1121 // Initialize write context with a new write group.
1122 m_write_output_ctx.start_next_write_group(
1123 std::move( next_write_group ) );
1124
1125 // Start the loop of sending data from current write group.
1127 }
1128 }
1129
1130 // Use aliases for shorter names.
1134
1135 void
1137 {
1138 try
1139 {
1140 auto wo = m_write_output_ctx.extract_next_write_operation();
1141
1142 if( std::holds_alternative< trivial_write_operation_t >( wo ) )
1143 {
1145 }
1146 else if( std::holds_alternative< none_write_operation_t >( wo ) )
1147 {
1149 }
1150 else
1151 {
1152 assert( std::holds_alternative< file_write_operation_t >( wo ) );
1153 throw exception_t{ "sendfile write operation not implemented" };
1154 }
1155 }
1156 catch( const std::exception & ex )
1157 {
1158 trigger_error_and_close(
1160 [&]{
1161 return fmt::format(
1163 "[ws_connection:{}] handle_current_write_ctx failed: {}" ),
1164 connection_id(),
1165 ex.what() );
1166 } );
1167 }
1168 }
1169
1170 void
1172 {
1173 // Asio buffers (param for async write):
1174 auto & bufs = op.get_trivial_bufs();
1175
1176 m_logger.trace( [&]{
1177 return fmt::format(
1179 "[ws_connection:{}] sending data with "
1180 "buf count: {}, "
1181 "total size: {}" ),
1182 connection_id(),
1183 bufs.size(),
1184 op.size() ); } );
1185
1187
1188 // There is somethig to write.
1189 asio_ns::async_write(
1190 m_socket,
1191 bufs,
1192 asio_ns::bind_executor(
1193 this->get_executor(),
1194 [ this,
1195 ctx = shared_from_this() ]
1196 // NOTE: this lambda is noexcept since v.0.6.0.
1197 ( const asio_ns::error_code & ec, std::size_t written ) noexcept
1198 {
1199 try
1200 {
1201 if( !ec )
1202 {
1203 m_logger.trace( [&]{
1204 return fmt::format(
1206 "[ws_connection:{}] outgoing data was "
1207 "sent: {} bytes" ),
1208 connection_id(),
1209 written );
1210 } );
1211 }
1212
1213 after_write( ec );
1214 }
1215 catch( const std::exception & ex )
1216 {
1217 trigger_error_and_close(
1219 [&]{
1220 return fmt::format(
1222 "[ws_connection:{}] after write "
1223 "callback error: {}" ),
1224 connection_id(),
1225 ex.what() );
1226 } );
1227 }
1228 } ) );
1229 }
1230
1231 //! Do post write actions for current write group.
1232 void
1234 {
1235 // Finishing writing this group.
1236 m_logger.trace( [&]{
1237 return fmt::format(
1239 "[ws_connection:{}] finishing current write group" ),
1240 this->connection_id() );
1241 } );
1242
1243 // Group notificators are called from here (if exist):
1244 m_write_output_ctx.finish_write_group();
1245
1246 // Start another write opertion
1247 // if there is something to send.
1249 }
1250
1251 //! Handle write response finished.
1252 void
1253 after_write( const asio_ns::error_code & ec )
1254 {
1255 if( !ec )
1256 {
1258 }
1259 else
1260 {
1261 trigger_error_and_close(
1263 [&]{
1264 return fmt::format(
1266 "[ws_connection:{}] unable to write: {}" ),
1267 connection_id(),
1268 ec.message() );
1269 } );
1270
1271 try
1272 {
1273 m_write_output_ctx.fail_write_group( ec );
1274 }
1275 catch( const std::exception & ex )
1276 {
1277 m_logger.error( [&]{
1278 return fmt::format(
1280 "[ws_connection:{}] notificator error: {}" ),
1281 connection_id(),
1282 ex.what() );
1283 } );
1284 }
1285 }
1286 }
1287
1288 //! Common paramaters of a connection.
1290
1291 //! Connection.
1293
1294 /*!
1295 * @brief Monitor of the connection lifetime.
1296 *
1297 * @since v.0.6.12
1298 */
1300
1301 //! Timers.
1302 //! \{
1303 static ws_connection_t &
1305 {
1306 return static_cast< ws_connection_t & >( base );
1307 }
1308
1309 virtual void
1311 {
1312 asio_ns::dispatch(
1313 this->get_executor(),
1314 [ ctx = std::move( self ) ]
1315 // NOTE: this lambda is noexcept since v.0.6.0.
1316 () noexcept
1317 {
1318 auto & conn_object = cast_to_self( *ctx );
1319 // If an exception will be thrown we can only
1320 // close the connection.
1321 try
1322 {
1323 conn_object.check_timeout_impl();
1324 }
1325 catch( const std::exception & x )
1326 {
1327 conn_object.trigger_error_and_close(
1329 [&] {
1330 return fmt::format(
1332 "[connection: {}] unexpected "
1333 "error during timeout handling: {}" ),
1334 conn_object.connection_id(),
1335 x.what() );
1336 } );
1337 }
1338 } );
1339 }
1340
1341 std::chrono::steady_clock::time_point m_write_operation_timeout_after;
1342 std::chrono::steady_clock::time_point m_close_frame_from_peer_timeout_after =
1346
1347 void
1349 {
1350 const auto now = std::chrono::steady_clock::now();
1351 if( m_write_output_ctx.transmitting() && now > m_write_operation_timeout_after )
1352 {
1353 m_logger.trace( [&]{
1354 return fmt::format(
1356 "[wd_connection:{}] write operation timed out" ),
1357 connection_id() );
1358 } );
1359 m_close_frame_to_peer.disable();
1362 }
1363 else if( now > m_close_frame_from_peer_timeout_after )
1364 {
1365 m_logger.trace( [&]{
1366 return fmt::format(
1368 "[wd_connection:{}] waiting for close-frame "
1369 "from peer timed out" ),
1370 connection_id() );
1371 } );
1373 }
1374 else
1375 {
1377 }
1378 }
1379
1380 //! schedule next timeout checking.
1381 void
1383 {
1384 m_timer_guard.schedule( m_prepared_weak_ctx );
1385 }
1386
1387 //! Start guard write operation if necessary.
1388 void
1390 {
1391 m_write_operation_timeout_after =
1392 std::chrono::steady_clock::now() + m_settings->m_write_http_response_timelimit;
1393 }
1394
1395 void
1397 {
1398 m_close_frame_from_peer_timeout_after =
1399 std::chrono::steady_clock::now() + m_settings->m_read_next_http_message_timelimit;
1400 }
1401 //! \}
1402
1403 //! Input routine.
1405
1406 //! Helper for validating protocol.
1408
1409 //! Websocket message handler provided by user.
1411
1412 //! Logger for operation
1414
1415 //! Write to socket operation context.
1417
1418 //! Output buffers queue.
1420
1421 //! A waek handler for owning ws_t to use it when call message handler.
1423
1424 //! Websocket output states.
1425 enum class write_state_t
1426 {
1427 //! Able to append outgoing data.
1429 //! No more outgoing data can be added (e.g. close-frame was sent).
1431 };
1432
1433 //! A state of a websocket output.
1435
1436 //! Websocket input states.
1437 enum class read_state_t
1438 {
1439 //! Reads any type of frame and serve it to user.
1441 //! Reads only close frame: skip all frames until close-frame.
1443 //! Do not read anything (before activation).
1445 };
1446
1447 //! A state of a websocket input.
1449
1450 //! A helper class for running exclusive action.
1451 //! Only a first action will run.
1453 {
1454 public:
1455 template < typename Action >
1456 void
1457 run_if_first( Action && action ) noexcept(noexcept(action()))
1458 {
1459 if( m_not_executed_yet )
1460 {
1461 m_not_executed_yet = false;
1462 action();
1463 }
1464 }
1465
1466 //! Disable ation: action will not be executed even on a first shot.
1467 void
1469 {
1470 m_not_executed_yet = false;
1471 }
1472
1473 private:
1475 };
1476
1480};
1481
1482} /* namespace impl */
1483
1484} /* namespace basic */
1485
1486} /* namespace websocket */
1487
1488} /* namespace restinio */
Exception class for all exceptions thrown by RESTinio.
Definition exception.hpp:26
exception_t(const char *err)
Definition exception.hpp:29
Wrapper for an executor (strand) used by connections.
Helper class for reading bytes and feeding them to parser.
auto size() const noexcept
The size of data within this operation.
Helper class for writting response data.
Websocket message class with more detailed protocol information.
Definition ws_parser.hpp:63
std::uint64_t payload_len() const
Get payload len.
Definition ws_parser.hpp:92
A helper class for running exclusive action. Only a first action will run.
void run_if_first(Action &&action) noexcept(noexcept(action()))
void disable()
Disable ation: action will not be executed even on a first shot.
void finish_handling_current_write_ctx()
Do post write actions for current write group.
void after_read_header(const asio_ns::error_code &ec, std::size_t length)
Handle read operation result, when reading header.
restinio::impl::connection_settings_handle_t< Traits > m_settings
Common paramaters of a connection.
void consume_header_from_socket()
Initiate read operation on socket to receive bytes for header.
message_handler_t m_msg_handler
Websocket message handler provided by user.
void handle_parsed_header(const message_details_t &md)
Handle parsed header.
bool validate_payload_part(char *payload_data, std::size_t length, std::size_t next_length_remaining)
Validates a part of received payload.
ws_weak_handle_t m_websocket_weak_handle
A waek handler for owning ws_t to use it when call message handler.
virtual void shutdown() override
Shutdown websocket.
void send_close_frame_to_peer(std::string payload)
Send close frame to peer.
void write_data_impl(write_group_t wg, bool is_close_frame)
Implementation of writing data performed on the asio_ns::io_context.
@ read_only_close_frame
Reads only close frame: skip all frames until close-frame.
@ read_nothing
Do not read anything (before activation).
@ read_any_frame
Reads any type of frame and serve it to user.
@ write_disabled
No more outgoing data can be added (e.g. close-frame was sent).
std::chrono::steady_clock::time_point m_close_frame_from_peer_timeout_after
typename timer_manager_t::timer_guard_t timer_guard_t
void init_write_if_necessary()
Checks if there is something to write, and if so starts write operation.
void trigger_error_and_close(status_code_t status, MSG_BUILDER msg_builder) noexcept
Trigger an error.
void start_read_header()
Start the process of reading ws messages from socket.
void start_read_payload(char *payload_data, std::size_t length_remaining, bool do_validate_payload_and_call_msg_handler=true)
Start reading message payload.
tcp_connection_ctx_weak_handle_t m_prepared_weak_ctx
void handle_trivial_write_operation(const trivial_write_operation_t &op)
void init_next_timeout_checking()
schedule next timeout checking.
void handle_invalid_payload(validation_state_t validation_result)
Handle payload errors.
void after_read_payload(char *payload_data, std::size_t length_remaining, const asio_ns::error_code &ec, std::size_t length, bool do_validate_payload_and_call_msg_handler=true)
Handle read operation result, when reading payload.
void consume_header_from_buffer(const char *data, std::size_t length)
Parse header from internal buffer.
void send_close_frame_to_peer(status_code_t code, std::string desc=std::string{})
Send close frame to peer.
ws_connection_t(const ws_connection_t &)=delete
virtual void check_timeout(tcp_connection_ctx_handle_t &self) override
void handle_read_error(const char *desc, const asio_ns::error_code &ec)
Handle read error (reading header or payload)
write_state_t m_write_state
A state of a websocket output.
ws_protocol_validator_t m_protocol_validator
Helper for validating protocol.
void handle_parsed_and_valid_header(const message_details_t &md)
Handle parsed and valid header.
ws_connection_t & operator=(const ws_connection_t &)=delete
restinio::impl::executor_wrapper_t< typename Traits::strand_t > executor_wrapper_base_t
void start_waiting_close_frame_only()
Start waiting for close-frame.
std::chrono::steady_clock::time_point m_write_operation_timeout_after
virtual void kill() override
Kill websocket.
ws_connection_t & operator=(ws_connection_t &&)=delete
void guard_write_operation()
Start guard write operation if necessary.
ws_outgoing_data_t m_outgoing_data
Output buffers queue.
void init_read(ws_handle_t wsh) override
Start reading ws-messages.
std::shared_ptr< timer_manager_t > timer_manager_handle_t
void call_message_handler(message_handle_t close_frame)
Call user message handler with current message.
static ws_connection_t & cast_to_self(tcp_connection_ctx_base_t &base)
Timers.
typename connection_count_limit_types< Traits >::lifetime_monitor_t lifetime_monitor_t
::restinio::impl::write_group_output_ctx_t::file_write_operation_t file_write_operation_t
lifetime_monitor_t m_lifetime_monitor
Monitor of the connection lifetime.
::restinio::impl::write_group_output_ctx_t::none_write_operation_t none_write_operation_t
void after_write(const asio_ns::error_code &ec)
Handle write response finished.
restinio::impl::write_group_output_ctx_t m_write_output_ctx
Write to socket operation context.
void close_impl() noexcept
Standard close routine.
::restinio::impl::write_group_output_ctx_t::trivial_write_operation_t trivial_write_operation_t
typename Traits::stream_socket_t stream_socket_t
virtual void write_data(write_group_t wg, bool is_close_frame) override
Write pieces of outgoing data.
void graceful_close()
Close WebSocket connection in a graceful manner.
read_state_t m_read_state
A state of a websocket input.
typename Traits::timer_manager_t timer_manager_t
write_groups_queue_t m_awaiting_write_groups
A queue of buffers.
void append(write_group_t wg)
Add buffers to queue.
std::optional< write_group_t > pop_ready_buffers()
Group of writable items transported to the context of underlying connection as one solid piece.
Definition buffers.hpp:727
void invoke_after_write_notificator_if_exists(const asio_ns::error_code &ec)
Get after write notificator.
Definition buffers.hpp:850
#define RESTINIO_ENSURE_NOEXCEPT_CALL(expr)
A wrapper around static_assert for checking that an expression is noexcept and execution of that expr...
#define RESTINIO_FMT_FORMAT_STRING(s)
std::size_t uint64_to_size_t(std::uint64_t v)
Helper function for truncating uint64 to std::size_t with exception if that truncation will lead to d...
raw_data_t write_message_details(const message_details_t &message)
Serialize websocket message details into bytes buffer.
constexpr size_t websocket_header_max_size()
Max possible size of websocket frame header (a part before payload).
validation_state_t
States of validated frame.
constexpr final_frame_flag_t final_frame
Definition message.hpp:135
const char * opcode_to_string(opcode_t opcode)
Helper sunction to get method string name.
Definition message.hpp:46
std::string status_code_to_bin(status_code_t code)
Definition message.hpp:101
std::weak_ptr< tcp_connection_ctx_base_t > tcp_connection_ctx_weak_handle_t
Alias for http connection weak handle.
asio_convertible_error_t
Enum for restinio errors that must presented as asio_ns::error_code value.
@ write_was_not_executed
After write notificator error: data was not sent, connection closed (or aborted) before a given piece...
std::uint64_t connection_id_t
Type for ID of connection.
asio_ns::error_code make_asio_compaible_error(asio_convertible_error_t err) noexcept
Make restinio error_code compatible with asio_ns::error_code.
std::shared_ptr< tcp_connection_ctx_base_t > tcp_connection_ctx_handle_t
Alias for http connection handle.
A kind of metafunction that deduces actual types related to connection count limiter in the dependecy...
restinio::impl::fixed_buffer_t m_buf
Input buffer.
void reset_parser_and_payload()
Prepare parser for reading new http-message.