@@ -43,7 +43,7 @@ struct aws_websocket {
43
43
aws_websocket_on_incoming_frame_complete_fn * on_incoming_frame_complete ;
44
44
45
45
struct aws_channel_task move_synced_data_to_thread_task ;
46
- struct aws_channel_task shutdown_channel_task ;
46
+ struct aws_channel_task shutdown_channel_from_offthread_task ;
47
47
struct aws_channel_task increment_read_window_task ;
48
48
struct aws_channel_task waiting_on_payload_stream_task ;
49
49
struct aws_channel_task close_timeout_task ;
@@ -85,7 +85,10 @@ struct aws_websocket {
85
85
/* True when no more frames will be read, due to:
86
86
* - a CLOSE frame was received
87
87
* - decoder error
88
- * - channel shutdown in read-dir */
88
+ * - channel shutdown in read-dir
89
+ * - user calling aws_websocket_close()
90
+ * - user dropping the last refcount
91
+ */
89
92
bool is_reading_stopped ;
90
93
91
94
/* True when no more frames will be written, due to:
@@ -124,9 +127,9 @@ struct aws_websocket {
124
127
/* Error-code returned by aws_websocket_send_frame() when is_writing_stopped is true */
125
128
int send_frame_error_code ;
126
129
127
- /* Use a task to issue a channel shutdown. */
128
- int shutdown_channel_task_error_code ;
129
- bool is_shutdown_channel_task_scheduled ;
130
+ /* Use a task to issue a channel shutdown from off-thread . */
131
+ int shutdown_channel_from_offthread_task_error_code ;
132
+ bool is_shutdown_channel_from_offthread_task_scheduled ;
130
133
131
134
bool is_move_synced_data_to_thread_task_scheduled ;
132
135
@@ -186,10 +189,13 @@ static bool s_midchannel_send_payload(struct aws_websocket *websocket, struct aw
186
189
static void s_midchannel_send_complete (struct aws_websocket * websocket , int error_code , void * user_data );
187
190
static void s_move_synced_data_to_thread_task (struct aws_channel_task * task , void * arg , enum aws_task_status status );
188
191
static void s_increment_read_window_task (struct aws_channel_task * task , void * arg , enum aws_task_status status );
189
- static void s_shutdown_channel_task (struct aws_channel_task * task , void * arg , enum aws_task_status status );
192
+ static void s_shutdown_channel_from_offthread_task (
193
+ struct aws_channel_task * task ,
194
+ void * arg ,
195
+ enum aws_task_status status );
190
196
static void s_waiting_on_payload_stream_task (struct aws_channel_task * task , void * arg , enum aws_task_status status );
191
197
static void s_close_timeout_task (struct aws_channel_task * task , void * arg , enum aws_task_status status );
192
- static void s_schedule_channel_shutdown (struct aws_websocket * websocket , int error_code );
198
+ static void s_schedule_channel_shutdown_from_offthread (struct aws_websocket * websocket , int error_code );
193
199
static void s_shutdown_due_to_write_err (struct aws_websocket * websocket , int error_code );
194
200
static void s_shutdown_due_to_read_err (struct aws_websocket * websocket , int error_code );
195
201
static void s_stop_writing (struct aws_websocket * websocket , int send_frame_error_code );
@@ -285,7 +291,10 @@ struct aws_websocket *aws_websocket_handler_new(const struct aws_websocket_handl
285
291
websocket ,
286
292
"websocket_move_synced_data_to_thread" );
287
293
aws_channel_task_init (
288
- & websocket -> shutdown_channel_task , s_shutdown_channel_task , websocket , "websocket_shutdown_channel" );
294
+ & websocket -> shutdown_channel_from_offthread_task ,
295
+ s_shutdown_channel_from_offthread_task ,
296
+ websocket ,
297
+ "websocket_shutdown_channel" );
289
298
aws_channel_task_init (
290
299
& websocket -> increment_read_window_task ,
291
300
s_increment_read_window_task ,
@@ -377,7 +386,7 @@ static void s_websocket_on_refcount_zero(void *user_data) {
377
386
AWS_LS_HTTP_WEBSOCKET , "id=%p: Websocket ref-count is zero, shut down if necessary." , (void * )websocket );
378
387
379
388
/* Channel might already be shut down, but make sure */
380
- s_schedule_channel_shutdown (websocket , AWS_ERROR_SUCCESS );
389
+ s_schedule_channel_shutdown_from_offthread (websocket , AWS_ERROR_SUCCESS );
381
390
382
391
/* Channel won't destroy its slots/handlers until its refcount reaches 0 */
383
392
aws_channel_release_hold (websocket -> channel_slot -> channel );
@@ -897,6 +906,21 @@ static void s_complete_frame_list(struct aws_websocket *websocket, struct aws_li
897
906
aws_linked_list_init (frames );
898
907
}
899
908
909
+ /* Set is_reading_stopped = true, all further read data will be ignored.
910
+ * But also increment the read window, so that channel shutdown won't deadlock
911
+ * due to pending read-data in an upstream handler or the underlying OS socket. */
912
+ static void s_stop_reading_and_dont_block_shutdown (struct aws_websocket * websocket ) {
913
+ AWS_ASSERT (aws_channel_thread_is_callers_thread (websocket -> channel_slot -> channel ));
914
+ if (websocket -> thread_data .is_reading_stopped ) {
915
+ return ;
916
+ }
917
+
918
+ AWS_LOGF_TRACE (AWS_LS_HTTP_WEBSOCKET , "id=%p: Websocket will ignore any further read data." , (void * )websocket );
919
+ websocket -> thread_data .is_reading_stopped = true;
920
+
921
+ aws_channel_slot_increment_read_window (websocket -> channel_slot , SIZE_MAX );
922
+ }
923
+
900
924
static void s_stop_writing (struct aws_websocket * websocket , int send_frame_error_code ) {
901
925
AWS_ASSERT (aws_channel_thread_is_callers_thread (websocket -> channel_slot -> channel ));
902
926
AWS_ASSERT (send_frame_error_code != AWS_ERROR_SUCCESS );
@@ -947,7 +971,7 @@ static void s_shutdown_due_to_write_err(struct aws_websocket *websocket, int err
947
971
(void * )websocket ,
948
972
error_code ,
949
973
aws_error_name (error_code ));
950
- s_schedule_channel_shutdown (websocket , error_code );
974
+ aws_channel_shutdown (websocket -> channel_slot -> channel , error_code );
951
975
}
952
976
}
953
977
@@ -961,18 +985,22 @@ static void s_shutdown_due_to_read_err(struct aws_websocket *websocket, int erro
961
985
error_code ,
962
986
aws_error_name (error_code ));
963
987
964
- websocket -> thread_data . is_reading_stopped = true ;
988
+ s_stop_reading_and_dont_block_shutdown ( websocket ) ;
965
989
966
990
/* If there's a current incoming frame, complete it with the specific error code. */
967
991
if (websocket -> thread_data .current_incoming_frame ) {
968
992
s_complete_incoming_frame (websocket , error_code , NULL );
969
993
}
970
994
971
995
/* Tell channel to shutdown (it's ok to call this redundantly) */
972
- s_schedule_channel_shutdown (websocket , error_code );
996
+ aws_channel_shutdown (websocket -> channel_slot -> channel , error_code );
973
997
}
974
998
975
- static void s_shutdown_channel_task (struct aws_channel_task * task , void * arg , enum aws_task_status status ) {
999
+ static void s_shutdown_channel_from_offthread_task (
1000
+ struct aws_channel_task * task ,
1001
+ void * arg ,
1002
+ enum aws_task_status status ) {
1003
+
976
1004
(void )task ;
977
1005
978
1006
if (status != AWS_TASK_STATUS_RUN_READY ) {
@@ -985,39 +1013,39 @@ static void s_shutdown_channel_task(struct aws_channel_task *task, void *arg, en
985
1013
/* BEGIN CRITICAL SECTION */
986
1014
s_lock_synced_data (websocket );
987
1015
988
- error_code = websocket -> synced_data .shutdown_channel_task_error_code ;
1016
+ error_code = websocket -> synced_data .shutdown_channel_from_offthread_task_error_code ;
989
1017
990
1018
s_unlock_synced_data (websocket );
991
1019
/* END CRITICAL SECTION */
992
- websocket -> thread_data .is_reading_stopped = true;
993
- websocket -> thread_data .is_writing_stopped = true;
1020
+
1021
+ /* Stop reading, so that shutdown won't be blocked.
1022
+ * If something off-thread is causing shutdown (aws_websocket_close(), refcount 0, etc),
1023
+ * the user may never interact with the websocket again. We can't rely on them
1024
+ * to keep the window open and prevent deadlock during shutdown. */
1025
+ s_stop_reading_and_dont_block_shutdown (websocket );
994
1026
995
1027
aws_channel_shutdown (websocket -> channel_slot -> channel , error_code );
996
- /* Increase the window size after shutdown starts, to prevent deadlock when data still pending in the upstream
997
- * handler. */
998
- aws_channel_slot_increment_read_window (websocket -> channel_slot , SIZE_MAX );
999
1028
}
1000
1029
1001
- /* Tell the channel to shut down. It is safe to call this multiple times.
1002
- * The call to aws_channel_shutdown() is delayed so that a user invoking aws_websocket_close doesn't
1003
- * have completion callbacks firing before the function call even returns */
1004
- static void s_schedule_channel_shutdown (struct aws_websocket * websocket , int error_code ) {
1030
+ /* Tell the channel to shut down, from off-thread. It is safe to call this multiple times. */
1031
+ static void s_schedule_channel_shutdown_from_offthread (struct aws_websocket * websocket , int error_code ) {
1005
1032
bool schedule_shutdown = false;
1006
1033
1007
1034
/* BEGIN CRITICAL SECTION */
1008
1035
s_lock_synced_data (websocket );
1009
1036
1010
- if (!websocket -> synced_data .is_shutdown_channel_task_scheduled ) {
1037
+ if (!websocket -> synced_data .is_shutdown_channel_from_offthread_task_scheduled ) {
1011
1038
schedule_shutdown = true;
1012
- websocket -> synced_data .is_shutdown_channel_task_scheduled = true;
1013
- websocket -> synced_data .shutdown_channel_task_error_code = error_code ;
1039
+ websocket -> synced_data .is_shutdown_channel_from_offthread_task_scheduled = true;
1040
+ websocket -> synced_data .shutdown_channel_from_offthread_task_error_code = error_code ;
1014
1041
}
1015
1042
1016
1043
s_unlock_synced_data (websocket );
1017
1044
/* END CRITICAL SECTION */
1018
1045
1019
1046
if (schedule_shutdown ) {
1020
- aws_channel_schedule_task_now (websocket -> channel_slot -> channel , & websocket -> shutdown_channel_task );
1047
+ aws_channel_schedule_task_now (
1048
+ websocket -> channel_slot -> channel , & websocket -> shutdown_channel_from_offthread_task );
1021
1049
}
1022
1050
}
1023
1051
@@ -1038,14 +1066,13 @@ void aws_websocket_close(struct aws_websocket *websocket, bool free_scarce_resou
1038
1066
return ;
1039
1067
}
1040
1068
1041
- /* TODO: aws_channel_shutdown() should let users specify error_code and "immediate" as separate parameters.
1042
- * Currently, any non-zero error_code results in "immediate" shutdown */
1069
+ /* TODO: aws_channel_shutdown() should let users specify error_code and "immediate" as separate parameters. */
1043
1070
int error_code = AWS_ERROR_SUCCESS ;
1044
1071
if (free_scarce_resources_immediately ) {
1045
1072
error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED ;
1046
1073
}
1047
1074
1048
- s_schedule_channel_shutdown (websocket , error_code );
1075
+ s_schedule_channel_shutdown_from_offthread (websocket , error_code );
1049
1076
}
1050
1077
1051
1078
static int s_handler_shutdown (
@@ -1255,17 +1282,7 @@ static int s_handler_process_read_message(
1255
1282
}
1256
1283
1257
1284
if (websocket -> thread_data .incoming_message_window_update > 0 ) {
1258
- err = aws_channel_slot_increment_read_window (slot , websocket -> thread_data .incoming_message_window_update );
1259
- if (err ) {
1260
- AWS_LOGF_ERROR (
1261
- AWS_LS_HTTP_WEBSOCKET ,
1262
- "id=%p: Failed to increment read window after message processing, error %d (%s). Closing "
1263
- "connection." ,
1264
- (void * )websocket ,
1265
- aws_last_error (),
1266
- aws_error_name (aws_last_error ()));
1267
- goto error ;
1268
- }
1285
+ aws_channel_slot_increment_read_window (slot , websocket -> thread_data .incoming_message_window_update );
1269
1286
}
1270
1287
1271
1288
goto clean_up ;
@@ -1508,7 +1525,7 @@ static void s_complete_incoming_frame(struct aws_websocket *websocket, int error
1508
1525
AWS_LS_HTTP_WEBSOCKET ,
1509
1526
"id=%p: Close frame received, any further data received will be ignored." ,
1510
1527
(void * )websocket );
1511
- websocket -> thread_data . is_reading_stopped = true ;
1528
+ s_stop_reading_and_dont_block_shutdown ( websocket ) ;
1512
1529
1513
1530
/* TODO: auto-close if there's a channel-handler to the right */
1514
1531
@@ -1598,37 +1615,17 @@ static int s_handler_increment_read_window(
1598
1615
}
1599
1616
1600
1617
if (increment != 0 ) {
1601
- int err = aws_channel_slot_increment_read_window (slot , increment );
1602
- if (err ) {
1603
- goto error ;
1604
- }
1618
+ aws_channel_slot_increment_read_window (slot , increment );
1605
1619
}
1606
1620
1607
1621
return AWS_OP_SUCCESS ;
1608
1622
1609
1623
error :
1610
- websocket -> thread_data .is_reading_stopped = true;
1611
1624
/* Shutting down channel because I know that no one ever checks these errors */
1612
1625
s_shutdown_due_to_read_err (websocket , aws_last_error ());
1613
1626
return AWS_OP_ERR ;
1614
1627
}
1615
1628
1616
- static void s_increment_read_window_action (struct aws_websocket * websocket , size_t size ) {
1617
- AWS_ASSERT (aws_channel_thread_is_callers_thread (websocket -> channel_slot -> channel ));
1618
-
1619
- int err = aws_channel_slot_increment_read_window (websocket -> channel_slot , size );
1620
- if (err ) {
1621
- AWS_LOGF_ERROR (
1622
- AWS_LS_HTTP_WEBSOCKET ,
1623
- "id=%p: Failed to increment read window, error %d (%s). Closing websocket." ,
1624
- (void * )websocket ,
1625
- aws_last_error (),
1626
- aws_error_name (aws_last_error ()));
1627
-
1628
- s_schedule_channel_shutdown (websocket , aws_last_error ());
1629
- }
1630
- }
1631
-
1632
1629
static void s_increment_read_window_task (struct aws_channel_task * task , void * arg , enum aws_task_status status ) {
1633
1630
(void )task ;
1634
1631
@@ -1651,7 +1648,7 @@ static void s_increment_read_window_task(struct aws_channel_task *task, void *ar
1651
1648
AWS_LOGF_TRACE (
1652
1649
AWS_LS_HTTP_WEBSOCKET , "id=%p: Running task to increment read window by %zu." , (void * )websocket , size );
1653
1650
1654
- s_increment_read_window_action (websocket , size );
1651
+ aws_channel_slot_increment_read_window (websocket -> channel_slot , size );
1655
1652
}
1656
1653
1657
1654
void aws_websocket_increment_read_window (struct aws_websocket * websocket , size_t size ) {
0 commit comments