Skip to content

Commit b1ba0b9

Browse files
olikasgluos
authored andcommitted
Convert AMQP 1.0 props and app props to AMQP 0.9.1 props and headers
- Timestamps are milliseconds in AMQP 1.0, but in AMQP 0.9.1 it is seconds. Fixed by multiplying the timestamp by 1 000. - Shovel crashed if user_id was set in the message because the encoding was as utf8 while it should be a byte array. - Negative integers were encoded as integers - therefore leading to incorrect positive values. - Float values were not supported by the client. - Fixed priority header encoding in AMQP 1.0. It was set as uint but it should be ubyte. - Priority of the message is now in the Headers instead of Application Properties. This is potentially a breaking change. Fixes: rabbitmq#7508
1 parent 741fd43 commit b1ba0b9

File tree

3 files changed

+182
-10
lines changed

3 files changed

+182
-10
lines changed

Diff for: deps/amqp10_client/src/amqp10_msg.erl

+10-4
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ set_headers(Headers, #amqp10_msg{header = Current} = Msg) ->
306306
H = maps:fold(fun(durable, V, Acc) ->
307307
Acc#'v1_0.header'{durable = V};
308308
(priority, V, Acc) ->
309-
Acc#'v1_0.header'{priority = {uint, V}};
309+
Acc#'v1_0.header'{priority = {ubyte, V}};
310310
(first_acquirer, V, Acc) ->
311311
Acc#'v1_0.header'{first_acquirer = V};
312312
(ttl, V, Acc) ->
@@ -325,8 +325,8 @@ set_properties(Props, #amqp10_msg{properties = Current} = Msg) ->
325325
P = maps:fold(fun(message_id, V, Acc) when is_binary(V) ->
326326
% message_id can be any type but we restrict it here
327327
Acc#'v1_0.properties'{message_id = utf8(V)};
328-
(user_id, V, Acc) ->
329-
Acc#'v1_0.properties'{user_id = utf8(V)};
328+
(user_id, V, Acc) when is_binary(V) orelse is_list(V) ->
329+
Acc#'v1_0.properties'{user_id = binary(V)};
330330
(to, V, Acc) ->
331331
Acc#'v1_0.properties'{to = utf8(V)};
332332
(subject, V, Acc) ->
@@ -407,8 +407,12 @@ wrap_ap_value(true) ->
407407
{boolean, true};
408408
wrap_ap_value(false) ->
409409
{boolean, false};
410-
wrap_ap_value(V) when is_integer(V) ->
410+
wrap_ap_value(V) when is_integer(V) andalso V >= 0 ->
411411
{uint, V};
412+
wrap_ap_value(V) when is_integer(V) andalso V < 0 ->
413+
{int, V};
414+
wrap_ap_value(F) when is_float(F) ->
415+
{double, F};
412416
wrap_ap_value(V) when is_binary(V) ->
413417
utf8(V);
414418
wrap_ap_value(V) when is_list(V) ->
@@ -449,6 +453,8 @@ utf8(V) -> amqp10_client_types:utf8(V).
449453
sym(B) when is_list(B) -> {symbol, list_to_binary(B)};
450454
sym(B) when is_binary(B) -> {symbol, B}.
451455
uint(B) -> {uint, B}.
456+
binary(B) when is_binary(B) -> {binary, B};
457+
binary(B) when is_list(B) -> {binary, erlang:list_to_binary(B)}.
452458

453459
has_value(undefined) -> false;
454460
has_value(_) -> true.

Diff for: deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

+70-5
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,13 @@ dest_endpoint(#{shovel_type := dynamic,
173173
dest := #{target_address := Addr}}) ->
174174
[{dest_address, Addr}].
175175

176-
-spec handle_source(Msg :: any(), state()) ->
176+
-spec handle_source(Msg :: any(), state()) ->
177177
not_handled | state() | {stop, any()}.
178178
handle_source({amqp10_msg, _LinkRef, Msg}, State) ->
179179
Tag = amqp10_msg:delivery_id(Msg),
180180
Payload = amqp10_msg:body_bin(Msg),
181-
rabbit_shovel_behaviour:forward(Tag, #{}, Payload, State);
181+
Props = props_to_map(Msg),
182+
rabbit_shovel_behaviour:forward(Tag, Props, Payload, State);
182183
handle_source({amqp10_event, {connection, Conn, opened}},
183184
State = #{source := #{current := #{conn := Conn}}}) ->
184185
State;
@@ -382,23 +383,27 @@ add_forward_headers(_, Msg) -> Msg.
382383
set_message_properties(Props, Msg) ->
383384
%% this is effectively special handling properties from amqp 0.9.1
384385
maps:fold(
385-
fun(content_type, Ct, M) ->
386+
fun(_Key, undefined, M) ->
387+
M;
388+
(content_type, Ct, M) ->
386389
amqp10_msg:set_properties(
387390
#{content_type => to_binary(Ct)}, M);
388391
(content_encoding, Ct, M) ->
389392
amqp10_msg:set_properties(
390393
#{content_encoding => to_binary(Ct)}, M);
391394
(delivery_mode, 2, M) ->
392395
amqp10_msg:set_headers(#{durable => true}, M);
396+
(priority, P, M) when is_integer(P) ->
397+
amqp10_msg:set_headers(#{priority => P}, M);
393398
(correlation_id, Ct, M) ->
394399
amqp10_msg:set_properties(#{correlation_id => to_binary(Ct)}, M);
395400
(reply_to, Ct, M) ->
396401
amqp10_msg:set_properties(#{reply_to => to_binary(Ct)}, M);
397402
(message_id, Ct, M) ->
398403
amqp10_msg:set_properties(#{message_id => to_binary(Ct)}, M);
399404
(timestamp, Ct, M) ->
400-
amqp10_msg:set_properties(#{creation_time => Ct}, M);
401-
(user_id, Ct, M) ->
405+
amqp10_msg:set_properties(#{creation_time => timestamp_091_to_10(Ct)}, M);
406+
(user_id, Ct, M) when Ct =/= undefined ->
402407
amqp10_msg:set_properties(#{user_id => Ct}, M);
403408
(headers, Headers0, M) when is_list(Headers0) ->
404409
%% AMPQ 0.9.1 are added as applicatin properties
@@ -440,3 +445,63 @@ is_amqp10_compat(T) ->
440445
%% TODO: not all lists are compatible
441446
is_list(T) orelse
442447
is_boolean(T).
448+
449+
to_amqp091_compatible_value(Key, Value) when is_binary(Value) ->
450+
{Key, longstr, Value};
451+
to_amqp091_compatible_value(Key, Value) when is_integer(Value) ->
452+
{Key, long, Value};
453+
to_amqp091_compatible_value(Key, Value) when is_float(Value) ->
454+
{Key, double, Value};
455+
to_amqp091_compatible_value(Key, true) ->
456+
{Key, bool, true};
457+
to_amqp091_compatible_value(Key, false) ->
458+
{Key, bool, false};
459+
to_amqp091_compatible_value(_Key, _Value) ->
460+
undefined.
461+
462+
delivery_mode(Headers) ->
463+
case maps:get(durable, Headers, undefined) of
464+
undefined -> undefined;
465+
true -> 2;
466+
false -> 1
467+
end.
468+
469+
timestamp_10_to_091(undefined) ->
470+
undefined;
471+
timestamp_10_to_091(T) ->
472+
trunc(T / 1000).
473+
474+
timestamp_091_to_10(T) when is_integer(T) ->
475+
T * 1000;
476+
timestamp_091_to_10(_Value) ->
477+
undefined.
478+
479+
ttl(T) when is_integer(T) ->
480+
erlang:integer_to_binary(T);
481+
ttl(_T) -> undefined.
482+
483+
props_to_map(Msg) ->
484+
AppProps = amqp10_msg:application_properties(Msg),
485+
AppProps091Headers = lists:filtermap(fun({K, V}) ->
486+
case to_amqp091_compatible_value(K, V) of
487+
undefined ->
488+
false;
489+
Value ->
490+
{true, Value}
491+
end
492+
end, maps:to_list(AppProps)),
493+
InProps = amqp10_msg:properties(Msg),
494+
Headers = amqp10_msg:headers(Msg),
495+
#{
496+
headers => AppProps091Headers,
497+
content_type => maps:get(content_type, InProps, undefined),
498+
content_encoding => maps:get(content_encoding, InProps, undefined),
499+
delivery_mode => delivery_mode(Headers),
500+
priority => maps:get(priority, Headers, undefined),
501+
correlation_id => maps:get(correlation_id, InProps, undefined),
502+
reply_to => maps:get(reply_to, InProps, undefined),
503+
expiration => ttl(maps:get(ttl, Headers, undefined)),
504+
message_id => maps:get(message_id, InProps, undefined),
505+
timestamp => timestamp_10_to_091(maps:get(creation_time, InProps, undefined)),
506+
user_id => maps:get(user_id, InProps, undefined)
507+
}.

Diff for: deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl

+102-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ groups() ->
2929
autodelete_amqp091_dest_on_confirm,
3030
autodelete_amqp091_dest_on_publish,
3131
simple_amqp10_dest,
32-
simple_amqp10_src
32+
simple_amqp10_src,
33+
message_prop_conversion
3334
]},
3435
{with_map_config, [], [
3536
simple,
@@ -168,6 +169,106 @@ simple_amqp10_src(Config) ->
168169
ok
169170
end).
170171

172+
message_prop_conversion(Config) ->
173+
MapConfig = ?config(map_config, Config),
174+
Src = ?config(srcq, Config),
175+
Dest = ?config(destq, Config),
176+
with_session(Config,
177+
fun (Sess) ->
178+
shovel_test_utils:set_param(
179+
Config,
180+
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
181+
{<<"src-address">>, Src},
182+
{<<"dest-protocol">>, <<"amqp091">>},
183+
{<<"dest-queue">>, Dest},
184+
{<<"add-forward-headers">>, true},
185+
{<<"dest-add-timestamp-header">>, true},
186+
{<<"publish-properties">>,
187+
case MapConfig of
188+
true -> #{<<"cluster_id">> => <<"x">>};
189+
_ -> [{<<"cluster_id">>, <<"x">>}]
190+
end}
191+
]),
192+
LinkName = <<"dynamic-sender-", Dest/binary>>,
193+
Tag = <<"tag1">>,
194+
Payload = <<"payload">>,
195+
{ok, Sender} = amqp10_client:attach_sender_link(Sess, LinkName, Src,
196+
unsettled, unsettled_state),
197+
ok = await_amqp10_event(link, Sender, attached),
198+
Headers = #{durable => true, priority => 3, ttl => 180000},
199+
Msg = amqp10_msg:set_headers(Headers,
200+
amqp10_msg:new(Tag, Payload, false)),
201+
Msg2 = amqp10_msg:set_properties(#{
202+
message_id => <<"message-id">>,
203+
user_id => <<"guest">>,
204+
to => <<"to">>,
205+
subject => <<"subject">>,
206+
reply_to => <<"reply-to">>,
207+
correlation_id => <<"correlation-id">>,
208+
content_type => <<"content-type">>,
209+
content_encoding => <<"content-encoding">>,
210+
%absolute_expiry_time => 123456789,
211+
creation_time => 123456789,
212+
group_id => <<"group-id">>,
213+
group_sequence => 123,
214+
reply_to_group_id => <<"reply-to-group-id">>
215+
}, Msg),
216+
Msg3 = amqp10_msg:set_application_properties(#{
217+
<<"x-binary">> => <<"binary">>,
218+
<<"x-int">> => 33,
219+
<<"x-negative-int">> => -33,
220+
<<"x-float">> => 1.3,
221+
<<"x-true">> => true,
222+
<<"x-false">> => false
223+
}, Msg2),
224+
ok = amqp10_client:send_msg(Sender, Msg3),
225+
receive
226+
{amqp10_disposition, {accepted, Tag}} -> ok
227+
after 3000 ->
228+
exit(publish_disposition_not_received)
229+
end,
230+
amqp10_client:detach_link(Sender),
231+
Channel = rabbit_ct_client_helpers:open_channel(Config),
232+
{#'basic.get_ok'{}, #amqp_msg{payload = Payload, props = #'P_basic'{
233+
content_type = ReceivedContentType,
234+
content_encoding = ReceivedContentEncoding,
235+
headers = Headers2,
236+
delivery_mode = ReceivedDeliveryMode,
237+
priority = ReceivedPriority,
238+
correlation_id = ReceivedCorrelationId,
239+
reply_to = ReceivedReplyTo,
240+
expiration = ReceivedExpiration,
241+
message_id = ReceivedMessageId,
242+
timestamp = ReceivedTimestamp,
243+
type = _ReceivedType,
244+
user_id = ReceivedUserId,
245+
app_id = _ReceivedAppId,
246+
cluster_id = _ReceivedClusterId
247+
}}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, no_ack = true}),
248+
249+
?assertEqual(<<"payload">>, Payload),
250+
?assertEqual(2, ReceivedDeliveryMode),
251+
?assertEqual({longstr, <<"binary">>}, rabbit_misc:table_lookup(Headers2, <<"x-binary">>)),
252+
?assertEqual({long, 33}, rabbit_misc:table_lookup(Headers2, <<"x-int">>)),
253+
?assertEqual({long, -33}, rabbit_misc:table_lookup(Headers2, <<"x-negative-int">>)),
254+
?assertEqual({double, 1.3}, rabbit_misc:table_lookup(Headers2, <<"x-float">>)),
255+
?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers2, <<"x-true">>)),
256+
?assertEqual({bool, false}, rabbit_misc:table_lookup(Headers2, <<"x-false">>)),
257+
258+
?assertEqual(<<"content-type">>, ReceivedContentType),
259+
?assertEqual(<<"content-encoding">>, ReceivedContentEncoding),
260+
261+
?assertEqual(3, ReceivedPriority),
262+
?assertEqual(<<"correlation-id">>, ReceivedCorrelationId),
263+
?assertEqual(<<"reply-to">>, ReceivedReplyTo),
264+
?assertEqual(<<"180000">>, ReceivedExpiration),
265+
?assertEqual(<<"message-id">>, ReceivedMessageId),
266+
?assertEqual(123456, ReceivedTimestamp), % timestamp is divided by 1 000
267+
?assertEqual(<<"guest">>, ReceivedUserId),
268+
ok
269+
end).
270+
271+
171272
change_definition(Config) ->
172273
Src = ?config(srcq, Config),
173274
Dest = ?config(destq, Config),

0 commit comments

Comments
 (0)