Skip to content

Commit 3d72f46

Browse files
authored
HTTP stream telemetry api (#433)
1 parent c9faa8a commit 3d72f46

17 files changed

+224
-4
lines changed

.github/workflows/ci.yml

+3-3
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ jobs:
158158
- name: Build and test
159159
run: |
160160
python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder.pyz')"
161-
python3 builder.pyz build -p aws-c-http --cmake-extra=-DENABLE_LOCALHOST_INTEGRATION_TESTS=ON
161+
python3 builder.pyz build -p aws-c-http --cmake-extra=-DENABLE_LOCALHOST_INTEGRATION_TESTS=ON --config Debug
162162
163163
localhost-test-mac:
164164
runs-on: macos-11 # latest
@@ -168,7 +168,7 @@ jobs:
168168
- name: Build and test
169169
run: |
170170
python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder.pyz')"
171-
python3 builder.pyz build -p aws-c-http --cmake-extra=-DENABLE_LOCALHOST_INTEGRATION_TESTS=ON
171+
python3 builder.pyz build -p aws-c-http --cmake-extra=-DENABLE_LOCALHOST_INTEGRATION_TESTS=ON --config Debug
172172
173173
localhost-test-win:
174174
runs-on: windows-2022 # latest
@@ -178,4 +178,4 @@ jobs:
178178
- name: Build and test
179179
run: |
180180
python -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder.pyz')"
181-
python builder.pyz build -p aws-c-http --cmake-extra=-DENABLE_LOCALHOST_INTEGRATION_TESTS=ON
181+
python builder.pyz build -p aws-c-http --cmake-extra=-DENABLE_LOCALHOST_INTEGRATION_TESTS=ON --config Debug

include/aws/http/connection.h

+7
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
struct aws_client_bootstrap;
1212
struct aws_socket_options;
13+
struct aws_socket_endpoint;
1314
struct aws_tls_connection_options;
1415
struct aws_http2_setting;
1516
struct proxy_env_var_settings;
@@ -512,6 +513,12 @@ enum aws_http_version aws_http_connection_get_version(const struct aws_http_conn
512513
AWS_HTTP_API
513514
struct aws_channel *aws_http_connection_get_channel(struct aws_http_connection *connection);
514515

516+
/**
517+
* Returns the remote endpoint of the HTTP connection.
518+
*/
519+
AWS_HTTP_API
520+
const struct aws_socket_endpoint *aws_http_connection_get_remote_endpoint(const struct aws_http_connection *connection);
521+
515522
/**
516523
* Initialize an map copied from the *src map, which maps `struct aws_string *` to `enum aws_http_version`.
517524
*/

include/aws/http/private/h2_connection.h

+1
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ struct aws_h2_connection {
125125
uint64_t outgoing_timestamp_ns;
126126
/* Timestamp when connection has data to receive, which is when there is an active stream */
127127
uint64_t incoming_timestamp_ns;
128+
128129
} thread_data;
129130

130131
/* Any thread may touch this data, but the lock must be held (unless it's an atomic) */

include/aws/http/private/request_response_impl.h

+2
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@ struct aws_http_stream {
4343
aws_http_on_incoming_headers_fn *on_incoming_headers;
4444
aws_http_on_incoming_header_block_done_fn *on_incoming_header_block_done;
4545
aws_http_on_incoming_body_fn *on_incoming_body;
46+
aws_http_on_stream_metrics_fn *on_metrics;
4647
aws_http_on_stream_complete_fn *on_complete;
4748
aws_http_on_stream_destroy_fn *on_destroy;
4849

4950
struct aws_atomic_var refcount;
5051
enum aws_http_method request_method;
52+
struct aws_http_stream_metrics metrics;
5153

5254
union {
5355
struct aws_http_stream_client_data {

include/aws/http/request_response.h

+53
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,46 @@ typedef void(aws_http_on_stream_complete_fn)(struct aws_http_stream *stream, int
194194
*/
195195
typedef void(aws_http_on_stream_destroy_fn)(void *user_data);
196196

197+
/**
198+
* Tracing metrics for aws_http_stream.
199+
* Data maybe not be available if the data of stream was never sent/received before it completes.
200+
*/
201+
struct aws_http_stream_metrics {
202+
/* The time stamp when the request started to be encoded. -1 means data not available. Timestamp
203+
* are from `aws_high_res_clock_get_ticks` */
204+
int64_t send_start_timestamp_ns;
205+
/* The time stamp when the request finished to be encoded. -1 means data not available.
206+
* Timestamp are from `aws_high_res_clock_get_ticks` */
207+
int64_t send_end_timestamp_ns;
208+
/* The time duration for the request from start encoding to finish encoding (send_end_timestamp_ns -
209+
* send_start_timestamp_ns). -1 means data not available. */
210+
int64_t sending_duration_ns;
211+
212+
/* The time stamp when the response started to be received from the network channel. -1 means data not available.
213+
* Timestamp are from `aws_high_res_clock_get_ticks` */
214+
int64_t receive_start_timestamp_ns;
215+
/* The time stamp when the response finished to be received from the network channel. -1 means data not available.
216+
* Timestamp are from `aws_high_res_clock_get_ticks` */
217+
int64_t receive_end_timestamp_ns;
218+
/* The time duration for the request from start receiving to finish receiving. receive_end_timestamp_ns -
219+
* receive_start_timestamp_ns. -1 means data not available. */
220+
int64_t receiving_duration_ns;
221+
222+
/* The stream-id on the connection when this stream was activated. */
223+
uint32_t stream_id;
224+
};
225+
226+
/**
227+
* Invoked right before request/response stream is complete to report the tracing metrics for aws_http_stream.
228+
* This may be invoked synchronously when aws_http_stream_release() is called.
229+
* This is invoked even if the stream is never activated.
230+
* See `aws_http_stream_metrics` for details.
231+
*/
232+
typedef void(aws_http_on_stream_metrics_fn)(
233+
struct aws_http_stream *stream,
234+
const struct aws_http_stream_metrics *metrics,
235+
void *user_data);
236+
197237
/**
198238
* Options for creating a stream which sends a request from the client and receives a response from the server.
199239
*/
@@ -234,6 +274,13 @@ struct aws_http_make_request_options {
234274
*/
235275
aws_http_on_incoming_body_fn *on_response_body;
236276

277+
/**
278+
* Invoked right before stream is complete, whether successful or unsuccessful
279+
* Optional.
280+
* See `aws_http_on_stream_metrics_fn`
281+
*/
282+
aws_http_on_stream_metrics_fn *on_metrics;
283+
237284
/**
238285
* Invoked when request/response stream is complete, whether successful or unsuccessful
239286
* Optional.
@@ -972,6 +1019,12 @@ AWS_HTTP_API
9721019
struct aws_http_stream *aws_http_stream_new_server_request_handler(
9731020
const struct aws_http_request_handler_options *options);
9741021

1022+
/**
1023+
* Acquire refcount on the stream to prevent it from being cleaned up until it is released.
1024+
*/
1025+
AWS_HTTP_API
1026+
struct aws_http_stream *aws_http_stream_acquire(struct aws_http_stream *stream);
1027+
9751028
/**
9761029
* Users must release the stream when they are done with it, or its memory will never be cleaned up.
9771030
* This will not cancel the stream, its callbacks will still fire if the stream is still in progress.

source/connection.c

+11
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <aws/io/channel_bootstrap.h>
1919
#include <aws/io/logging.h>
2020
#include <aws/io/socket.h>
21+
#include <aws/io/socket_channel_handler.h>
2122
#include <aws/io/tls_channel_handler.h>
2223

2324
#ifdef _MSC_VER
@@ -366,6 +367,16 @@ struct aws_channel *aws_http_connection_get_channel(struct aws_http_connection *
366367
return connection->channel_slot->channel;
367368
}
368369

370+
const struct aws_socket_endpoint *aws_http_connection_get_remote_endpoint(
371+
const struct aws_http_connection *connection) {
372+
AWS_ASSERT(connection);
373+
struct aws_channel *channel = connection->channel_slot->channel;
374+
/* The first slot for an HTTP connection is always socket */
375+
struct aws_channel_slot *socket_slot = aws_channel_get_first_slot(channel);
376+
const struct aws_socket *socket = aws_socket_handler_get_socket(socket_slot->handler);
377+
return &socket->remote_endpoint;
378+
}
379+
369380
int aws_http_alpn_map_init(struct aws_allocator *allocator, struct aws_hash_table *map) {
370381
AWS_ASSERT(allocator);
371382
AWS_ASSERT(map);

source/h1_connection.c

+36
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ int aws_h1_stream_activate(struct aws_http_stream *stream) {
371371

372372
/* connection keeps activated stream alive until stream completes */
373373
aws_atomic_fetch_add(&stream->refcount, 1);
374+
stream->metrics.stream_id = stream->id;
374375

375376
if (should_schedule_task) {
376377
AWS_LOGF_TRACE(
@@ -633,6 +634,10 @@ static void s_stream_complete(struct aws_h1_stream *stream, int error_code) {
633634
aws_h1_chunk_complete_and_destroy(chunk, &stream->base, AWS_ERROR_HTTP_STREAM_HAS_COMPLETED);
634635
}
635636

637+
if (stream->base.on_metrics) {
638+
stream->base.on_metrics(&stream->base, &stream->base.metrics, stream->base.user_data);
639+
}
640+
636641
/* Invoke callback and clean up stream. */
637642
if (stream->base.on_complete) {
638643
stream->base.on_complete(&stream->base, error_code, stream->base.user_data);
@@ -731,6 +736,12 @@ static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connecti
731736
/* If current stream is done sending data... */
732737
if (current && !aws_h1_encoder_is_message_in_progress(&connection->thread_data.encoder)) {
733738
current->is_outgoing_message_done = true;
739+
AWS_ASSERT(current->base.metrics.send_end_timestamp_ns == -1);
740+
aws_high_res_clock_get_ticks((uint64_t *)&current->base.metrics.send_end_timestamp_ns);
741+
AWS_ASSERT(current->base.metrics.send_start_timestamp_ns != -1);
742+
AWS_ASSERT(current->base.metrics.send_end_timestamp_ns >= current->base.metrics.send_start_timestamp_ns);
743+
current->base.metrics.sending_duration_ns =
744+
current->base.metrics.send_end_timestamp_ns - current->base.metrics.send_start_timestamp_ns;
734745

735746
/* RFC-7230 section 6.6: Tear-down.
736747
* If this was the final stream, don't allows any further streams to be sent */
@@ -801,9 +812,13 @@ static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connecti
801812
s_set_outgoing_stream_ptr(connection, current);
802813

803814
if (current) {
815+
AWS_ASSERT(current->base.metrics.send_start_timestamp_ns == -1);
816+
aws_high_res_clock_get_ticks((uint64_t *)&current->base.metrics.send_start_timestamp_ns);
817+
804818
err = aws_h1_encoder_start_message(
805819
&connection->thread_data.encoder, &current->encoder_message, &current->base);
806820
(void)err;
821+
AWS_ASSERT(connection->thread_data.encoder.state == AWS_H1_ENCODER_STATE_INIT);
807822
AWS_ASSERT(!err);
808823
}
809824

@@ -1110,6 +1125,15 @@ static int s_decoder_on_header(const struct aws_h1_decoded_header *header, void
11101125
"id=%p: Received 'Connection: close' header, no more request data will be sent.",
11111126
(void *)&incoming_stream->base);
11121127
incoming_stream->is_outgoing_message_done = true;
1128+
AWS_ASSERT(incoming_stream->base.metrics.send_end_timestamp_ns == -1);
1129+
aws_high_res_clock_get_ticks((uint64_t *)&incoming_stream->base.metrics.send_end_timestamp_ns);
1130+
AWS_ASSERT(incoming_stream->base.metrics.send_start_timestamp_ns != -1);
1131+
AWS_ASSERT(
1132+
incoming_stream->base.metrics.send_end_timestamp_ns >=
1133+
incoming_stream->base.metrics.send_start_timestamp_ns);
1134+
incoming_stream->base.metrics.sending_duration_ns =
1135+
incoming_stream->base.metrics.send_end_timestamp_ns -
1136+
incoming_stream->base.metrics.send_start_timestamp_ns;
11131137
}
11141138
/* Stop writing right now.
11151139
* Shutdown will be scheduled after we finishing parsing the response */
@@ -1270,6 +1294,13 @@ static int s_decoder_on_done(void *user_data) {
12701294

12711295
/* Otherwise the incoming stream is finished decoding and we will update it if needed */
12721296
incoming_stream->is_incoming_message_done = true;
1297+
aws_high_res_clock_get_ticks((uint64_t *)&incoming_stream->base.metrics.receive_end_timestamp_ns);
1298+
AWS_ASSERT(incoming_stream->base.metrics.receive_start_timestamp_ns != -1);
1299+
AWS_ASSERT(
1300+
incoming_stream->base.metrics.receive_end_timestamp_ns >=
1301+
incoming_stream->base.metrics.receive_start_timestamp_ns);
1302+
incoming_stream->base.metrics.receiving_duration_ns = incoming_stream->base.metrics.receive_end_timestamp_ns -
1303+
incoming_stream->base.metrics.receive_start_timestamp_ns;
12731304

12741305
/* RFC-7230 section 6.6
12751306
* After reading the final message, the connection must not read any more */
@@ -1822,6 +1853,11 @@ static int s_try_process_next_stream_read_message(struct aws_h1_connection *conn
18221853
bool body_headers_ignored = incoming_stream->base.request_method == AWS_HTTP_METHOD_HEAD;
18231854
aws_h1_decoder_set_body_headers_ignored(connection->thread_data.incoming_stream_decoder, body_headers_ignored);
18241855

1856+
if (incoming_stream->base.metrics.receive_start_timestamp_ns == -1) {
1857+
/* That's the first time for the stream receives any message */
1858+
aws_high_res_clock_get_ticks((uint64_t *)&incoming_stream->base.metrics.receive_start_timestamp_ns);
1859+
}
1860+
18251861
/* As decoder runs, it invokes the internal s_decoder_X callbacks, which in turn invoke user callbacks.
18261862
* The decoder will stop once it hits the end of the request/response OR the end of the message data. */
18271863
if (aws_h1_decode(connection->thread_data.incoming_stream_decoder, &message_cursor)) {

source/h1_stream.c

+7
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,12 @@ static struct aws_h1_stream *s_stream_new_common(
361361
stream->base.on_incoming_body = on_incoming_body;
362362
stream->base.on_complete = on_complete;
363363
stream->base.on_destroy = on_destroy;
364+
stream->base.metrics.send_start_timestamp_ns = -1;
365+
stream->base.metrics.send_end_timestamp_ns = -1;
366+
stream->base.metrics.sending_duration_ns = -1;
367+
stream->base.metrics.receive_start_timestamp_ns = -1;
368+
stream->base.metrics.receive_end_timestamp_ns = -1;
369+
stream->base.metrics.receiving_duration_ns = -1;
364370

365371
aws_channel_task_init(
366372
&stream->cross_thread_work_task, s_stream_cross_thread_work_task, stream, "http1_stream_cross_thread_work");
@@ -401,6 +407,7 @@ struct aws_h1_stream *aws_h1_stream_new_request(
401407

402408
stream->base.client_data = &stream->base.client_or_server_data.client;
403409
stream->base.client_data->response_status = AWS_HTTP_STATUS_CODE_UNKNOWN;
410+
stream->base.on_metrics = options->on_metrics;
404411

405412
/* Validate request and cache info that the encoder will eventually need */
406413
if (aws_h1_encoder_message_init_from_request(

source/h2_connection.c

+1
Original file line numberDiff line numberDiff line change
@@ -2057,6 +2057,7 @@ int aws_h2_stream_activate(struct aws_http_stream *stream) {
20572057

20582058
/* connection keeps activated stream alive until stream completes */
20592059
aws_atomic_fetch_add(&stream->refcount, 1);
2060+
stream->metrics.stream_id = stream->id;
20602061

20612062
if (!was_cross_thread_work_scheduled) {
20622063
CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task");

source/h2_stream.c

+31-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include <aws/http/private/h2_stream.h>
77

8+
#include <aws/common/clock.h>
89
#include <aws/http/private/h2_connection.h>
910
#include <aws/http/private/strutil.h>
1011
#include <aws/http/status_code.h>
@@ -240,10 +241,17 @@ struct aws_h2_stream *aws_h2_stream_new_request(
240241
stream->base.on_incoming_headers = options->on_response_headers;
241242
stream->base.on_incoming_header_block_done = options->on_response_header_block_done;
242243
stream->base.on_incoming_body = options->on_response_body;
244+
stream->base.on_metrics = options->on_metrics;
243245
stream->base.on_complete = options->on_complete;
244246
stream->base.on_destroy = options->on_destroy;
245247
stream->base.client_data = &stream->base.client_or_server_data.client;
246248
stream->base.client_data->response_status = AWS_HTTP_STATUS_CODE_UNKNOWN;
249+
stream->base.metrics.send_start_timestamp_ns = -1;
250+
stream->base.metrics.send_end_timestamp_ns = -1;
251+
stream->base.metrics.sending_duration_ns = -1;
252+
stream->base.metrics.receive_start_timestamp_ns = -1;
253+
stream->base.metrics.receive_end_timestamp_ns = -1;
254+
stream->base.metrics.receiving_duration_ns = -1;
247255
aws_linked_list_init(&stream->thread_data.outgoing_writes);
248256
aws_linked_list_init(&stream->synced_data.pending_write_list);
249257

@@ -446,6 +454,9 @@ void aws_h2_stream_complete(struct aws_h2_stream *stream, int error_code) {
446454
s_h2_stream_destroy_pending_writes(stream);
447455

448456
/* Invoke callback */
457+
if (stream->base.on_metrics) {
458+
stream->base.on_metrics(&stream->base, &stream->base.metrics, stream->base.user_data);
459+
}
449460
if (stream->base.on_complete) {
450461
stream->base.on_complete(&stream->base, error_code, stream->base.user_data);
451462
}
@@ -706,7 +717,8 @@ int aws_h2_stream_on_activated(struct aws_h2_stream *stream, enum aws_h2_stream_
706717
AWS_H2_STREAM_LOGF(ERROR, stream, "Failed to create HEADERS frame: %s", aws_error_name(aws_last_error()));
707718
goto error;
708719
}
709-
720+
AWS_ASSERT(stream->base.metrics.send_start_timestamp_ns == -1);
721+
aws_high_res_clock_get_ticks((uint64_t *)&stream->base.metrics.send_start_timestamp_ns);
710722
/* Initialize the flow-control window size */
711723
stream->thread_data.window_size_peer =
712724
connection->thread_data.settings_peer[AWS_HTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
@@ -721,6 +733,11 @@ int aws_h2_stream_on_activated(struct aws_h2_stream *stream, enum aws_h2_stream_
721733
/* If stream has no body, then HEADERS frame marks the end of outgoing data */
722734
stream->thread_data.state = AWS_H2_STREAM_STATE_HALF_CLOSED_LOCAL;
723735
AWS_H2_STREAM_LOG(TRACE, stream, "Sending HEADERS with END_STREAM. State -> HALF_CLOSED_LOCAL");
736+
/* There is no further frames to be sent, now is the end timestamp of sending. */
737+
AWS_ASSERT(stream->base.metrics.send_end_timestamp_ns == -1);
738+
aws_high_res_clock_get_ticks((uint64_t *)&stream->base.metrics.send_end_timestamp_ns);
739+
stream->base.metrics.sending_duration_ns =
740+
stream->base.metrics.send_end_timestamp_ns - stream->base.metrics.send_start_timestamp_ns;
724741
}
725742

726743
if (s_h2_stream_has_outgoing_writes(stream)) {
@@ -798,6 +815,11 @@ int aws_h2_stream_encode_data_frame(
798815
*/
799816
if (input_stream_complete && ends_stream) {
800817
/* Done sending data. No more data will be sent. */
818+
AWS_ASSERT(stream->base.metrics.send_end_timestamp_ns == -1);
819+
aws_high_res_clock_get_ticks((uint64_t *)&stream->base.metrics.send_end_timestamp_ns);
820+
stream->base.metrics.sending_duration_ns =
821+
stream->base.metrics.send_end_timestamp_ns - stream->base.metrics.send_start_timestamp_ns;
822+
801823
if (stream->thread_data.state == AWS_H2_STREAM_STATE_HALF_CLOSED_REMOTE) {
802824
/* Both sides have sent END_STREAM */
803825
stream->thread_data.state = AWS_H2_STREAM_STATE_CLOSED;
@@ -841,6 +863,7 @@ struct aws_h2err aws_h2_stream_on_decoder_headers_begin(struct aws_h2_stream *st
841863
if (aws_h2err_failed(stream_err)) {
842864
return s_send_rst_and_close_stream(stream, stream_err);
843865
}
866+
aws_high_res_clock_get_ticks((uint64_t *)&stream->base.metrics.receive_start_timestamp_ns);
844867

845868
return AWS_H2ERR_SUCCESS;
846869
}
@@ -1150,6 +1173,13 @@ struct aws_h2err aws_h2_stream_on_decoder_end_stream(struct aws_h2_stream *strea
11501173
* an actual frame type. It's a flag on DATA or HEADERS frames, and we
11511174
* already checked the legality of those frames in their respective callbacks. */
11521175

1176+
AWS_ASSERT(stream->base.metrics.receive_start_timestamp_ns != -1);
1177+
AWS_ASSERT(stream->base.metrics.receive_end_timestamp_ns == -1);
1178+
aws_high_res_clock_get_ticks((uint64_t *)&stream->base.metrics.receive_end_timestamp_ns);
1179+
AWS_ASSERT(stream->base.metrics.receive_end_timestamp_ns >= stream->base.metrics.receive_start_timestamp_ns);
1180+
stream->base.metrics.receiving_duration_ns =
1181+
stream->base.metrics.receive_end_timestamp_ns - stream->base.metrics.receive_start_timestamp_ns;
1182+
11531183
if (stream->thread_data.content_length_received) {
11541184
if (stream->base.request_method != AWS_HTTP_METHOD_HEAD &&
11551185
stream->base.client_data->response_status != AWS_HTTP_STATUS_CODE_304_NOT_MODIFIED) {

source/request_response.c

+9
Original file line numberDiff line numberDiff line change
@@ -1107,6 +1107,15 @@ int aws_http_stream_send_response(struct aws_http_stream *stream, struct aws_htt
11071107
return stream->owning_connection->vtable->stream_send_response(stream, response);
11081108
}
11091109

1110+
struct aws_http_stream *aws_http_stream_acquire(struct aws_http_stream *stream) {
1111+
AWS_PRECONDITION(stream);
1112+
1113+
size_t prev_refcount = aws_atomic_fetch_add(&stream->refcount, 1);
1114+
AWS_LOGF_TRACE(
1115+
AWS_LS_HTTP_STREAM, "id=%p: Stream refcount acquired, %zu remaining.", (void *)stream, prev_refcount + 1);
1116+
return stream;
1117+
}
1118+
11101119
void aws_http_stream_release(struct aws_http_stream *stream) {
11111120
if (!stream) {
11121121
return;

0 commit comments

Comments
 (0)