Skip to content

Commit f5096ca

Browse files
Streams now have to be activated. This works around a race condition … (#200)
* Streams now have to be activated. This works around a race condition where the request kicks off before the stream container itself has had a chance to be seated by the user. * Reworked backpressure to be a connection-wide setting. Removed the request level option altogether. Added tests to verify unactivated streams still properly clean-up.
1 parent 8c12b8c commit f5096ca

30 files changed

+334
-149
lines changed

bin/elasticurl/main.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,7 @@ static void s_on_signing_complete(struct aws_http_message *request, int error_co
504504
fprintf(stderr, "failed to create request.");
505505
exit(1);
506506
}
507+
aws_http_stream_activate(stream);
507508

508509
/* Connection will stay alive until stream completes */
509510
aws_http_connection_release(app_ctx->connection);

include/aws/http/connection.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,15 @@ struct aws_http_client_connection_options {
210210
aws_http_on_client_connection_shutdown_fn *on_shutdown;
211211

212212
/**
213-
* If set to true, read back pressure mechanism will be enabled.
214-
*/
215-
bool enable_read_back_pressure;
213+
* Set to true to manually manage the read window size.
214+
*
215+
* If this is false, the connection will maintain a constant window size.
216+
*
217+
* If this is true, the caller must manually increment the window size using aws_http_stream_update_window().
218+
* If the window is not incremented, it will shrink by the amount of body data received. If the window size
219+
* reaches 0, no further data will be received.
220+
**/
221+
bool manual_window_management;
216222
};
217223

218224
/**

include/aws/http/private/connection_impl.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ struct aws_http_connection {
7676
struct aws_atomic_var refcount;
7777

7878
/* Starts at either 1 or 2, increments by two with each new stream */
79-
struct aws_atomic_var next_stream_id;
79+
uint32_t next_stream_id;
8080

8181
union {
8282
struct aws_http_connection_client_data {
@@ -93,13 +93,16 @@ struct aws_http_connection {
9393
* Opposite is true on server connections */
9494
struct aws_http_connection_client_data *client_data;
9595
struct aws_http_connection_server_data *server_data;
96+
97+
bool manual_window_management;
9698
};
9799

98100
/* Gets a client connection up and running.
99101
* Responsible for firing on_setup and on_shutdown callbacks. */
100102
struct aws_http_client_bootstrap {
101103
struct aws_allocator *alloc;
102104
bool is_using_tls;
105+
bool manual_window_management;
103106
size_t initial_window_size;
104107
struct aws_http_connection_monitoring_options monitoring_options;
105108
void *user_data;
@@ -135,6 +138,8 @@ struct aws_crt_statistics_http1_channel *aws_h1_connection_get_statistics(struct
135138
/**
136139
* Gets the next available stream id within the connection. Valid for creating both h1 and h2 streams.
137140
*
141+
* This function is not thread-safe.
142+
*
138143
* Returns 0 if there was an error.
139144
*/
140145
AWS_HTTP_API

include/aws/http/private/h1_connection.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ AWS_EXTERN_C_BEGIN
2323
AWS_HTTP_API
2424
struct aws_http_connection *aws_http_connection_new_http1_1_server(
2525
struct aws_allocator *allocator,
26+
bool manual_window_management,
2627
size_t initial_window_size);
2728

2829
AWS_HTTP_API
2930
struct aws_http_connection *aws_http_connection_new_http1_1_client(
3031
struct aws_allocator *allocator,
32+
bool manual_window_management,
3133
size_t initial_window_size);
3234

3335
AWS_EXTERN_C_END

include/aws/http/private/h1_stream.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,8 @@ struct aws_h1_stream *aws_h1_stream_new_request_handler(const struct aws_http_re
5858

5959
AWS_EXTERN_C_END
6060

61+
/* we don't want this exported. We just want it to have external linkage between h1_stream and h1_connection compilation
62+
* units. it is defined in h1_connection.c */
63+
int aws_h1_stream_activate(struct aws_http_stream *stream);
64+
6165
#endif /* AWS_HTTP_H1_STREAM_H */

include/aws/http/private/h2_connection.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,13 @@ AWS_EXTERN_C_BEGIN
8484
AWS_HTTP_API
8585
struct aws_http_connection *aws_http_connection_new_http2_server(
8686
struct aws_allocator *allocator,
87+
bool manual_window_management,
8788
size_t initial_window_size);
8889

8990
AWS_HTTP_API
9091
struct aws_http_connection *aws_http_connection_new_http2_client(
9192
struct aws_allocator *allocator,
93+
bool manual_window_management,
9294
size_t initial_window_size);
9395

9496
AWS_EXTERN_C_END

include/aws/http/private/h2_stream.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,6 @@ enum aws_h2_stream_state aws_h2_stream_get_state(const struct aws_h2_stream *str
7575
/* Connection is ready to send frames from stream now */
7676
int aws_h2_stream_on_activated(struct aws_h2_stream *stream, bool *out_has_outgoing_data);
7777

78+
int aws_h2_stream_activate(struct aws_http_stream *stream);
79+
7880
#endif /* AWS_HTTP_H2_STREAM_H */

include/aws/http/private/request_response_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
struct aws_http_stream_vtable {
2626
void (*destroy)(struct aws_http_stream *stream);
2727
void (*update_window)(struct aws_http_stream *stream, size_t increment_size);
28+
int (*activate)(struct aws_http_stream *stream);
2829
};
2930

3031
/**

include/aws/http/private/websocket_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ struct aws_websocket_client_bootstrap_system_vtable {
8585
struct aws_http_stream *(*aws_http_connection_make_request)(
8686
struct aws_http_connection *client_connection,
8787
const struct aws_http_make_request_options *options);
88+
int (*aws_http_stream_activate)(struct aws_http_stream *stream);
8889
void (*aws_http_stream_release)(struct aws_http_stream *stream);
8990
struct aws_http_connection *(*aws_http_stream_get_connection)(const struct aws_http_stream *stream);
9091
int (*aws_http_stream_get_incoming_response_status)(const struct aws_http_stream *stream, int *out_status);

include/aws/http/request_response.h

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ typedef int(aws_http_on_incoming_header_block_done_fn)(
171171
* The data must be copied immediately if you wish to preserve it.
172172
* This is always invoked on the HTTP connection's event-loop thread.
173173
*
174-
* Note that, if the stream is using manual_window_management then the window
174+
* Note that, if the connection is using manual_window_management then the window
175175
* size has shrunk by the amount of body data received. If the window size
176176
* reaches 0 no further data will be received. Increment the window size with
177177
* aws_http_stream_update_window().
@@ -243,17 +243,6 @@ struct aws_http_make_request_options {
243243
* See `aws_http_on_stream_complete_fn`.
244244
*/
245245
aws_http_on_stream_complete_fn *on_complete;
246-
247-
/**
248-
* Set to true to manually manage the read window size.
249-
*
250-
* If this is false, the connection will maintain a constant window size.
251-
*
252-
* If this is true, the caller must manually increment the window size using aws_http_stream_update_window().
253-
* If the window is not incremented, it will shrink by the amount of body data received. If the window size
254-
* reaches 0, no further data will be received.
255-
*/
256-
bool manual_window_management;
257246
};
258247

259248
struct aws_http_request_handler_options {
@@ -305,17 +294,6 @@ struct aws_http_request_handler_options {
305294
* See `aws_http_on_stream_complete_fn`.
306295
*/
307296
aws_http_on_stream_complete_fn *on_complete;
308-
309-
/**
310-
* Set to true to manually manage the read window size.
311-
*
312-
* If this is false, the connection will maintain a constant window size.
313-
*
314-
* If this is true, the caller must manually increment the window size using aws_http_stream_update_window().
315-
* If the window is not incremented, it will shrink by the amount of body data received. If the window size
316-
* reaches 0, no further data will be received.
317-
*/
318-
bool manual_window_management;
319297
};
320298

321299
#define AWS_HTTP_REQUEST_HANDLER_OPTIONS_INIT \
@@ -634,7 +612,9 @@ int aws_http_message_erase_header(struct aws_http_message *message, size_t index
634612

635613
/**
636614
* Create a stream, with a client connection sending a request.
637-
* The request starts sending automatically once the stream is created.
615+
* The request does not start sending automatically once the stream is created. You must call
616+
* aws_http_stream_activate to begin execution of the request.
617+
*
638618
* The `options` are copied during this call.
639619
*
640620
* Tip for language bindings: Do not bind the `options` struct. Use something more natural for your language,
@@ -664,6 +644,13 @@ struct aws_http_stream *aws_http_stream_new_server_request_handler(
664644
AWS_HTTP_API
665645
void aws_http_stream_release(struct aws_http_stream *stream);
666646

647+
/**
648+
* Only used for client initiated streams (immediately following a call to aws_http_connection_make_request).
649+
*
650+
* Activates the request's outgoing stream processing.
651+
*/
652+
AWS_HTTP_API int aws_http_stream_activate(struct aws_http_stream *stream);
653+
667654
AWS_HTTP_API
668655
struct aws_http_connection *aws_http_stream_get_connection(const struct aws_http_stream *stream);
669656

@@ -697,7 +684,8 @@ void aws_http_stream_update_window(struct aws_http_stream *stream, size_t increm
697684

698685
/**
699686
* Gets the Http/2 id associated with a stream. Even h1 streams have an id (using the same allocation procedure
700-
* as http/2) for easier tracking purposes.
687+
* as http/2) for easier tracking purposes. For client streams, this will only be non-zero after a successful call
688+
* to aws_http_stream_activate()
701689
*/
702690
uint32_t aws_http_stream_get_id(struct aws_http_stream *stream);
703691

include/aws/http/server.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,15 @@ struct aws_http_server_options {
105105
aws_http_server_on_destroy_fn *on_destroy_complete;
106106

107107
/**
108-
* If set to true, read back pressure mechanism will be enabled.
109-
*/
110-
bool enable_read_back_pressure;
108+
* Set to true to manually manage the read window size.
109+
*
110+
* If this is false, the connection will maintain a constant window size.
111+
*
112+
* If this is true, the caller must manually increment the window size using aws_http_stream_update_window().
113+
* If the window is not incremented, it will shrink by the amount of body data received. If the window size
114+
* reaches 0, no further data will be received.
115+
**/
116+
bool manual_window_management;
111117
};
112118

113119
/**

source/connection.c

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ struct aws_http_server {
5151
struct aws_allocator *alloc;
5252
struct aws_server_bootstrap *bootstrap;
5353
bool is_using_tls;
54+
bool manual_window_management;
5455
size_t initial_window_size;
5556
void *user_data;
5657
aws_http_server_on_incoming_connection_fn *on_incoming_connection;
@@ -83,6 +84,7 @@ static struct aws_http_connection *s_connection_new(
8384
struct aws_channel *channel,
8485
bool is_server,
8586
bool is_using_tls,
87+
bool manual_window_management,
8688
size_t initial_window_size) {
8789

8890
struct aws_channel_slot *connection_slot = NULL;
@@ -146,17 +148,19 @@ static struct aws_http_connection *s_connection_new(
146148
switch (version) {
147149
case AWS_HTTP_VERSION_1_1:
148150
if (is_server) {
149-
connection = aws_http_connection_new_http1_1_server(alloc, initial_window_size);
151+
connection =
152+
aws_http_connection_new_http1_1_server(alloc, manual_window_management, initial_window_size);
150153
} else {
151-
connection = aws_http_connection_new_http1_1_client(alloc, initial_window_size);
154+
connection =
155+
aws_http_connection_new_http1_1_client(alloc, manual_window_management, initial_window_size);
152156
}
153157
break;
154158
case AWS_HTTP_VERSION_2:
155159
AWS_FATAL_ASSERT(false && "H2 is not currently supported"); /* lol nice try */
156160
if (is_server) {
157-
connection = aws_http_connection_new_http2_server(alloc, initial_window_size);
161+
connection = aws_http_connection_new_http2_server(alloc, manual_window_management, initial_window_size);
158162
} else {
159-
connection = aws_http_connection_new_http2_client(alloc, initial_window_size);
163+
connection = aws_http_connection_new_http2_client(alloc, manual_window_management, initial_window_size);
160164
}
161165
break;
162166
default:
@@ -293,7 +297,13 @@ static void s_server_bootstrap_on_accept_channel_setup(
293297
goto error;
294298
}
295299
/* Create connection */
296-
connection = s_connection_new(server->alloc, channel, true, server->is_using_tls, server->initial_window_size);
300+
connection = s_connection_new(
301+
server->alloc,
302+
channel,
303+
true,
304+
server->is_using_tls,
305+
server->manual_window_management,
306+
server->initial_window_size);
297307
if (!connection) {
298308
AWS_LOGF_ERROR(
299309
AWS_LS_HTTP_SERVER,
@@ -466,6 +476,7 @@ struct aws_http_server *aws_http_server_new(const struct aws_http_server_options
466476
server->user_data = options->server_user_data;
467477
server->on_incoming_connection = options->on_incoming_connection;
468478
server->on_destroy_complete = options->on_destroy_complete;
479+
server->manual_window_management = options->manual_window_management;
469480

470481
int err = aws_mutex_init(&server->synced_data.lock);
471482
if (err) {
@@ -490,7 +501,7 @@ struct aws_http_server *aws_http_server_new(const struct aws_http_server_options
490501
}
491502

492503
struct aws_server_socket_channel_bootstrap_options bootstrap_options = {
493-
.enable_read_back_pressure = options->enable_read_back_pressure,
504+
.enable_read_back_pressure = options->manual_window_management,
494505
.tls_options = options->tls_options,
495506
.bootstrap = options->bootstrap,
496507
.socket_options = options->socket_options,
@@ -613,7 +624,12 @@ static void s_client_bootstrap_on_channel_setup(
613624
AWS_LOGF_TRACE(AWS_LS_HTTP_CONNECTION, "static: Socket connected, creating client connection object.");
614625

615626
http_bootstrap->connection = s_connection_new(
616-
http_bootstrap->alloc, channel, false, http_bootstrap->is_using_tls, http_bootstrap->initial_window_size);
627+
http_bootstrap->alloc,
628+
channel,
629+
false,
630+
http_bootstrap->is_using_tls,
631+
http_bootstrap->manual_window_management,
632+
http_bootstrap->initial_window_size);
617633
if (!http_bootstrap->connection) {
618634
AWS_LOGF_ERROR(
619635
AWS_LS_HTTP_CONNECTION,
@@ -743,6 +759,7 @@ int aws_http_client_connect_internal(
743759

744760
http_bootstrap->alloc = options->allocator;
745761
http_bootstrap->is_using_tls = options->tls_options != NULL;
762+
http_bootstrap->manual_window_management = options->manual_window_management;
746763
http_bootstrap->initial_window_size = options->initial_window_size;
747764
http_bootstrap->user_data = options->user_data;
748765
http_bootstrap->on_setup = options->on_setup;
@@ -766,7 +783,7 @@ int aws_http_client_connect_internal(
766783
.tls_options = options->tls_options,
767784
.setup_callback = s_client_bootstrap_on_channel_setup,
768785
.shutdown_callback = s_client_bootstrap_on_channel_shutdown,
769-
.enable_read_back_pressure = options->enable_read_back_pressure,
786+
.enable_read_back_pressure = options->manual_window_management,
770787
.user_data = http_bootstrap,
771788
};
772789

@@ -845,13 +862,15 @@ static const uint32_t MAX_STREAM_ID = UINT32_MAX >> 1;
845862

846863
uint32_t aws_http_connection_get_next_stream_id(struct aws_http_connection *connection) {
847864

848-
uint32_t next_id = (uint32_t)aws_atomic_fetch_add(&connection->next_stream_id, 2);
849-
/* If next fetch would overflow next_stream_id, set it to 0 */
865+
uint32_t next_id = connection->next_stream_id;
866+
850867
if (AWS_UNLIKELY(next_id > MAX_STREAM_ID)) {
851868
AWS_LOGF_INFO(AWS_LS_HTTP_CONNECTION, "id=%p: All available stream ids are gone", (void *)connection);
852869

853870
next_id = 0;
854871
aws_raise_error(AWS_ERROR_HTTP_STREAM_IDS_EXHAUSTED);
872+
} else {
873+
connection->next_stream_id += 2;
855874
}
856875

857876
return next_id;

source/connection_manager.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,7 @@ static int s_aws_http_connection_manager_new_connection(struct aws_http_connecti
691691
options.socket_options = &manager->socket_options;
692692
options.on_setup = s_aws_http_connection_manager_on_connection_setup;
693693
options.on_shutdown = s_aws_http_connection_manager_on_connection_shutdown;
694-
options.enable_read_back_pressure = manager->enable_read_back_pressure;
694+
options.manual_window_management = manager->enable_read_back_pressure;
695695

696696
if (aws_http_connection_monitoring_options_is_valid(&manager->monitoring_options)) {
697697
options.monitoring_options = &manager->monitoring_options;

0 commit comments

Comments
 (0)