1212-include_lib (" amqp_client/include/amqp_client.hrl" ).
1313-compile ([nowarn_export_all , export_all ]).
1414
15+ -define (EXCHANGE_LIMIT , 10 ).
1516
1617all () ->
1718 [
@@ -22,7 +23,8 @@ groups() ->
2223 [
2324 {clustered , [],
2425 [
25- {size_2 , [], [queue_limit ]}
26+ {size_2 , [], [queue_limit ,
27+ exchange_limit ]}
2628 ]}
2729 ].
2830
@@ -34,7 +36,8 @@ init_per_suite(Config0) ->
3436 rabbit_ct_helpers :log_environment (),
3537 Config1 = rabbit_ct_helpers :merge_app_env (
3638 Config0 , {rabbit , [{quorum_tick_interval , 1000 },
37- {cluster_queue_limit , 3 }]}),
39+ {cluster_queue_limit , 3 },
40+ {cluster_exchange_limit , ? EXCHANGE_LIMIT }]}),
3841 rabbit_ct_helpers :run_setup_steps (Config1 , []).
3942
4043end_per_suite (Config ) ->
@@ -101,48 +104,99 @@ queue_limit(Config) ->
101104 Ch2 = rabbit_ct_client_helpers :open_channel (Config , Server1 ),
102105 Q1 = ? config (queue_name , Config ),
103106 ? assertEqual ({'queue.declare_ok' , Q1 , 0 , 0 },
104- declare (Ch , Q1 )),
107+ declare_queue (Ch , Q1 )),
105108
106109 Q2 = ? config (alt_queue_name , Config ),
107110 ? assertEqual ({'queue.declare_ok' , Q2 , 0 , 0 },
108- declare (Ch , Q2 )),
111+ declare_queue (Ch , Q2 )),
109112
110113 Q3 = ? config (alt_2_queue_name , Config ),
111114 ? assertEqual ({'queue.declare_ok' , Q3 , 0 , 0 },
112- declare (Ch , Q3 )),
115+ declare_queue (Ch , Q3 )),
113116 Q4 = ? config (over_limit_queue_name , Config ),
114117 ExpectedError = list_to_binary (io_lib :format (" PRECONDITION_FAILED - cannot declare queue '~s ': queue limit in cluster (3) is reached" , [Q4 ])),
115118 ? assertExit (
116119 {{shutdown , {server_initiated_close , 406 , ExpectedError }}, _ },
117- declare (Ch , Q4 )),
120+ declare_queue (Ch , Q4 )),
118121
119122 % % Trying the second server, in the cluster, but no queues on it,
120123 % % but should still fail as the limit is cluster wide.
121124 ? assertExit (
122125 {{shutdown , {server_initiated_close , 406 , ExpectedError }}, _ },
123- declare (Ch2 , Q4 )),
126+ declare_queue (Ch2 , Q4 )),
124127
125128 % Trying other types of queues
126129 ChQQ = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
127130 ChStream = rabbit_ct_client_helpers :open_channel (Config , Server1 ),
128131 ? assertExit (
129132 {{shutdown , {server_initiated_close , 406 , ExpectedError }}, _ },
130- declare (ChQQ , Q4 , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
133+ declare_queue (ChQQ , Q4 , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
131134 ? assertExit (
132135 {{shutdown , {server_initiated_close , 406 , ExpectedError }}, _ },
133- declare (ChStream , Q4 , [{<<" x-queue-type" >>, longstr , <<" stream" >>}])),
136+ declare_queue (ChStream , Q4 , [{<<" x-queue-type" >>, longstr , <<" stream" >>}])),
134137 rabbit_ct_broker_helpers :rpc (Config , 0 , ? MODULE , delete_queues , []),
135138 ok .
136139
137- declare (Ch , Q ) ->
138- declare (Ch , Q , []).
140+ exchange_limit (Config ) ->
141+ DefaultXs = rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_exchange , count , []),
142+ ? assert (? EXCHANGE_LIMIT > DefaultXs ),
139143
140- declare (Ch , Q , Args ) ->
144+ [Server0 , Server1 ] =
145+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
146+ Ch1 = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
147+ Ch2 = rabbit_ct_client_helpers :open_channel (Config , Server1 ),
148+
149+ % % Reach the limit.
150+ [begin
151+ XName = list_to_binary (rabbit_misc :format (" x-~b " , [N ])),
152+ # 'exchange.declare_ok' {} = declare_exchange (Ch1 , XName , <<" fanout" >>)
153+ end || N <- lists :seq (DefaultXs , ? EXCHANGE_LIMIT - 1 )],
154+
155+ % % Trying to declare the next exchange fails.
156+ OverLimitXName = <<" over-limit-x" >>,
157+ ? assertExit (
158+ {{shutdown , {server_initiated_close , 406 ,
159+ <<" PRECONDITION_FAILED" , _ /binary >>}}, _ },
160+ declare_exchange (Ch1 , OverLimitXName , <<" fanout" >>)),
161+
162+ % % Existing exchanges can be re-declared.
163+ ExistingX = list_to_binary (rabbit_misc :format (" x-~b " , [DefaultXs ])),
164+ # 'exchange.declare_ok' {} = declare_exchange (Ch2 , ExistingX , <<" fanout" >>),
165+
166+ % % The limit is cluster wide: the other node cannot declare the exchange
167+ % % either.
168+ ? assertExit (
169+ {{shutdown , {server_initiated_close , 406 ,
170+ <<" PRECONDITION_FAILED" , _ /binary >>}}, _ },
171+ declare_exchange (Ch2 , OverLimitXName , <<" fanout" >>)),
172+
173+ % % Clean up extra exchanges
174+ Ch3 = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
175+ [begin
176+ XName = list_to_binary (rabbit_misc :format (" x-~b " , [N ])),
177+ # 'exchange.delete_ok' {} = amqp_channel :call (
178+ Ch3 ,
179+ # 'exchange.delete' {exchange = XName })
180+ end || N <- lists :seq (DefaultXs , ? EXCHANGE_LIMIT - 1 )],
181+
182+ ok .
183+
184+ % % -------------------------------------------------------------------
185+
186+ declare_queue (Ch , Q ) ->
187+ declare_queue (Ch , Q , []).
188+
189+ declare_queue (Ch , Q , Args ) ->
141190 amqp_channel :call (Ch , # 'queue.declare' {queue = Q ,
142191 durable = true ,
143192 auto_delete = false ,
144193 arguments = Args }).
145194
195+ declare_exchange (Ch , Name , Type ) ->
196+ amqp_channel :call (Ch , # 'exchange.declare' {exchange = Name ,
197+ type = Type ,
198+ durable = true }).
199+
146200delete_queues () ->
147201 [rabbit_amqqueue :delete (Q , false , false , <<" dummy" >>)
148202 || Q <- rabbit_amqqueue :list ()].
0 commit comments