Skip to content

Commit 2563327

Browse files
Configure capabilities on the source/target field in the ATTACH frame
1 parent b4efe04 commit 2563327

File tree

7 files changed

+519
-47
lines changed

7 files changed

+519
-47
lines changed

deps/amqp10_client/BUILD.bazel

+2
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ rabbitmq_integration_suite(
116116
size = "medium",
117117
additional_beam = [
118118
"test/activemq_ct_helpers.beam",
119+
"test/ibmmq_ct_helpers.beam",
119120
"test/mock_server.beam",
120121
],
121122
data = [
@@ -139,6 +140,7 @@ eunit(
139140
name = "eunit",
140141
compiled_suites = [
141142
":test_activemq_ct_helpers_beam",
143+
":test_ibmmq_ct_helpers_beam",
142144
":test_mock_server_beam",
143145
],
144146
target = ":test_erlang_app",

deps/amqp10_client/app.bzl

+8
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
126126
app_name = "amqp10_client",
127127
erlc_opts = "//:test_erlc_opts",
128128
)
129+
erlang_bytecode(
130+
name = "test_ibmmq_ct_helpers_beam",
131+
testonly = True,
132+
srcs = ["test/ibmmq_ct_helpers.erl"],
133+
outs = ["test/ibmmq_ct_helpers.beam"],
134+
app_name = "amqp10_client",
135+
erlc_opts = "//:test_erlc_opts",
136+
)
129137
erlang_bytecode(
130138
name = "test_mock_server_beam",
131139
testonly = True,

deps/amqp10_client/src/amqp10_client.erl

+4-1
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ end_session(Pid) ->
166166
%% for the link before returning.
167167
attach_sender_link_sync(Session, Name, Target) ->
168168
attach_sender_link_sync(Session, Name, Target, mixed).
169+
-spec attach_sender_link_sync(pid(), binary(), binary()) ->
170+
{ok, link_ref()} | link_timeout.
169171

170172
%% @doc Synchronously attach a link on 'Session'.
171173
%% This is a convenience function that awaits attached event
@@ -273,7 +275,8 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
273275
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
274276
terminus_durability(), filter(), properties()) ->
275277
{ok, link_ref()}.
276-
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
278+
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter,
279+
Properties)
277280
when is_pid(Session) andalso
278281
is_binary(Name) andalso
279282
is_binary(Source) andalso

deps/amqp10_client/src/amqp10_client_session.erl

+26-5
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,14 @@
7373
-type rcv_settle_mode() :: first | second.
7474

7575
-type terminus_durability() :: none | configuration | unsettled_state.
76+
-type terminus_capabilities() :: none | binary() | list().
7677

7778
-type target_def() :: #{address => link_address(),
78-
durable => terminus_durability()}.
79+
durable => terminus_durability(),
80+
capabilities => terminus_capabilities()}.
7981
-type source_def() :: #{address => link_address(),
80-
durable => terminus_durability()}.
82+
durable => terminus_durability(),
83+
capabilities => terminus_capabilities()}.
8184

8285
-type attach_role() :: {sender, target_def()} | {receiver, source_def(), pid()}.
8386

@@ -109,6 +112,7 @@
109112
-export_type([snd_settle_mode/0,
110113
rcv_settle_mode/0,
111114
terminus_durability/0,
115+
terminus_capabilities/0,
112116
attach_args/0,
113117
attach_role/0,
114118
target_def/0,
@@ -713,20 +717,24 @@ make_source(#{role := {sender, _}}) ->
713717
make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) ->
714718
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
715719
TranslatedFilter = translate_filters(Filter),
720+
Capabilities = translate_terminus_capabilities(maps:get(capabilities, Source, none)),
716721
#'v1_0.source'{address = {utf8, Address},
717722
durable = {uint, Durable},
718-
filter = TranslatedFilter}.
723+
filter = TranslatedFilter,
724+
capabilities = Capabilities}.
719725

720726
make_target(#{role := {receiver, _Source, _Pid}}) ->
721727
#'v1_0.target'{};
722728
make_target(#{role := {sender, #{address := Address} = Target}}) ->
723729
Durable = translate_terminus_durability(maps:get(durable, Target, none)),
730+
Capabilities = translate_terminus_capabilities(maps:get(capabilities, Target, none)),
724731
TargetAddr = case is_binary(Address) of
725732
true -> {utf8, Address};
726733
false -> Address
727734
end,
728735
#'v1_0.target'{address = TargetAddr,
729-
durable = {uint, Durable}}.
736+
durable = {uint, Durable},
737+
capabilities = Capabilities}.
730738

731739
max_message_size(#{max_message_size := Size})
732740
when is_integer(Size) andalso
@@ -771,6 +779,19 @@ filter_value_type({T, _} = V) when is_atom(T) ->
771779
%% looks like an already tagged type, just pass it through
772780
V.
773781

782+
translate_terminus_capabilities(none) ->
783+
undefined;
784+
translate_terminus_capabilities(Capabilities) when is_binary(Capabilities) ->
785+
{symbol, Capabilities};
786+
translate_terminus_capabilities(CapabilitiesList) when is_list(CapabilitiesList) ->
787+
{array, symbol, [filter_capability(V) || V <- CapabilitiesList]}.
788+
789+
filter_capability(V) when is_binary(V) ->
790+
{symbol, V};
791+
filter_capability({T, _} = V) when is_atom(T) ->
792+
%% looks like an already tagged type, just pass it through
793+
V.
794+
774795
% https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html
775796
translate_legacy_amqp_headers_binding(LegacyHeaders) ->
776797
{map,
@@ -847,7 +868,7 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
847868
target = Target,
848869
max_message_size = MaxMessageSize},
849870
ok = Send(Attach, State),
850-
871+
851872
Ref = make_link_ref(Role, self(), OutHandle),
852873
Link = #link{name = Name,
853874
ref = Ref,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(ibmmq_ct_helpers).
9+
10+
-include_lib("common_test/include/ct.hrl").
11+
12+
-export([setup_steps/0,
13+
teardown_steps/0,
14+
init_config/1,
15+
start_ibmmq_server/1,
16+
stop_ibmmq_server/1]).
17+
18+
setup_steps() ->
19+
[fun init_config/1,
20+
fun start_ibmmq_server/1
21+
].
22+
23+
teardown_steps() ->
24+
[
25+
fun stop_ibmmq_server/1
26+
].
27+
28+
init_config(Config) ->
29+
NodeConfig = [{tcp_port_amqp, 5672}],
30+
rabbit_ct_helpers:set_config(Config, [ {rmq_nodes, [NodeConfig]},
31+
{rmq_hostname, "localhost"},
32+
{tcp_hostname_amqp, "localhost"},
33+
{sasl, {plain, <<"app">>, <<"passw0rd">>}} ]).
34+
35+
start_ibmmq_server(Config) ->
36+
IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]),
37+
Cmd = [IBMmqCmd, "start"],
38+
ct:log("Running command ~p", [Cmd]),
39+
case rabbit_ct_helpers:exec(Cmd, []) of
40+
{ok, _} -> wait_for_ibmmq_nodes(Config);
41+
Error -> ct:pal("Error: ~tp", [Error]),
42+
{skip, "Failed to start IBM MQ"}
43+
end.
44+
45+
wait_for_ibmmq_nodes(Config) ->
46+
Hostname = ?config(rmq_hostname, Config),
47+
Ports = rabbit_ct_broker_helpers:get_node_configs(Config, tcp_port_amqp),
48+
wait_for_ibmmq_ports(Config, Hostname, Ports).
49+
50+
wait_for_ibmmq_ports(Config, Hostname, [Port | Rest]) ->
51+
ct:log("Waiting for IBM MQ on port ~b", [Port]),
52+
case wait_for_ibmmq_port(Hostname, Port, 60) of
53+
ok ->
54+
ct:log("IBM MQ ready on port ~b", [Port]),
55+
wait_for_ibmmq_ports(Config, Hostname, Rest);
56+
{error, _} ->
57+
Msg = lists:flatten(
58+
io_lib:format(
59+
"Failed to start IBM MQ on port ~b; see IBM MQ logs",
60+
[Port])),
61+
ct:pal(?LOW_IMPORTANCE, Msg, []),
62+
{skip, Msg}
63+
end;
64+
wait_for_ibmmq_ports(Config, _, []) ->
65+
Config.
66+
67+
wait_for_ibmmq_port(_, _, 0) ->
68+
{error, econnrefused};
69+
wait_for_ibmmq_port(Hostname, Port, Retries) ->
70+
case gen_tcp:connect(Hostname, Port, []) of
71+
{ok, Connection} ->
72+
gen_tcp:close(Connection),
73+
ok;
74+
{error, econnrefused} ->
75+
timer:sleep(1000),
76+
wait_for_ibmmq_port(Hostname, Port, Retries - 1);
77+
Error ->
78+
Error
79+
end.
80+
81+
stop_ibmmq_server(Config) ->
82+
IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]),
83+
Cmd = [IBMmqCmd, "stop"],
84+
ct:log("Running command ~p", [Cmd]),
85+
case rabbit_ct_helpers:exec(Cmd, []) of
86+
{ok, _} -> Config;
87+
Error -> ct:pal("Error: ~tp", [Error]),
88+
{skip, "Failed to stop IBM MQ"}
89+
end.

0 commit comments

Comments
 (0)