diff --git a/deps/amqp10_client/src/amqp10_client_internal.hrl b/deps/amqp10_client/src/amqp10_client_internal.hrl index aec70e866ce4..637faf897a2b 100644 --- a/deps/amqp10_client/src/amqp10_client_internal.hrl +++ b/deps/amqp10_client/src/amqp10_client_internal.hrl @@ -7,7 +7,6 @@ -define(AMQP_PROTOCOL_HEADER, <<"AMQP", 0, 1, 0, 0>>). -define(SASL_PROTOCOL_HEADER, <<"AMQP", 3, 1, 0, 0>>). --define(FRAME_HEADER_SIZE, 8). -define(TIMEOUT, 5000). diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index d40f1e301b6d..7a152b440a23 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -625,8 +625,8 @@ send_transfer(Transfer0, Sections0, FooterOpt, MaxMessageSize, channel = Channel, connection_config = Config}) -> OutMaxFrameSize = maps:get(outgoing_max_frame_size, Config), - Transfer = amqp10_framing:encode_bin(Transfer0), - TransferSize = iolist_size(Transfer), + Transfer = Transfer0#'v1_0.transfer'{more = false}, + TransferSize = iolist_size(amqp10_framing:encode_bin(Transfer)), Sections = encode_sections(Sections0, FooterOpt), SectionsBin = iolist_to_binary(Sections), if is_integer(MaxMessageSize) andalso @@ -636,8 +636,8 @@ send_transfer(Transfer0, Sections0, FooterOpt, MaxMessageSize, true -> % TODO: this does not take the extended header into account % see: 2.3 - MaxPayloadSize = OutMaxFrameSize - TransferSize - ?FRAME_HEADER_SIZE, - Frames = build_frames(Channel, Transfer0, SectionsBin, MaxPayloadSize, []), + MaxPayloadSize = OutMaxFrameSize - ?FRAME_HEADER_SIZE - TransferSize, + Frames = build_frames(Channel, Transfer, SectionsBin, MaxPayloadSize, []), ok = socket_send(Socket, Frames), {ok, length(Frames)} end. @@ -722,7 +722,7 @@ set_flow_session_fields(Flow, #state{next_incoming_id = NID, build_frames(Channel, Trf, Bin, MaxPayloadSize, Acc) when byte_size(Bin) =< MaxPayloadSize -> - T = amqp10_framing:encode_bin(Trf#'v1_0.transfer'{more = false}), + T = amqp10_framing:encode_bin(Trf), Frame = amqp10_binary_generator:build_frame(Channel, [T, Bin]), lists:reverse([Frame | Acc]); build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) -> diff --git a/deps/amqp10_common/include/amqp10_types.hrl b/deps/amqp10_common/include/amqp10_types.hrl index ad29b86d9c14..04fe5ca2ee20 100644 --- a/deps/amqp10_common/include/amqp10_types.hrl +++ b/deps/amqp10_common/include/amqp10_types.hrl @@ -2,6 +2,10 @@ % [1.6.5] -type uint() :: 0..?UINT_MAX. + +% [2.3.1] +-define(FRAME_HEADER_SIZE, 8). + % [2.8.4] -type link_handle() :: uint(). % [2.8.8] diff --git a/deps/amqp10_common/src/amqp10_binary_generator.erl b/deps/amqp10_common/src/amqp10_binary_generator.erl index b628fcaaa152..71effc61ae06 100644 --- a/deps/amqp10_common/src/amqp10_binary_generator.erl +++ b/deps/amqp10_common/src/amqp10_binary_generator.erl @@ -99,10 +99,10 @@ generate1({boolean, false}) -> [16#56, 16#00]; %% bits set to zero and values < 256. generate1({ubyte, V}) -> [16#50, V]; generate1({ushort, V}) -> <<16#60,V:16/unsigned>>; -generate1({uint, V}) when V =:= 0 -> 16#43; +generate1({uint, 0}) -> 16#43; generate1({uint, V}) when V < 256 -> [16#52, V]; generate1({uint, V}) -> <<16#70,V:32/unsigned>>; -generate1({ulong, V}) when V =:= 0 -> 16#44; +generate1({ulong, 0}) -> 16#44; generate1({ulong, V}) when V < 256 -> [16#53, V]; generate1({ulong, V}) -> <<16#80,V:64/unsigned>>; generate1({byte, V}) -> <<16#51,V:8/signed>>; @@ -204,9 +204,9 @@ constructor(array) -> 16#f0; % use large array type for all nested arrays constructor({described, Descriptor, Primitive}) -> [16#00, generate1(Descriptor), constructor(Primitive)]. -generate2(symbol, {symbol, V}) -> [<<(size(V)):32>>, V]; -generate2(utf8, {utf8, V}) -> [<<(size(V)):32>>, V]; -generate2(binary, {binary, V}) -> [<<(size(V)):32>>, V]; +generate2(symbol, {symbol, V}) -> [<<(byte_size(V)):32>>, V]; +generate2(utf8, {utf8, V}) -> [<<(byte_size(V)):32>>, V]; +generate2(binary, {binary, V}) -> [<<(byte_size(V)):32>>, V]; generate2(boolean, true) -> 16#01; generate2(boolean, false) -> 16#00; generate2(boolean, {boolean, true}) -> 16#01; diff --git a/deps/amqp10_common/src/amqp10_composite.erl b/deps/amqp10_common/src/amqp10_composite.erl new file mode 100644 index 000000000000..be86c1f0de36 --- /dev/null +++ b/deps/amqp10_common/src/amqp10_composite.erl @@ -0,0 +1,336 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(amqp10_composite). + +-include("amqp10_framing.hrl"). + +-export([flow/2, + transfer/2, + disposition/2, + header/2, + properties/2]). + +-spec flow(#'v1_0.flow'{}, nonempty_list()) -> + #'v1_0.flow'{}. +flow(F, [F1, F2, F3, F4]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4)}; +flow(F, [F1, F2, F3, F4, F5]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4), + handle = ntu(F5)}; +flow(F, [F1, F2, F3, F4, F5, F6]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4), + handle = ntu(F5), + delivery_count = ntu(F6)}; +flow(F, [F1, F2, F3, F4, F5, F6, F7]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4), + handle = ntu(F5), + delivery_count = ntu(F6), + link_credit = ntu(F7)}; +flow(F, [F1, F2, F3, F4, F5, F6, F7, F8]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4), + handle = ntu(F5), + delivery_count = ntu(F6), + link_credit = ntu(F7), + available = ntu(F8)}; +flow(F, [F1, F2, F3, F4, F5, F6, F7, F8, F9]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4), + handle = ntu(F5), + delivery_count = ntu(F6), + link_credit = ntu(F7), + available = ntu(F8), + drain = ntu(F9)}; +flow(F, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4), + handle = ntu(F5), + delivery_count = ntu(F6), + link_credit = ntu(F7), + available = ntu(F8), + drain = ntu(F9), + echo = ntu(F10)}; +flow(F, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10, F11]) -> + F#'v1_0.flow'{next_incoming_id = ntu(F1), + incoming_window = ntu(F2), + next_outgoing_id = ntu(F3), + outgoing_window = ntu(F4), + handle = ntu(F5), + delivery_count = ntu(F6), + link_credit = ntu(F7), + available = ntu(F8), + drain = ntu(F9), + echo = ntu(F10), + properties = amqp10_framing:decode(F11)}. + +-spec transfer(#'v1_0.transfer'{}, nonempty_list()) -> + #'v1_0.transfer'{}. +transfer(T, [F1]) -> + T#'v1_0.transfer'{handle = ntu(F1)}; +transfer(T, [F1, F2]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2)}; +transfer(T, [F1, F2, F3]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3)}; +transfer(T, [F1, F2, F3, F4]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4)}; +transfer(T, [F1, F2, F3, F4, F5]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4), + settled = ntu(F5)}; +transfer(T, [F1, F2, F3, F4, F5, F6]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4), + settled = ntu(F5), + more = ntu(F6)}; +transfer(T, [F1, F2, F3, F4, F5, F6, F7]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4), + settled = ntu(F5), + more = ntu(F6), + rcv_settle_mode = ntu(F7)}; +transfer(T, [F1, F2, F3, F4, F5, F6, F7, F8]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4), + settled = ntu(F5), + more = ntu(F6), + rcv_settle_mode = ntu(F7), + state = amqp10_framing:decode(F8)}; +transfer(T, [F1, F2, F3, F4, F5, F6, F7, F8, F9]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4), + settled = ntu(F5), + more = ntu(F6), + rcv_settle_mode = ntu(F7), + state = amqp10_framing:decode(F8), + resume = ntu(F9)}; +transfer(T, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4), + settled = ntu(F5), + more = ntu(F6), + rcv_settle_mode = ntu(F7), + state = amqp10_framing:decode(F8), + resume = ntu(F9), + aborted = ntu(F10)}; +transfer(T, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10, F11]) -> + T#'v1_0.transfer'{handle = ntu(F1), + delivery_id = ntu(F2), + delivery_tag = ntu(F3), + message_format = ntu(F4), + settled = ntu(F5), + more = ntu(F6), + rcv_settle_mode = ntu(F7), + state = amqp10_framing:decode(F8), + resume = ntu(F9), + aborted = ntu(F10), + batchable = ntu(F11)}. + +-spec disposition(#'v1_0.disposition'{}, nonempty_list()) -> + #'v1_0.disposition'{}. +disposition(D, [F1, F2]) -> + D#'v1_0.disposition'{role = ntu(F1), + first = ntu(F2)}; +disposition(D, [F1, F2, F3]) -> + D#'v1_0.disposition'{role = ntu(F1), + first = ntu(F2), + last = ntu(F3)}; +disposition(D, [F1, F2, F3, F4]) -> + D#'v1_0.disposition'{role = ntu(F1), + first = ntu(F2), + last = ntu(F3), + settled = ntu(F4)}; +disposition(D, [F1, F2, F3, F4, F5]) -> + D#'v1_0.disposition'{role = ntu(F1), + first = ntu(F2), + last = ntu(F3), + settled = ntu(F4), + state = amqp10_framing:decode(F5)}; +disposition(D, [F1, F2, F3, F4, F5, F6]) -> + D#'v1_0.disposition'{role = ntu(F1), + first = ntu(F2), + last = ntu(F3), + settled = ntu(F4), + state = amqp10_framing:decode(F5), + batchable = ntu(F6)}. + +-spec header(#'v1_0.header'{}, list()) -> + #'v1_0.header'{}. +header(H, []) -> + H; +header(H, [F1]) -> + H#'v1_0.header'{durable = ntu(F1)}; +header(H, [F1, F2]) -> + H#'v1_0.header'{durable = ntu(F1), + priority = ntu(F2)}; +header(H, [F1, F2, F3]) -> + H#'v1_0.header'{durable = ntu(F1), + priority = ntu(F2), + ttl = ntu(F3)}; +header(H, [F1, F2, F3, F4]) -> + H#'v1_0.header'{durable = ntu(F1), + priority = ntu(F2), + ttl = ntu(F3), + first_acquirer = ntu(F4)}; +header(H, [F1, F2, F3, F4, F5]) -> + H#'v1_0.header'{durable = ntu(F1), + priority = ntu(F2), + ttl = ntu(F3), + first_acquirer = ntu(F4), + delivery_count = ntu(F5)}. + +-spec properties(#'v1_0.properties'{}, list()) -> + #'v1_0.properties'{}. +properties(P, []) -> + P; +properties(P, [F1]) -> + P#'v1_0.properties'{message_id = ntu(F1)}; +properties(P, [F1, F2]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2)}; +properties(P, [F1, F2, F3]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3)}; +properties(P, [F1, F2, F3, F4]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4)}; +properties(P, [F1, F2, F3, F4, F5]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5)}; +properties(P, [F1, F2, F3, F4, F5, F6]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6)}; +properties(P, [F1, F2, F3, F4, F5, F6, F7]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6), + content_type = ntu(F7)}; +properties(P, [F1, F2, F3, F4, F5, F6, F7, F8]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6), + content_type = ntu(F7), + content_encoding = ntu(F8)}; +properties(P, [F1, F2, F3, F4, F5, F6, F7, F8, F9]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6), + content_type = ntu(F7), + content_encoding = ntu(F8), + absolute_expiry_time = ntu(F9)}; +properties(P, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6), + content_type = ntu(F7), + content_encoding = ntu(F8), + absolute_expiry_time = ntu(F9), + creation_time = ntu(F10)}; +properties(P, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10, F11]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6), + content_type = ntu(F7), + content_encoding = ntu(F8), + absolute_expiry_time = ntu(F9), + creation_time = ntu(F10), + group_id = ntu(F11)}; +properties(P, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10, F11, F12]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6), + content_type = ntu(F7), + content_encoding = ntu(F8), + absolute_expiry_time = ntu(F9), + creation_time = ntu(F10), + group_id = ntu(F11), + group_sequence = ntu(F12)}; +properties(P, [F1, F2, F3, F4, F5, F6, F7, F8, F9, F10, F11, F12, F13]) -> + P#'v1_0.properties'{message_id = ntu(F1), + user_id = ntu(F2), + to = ntu(F3), + subject = ntu(F4), + reply_to = ntu(F5), + correlation_id = ntu(F6), + content_type = ntu(F7), + content_encoding = ntu(F8), + absolute_expiry_time = ntu(F9), + creation_time = ntu(F10), + group_id = ntu(F11), + group_sequence = ntu(F12), + reply_to_group_id = ntu(F13)}. + +%% null to undefined +-compile({inline, [ntu/1]}). +ntu(null) -> + undefined; +ntu(Other) -> + Other. diff --git a/deps/amqp10_common/src/amqp10_framing.erl b/deps/amqp10_common/src/amqp10_framing.erl index 9baaa4a02a16..542f9104ee15 100644 --- a/deps/amqp10_common/src/amqp10_framing.erl +++ b/deps/amqp10_common/src/amqp10_framing.erl @@ -106,10 +106,18 @@ symbolify(FieldName) when is_atom(FieldName) -> %% elements) both describe an absence of a value and should be treated %% as semantically identical." (see section 1.3) -%% A sequence comes as an arbitrary list of values; it's not a -%% composite type. decode({described, Descriptor, {list, Fields} = Type}) -> case amqp10_framing0:record_for(Descriptor) of + #'v1_0.flow'{} = Flow -> + amqp10_composite:flow(Flow, Fields); + #'v1_0.transfer'{} = Transfer -> + amqp10_composite:transfer(Transfer, Fields); + #'v1_0.disposition'{} = Disposition -> + amqp10_composite:disposition(Disposition, Fields); + #'v1_0.header'{} = Header -> + amqp10_composite:header(Header, Fields); + #'v1_0.properties'{} = Properties -> + amqp10_composite:properties(Properties, Fields); #'v1_0.amqp_sequence'{} -> #'v1_0.amqp_sequence'{content = [decode(F) || F <- Fields]}; #'v1_0.amqp_value'{} -> @@ -169,9 +177,17 @@ encode_described(list, CodeNumber, #'v1_0.amqp_sequence'{content = Content}) -> {described, {ulong, CodeNumber}, {list, lists:map(fun encode/1, Content)}}; -encode_described(list, CodeNumber, Frame) -> - {described, {ulong, CodeNumber}, - {list, lists:map(fun encode/1, tl(tuple_to_list(Frame)))}}; +encode_described(list, CodeNumber, Rec) -> + L = if is_record(Rec, 'v1_0.flow') orelse + is_record(Rec, 'v1_0.transfer') orelse + is_record(Rec, 'v1_0.disposition') orelse + is_record(Rec, 'v1_0.header') orelse + is_record(Rec, 'v1_0.properties') -> + encode_fields_omit_trailing_null(Rec, true, tuple_size(Rec), []); + true -> + encode_fields(Rec, 2, tuple_size(Rec)) + end, + {described, {ulong, CodeNumber}, {list, L}}; encode_described(map, CodeNumber, #'v1_0.application_properties'{content = Content}) -> {described, {ulong, CodeNumber}, {map, Content}}; @@ -191,6 +207,21 @@ encode_described('*', CodeNumber, #'v1_0.amqp_value'{content = Content}) -> encode_described(annotations, CodeNumber, Frame) -> encode_described(map, CodeNumber, Frame). +encode_fields(_, N, Size) when N > Size -> + []; +encode_fields(Tup, N, Size) -> + [encode(element(N, Tup)) | encode_fields(Tup, N + 1, Size)]. + +encode_fields_omit_trailing_null(_, _, 1, L) -> + L; +encode_fields_omit_trailing_null(Tup, Omit, N, L) -> + case element(N, Tup) of + undefined when Omit -> + encode_fields_omit_trailing_null(Tup, Omit, N - 1, L); + Val -> + encode_fields_omit_trailing_null(Tup, false, N - 1, [encode(Val) | L]) + end. + encode(X) -> amqp10_framing0:encode(X). diff --git a/deps/amqp10_common/test/prop_SUITE.erl b/deps/amqp10_common/test/prop_SUITE.erl index 52c3cf984812..74d4eb62461c 100644 --- a/deps/amqp10_common/test/prop_SUITE.erl +++ b/deps/amqp10_common/test/prop_SUITE.erl @@ -25,7 +25,8 @@ groups() -> prop_many_primitive_types_parse_many, prop_annotated_message, prop_server_mode_body, - prop_server_mode_bare_message + prop_server_mode_bare_message, + prop_frame ]} ]. @@ -148,6 +149,16 @@ prop_server_mode_bare_message(_Config) -> end) end, [], 1000). +prop_frame(_Config) -> + run_proper( + fun() -> ?FORALL(Frame, + frame(), + begin + Bin = iolist_to_binary(amqp10_framing:encode_bin(Frame)), + equals([Frame], amqp10_framing:decode_bin(Bin)) + end) + end, [], 1000). + %%%%%%%%%%%%%%% %%% Helpers %%% %%%%%%%%%%%%%%% @@ -298,6 +309,13 @@ amqp_map() -> lists:uniq(fun({K, _V}) -> K end, KvList) )}. +fields() -> + {map, ?LET(KvList, + list({amqp_symbol(), + prefer_simple_type()}), + lists:uniq(fun({K, _V}) -> K end, KvList) + )}. + amqp_array() -> Gens = fixed_and_variable_width_types(), ?LET(N, @@ -329,11 +347,52 @@ zero_or_one(Section) -> ]). optional(Field) -> + frequency([ + {2, undefined}, + {1, Field} + ]). + +frame() -> oneof([ - undefined, - Field + flow(), + transfer(), + disposition() ]). +flow() -> + #'v1_0.flow'{next_incoming_id = optional(transfer_number()), + incoming_window = amqp_uint(), + next_outgoing_id = transfer_number(), + outgoing_window = amqp_uint(), + handle = optional(handle()), + delivery_count = optional(sequence_no()), + link_credit = optional(amqp_uint()), + available = optional(amqp_uint()), + drain = optional(amqp_boolean()), + echo = optional(amqp_boolean()), + properties = optional(fields())}. + +transfer() -> + #'v1_0.transfer'{handle = handle(), + delivery_id = optional(delivery_number()), + delivery_tag = optional(delivery_tag()), + message_format = optional(amqp_uint()), + settled = optional(amqp_boolean()), + more = optional(amqp_boolean()), + rcv_settle_mode = optional(receiver_settle_mode()), + state = optional(delivery_state()), + resume = optional(amqp_boolean()), + aborted = optional(amqp_boolean()), + batchable = optional(amqp_boolean())}. + +disposition() -> + #'v1_0.disposition'{role = role(), + first = delivery_number(), + last = optional(delivery_number()), + settled = optional(amqp_boolean()), + state = optional(delivery_state()), + batchable = optional(amqp_boolean())}. + annotated_message() -> ?LET(H, zero_or_one(header_section()), @@ -427,9 +486,63 @@ non_reserved_annotation_key() -> <<"x-", Bin/binary>> end)}. +delivery_state() -> + oneof([received(), + accepted(), + rejected(), + released(), + modified()]). + +received() -> + #'v1_0.received'{section_number = section_number(), + section_offset = section_offset()}. + +accepted() -> + #'v1_0.accepted'{}. + +rejected() -> + #'v1_0.rejected'{error = amqp_error()}. + +released() -> + #'v1_0.released'{}. + +modified() -> + #'v1_0.modified'{delivery_failed = optional(amqp_boolean()), + undeliverable_here = optional(amqp_boolean()), + message_annotations = optional(fields())}. + +amqp_error() -> + #'v1_0.error'{condition = amqp_symbol(), + description = optional(amqp_string()), + info = optional(fields())}. + +role() -> + amqp_boolean(). + +delivery_number() -> + sequence_no(). + +transfer_number() -> + sequence_no(). + +delivery_tag() -> + {binary, binary(32)}. + +receiver_settle_mode() -> + {ubyte, oneof([0, 1])}. + +handle() -> + amqp_uint(). + sequence_no() -> amqp_uint(). +section_number() -> + amqp_uint(). + +section_offset() -> + amqp_ulong(). + milliseconds() -> amqp_uint(). diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index 1c1c3b9d7f22..00a696f7cb71 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -500,37 +500,29 @@ maps_upsert(Key, TaggedVal, KVList) -> encode(Sections) when is_list(Sections) -> [amqp10_framing:encode_bin(Section) || Section <- Sections, - not is_empty(Section)]. + not omit(Section)]. -is_empty(#'v1_0.header'{durable = undefined, - priority = undefined, - ttl = undefined, - first_acquirer = undefined, - delivery_count = undefined}) -> +omit(#'v1_0.message_annotations'{content = []}) -> true; -is_empty(#'v1_0.delivery_annotations'{content = []}) -> +omit(#'v1_0.properties'{message_id = undefined, + user_id = undefined, + to = undefined, + subject = undefined, + reply_to = undefined, + correlation_id = undefined, + content_type = undefined, + content_encoding = undefined, + absolute_expiry_time = undefined, + creation_time = undefined, + group_id = undefined, + group_sequence = undefined, + reply_to_group_id = undefined}) -> true; -is_empty(#'v1_0.message_annotations'{content = []}) -> +omit(#'v1_0.application_properties'{content = []}) -> true; -is_empty(#'v1_0.properties'{message_id = undefined, - user_id = undefined, - to = undefined, - subject = undefined, - reply_to = undefined, - correlation_id = undefined, - content_type = undefined, - content_encoding = undefined, - absolute_expiry_time = undefined, - creation_time = undefined, - group_id = undefined, - group_sequence = undefined, - reply_to_group_id = undefined}) -> +omit(#'v1_0.footer'{content = []}) -> true; -is_empty(#'v1_0.application_properties'{content = []}) -> - true; -is_empty(#'v1_0.footer'{content = []}) -> - true; -is_empty(_) -> +omit(_) -> false. message_annotation(Key, State, Default) diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index b9d2eaf82429..996cc5331024 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -827,14 +827,9 @@ send_to_new_session( container_id = ContainerId, name = ConnName}, writer = WriterPid} = State) -> - %% Subtract fixed frame header size. - OutgoingMaxFrameSize = case MaxFrame of - unlimited -> unlimited; - _ -> MaxFrame - 8 - end, ChildArgs = [WriterPid, ChannelNum, - OutgoingMaxFrameSize, + MaxFrame, User, Vhost, ContainerId, diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index c666017194a0..27c6d9691398 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -2182,7 +2182,8 @@ handle_deliver(ConsumerTag, AckRequired, delivery_id = ?UINT(DeliveryId), delivery_tag = {binary, Dtag}, message_format = ?UINT(?MESSAGE_FORMAT), - settled = SendSettled}, + settled = SendSettled, + more = false}, Mc1 = rabbit_msg_interceptor:intercept_outgoing(Mc0, MsgIcptCtx), Mc2 = mc:convert(mc_amqp, Mc1), Mc = mc:set_annotation(redelivered, Redelivered, Mc2), @@ -2332,7 +2333,8 @@ incoming_mgmt_link_transfer( delivery_id = ?UINT(OutgoingDeliveryId), delivery_tag = {binary, <<>>}, message_format = ?UINT(?MESSAGE_FORMAT), - settled = true}, + settled = true, + more = false}, validate_message_size(Response, OutgoingMaxMessageSize), Frames = transfer_frames(Transfer, Response, MaxFrameSize), PendingDelivery = #pending_management_delivery{frames = Frames}, @@ -3161,19 +3163,20 @@ transfer_frames(Transfer, Sections, unlimited) -> [[Transfer, Sections]]; transfer_frames(Transfer, Sections, MaxFrameSize) -> PerformativeSize = iolist_size(amqp10_framing:encode_bin(Transfer)), - encode_frames(Transfer, Sections, MaxFrameSize - PerformativeSize, []). + MaxPayloadSize = MaxFrameSize - ?FRAME_HEADER_SIZE - PerformativeSize, + split_msg(Transfer, Sections, MaxPayloadSize, []). -encode_frames(_T, _Msg, MaxPayloadSize, _Transfers) when MaxPayloadSize =< 0 -> +split_msg(_T, _Msg, MaxPayloadSize, _Transfers) when MaxPayloadSize =< 0 -> protocol_error(?V_1_0_AMQP_ERROR_FRAME_SIZE_TOO_SMALL, "Frame size is too small by ~b bytes", [-MaxPayloadSize]); -encode_frames(T, Msg, MaxPayloadSize, Transfers) -> +split_msg(T, Msg, MaxPayloadSize, Transfers) -> case iolist_size(Msg) > MaxPayloadSize of true -> MsgBin = iolist_to_binary(Msg), {Chunk, Rest} = split_binary(MsgBin, MaxPayloadSize), T1 = T#'v1_0.transfer'{more = true}, - encode_frames(T, Rest, MaxPayloadSize, [[T1, Chunk] | Transfers]); + split_msg(T, Rest, MaxPayloadSize, [[T1, Chunk] | Transfers]); false -> lists:reverse([[T, Msg] | Transfers]) end. diff --git a/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs b/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs index aa6a2fd0b713..67758ec6a725 100755 --- a/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs +++ b/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs @@ -279,7 +279,11 @@ module Test = let fragmentation uri = for frameSize, size in - [1024u, 1024 + [1024u, 990 + 1024u, 1000 + 1024u, 1010 + 1024u, 1020 + 1024u, 1024 1024u, 1100 1024u, 2048 2048u, 2048] do