Skip to content

Commit 70839ed

Browse files
Configure capabilities on the source/target field in the ATTACH frame
1 parent 8f1219a commit 70839ed

File tree

9 files changed

+514
-40
lines changed

9 files changed

+514
-40
lines changed

deps/amqp10_client/.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,5 @@ amqp10_client.d
2626

2727
# Downloaded ActiveMQ.
2828
/test/system_SUITE_data/apache-activemq-*
29+
/test/system_SUITE_data/ibmmq/mq-container
30+
/test/system_SUITE_data/ibmmq/*.tar.gz

deps/amqp10_client/BUILD.bazel

+4-2
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,14 @@ rabbitmq_integration_suite(
116116
size = "medium",
117117
additional_beam = [
118118
"test/activemq_ct_helpers.beam",
119+
"test/ibmmq_ct_helpers.beam",
119120
"test/mock_server.beam",
120121
],
121122
data = [
122-
"@activemq//:exec_dir",
123+
"@activemq//:exec_dir",
123124
],
124125
test_env = {
125-
"ACTIVEMQ": "$TEST_SRCDIR/$TEST_WORKSPACE/external/activemq/bin/activemq",
126+
"ACTIVEMQ": "$TEST_SRCDIR/$TEST_WORKSPACE/external/activemq/bin/activemq"
126127
},
127128
deps = TEST_DEPS,
128129
)
@@ -139,6 +140,7 @@ eunit(
139140
name = "eunit",
140141
compiled_suites = [
141142
":test_activemq_ct_helpers_beam",
143+
":test_ibmmq_ct_helpers_beam",
142144
":test_mock_server_beam",
143145
],
144146
target = ":test_erlang_app",

deps/amqp10_client/app.bzl

+8
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
126126
app_name = "amqp10_client",
127127
erlc_opts = "//:test_erlc_opts",
128128
)
129+
erlang_bytecode(
130+
name = "test_ibmmq_ct_helpers_beam",
131+
testonly = True,
132+
srcs = ["test/ibmmq_ct_helpers.erl"],
133+
outs = ["test/ibmmq_ct_helpers.beam"],
134+
app_name = "amqp10_client",
135+
erlc_opts = "//:test_erlc_opts",
136+
)
129137
erlang_bytecode(
130138
name = "test_mock_server_beam",
131139
testonly = True,

deps/amqp10_client/src/amqp10_client.erl

+4-1
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ end_session(Pid) ->
164164
%% @doc Synchronously attach a link on 'Session'.
165165
%% This is a convenience function that awaits attached event
166166
%% for the link before returning.
167+
-spec attach_sender_link_sync(pid(), binary(), binary()) ->
168+
{ok, link_ref()} | link_timeout.
167169
attach_sender_link_sync(Session, Name, Target) ->
168170
attach_sender_link_sync(Session, Name, Target, mixed).
169171

@@ -273,7 +275,8 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
273275
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
274276
terminus_durability(), filter(), properties()) ->
275277
{ok, link_ref()}.
276-
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
278+
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter,
279+
Properties)
277280
when is_pid(Session) andalso
278281
is_binary(Name) andalso
279282
is_binary(Source) andalso

deps/amqp10_client/src/amqp10_client_session.erl

+23-4
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,14 @@
7373
-type rcv_settle_mode() :: first | second.
7474

7575
-type terminus_durability() :: none | configuration | unsettled_state.
76+
-type terminus_capabilities() :: binary() | [binary(),...].
7677

7778
-type target_def() :: #{address => link_address(),
78-
durable => terminus_durability()}.
79+
durable => terminus_durability(),
80+
capabilities => terminus_capabilities()}.
7981
-type source_def() :: #{address => link_address(),
80-
durable => terminus_durability()}.
82+
durable => terminus_durability(),
83+
capabilities => terminus_capabilities()}.
8184

8285
-type attach_role() :: {sender, target_def()} | {receiver, source_def(), pid()}.
8386

@@ -713,20 +716,24 @@ make_source(#{role := {sender, _}}) ->
713716
make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) ->
714717
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
715718
TranslatedFilter = translate_filters(Filter),
719+
Capabilities = translate_terminus_capabilities(maps:get(capabilities, Source, [])),
716720
#'v1_0.source'{address = {utf8, Address},
717721
durable = {uint, Durable},
718-
filter = TranslatedFilter}.
722+
filter = TranslatedFilter,
723+
capabilities = Capabilities}.
719724

720725
make_target(#{role := {receiver, _Source, _Pid}}) ->
721726
#'v1_0.target'{};
722727
make_target(#{role := {sender, #{address := Address} = Target}}) ->
723728
Durable = translate_terminus_durability(maps:get(durable, Target, none)),
729+
Capabilities = translate_terminus_capabilities(maps:get(capabilities, Target, [])),
724730
TargetAddr = case is_binary(Address) of
725731
true -> {utf8, Address};
726732
false -> Address
727733
end,
728734
#'v1_0.target'{address = TargetAddr,
729-
durable = {uint, Durable}}.
735+
durable = {uint, Durable},
736+
capabilities = Capabilities}.
730737

731738
max_message_size(#{max_message_size := Size})
732739
when is_integer(Size) andalso
@@ -771,6 +778,17 @@ filter_value_type({T, _} = V) when is_atom(T) ->
771778
%% looks like an already tagged type, just pass it through
772779
V.
773780

781+
translate_terminus_capabilities(Capabilities) when is_binary(Capabilities) ->
782+
{symbol, Capabilities};
783+
translate_terminus_capabilities(CapabilitiesList) when is_list(CapabilitiesList) ->
784+
{array, symbol, [filter_capability(V) || V <- CapabilitiesList]}.
785+
786+
filter_capability(V) when is_binary(V) ->
787+
{symbol, V};
788+
filter_capability(_) ->
789+
%% Any other type is ignored
790+
{}.
791+
774792
% https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html
775793
translate_legacy_amqp_headers_binding(LegacyHeaders) ->
776794
{map,
@@ -846,6 +864,7 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
846864
rcv_settle_mode = rcv_settle_mode(Args),
847865
target = Target,
848866
max_message_size = MaxMessageSize},
867+
849868
ok = Send(Attach, State),
850869

851870
Ref = make_link_ref(Role, self(), OutHandle),

deps/amqp10_client/test/activemq_ct_helpers.erl

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ start_activemq_nodes(Config) ->
6363
ActivemqCmd = ?config(activemq_cmd, Config1),
6464
TCPPort = rabbit_ct_broker_helpers:get_node_config(Config1, 0, tcp_port_amqp),
6565
ConfigFile = ?config(activemq_config_filename, Config1),
66+
ct:log("Running ~p", [ActivemqCmd]),
6667
Cmd = [ActivemqCmd,
6768
"start",
6869
{"-Dtestsuite.tcp_port_amqp=~b", [TCPPort]},
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(ibmmq_ct_helpers).
9+
10+
-include_lib("common_test/include/ct.hrl").
11+
12+
-export([setup_steps/0,
13+
teardown_steps/0,
14+
init_config/1,
15+
start_ibmmq_server/1,
16+
stop_ibmmq_server/1]).
17+
18+
setup_steps() ->
19+
[fun init_config/1,
20+
fun start_ibmmq_server/1
21+
].
22+
23+
teardown_steps() ->
24+
[
25+
fun stop_ibmmq_server/1
26+
].
27+
28+
init_config(Config) ->
29+
NodeConfig = [{tcp_port_amqp, 5672}],
30+
rabbit_ct_helpers:set_config(Config, [ {rmq_nodes, [NodeConfig]},
31+
{rmq_hostname, "localhost"},
32+
{tcp_hostname_amqp, "localhost"},
33+
{sasl, {plain, <<"app">>, <<"passw0rd">>}} ]).
34+
35+
start_ibmmq_server(Config) ->
36+
IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]),
37+
Cmd = [IBMmqCmd, "start"],
38+
ct:log("Running command ~p", [Cmd]),
39+
case rabbit_ct_helpers:exec(Cmd, []) of
40+
{ok, _} -> wait_for_ibmmq_nodes(Config);
41+
Error -> ct:pal("Error: ~tp", [Error]),
42+
{skip, "Failed to start IBM MQ"}
43+
end.
44+
45+
wait_for_ibmmq_nodes(Config) ->
46+
Hostname = ?config(rmq_hostname, Config),
47+
Ports = rabbit_ct_broker_helpers:get_node_configs(Config, tcp_port_amqp),
48+
wait_for_ibmmq_ports(Config, Hostname, Ports).
49+
50+
wait_for_ibmmq_ports(Config, Hostname, [Port | Rest]) ->
51+
ct:log("Waiting for IBM MQ on port ~b", [Port]),
52+
case wait_for_ibmmq_port(Hostname, Port, 60) of
53+
ok ->
54+
ct:log("IBM MQ ready on port ~b", [Port]),
55+
wait_for_ibmmq_ports(Config, Hostname, Rest);
56+
{error, _} ->
57+
Msg = lists:flatten(
58+
io_lib:format(
59+
"Failed to start IBM MQ on port ~b; see IBM MQ logs",
60+
[Port])),
61+
ct:pal(?LOW_IMPORTANCE, Msg, []),
62+
{skip, Msg}
63+
end;
64+
wait_for_ibmmq_ports(Config, _, []) ->
65+
Config.
66+
67+
wait_for_ibmmq_port(_, _, 0) ->
68+
{error, econnrefused};
69+
wait_for_ibmmq_port(Hostname, Port, Retries) ->
70+
case gen_tcp:connect(Hostname, Port, []) of
71+
{ok, Connection} ->
72+
gen_tcp:close(Connection),
73+
ok;
74+
{error, econnrefused} ->
75+
timer:sleep(1000),
76+
wait_for_ibmmq_port(Hostname, Port, Retries - 1);
77+
Error ->
78+
Error
79+
end.
80+
81+
stop_ibmmq_server(Config) ->
82+
IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]),
83+
Cmd = [IBMmqCmd, "stop"],
84+
ct:log("Running command ~p", [Cmd]),
85+
case rabbit_ct_helpers:exec(Cmd, []) of
86+
{ok, _} -> Config;
87+
Error -> ct:pal("Error: ~tp", [Error]),
88+
{skip, "Failed to stop IBM MQ"}
89+
end.

0 commit comments

Comments
 (0)