Skip to content

Commit ad2858d

Browse files
Connection setup callback from connection manager is now guaranteed t… (#239)
Connection setup callback from connection manager is now guaranteed to run from the connection's thread, deleted some tests that made invalid assumptions about data access patterns.
1 parent 4fff6a8 commit ad2858d

File tree

3 files changed

+63
-79
lines changed

3 files changed

+63
-79
lines changed

source/connection_manager.c

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
#include <aws/io/socket.h>
2727
#include <aws/io/tls_channel_handler.h>
2828

29-
#include <aws/common/atomics.h>
3029
#include <aws/common/hash_table.h>
3130
#include <aws/common/linked_list.h>
3231
#include <aws/common/mutex.h>
@@ -39,7 +38,8 @@ static struct aws_http_connection_manager_system_vtable s_default_system_vtable
3938
.create_connection = aws_http_client_connect,
4039
.release_connection = aws_http_connection_release,
4140
.close_connection = aws_http_connection_close,
42-
.is_connection_open = aws_http_connection_is_open};
41+
.is_connection_open = aws_http_connection_is_open,
42+
};
4343

4444
const struct aws_http_connection_manager_system_vtable *g_aws_http_connection_manager_default_system_vtable_ptr =
4545
&s_default_system_vtable;
@@ -310,14 +310,47 @@ static bool s_aws_http_connection_manager_should_destroy(struct aws_http_connect
310310
* simply by setting the error_code and moving it to the current transaction's completion list.
311311
*/
312312
struct aws_http_connection_acquisition {
313+
struct aws_allocator *allocator;
313314
struct aws_linked_list_node node;
314315
struct aws_http_connection_manager *manager; /* Only used by logging */
315316
aws_http_connection_manager_on_connection_setup_fn *callback;
316317
void *user_data;
317318
struct aws_http_connection *connection;
318319
int error_code;
320+
struct aws_channel_task acquisition_task;
319321
};
320322

323+
static void s_connection_acquisition_task(
324+
struct aws_channel_task *channel_task,
325+
void *arg,
326+
enum aws_task_status status) {
327+
(void)channel_task;
328+
329+
struct aws_http_connection_acquisition *pending_acquisition = arg;
330+
331+
/* this is a channel task. If it is canceled, that means the channel shutdown. In that case, that's equivalent
332+
* to a closed connection. */
333+
if (status != AWS_TASK_STATUS_RUN_READY) {
334+
AWS_LOGF_WARN(
335+
AWS_LS_HTTP_CONNECTION_MANAGER,
336+
"id=%p: Failed to complete connection acquisition because the connection was closed",
337+
(void *)pending_acquisition->manager);
338+
pending_acquisition->callback(NULL, AWS_ERROR_HTTP_CONNECTION_CLOSED, pending_acquisition->user_data);
339+
/* release it back to prevent a leak of the connection count. */
340+
aws_http_connection_manager_release_connection(pending_acquisition->manager, pending_acquisition->connection);
341+
} else {
342+
AWS_LOGF_DEBUG(
343+
AWS_LS_HTTP_CONNECTION_MANAGER,
344+
"id=%p: Successfully completed connection acquisition with connection id=%p",
345+
(void *)pending_acquisition->manager,
346+
(void *)pending_acquisition->connection);
347+
pending_acquisition->callback(
348+
pending_acquisition->connection, pending_acquisition->error_code, pending_acquisition->user_data);
349+
}
350+
351+
aws_mem_release(pending_acquisition->allocator, pending_acquisition);
352+
}
353+
321354
/*
322355
* Invokes a set of connection acquisition completion callbacks.
323356
*
@@ -335,24 +368,38 @@ static void s_aws_http_connection_manager_complete_acquisitions(
335368
struct aws_http_connection_acquisition *pending_acquisition =
336369
AWS_CONTAINER_OF(node, struct aws_http_connection_acquisition, node);
337370

338-
pending_acquisition->callback(
339-
pending_acquisition->connection, pending_acquisition->error_code, pending_acquisition->user_data);
371+
if (pending_acquisition->error_code == AWS_OP_SUCCESS) {
372+
struct aws_channel *channel = aws_http_connection_get_channel(pending_acquisition->connection);
373+
AWS_PRECONDITION(channel);
374+
375+
/* For some workloads, going ahead and moving the connection callback to the connection's thread is a
376+
* substantial performance improvement so let's do that */
377+
if (!aws_channel_thread_is_callers_thread(channel)) {
378+
aws_channel_task_init(
379+
&pending_acquisition->acquisition_task,
380+
s_connection_acquisition_task,
381+
pending_acquisition,
382+
"s_connection_acquisition_task");
383+
aws_channel_schedule_task_now(channel, &pending_acquisition->acquisition_task);
384+
return;
385+
}
386+
AWS_LOGF_DEBUG(
387+
AWS_LS_HTTP_CONNECTION_MANAGER,
388+
"id=%p: Successfully completed connection acquisition with connection id=%p",
389+
(void *)pending_acquisition->manager,
390+
(void *)pending_acquisition->connection);
340391

341-
if (pending_acquisition->error_code != 0) {
392+
} else {
342393
AWS_LOGF_WARN(
343394
AWS_LS_HTTP_CONNECTION_MANAGER,
344-
"id=%p: Failed to completed connection acquisition with error_code %d(%s)",
395+
"id=%p: Failed to complete connection acquisition with error_code %d(%s)",
345396
(void *)pending_acquisition->manager,
346397
pending_acquisition->error_code,
347398
aws_error_str(pending_acquisition->error_code));
348-
} else {
349-
AWS_LOGF_DEBUG(
350-
AWS_LS_HTTP_CONNECTION_MANAGER,
351-
"id=%p: Successfully completed connection acquisition with connection id=%p",
352-
(void *)pending_acquisition->manager,
353-
(void *)pending_acquisition->connection);
354399
}
355400

401+
pending_acquisition->callback(
402+
pending_acquisition->connection, pending_acquisition->error_code, pending_acquisition->user_data);
356403
aws_mem_release(allocator, pending_acquisition);
357404
}
358405
}
@@ -856,6 +903,7 @@ void aws_http_connection_manager_acquire_connection(
856903
return;
857904
}
858905

906+
request->allocator = manager->allocator;
859907
request->callback = callback;
860908
request->user_data = user_data;
861909

tests/CMakeLists.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -388,9 +388,7 @@ add_net_test_case(test_connection_manager_acquire_release_mix)
388388
add_net_test_case(test_connection_manager_acquire_release_mix_synchronous)
389389
add_net_test_case(test_connection_manager_connect_callback_failure)
390390
add_net_test_case(test_connection_manager_connect_immediate_failure)
391-
add_net_test_case(test_connection_manager_success_then_cancel_pending_from_failure)
392391
add_net_test_case(test_connection_manager_proxy_setup_shutdown)
393-
add_net_test_case(test_connection_manager_proxy_acquire_single)
394392

395393
add_test_case(h1_server_sanity_check)
396394
add_test_case(h1_server_receive_1line_request)

tests/test_connection_manager.c

Lines changed: 3 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,9 @@ static int s_test_connection_manager_acquire_release_mix_synchronous(struct aws_
591591
(void)ctx;
592592

593593
struct cm_tester_options options = {
594-
.allocator = allocator, .max_connections = 5, .mock_table = &s_synchronous_mocks};
594+
.allocator = allocator,
595+
.max_connections = 5,
596+
};
595597

596598
ASSERT_SUCCESS(s_cm_tester_init(&options));
597599

@@ -669,39 +671,6 @@ static int s_test_connection_manager_connect_immediate_failure(struct aws_alloca
669671
}
670672
AWS_TEST_CASE(test_connection_manager_connect_immediate_failure, s_test_connection_manager_connect_immediate_failure);
671673

672-
static int s_test_connection_manager_success_then_cancel_pending_from_failure(
673-
struct aws_allocator *allocator,
674-
void *ctx) {
675-
(void)ctx;
676-
677-
struct cm_tester_options options = {
678-
.allocator = allocator, .max_connections = 1, .mock_table = &s_synchronous_mocks};
679-
680-
ASSERT_SUCCESS(s_cm_tester_init(&options));
681-
682-
s_add_mock_connections(1, AWS_NCRT_SUCCESS, true);
683-
s_add_mock_connections(1, AWS_NCRT_ERROR_FROM_CREATE, false);
684-
685-
s_acquire_connections(5);
686-
687-
ASSERT_SUCCESS(s_wait_on_connection_reply_count(1));
688-
689-
ASSERT_TRUE(s_tester.connection_errors == 0);
690-
691-
ASSERT_SUCCESS(s_release_connections(1, true));
692-
693-
ASSERT_SUCCESS(s_wait_on_connection_reply_count(5));
694-
695-
ASSERT_TRUE(s_tester.connection_errors == 4);
696-
697-
ASSERT_SUCCESS(s_cm_tester_clean_up());
698-
699-
return AWS_OP_SUCCESS;
700-
}
701-
AWS_TEST_CASE(
702-
test_connection_manager_success_then_cancel_pending_from_failure,
703-
s_test_connection_manager_success_then_cancel_pending_from_failure);
704-
705674
static int s_test_connection_manager_proxy_setup_shutdown(struct aws_allocator *allocator, void *ctx) {
706675
(void)ctx;
707676

@@ -724,34 +693,3 @@ static int s_test_connection_manager_proxy_setup_shutdown(struct aws_allocator *
724693
return AWS_OP_SUCCESS;
725694
}
726695
AWS_TEST_CASE(test_connection_manager_proxy_setup_shutdown, s_test_connection_manager_proxy_setup_shutdown);
727-
728-
static int s_test_connection_manager_proxy_acquire_single(struct aws_allocator *allocator, void *ctx) {
729-
(void)ctx;
730-
731-
struct aws_http_proxy_options proxy_options = {
732-
.host = aws_byte_cursor_from_c_str("127.0.0.1"),
733-
.port = 8080,
734-
};
735-
736-
struct cm_tester_options options = {
737-
.allocator = allocator,
738-
.max_connections = 1,
739-
.mock_table = &s_synchronous_mocks,
740-
.proxy_options = &proxy_options,
741-
};
742-
743-
ASSERT_SUCCESS(s_cm_tester_init(&options));
744-
745-
s_add_mock_connections(1, AWS_NCRT_SUCCESS, true);
746-
s_acquire_connections(1);
747-
s_wait_on_connection_reply_count(1);
748-
749-
ASSERT_TRUE(s_tester.connection_errors == 0);
750-
751-
ASSERT_SUCCESS(s_release_connections(1, true));
752-
753-
ASSERT_SUCCESS(s_cm_tester_clean_up());
754-
755-
return AWS_OP_SUCCESS;
756-
}
757-
AWS_TEST_CASE(test_connection_manager_proxy_acquire_single, s_test_connection_manager_proxy_acquire_single);

0 commit comments

Comments
 (0)