From 603466b5d8a02c367aa769169a1f5dec589335a4 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 15 Jul 2025 07:41:46 +0000 Subject: [PATCH 1/7] Use byte_size/1 instead of size/1 --- deps/amqp10_common/src/amqp10_binary_generator.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/deps/amqp10_common/src/amqp10_binary_generator.erl b/deps/amqp10_common/src/amqp10_binary_generator.erl index b628fcaaa15..96de7f9f42b 100644 --- a/deps/amqp10_common/src/amqp10_binary_generator.erl +++ b/deps/amqp10_common/src/amqp10_binary_generator.erl @@ -204,9 +204,9 @@ constructor(array) -> 16#f0; % use large array type for all nested arrays constructor({described, Descriptor, Primitive}) -> [16#00, generate1(Descriptor), constructor(Primitive)]. -generate2(symbol, {symbol, V}) -> [<<(size(V)):32>>, V]; -generate2(utf8, {utf8, V}) -> [<<(size(V)):32>>, V]; -generate2(binary, {binary, V}) -> [<<(size(V)):32>>, V]; +generate2(symbol, {symbol, V}) -> [<<(byte_size(V)):32>>, V]; +generate2(utf8, {utf8, V}) -> [<<(byte_size(V)):32>>, V]; +generate2(binary, {binary, V}) -> [<<(byte_size(V)):32>>, V]; generate2(boolean, true) -> 16#01; generate2(boolean, false) -> 16#00; generate2(boolean, {boolean, true}) -> 16#01; From 09f9a77799c90366fc697619366d77e731181d01 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 15 Jul 2025 07:25:09 +0000 Subject: [PATCH 2/7] Delete dead code --- deps/rabbit/src/mc_amqp.erl | 44 +++++++++++++++---------------------- 1 file changed, 18 insertions(+), 26 deletions(-) diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index 1c1c3b9d7f2..00a696f7cb7 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -500,37 +500,29 @@ maps_upsert(Key, TaggedVal, KVList) -> encode(Sections) when is_list(Sections) -> [amqp10_framing:encode_bin(Section) || Section <- Sections, - not is_empty(Section)]. + not omit(Section)]. -is_empty(#'v1_0.header'{durable = undefined, - priority = undefined, - ttl = undefined, - first_acquirer = undefined, - delivery_count = undefined}) -> +omit(#'v1_0.message_annotations'{content = []}) -> true; -is_empty(#'v1_0.delivery_annotations'{content = []}) -> +omit(#'v1_0.properties'{message_id = undefined, + user_id = undefined, + to = undefined, + subject = undefined, + reply_to = undefined, + correlation_id = undefined, + content_type = undefined, + content_encoding = undefined, + absolute_expiry_time = undefined, + creation_time = undefined, + group_id = undefined, + group_sequence = undefined, + reply_to_group_id = undefined}) -> true; -is_empty(#'v1_0.message_annotations'{content = []}) -> +omit(#'v1_0.application_properties'{content = []}) -> true; -is_empty(#'v1_0.properties'{message_id = undefined, - user_id = undefined, - to = undefined, - subject = undefined, - reply_to = undefined, - correlation_id = undefined, - content_type = undefined, - content_encoding = undefined, - absolute_expiry_time = undefined, - creation_time = undefined, - group_id = undefined, - group_sequence = undefined, - reply_to_group_id = undefined}) -> +omit(#'v1_0.footer'{content = []}) -> true; -is_empty(#'v1_0.application_properties'{content = []}) -> - true; -is_empty(#'v1_0.footer'{content = []}) -> - true; -is_empty(_) -> +omit(_) -> false. message_annotation(Key, State, Default) From bf23a7fb30290c303d269a09aac2b3da428ee50c Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 15 Jul 2025 11:01:01 +0000 Subject: [PATCH 3/7] Speed up AMQP 1.0 parser by generating less garbage This commit results in great performance improvements for AMQP 1.0. Stream filtering via AMQP SQL Filters or AMQP Property Filters where the broker reads messages from the stream into memory and the filter returns false, i.e. messages are not sent to the client: Before this commit: ~400,000 msgs/s After this commit: ~500,000 msgs/s There is also a ~10% increase in end-to-end throughput for normal AMQP workloads, e.g. when sending to and receiving from a classic queue. Prior to this commit, a lot of garbage was created leading to many minor garbage collections. This commit reduces the garbage being generated. The new module amqp10_composite performs very well: * Single tuple update setting multiple fields at once, sometimes done even in-place without copying the tuple. * The list of fields is passed into X registers, no need for any new allocations. On the serialisation side, this commit also generates less garbage by: 1. Avoiding tuple_to_list/1 2. Omitting trailing elements of the list that are null This will also lead to fewer bytes per message sent on the wire and less resource usage for the clients as they need to parse fewer fields. --- deps/amqp10_common/src/amqp10_composite.erl | 336 ++++++++++++++++++++ deps/amqp10_common/src/amqp10_framing.erl | 41 ++- deps/amqp10_common/test/prop_SUITE.erl | 119 ++++++- 3 files changed, 488 insertions(+), 8 deletions(-) create mode 100644 deps/amqp10_common/src/amqp10_composite.erl diff --git a/deps/amqp10_common/src/amqp10_composite.erl b/deps/amqp10_common/src/amqp10_composite.erl new file mode 100644 index 00000000000..be86c1f0de3 --- /dev/null +++ b/deps/amqp10_common/src/amqp10_composite.erl @@ -0,0 +1,336 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(amqp10_composite). + +-include("amqp10_framing.hrl"). + +-export([flow/2, + transfer/2, + disposition/2, + header/2, + properties/2]). + +-spec flow(#'v1_0.flow'{}, nonempty_list()) -> + #'v1_0.flow'{}. +flow(F, [F1, F2, F3, F4]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4)}; +flow(F, [F1, F2, F3, F4, F5]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4), + handle = ntu(F5)}; +flow(F, [F1, F2, F3, F4, F5, F6]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4), + handle = ntu(F5), + delivery_count = ntu(F6)}; +flow(F, [F1, F2, F3, F4, F5, F6, F7]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4), + handle = ntu(F5), + delivery_count = ntu(F6), + link_credit = ntu(F7)}; +flow(F, [F1, F2, F3, F4, F5, F6, F7, F8]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4), + handle = ntu(F5), + delivery_count = ntu(F6), + link_credit = ntu(F7), + available = ntu(F8)}; +flow(F, [F1, F2, F3, F4, F5, F6, F7, F8, F9]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4), + handle = ntu(F5), + delivery_count = ntu(F6), + link_credit = ntu(F7), + available = ntu(F8), + drain = ntu(F9)}; +flow(F, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4), + handle = ntu(F5), + delivery_count = ntu(F6), + link_credit = ntu(F7), + available = ntu(F8), + drain = ntu(F9), + echo = ntu(F10)}; +flow(F, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10, F11]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4), + handle = ntu(F5), + delivery_count = ntu(F6), + link_credit = ntu(F7), + available = ntu(F8), + drain = ntu(F9), + echo = ntu(F10), + properties = amqp10_framing:decode(F11)}. + +-spec transfer(#'v1_0.transfer'{}, nonempty_list()) -> + #'v1_0.transfer'{}. +transfer(T, [F1]) -> + T#'v1_0.transfer'{handle = ntu(F1)}; +transfer(T, [F1, F2]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2)}; +transfer(T, [F1, F2, F3]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3)}; +transfer(T, [F1, F2, F3, F4]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4)}; +transfer(T, [F1, F2, F3, F4, F5]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4), + settled = ntu(F5)}; +transfer(T, [F1, F2, F3, F4, F5, F6]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4), + settled = ntu(F5), + more = ntu(F6)}; +transfer(T, [F1, F2, F3, F4, F5, F6, F7]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4), + settled = ntu(F5), + more = ntu(F6), + rcv_settle_mode = ntu(F7)}; +transfer(T, [F1, F2, F3, F4, F5, F6, F7, F8]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4), + settled = ntu(F5), + more = ntu(F6), + rcv_settle_mode = ntu(F7), + state = amqp10_framing:decode(F8)}; +transfer(T, [F1, F2, F3, F4, F5, F6, F7, F8, F9]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4), + settled = ntu(F5), + more = ntu(F6), + rcv_settle_mode = ntu(F7), + state = amqp10_framing:decode(F8), + resume = ntu(F9)}; +transfer(T, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4), + settled = ntu(F5), + more = ntu(F6), + rcv_settle_mode = ntu(F7), + state = amqp10_framing:decode(F8), + resume = ntu(F9), + aborted = ntu(F10)}; +transfer(T, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10, F11]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4), + settled = ntu(F5), + more = ntu(F6), + rcv_settle_mode = ntu(F7), + state = amqp10_framing:decode(F8), + resume = ntu(F9), + aborted = ntu(F10), + batchable = ntu(F11)}. + +-spec disposition(#'v1_0.disposition'{}, nonempty_list()) -> + #'v1_0.disposition'{}. +disposition(D, [F1, F2]) -> + D#'v1_0.disposition'{role = ntu(F1), + first = ntu(F2)}; +disposition(D, [F1, F2, F3]) -> + D#'v1_0.disposition'{role = ntu(F1), + first = ntu(F2), + last = ntu(F3)}; +disposition(D, [F1, F2, F3, F4]) -> + D#'v1_0.disposition'{role = ntu(F1), + first = ntu(F2), + last = ntu(F3), + settled = ntu(F4)}; +disposition(D, [F1, F2, F3, F4, F5]) -> + D#'v1_0.disposition'{role = ntu(F1), + first = ntu(F2), + last = ntu(F3), + settled = ntu(F4), + state = amqp10_framing:decode(F5)}; +disposition(D, [F1, F2, F3, F4, F5, F6]) -> + D#'v1_0.disposition'{role = ntu(F1), + first = ntu(F2), + last = ntu(F3), + settled = ntu(F4), + state = amqp10_framing:decode(F5), + batchable = ntu(F6)}. + +-spec header(#'v1_0.header'{}, list()) -> + #'v1_0.header'{}. +header(H, []) -> + H; +header(H, [F1]) -> + H#'v1_0.header'{durable = ntu(F1)}; +header(H, [F1, F2]) -> + H#'v1_0.header'{durable = ntu(F1), + priority = ntu(F2)}; +header(H, [F1, F2, F3]) -> + H#'v1_0.header'{durable = ntu(F1), + priority = ntu(F2), + ttl = ntu(F3)}; +header(H, [F1, F2, F3, F4]) -> + H#'v1_0.header'{durable = ntu(F1), + priority = ntu(F2), + ttl = ntu(F3), + first_acquirer = ntu(F4)}; +header(H, [F1, F2, F3, F4, F5]) -> + H#'v1_0.header'{durable = ntu(F1), + priority = ntu(F2), + ttl = ntu(F3), + first_acquirer = ntu(F4), + delivery_count = ntu(F5)}. + +-spec properties(#'v1_0.properties'{}, list()) -> + #'v1_0.properties'{}. +properties(P, []) -> + P; +properties(P, [F1]) -> + P#'v1_0.properties'{message_id = ntu(F1)}; +properties(P, [F1, F2]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2)}; +properties(P, [F1, F2, F3]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3)}; +properties(P, [F1, F2, F3, F4]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4)}; +properties(P, [F1, F2, F3, F4, F5]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5)}; +properties(P, [F1, F2, F3, F4, F5, F6]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6)}; +properties(P, [F1, F2, F3, F4, F5, F6, F7]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6), + content_type = ntu(F7)}; +properties(P, [F1, F2, F3, F4, F5, F6, F7, F8]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6), + content_type = ntu(F7), + content_encoding = ntu(F8)}; +properties(P, [F1, F2, F3, F4, F5, F6, F7, F8, F9]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6), + content_type = ntu(F7), + content_encoding = ntu(F8), + absolute_expiry_time = ntu(F9)}; +properties(P, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6), + content_type = ntu(F7), + content_encoding = ntu(F8), + absolute_expiry_time = ntu(F9), + creation_time = ntu(F10)}; +properties(P, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10, F11]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6), + content_type = ntu(F7), + content_encoding = ntu(F8), + absolute_expiry_time = ntu(F9), + creation_time = ntu(F10), + group_id = ntu(F11)}; +properties(P, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10, F11, F12]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6), + content_type = ntu(F7), + content_encoding = ntu(F8), + absolute_expiry_time = ntu(F9), + creation_time = ntu(F10), + group_id = ntu(F11), + group_sequence = ntu(F12)}; +properties(P, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10, F11, F12, F13]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6), + content_type = ntu(F7), + content_encoding = ntu(F8), + absolute_expiry_time = ntu(F9), + creation_time = ntu(F10), + group_id = ntu(F11), + group_sequence = ntu(F12), + reply_to_group_id = ntu(F13)}. + +%% null to undefined +-compile({inline, [ntu/1]}). +ntu(null) -> + undefined; +ntu(Other) -> + Other. diff --git a/deps/amqp10_common/src/amqp10_framing.erl b/deps/amqp10_common/src/amqp10_framing.erl index 9baaa4a02a1..542f9104ee1 100644 --- a/deps/amqp10_common/src/amqp10_framing.erl +++ b/deps/amqp10_common/src/amqp10_framing.erl @@ -106,10 +106,18 @@ symbolify(FieldName) when is_atom(FieldName) -> %% elements) both describe an absence of a value and should be treated %% as semantically identical." (see section 1.3) -%% A sequence comes as an arbitrary list of values; it's not a -%% composite type. decode({described, Descriptor, {list, Fields} = Type}) -> case amqp10_framing0:record_for(Descriptor) of + #'v1_0.flow'{} = Flow -> + amqp10_composite:flow(Flow, Fields); + #'v1_0.transfer'{} = Transfer -> + amqp10_composite:transfer(Transfer, Fields); + #'v1_0.disposition'{} = Disposition -> + amqp10_composite:disposition(Disposition, Fields); + #'v1_0.header'{} = Header -> + amqp10_composite:header(Header, Fields); + #'v1_0.properties'{} = Properties -> + amqp10_composite:properties(Properties, Fields); #'v1_0.amqp_sequence'{} -> #'v1_0.amqp_sequence'{content = [decode(F) || F <- Fields]}; #'v1_0.amqp_value'{} -> @@ -169,9 +177,17 @@ encode_described(list, CodeNumber, #'v1_0.amqp_sequence'{content = Content}) -> {described, {ulong, CodeNumber}, {list, lists:map(fun encode/1, Content)}}; -encode_described(list, CodeNumber, Frame) -> - {described, {ulong, CodeNumber}, - {list, lists:map(fun encode/1, tl(tuple_to_list(Frame)))}}; +encode_described(list, CodeNumber, Rec) -> + L = if is_record(Rec, 'v1_0.flow') orelse + is_record(Rec, 'v1_0.transfer') orelse + is_record(Rec, 'v1_0.disposition') orelse + is_record(Rec, 'v1_0.header') orelse + is_record(Rec, 'v1_0.properties') -> + encode_fields_omit_trailing_null(Rec, true, tuple_size(Rec), []); + true -> + encode_fields(Rec, 2, tuple_size(Rec)) + end, + {described, {ulong, CodeNumber}, {list, L}}; encode_described(map, CodeNumber, #'v1_0.application_properties'{content = Content}) -> {described, {ulong, CodeNumber}, {map, Content}}; @@ -191,6 +207,21 @@ encode_described('*', CodeNumber, #'v1_0.amqp_value'{content = Content}) -> encode_described(annotations, CodeNumber, Frame) -> encode_described(map, CodeNumber, Frame). +encode_fields(_, N, Size) when N > Size -> + []; +encode_fields(Tup, N, Size) -> + [encode(element(N, Tup)) | encode_fields(Tup, N + 1, Size)]. + +encode_fields_omit_trailing_null(_, _, 1, L) -> + L; +encode_fields_omit_trailing_null(Tup, Omit, N, L) -> + case element(N, Tup) of + undefined when Omit -> + encode_fields_omit_trailing_null(Tup, Omit, N - 1, L); + Val -> + encode_fields_omit_trailing_null(Tup, false, N - 1, [encode(Val) | L]) + end. + encode(X) -> amqp10_framing0:encode(X). diff --git a/deps/amqp10_common/test/prop_SUITE.erl b/deps/amqp10_common/test/prop_SUITE.erl index 52c3cf98481..74d4eb62461 100644 --- a/deps/amqp10_common/test/prop_SUITE.erl +++ b/deps/amqp10_common/test/prop_SUITE.erl @@ -25,7 +25,8 @@ groups() -> prop_many_primitive_types_parse_many, prop_annotated_message, prop_server_mode_body, - prop_server_mode_bare_message + prop_server_mode_bare_message, + prop_frame ]} ]. @@ -148,6 +149,16 @@ prop_server_mode_bare_message(_Config) -> end) end, [], 1000). +prop_frame(_Config) -> + run_proper( + fun() -> ?FORALL(Frame, + frame(), + begin + Bin = iolist_to_binary(amqp10_framing:encode_bin(Frame)), + equals([Frame], amqp10_framing:decode_bin(Bin)) + end) + end, [], 1000). + %%%%%%%%%%%%%%% %%% Helpers %%% %%%%%%%%%%%%%%% @@ -298,6 +309,13 @@ amqp_map() -> lists:uniq(fun({K, _V}) -> K end, KvList) )}. +fields() -> + {map, ?LET(KvList, + list({amqp_symbol(), + prefer_simple_type()}), + lists:uniq(fun({K, _V}) -> K end, KvList) + )}. + amqp_array() -> Gens = fixed_and_variable_width_types(), ?LET(N, @@ -329,11 +347,52 @@ zero_or_one(Section) -> ]). optional(Field) -> + frequency([ + {2, undefined}, + {1, Field} + ]). + +frame() -> oneof([ - undefined, - Field + flow(), + transfer(), + disposition() ]). +flow() -> + #'v1_0.flow'{next_incoming_id = optional(transfer_number()), + incoming_window = amqp_uint(), + next_outgoing_id = transfer_number(), + outgoing_window = amqp_uint(), + handle = optional(handle()), + delivery_count = optional(sequence_no()), + link_credit = optional(amqp_uint()), + available = optional(amqp_uint()), + drain = optional(amqp_boolean()), + echo = optional(amqp_boolean()), + properties = optional(fields())}. + +transfer() -> + #'v1_0.transfer'{handle = handle(), + delivery_id = optional(delivery_number()), + delivery_tag = optional(delivery_tag()), + message_format = optional(amqp_uint()), + settled = optional(amqp_boolean()), + more = optional(amqp_boolean()), + rcv_settle_mode = optional(receiver_settle_mode()), + state = optional(delivery_state()), + resume = optional(amqp_boolean()), + aborted = optional(amqp_boolean()), + batchable = optional(amqp_boolean())}. + +disposition() -> + #'v1_0.disposition'{role = role(), + first = delivery_number(), + last = optional(delivery_number()), + settled = optional(amqp_boolean()), + state = optional(delivery_state()), + batchable = optional(amqp_boolean())}. + annotated_message() -> ?LET(H, zero_or_one(header_section()), @@ -427,9 +486,63 @@ non_reserved_annotation_key() -> <<"x-", Bin/binary>> end)}. +delivery_state() -> + oneof([received(), + accepted(), + rejected(), + released(), + modified()]). + +received() -> + #'v1_0.received'{section_number = section_number(), + section_offset = section_offset()}. + +accepted() -> + #'v1_0.accepted'{}. + +rejected() -> + #'v1_0.rejected'{error = amqp_error()}. + +released() -> + #'v1_0.released'{}. + +modified() -> + #'v1_0.modified'{delivery_failed = optional(amqp_boolean()), + undeliverable_here = optional(amqp_boolean()), + message_annotations = optional(fields())}. + +amqp_error() -> + #'v1_0.error'{condition = amqp_symbol(), + description = optional(amqp_string()), + info = optional(fields())}. + +role() -> + amqp_boolean(). + +delivery_number() -> + sequence_no(). + +transfer_number() -> + sequence_no(). + +delivery_tag() -> + {binary, binary(32)}. + +receiver_settle_mode() -> + {ubyte, oneof([0, 1])}. + +handle() -> + amqp_uint(). + sequence_no() -> amqp_uint(). +section_number() -> + amqp_uint(). + +section_offset() -> + amqp_ulong(). + milliseconds() -> amqp_uint(). From 64fdb25f975c3db50ac2981f933762f3be644eba Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 16 Jul 2025 09:33:57 +0200 Subject: [PATCH 4/7] Fix splitting large messages in AMQP client This commit fixes the failing test cases ``` make -C deps/rabbit ct-msg_size_metrics t=tests:message_size make -C deps/amqp10_client ct-system t=rabbitmq:roundtrip_large_messages ``` The second test case failed with: ``` flush {amqp10_event, {connection,<0.193.0>, {closed, {framing_error, <<"frame size (131073 bytes) > maximum frame size (131072 bytes)">>}}}} ``` --- deps/amqp10_client/src/amqp10_client_session.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index d40f1e301b6..467082cf014 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -625,8 +625,8 @@ send_transfer(Transfer0, Sections0, FooterOpt, MaxMessageSize, channel = Channel, connection_config = Config}) -> OutMaxFrameSize = maps:get(outgoing_max_frame_size, Config), - Transfer = amqp10_framing:encode_bin(Transfer0), - TransferSize = iolist_size(Transfer), + Transfer = Transfer0#'v1_0.transfer'{more = false}, + TransferSize = iolist_size(amqp10_framing:encode_bin(Transfer)), Sections = encode_sections(Sections0, FooterOpt), SectionsBin = iolist_to_binary(Sections), if is_integer(MaxMessageSize) andalso @@ -637,7 +637,7 @@ send_transfer(Transfer0, Sections0, FooterOpt, MaxMessageSize, % TODO: this does not take the extended header into account % see: 2.3 MaxPayloadSize = OutMaxFrameSize - TransferSize - ?FRAME_HEADER_SIZE, - Frames = build_frames(Channel, Transfer0, SectionsBin, MaxPayloadSize, []), + Frames = build_frames(Channel, Transfer, SectionsBin, MaxPayloadSize, []), ok = socket_send(Socket, Frames), {ok, length(Frames)} end. @@ -722,7 +722,7 @@ set_flow_session_fields(Flow, #state{next_incoming_id = NID, build_frames(Channel, Trf, Bin, MaxPayloadSize, Acc) when byte_size(Bin) =< MaxPayloadSize -> - T = amqp10_framing:encode_bin(Trf#'v1_0.transfer'{more = false}), + T = amqp10_framing:encode_bin(Trf), Frame = amqp10_binary_generator:build_frame(Channel, [T, Bin]), lists:reverse([Frame | Acc]); build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) -> From ce3726c064a1389eb90767ca37dfa524f8a07cef Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 16 Jul 2025 10:29:08 +0200 Subject: [PATCH 5/7] Fix splitting large messages in server This commit fixes the following test case: ``` make -C deps/rabbit ct-amqp_dotnet t=cluster_size_1:fragmentation ``` Previously, the server sent a frame that was 1 byte too large. --- deps/rabbit/src/rabbit_amqp_session.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index c666017194a..a6aa7e7ef78 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -2182,7 +2182,8 @@ handle_deliver(ConsumerTag, AckRequired, delivery_id = ?UINT(DeliveryId), delivery_tag = {binary, Dtag}, message_format = ?UINT(?MESSAGE_FORMAT), - settled = SendSettled}, + settled = SendSettled, + more = false}, Mc1 = rabbit_msg_interceptor:intercept_outgoing(Mc0, MsgIcptCtx), Mc2 = mc:convert(mc_amqp, Mc1), Mc = mc:set_annotation(redelivered, Redelivered, Mc2), @@ -2332,7 +2333,8 @@ incoming_mgmt_link_transfer( delivery_id = ?UINT(OutgoingDeliveryId), delivery_tag = {binary, <<>>}, message_format = ?UINT(?MESSAGE_FORMAT), - settled = true}, + settled = true, + more = false}, validate_message_size(Response, OutgoingMaxMessageSize), Frames = transfer_frames(Transfer, Response, MaxFrameSize), PendingDelivery = #pending_management_delivery{frames = Frames}, From cf3bbe99a75463168c6b37ed5c26d205d5785fd2 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 16 Jul 2025 10:32:21 +0200 Subject: [PATCH 6/7] Simplify pattern matching --- deps/amqp10_common/src/amqp10_binary_generator.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/amqp10_common/src/amqp10_binary_generator.erl b/deps/amqp10_common/src/amqp10_binary_generator.erl index 96de7f9f42b..71effc61ae0 100644 --- a/deps/amqp10_common/src/amqp10_binary_generator.erl +++ b/deps/amqp10_common/src/amqp10_binary_generator.erl @@ -99,10 +99,10 @@ generate1({boolean, false}) -> [16#56, 16#00]; %% bits set to zero and values < 256. generate1({ubyte, V}) -> [16#50, V]; generate1({ushort, V}) -> <<16#60,V:16/unsigned>>; -generate1({uint, V}) when V =:= 0 -> 16#43; +generate1({uint, 0}) -> 16#43; generate1({uint, V}) when V < 256 -> [16#52, V]; generate1({uint, V}) -> <<16#70,V:32/unsigned>>; -generate1({ulong, V}) when V =:= 0 -> 16#44; +generate1({ulong, 0}) -> 16#44; generate1({ulong, V}) when V < 256 -> [16#53, V]; generate1({ulong, V}) -> <<16#80,V:64/unsigned>>; generate1({byte, V}) -> <<16#51,V:8/signed>>; From 80a687f5253dea003bceb77007ad04d967ff0493 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 16 Jul 2025 11:32:50 +0200 Subject: [PATCH 7/7] Simplify splitting large messages --- deps/amqp10_client/src/amqp10_client_internal.hrl | 1 - deps/amqp10_client/src/amqp10_client_session.erl | 2 +- deps/amqp10_common/include/amqp10_types.hrl | 4 ++++ deps/rabbit/src/rabbit_amqp_reader.erl | 7 +------ deps/rabbit/src/rabbit_amqp_session.erl | 9 +++++---- .../test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs | 6 +++++- 6 files changed, 16 insertions(+), 13 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_internal.hrl b/deps/amqp10_client/src/amqp10_client_internal.hrl index aec70e866ce..637faf897a2 100644 --- a/deps/amqp10_client/src/amqp10_client_internal.hrl +++ b/deps/amqp10_client/src/amqp10_client_internal.hrl @@ -7,7 +7,6 @@ -define(AMQP_PROTOCOL_HEADER, <<"AMQP", 0, 1, 0, 0>>). -define(SASL_PROTOCOL_HEADER, <<"AMQP", 3, 1, 0, 0>>). --define(FRAME_HEADER_SIZE, 8). -define(TIMEOUT, 5000). diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 467082cf014..7a152b440a2 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -636,7 +636,7 @@ send_transfer(Transfer0, Sections0, FooterOpt, MaxMessageSize, true -> % TODO: this does not take the extended header into account % see: 2.3 - MaxPayloadSize = OutMaxFrameSize - TransferSize - ?FRAME_HEADER_SIZE, + MaxPayloadSize = OutMaxFrameSize - ?FRAME_HEADER_SIZE - TransferSize, Frames = build_frames(Channel, Transfer, SectionsBin, MaxPayloadSize, []), ok = socket_send(Socket, Frames), {ok, length(Frames)} diff --git a/deps/amqp10_common/include/amqp10_types.hrl b/deps/amqp10_common/include/amqp10_types.hrl index ad29b86d9c1..04fe5ca2ee2 100644 --- a/deps/amqp10_common/include/amqp10_types.hrl +++ b/deps/amqp10_common/include/amqp10_types.hrl @@ -2,6 +2,10 @@ % [1.6.5] -type uint() :: 0..?UINT_MAX. + +% [2.3.1] +-define(FRAME_HEADER_SIZE, 8). + % [2.8.4] -type link_handle() :: uint(). % [2.8.8] diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index b9d2eaf8242..996cc533102 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -827,14 +827,9 @@ send_to_new_session( container_id = ContainerId, name = ConnName}, writer = WriterPid} = State) -> - %% Subtract fixed frame header size. - OutgoingMaxFrameSize = case MaxFrame of - unlimited -> unlimited; - _ -> MaxFrame - 8 - end, ChildArgs = [WriterPid, ChannelNum, - OutgoingMaxFrameSize, + MaxFrame, User, Vhost, ContainerId, diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index a6aa7e7ef78..27c6d969139 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -3163,19 +3163,20 @@ transfer_frames(Transfer, Sections, unlimited) -> [[Transfer, Sections]]; transfer_frames(Transfer, Sections, MaxFrameSize) -> PerformativeSize = iolist_size(amqp10_framing:encode_bin(Transfer)), - encode_frames(Transfer, Sections, MaxFrameSize - PerformativeSize, []). + MaxPayloadSize = MaxFrameSize - ?FRAME_HEADER_SIZE - PerformativeSize, + split_msg(Transfer, Sections, MaxPayloadSize, []). -encode_frames(_T, _Msg, MaxPayloadSize, _Transfers) when MaxPayloadSize =< 0 -> +split_msg(_T, _Msg, MaxPayloadSize, _Transfers) when MaxPayloadSize =< 0 -> protocol_error(?V_1_0_AMQP_ERROR_FRAME_SIZE_TOO_SMALL, "Frame size is too small by ~b bytes", [-MaxPayloadSize]); -encode_frames(T, Msg, MaxPayloadSize, Transfers) -> +split_msg(T, Msg, MaxPayloadSize, Transfers) -> case iolist_size(Msg) > MaxPayloadSize of true -> MsgBin = iolist_to_binary(Msg), {Chunk, Rest} = split_binary(MsgBin, MaxPayloadSize), T1 = T#'v1_0.transfer'{more = true}, - encode_frames(T, Rest, MaxPayloadSize, [[T1, Chunk] | Transfers]); + split_msg(T, Rest, MaxPayloadSize, [[T1, Chunk] | Transfers]); false -> lists:reverse([[T, Msg] | Transfers]) end. diff --git a/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs b/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs index aa6a2fd0b71..67758ec6a72 100755 --- a/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs +++ b/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs @@ -279,7 +279,11 @@ module Test = let fragmentation uri = for frameSize, size in - [1024u, 1024 + [1024u, 990 + 1024u, 1000 + 1024u, 1010 + 1024u, 1020 + 1024u, 1024 1024u, 1100 1024u, 2048 2048u, 2048] do