From ba5c2a72b3ae10f1b9e69b14d0604ffbee217913 Mon Sep 17 00:00:00 2001 From: Cliff Jansen Date: Wed, 23 Oct 2024 09:17:06 -0700 Subject: [PATCH 1/3] PROTON-2857: improve sending performance with session flow control --- c/src/core/engine-internal.h | 1 + c/src/core/engine.c | 25 ++++++++++++++++++++++--- c/src/core/transport.c | 20 ++++++++++++++------ 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h index 62aa12434..1de6c91f9 100644 --- a/c/src/core/engine-internal.h +++ b/c/src/core/engine-internal.h @@ -361,6 +361,7 @@ struct pn_delivery_t { pn_delivery_state_t state; pn_buffer_t *bytes; pn_record_t *context; + uint32_t bytes_offset; // start of content remaining to send on transport bool updated; bool settled; // tracks whether we're in the unsettled list or not bool work; diff --git a/c/src/core/engine.c b/c/src/core/engine.c index 4ad8c4b05..bddd155ac 100644 --- a/c/src/core/engine.c +++ b/c/src/core/engine.c @@ -1627,6 +1627,7 @@ static void pn_delivery_finalize(void *object) pn_bytes_free(delivery->tag); delivery->tag = (pn_delivery_tag_t){0, NULL}; pn_buffer_clear(delivery->bytes); + delivery->bytes_offset = 0; pn_record_clear(delivery->context); delivery->settled = true; pn_connection_t *conn = link->session->connection; @@ -1728,6 +1729,7 @@ pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag) delivery->tpwork_prev = NULL; delivery->tpwork = false; pn_buffer_clear(delivery->bytes); + delivery->bytes_offset = 0; delivery->done = false; delivery->aborted = false; pn_record_clear(delivery->context); @@ -1751,6 +1753,12 @@ pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag) return delivery; } +static uint32_t pni_delivery_buffer_size(pn_delivery_t *delivery) +{ + assert(pn_buffer_size(delivery->bytes) >= delivery->bytes_offset); + return pn_buffer_size(delivery->bytes) - delivery->bytes_offset; +} + bool pn_delivery_buffered(pn_delivery_t *delivery) { assert(delivery); @@ -1760,7 +1768,7 @@ bool pn_delivery_buffered(pn_delivery_t *delivery) if (state->sent) { return false; } else { - return delivery->done || (pn_buffer_size(delivery->bytes) > 0); + return delivery->done || (pni_delivery_buffer_size(delivery) > 0); } } else { return false; @@ -2062,6 +2070,16 @@ ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n) pn_delivery_t *current = pn_link_current(sender); if (!current) return PN_EOS; if (!bytes || !n) return 0; + // Performance optimization: if alternating several writes into and multiple reads + // from the bytes pn_buffer_t, only a single trim+defrag is necessary at transition + // from read to write. + if (current->bytes_offset) { + // Streaming message. Update bytes buffer accounting. + pn_buffer_trim(current->bytes, current->bytes_offset, 0); + current->bytes_offset = 0; + // Expensive defrag/rotate is here. Future calls to pn_buffer_bytes() are fast if no trim. + pn_buffer_bytes(current->bytes); + } pn_buffer_append(current->bytes, bytes, n); sender->session->outgoing_bytes += n; pni_add_tpwork(current); @@ -2259,7 +2277,7 @@ size_t pn_delivery_pending(pn_delivery_t *delivery) the PN_ABORTED error return code. */ if (delivery->aborted) return 1; - return pn_buffer_size(delivery->bytes); + return pni_delivery_buffer_size(delivery); } bool pn_delivery_partial(pn_delivery_t *delivery) @@ -2271,8 +2289,9 @@ void pn_delivery_abort(pn_delivery_t *delivery) { if (!delivery->local.settled) { /* Can't abort a settled delivery */ delivery->aborted = true; pn_delivery_settle(delivery); - delivery->link->session->outgoing_bytes -= pn_buffer_size(delivery->bytes); + delivery->link->session->outgoing_bytes -= pni_delivery_buffer_size(delivery); pn_buffer_clear(delivery->bytes); + delivery->bytes_offset = 0; } } diff --git a/c/src/core/transport.c b/c/src/core/transport.c index d89625797..b7554d718 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -2209,15 +2209,22 @@ static int pni_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *d pn_session_state_t *ssn_state = &link->session->state; pn_link_state_t *link_state = &link->state; bool xfr_posted = false; + uint32_t unsent = pn_buffer_size(delivery->bytes) - delivery->bytes_offset; if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) { - if (!state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) && + if (!state->sent && (delivery->done || unsent > 0) && ssn_state->remote_incoming_window > 0 && link_state->link_credit > 0) { if (!state->init) { state = pni_delivery_map_push(&ssn_state->outgoing, delivery); } - + // Content may be consumed in chunks via multiple calls to transport_produce(). + // pn_link_send() ensures next call is always fast: buffer already in rotated state. pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes); - size_t full_size = bytes.size; + if (delivery->bytes_offset) { + // Account for previous sent data while avoiding expensive buffer rotate. + bytes.size -= delivery->bytes_offset; + bytes.start += delivery->bytes_offset; + } + size_t remaining_size = bytes.size; int count = pni_post_amqp_transfer_frame(transport, ssn_state->local_channel, link_state->local_handle, @@ -2237,10 +2244,11 @@ static int pni_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *d ssn_state->outgoing_transfer_count += count; ssn_state->remote_incoming_window -= count; - int sent = full_size - bytes.size; - pn_buffer_trim(delivery->bytes, sent, 0); + int sent = remaining_size - bytes.size; + delivery->bytes_offset += sent; link->session->outgoing_bytes -= sent; - if (!pn_buffer_size(delivery->bytes) && delivery->done) { + remaining_size -= sent; + if (!remaining_size && delivery->done) { state->sent = true; link_state->delivery_count++; link_state->link_credit--; From 5c319fe3b786fab8a185a6ff3afc9edfeccf08cd Mon Sep 17 00:00:00 2001 From: Cliff Jansen Date: Wed, 23 Oct 2024 09:56:03 -0700 Subject: [PATCH 2/3] PROTON-2858: Improve scheduling fairness for outgoing streaming messages --- c/src/core/engine.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/c/src/core/engine.c b/c/src/core/engine.c index bddd155ac..51c7b3114 100644 --- a/c/src/core/engine.c +++ b/c/src/core/engine.c @@ -2079,6 +2079,12 @@ ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n) current->bytes_offset = 0; // Expensive defrag/rotate is here. Future calls to pn_buffer_bytes() are fast if no trim. pn_buffer_bytes(current->bytes); + if (current->tpwork) { + // Some content was sent: bytes_offset > 0. Give other senders a turn. + pn_connection_t *connection = current->link->session->connection; + LL_REMOVE(connection, tpwork, current); + current->tpwork = false; + } } pn_buffer_append(current->bytes, bytes, n); sender->session->outgoing_bytes += n; From 21ce8ca6d4cf8eba351feccdc74895a7523a10fe Mon Sep 17 00:00:00 2001 From: Cliff Jansen Date: Wed, 23 Oct 2024 10:13:09 -0700 Subject: [PATCH 3/3] PROTON-2859: Improve performance of pn_buffer_t defrag --- c/src/core/buffer.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/c/src/core/buffer.c b/c/src/core/buffer.c index dc3a753d6..7ef3e0fe2 100644 --- a/c/src/core/buffer.c +++ b/c/src/core/buffer.c @@ -236,7 +236,10 @@ static void pn_buffer_rotate (pn_buffer_t *buf, size_t sz) { static inline int pn_buffer_defrag(pn_buffer_t *buf) { - pn_buffer_rotate(buf, buf->start); + if (pni_buffer_wrapped(buf)) + pn_buffer_rotate(buf, buf->start); + else + memmove(buf->bytes, buf->bytes + buf->start, buf->size); buf->start = 0; return 0; }