From a7ea6d293e1ab492ae3ab490e64e6d8478f703c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 16 Jul 2025 10:20:54 +0200 Subject: [PATCH 1/2] Increase timeouts and improve error logging in stream test (cherry picked from commit 0d84c8e9a5d97dca47e9fabec8c0f7240e4834f2) --- .../src/rabbit_stream_reader.erl | 4 +- .../java/com/rabbitmq/stream/FailureTest.java | 59 +++++++++++++++++-- .../src/test/resources/logback-test.xml | 1 + 3 files changed, 57 insertions(+), 7 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 492b74a7cc95..901844383e80 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2344,8 +2344,8 @@ handle_frame_post_auth(Transport, case {is_binary(Host), is_integer(Port)} of {true, true} -> Acc#{Node => {Host, Port}}; _ -> - rabbit_log:warning("Error when retrieving broker metadata: ~tp ~tp", - [Host, Port]), + rabbit_log:warning("Error when retrieving broker '~tp' metadata: ~tp ~tp", + [Node, Host, Port]), Acc end end, diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java index e04fd2042d40..016da1f59789 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -34,8 +34,11 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.ToLongFunction; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +48,7 @@ public class FailureTest { private static final Logger LOGGER = LoggerFactory.getLogger(FailureTest.class); + static String testMethod; TestUtils.ClientFactory cf; String stream; ExecutorService executorService; @@ -57,6 +61,11 @@ static void wait(Duration duration) { } } + @BeforeEach + void init(TestInfo info) { + testMethod = info.getTestMethod().get().getName(); + } + @AfterEach void tearDown() { if (executorService != null) { @@ -142,9 +151,9 @@ void leaderFailureWhenPublisherConnectedToReplica() throws Exception { waitAtMost( Duration.ofSeconds(10), () -> { - LOGGER.info("Getting metadata for {}", stream); + log("Getting metadata for {}", stream); Client.StreamMetadata m = publisher.metadata(stream).get(stream); - LOGGER.info("Metadata for {} (expecting 2 replicas): {}", stream, m); + log("Metadata for {} (expecting 2 replicas): {}", stream, m); return m.getReplicas().size() == 2; }); @@ -195,6 +204,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { Map published = new ConcurrentHashMap<>(); Set confirmed = ConcurrentHashMap.newKeySet(); + // match confirmed messages to published messages Client.PublishConfirmListener publishConfirmListener = (publisherId, publishingId) -> { Message confirmedMessage; @@ -212,18 +222,22 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { AtomicReference publisher = new AtomicReference<>(); CountDownLatch reconnectionLatch = new CountDownLatch(1); AtomicReference shutdownListenerReference = new AtomicReference<>(); + // shutdown listener reconnects to node 2 to locate the node the stream leader is on + // it then re-creates a publisher connected to this node Client.ShutdownListener shutdownListener = shutdownContext -> { if (shutdownContext.getShutdownReason() == Client.ShutdownContext.ShutdownReason.UNKNOWN) { + log("Connection got closed, reconnecting"); // avoid long-running task in the IO thread executorService.submit( () -> { connected.set(false); AtomicReference locator = new AtomicReference<>(); try { + log("Reconnecting to node 2"); waitAtMost( - Duration.ofSeconds(5), + Duration.ofSeconds(20), () -> { try { locator.set( @@ -233,14 +247,35 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { return false; } }); + log("Reconnected to node 2, looking up new stream leader"); waitAtMost( - Duration.ofSeconds(5), + Duration.ofSeconds(20), () -> { Client.StreamMetadata m = locator.get().metadata(stream).get(stream); return m.getLeader() != null && m.getLeader().getPort() != streamPortNode1(); }); + log("New stream leader is on another node than node 1"); } catch (Throwable e) { + log("Error while trying to connect to new stream leader"); + if (locator.get() == null) { + log("Could not reconnect"); + } else { + try { + Client.StreamMetadata m = locator.get().metadata(stream).get(stream); + if (m.getLeader() == null) { + log("The stream has no leader"); + } else { + log( + "The stream is on node with port {} (node 1 = {}, node 2 = {})", + m.getLeader().getPort(), + streamPortNode1(), + streamPortNode2()); + } + } catch (Exception ex) { + log("Error while checking failure: {}", ex.getMessage()); + } + } reconnectionLatch.countDown(); return; } @@ -278,6 +313,9 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { AtomicBoolean keepPublishing = new AtomicBoolean(true); + AtomicLong publishSequence = new AtomicLong(0); + ToLongFunction publishSequenceFunction = value -> publishSequence.getAndIncrement(); + executorService.submit( () -> { while (keepPublishing.get()) { @@ -295,7 +333,11 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { .build(); try { long publishingId = - publisher.get().publish((byte) 1, Collections.singletonList(message)).get(0); + publisher + .get() + .publish( + (byte) 1, Collections.singletonList(message), publishSequenceFunction) + .get(0); published.put(publishingId, message); } catch (Exception e) { // keep going @@ -314,6 +356,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { int confirmedCount = confirmed.size(); try { + // stop the first node (this is where the stream leader is) Host.rabbitmqctl("stop_app"); assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue(); @@ -324,6 +367,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { } finally { Host.rabbitmqctl("start_app"); } + // making sure we published a few messages and got the confirmations assertThat(confirmed).hasSizeGreaterThan(confirmedCount); confirmedCount = confirmed.size(); @@ -339,6 +383,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { // let's publish for a bit of time Thread.sleep(2000); + // making sure we published messages and got the confirmations assertThat(confirmed).hasSizeGreaterThan(confirmedCount); keepPublishing.set(false); @@ -640,4 +685,8 @@ void shouldReceiveMetadataUpdateWhenReplicaIsKilledWithPublisherAndConsumerOnSam Host.killStreamLeaderProcess(stream); waitUntil(() -> metadataNotifications.get() == 2); } + + private static void log(String format, Object... args) { + LOGGER.info("[" + testMethod + "] " + format, args); + } } diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/resources/logback-test.xml b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/resources/logback-test.xml index 45d598991dca..4e84bbb65945 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/resources/logback-test.xml +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/resources/logback-test.xml @@ -6,6 +6,7 @@ + From 056e68f77ed8ccb6fb4a708a3d36594c69cce206 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 16 Jul 2025 11:33:12 +0200 Subject: [PATCH 2/2] Propagate connection state in offset lag calculation test This should fix some flakes. (cherry picked from commit c1fd7c337657887b379209f6cbfee14ef0f880cd) --- .../src/stream_test_utils.erl | 15 +- .../test/rabbit_stream_SUITE.erl | 141 +++++++++--------- 2 files changed, 82 insertions(+), 74 deletions(-) diff --git a/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl b/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl index b6e1dbc4a24d..faab0c7ed482 100644 --- a/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl +++ b/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl @@ -21,10 +21,17 @@ connect(Config, Node) -> connect(StreamPort). connect(StreamPort) -> + do_connect(StreamPort, #{}). + +connect_pp(StreamPort, PeerProperties) -> + do_connect(StreamPort, PeerProperties). + +do_connect(StreamPort, PeerProperties) -> {ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]), C0 = rabbit_stream_core:init(0), - PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}), + PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, + PeerProperties}}), ok = gen_tcp:send(Sock, PeerPropertiesFrame), {{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(Sock, C0), @@ -78,8 +85,12 @@ delete_publisher(Sock, C0, PublisherId) -> subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) -> subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, #{}). + subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props) -> - Cmd = {subscribe, SubscriptionId, Stream, _OffsetSpec = first, + subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props, first). + +subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props, OffsetSpec) -> + Cmd = {subscribe, SubscriptionId, Stream, OffsetSpec, InitialCredit, Props}, SubscribeFrame = rabbit_stream_core:frame({request, 1, Cmd}), ok = gen_tcp:send(Sock, SubscribeFrame), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 5fdc48b61ab1..e7a40363ad14 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -819,89 +819,86 @@ store_offset_requires_read_access(Config) -> offset_lag_calculation(Config) -> FunctionName = atom_to_binary(?FUNCTION_NAME, utf8), - T = gen_tcp, - Port = get_port(T, Config), - Opts = get_opts(T), - {ok, S} = T:connect("localhost", Port, Opts), - C = rabbit_stream_core:init(0), + Port = get_port(gen_tcp, Config), ConnectionName = FunctionName, - test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C), - test_authenticate(T, S, C), + {ok, S, C0} = stream_test_utils:connect_pp(Port, + #{<<"connection_name">> => ConnectionName}), - Stream = FunctionName, - test_create_stream(T, S, Stream, C), + St = FunctionName, + {ok, C1} = stream_test_utils:create_stream(S, C0, St), SubId = 1, TheFuture = os:system_time(millisecond) + 60 * 60 * 1_000, - lists:foreach(fun(OffsetSpec) -> - test_subscribe(T, S, SubId, Stream, - OffsetSpec, 10, #{}, - ?RESPONSE_CODE_OK, C), - ConsumerInfo = consumer_offset_info(Config, ConnectionName), - ?assertEqual({0, 0}, ConsumerInfo), - test_unsubscribe(T, S, SubId, C) - end, [first, last, next, 0, 1_000, {timestamp, TheFuture}]), - - - PublisherId = 1, - test_declare_publisher(T, S, PublisherId, Stream, C), + C2 = lists:foldl( + fun(OffsetSpec, C00) -> + {ok, C01} = stream_test_utils:subscribe(S, C00, St, SubId, + 10, #{}, OffsetSpec), + ConsumerInfo = consumer_offset_info(Config, ConnectionName), + ?assertEqual({0, 0}, ConsumerInfo), + {ok, C02} = stream_test_utils:unsubscribe(S, C01, SubId), + C02 + end, C1, [first, last, next, 0, 1_000, {timestamp, TheFuture}]), + + PubId = 1, + {ok, C3} = stream_test_utils:declare_publisher(S, C2, St, PubId), MessageCount = 10, Body = <<"hello">>, - lists:foreach(fun(_) -> - test_publish_confirm(T, S, PublisherId, Body, C) - end, lists:seq(1, MessageCount - 1)), + {ok, C4} = stream_test_utils:publish(S, C3, PubId, 1, + lists:duplicate(MessageCount - 1, Body)), %% to make sure to have 2 chunks timer:sleep(200), - test_publish_confirm(T, S, PublisherId, Body, C), - test_delete_publisher(T, S, PublisherId, C), + {ok, C5} = stream_test_utils:publish(S, C4, PubId, 1, [Body]), + {ok, C6} = stream_test_utils:delete_publisher(S, C5, PubId), NextOffset = MessageCount, - lists:foreach(fun({OffsetSpec, ReceiveDeliver, CheckFun}) -> - test_subscribe(T, S, SubId, Stream, - OffsetSpec, 1, #{}, - ?RESPONSE_CODE_OK, C), - case ReceiveDeliver of - true -> - {{deliver, SubId, _}, _} = receive_commands(T, S, C); - _ -> - ok - end, - {Offset, Lag} = consumer_offset_info(Config, ConnectionName), - CheckFun(Offset, Lag), - test_unsubscribe(T, S, SubId, C) - end, [{first, true, - fun(Offset, Lag) -> - ?assert(Offset >= 0, "first, at least one chunk consumed"), - ?assert(Lag > 0, "first, not all messages consumed") - end}, - {last, true, - fun(Offset, _Lag) -> - ?assert(Offset > 0, "offset expected for last") - end}, - {next, false, - fun(Offset, Lag) -> - ?assertEqual(NextOffset, Offset, "next, offset should be at the end of the stream"), - ?assert(Lag =:= 0, "next, offset lag should be 0") - end}, - {0, true, - fun(Offset, Lag) -> - ?assert(Offset >= 0, "offset spec = 0, at least one chunk consumed"), - ?assert(Lag > 0, "offset spec = 0, not all messages consumed") - end}, - {1_000, false, - fun(Offset, Lag) -> - ?assertEqual(NextOffset, Offset, "offset spec = 1000, offset should be at the end of the stream"), - ?assert(Lag =:= 0, "offset spec = 1000, offset lag should be 0") - end}, - {{timestamp, TheFuture}, false, - fun(Offset, Lag) -> - ?assertEqual(NextOffset, Offset, "offset spec in future, offset should be at the end of the stream"), - ?assert(Lag =:= 0, "offset spec in future , offset lag should be 0") - end}]), - - test_delete_stream(T, S, Stream, C, false), - test_close(T, S, C), - + C7 = lists:foldl( + fun({OffsetSpec, ReceiveDeliver, CheckFun}, C00) -> + {ok, C01} = stream_test_utils:subscribe(S, C00, St, SubId, + 1, #{}, OffsetSpec), + + C03 = case ReceiveDeliver of + true -> + {{deliver, SubId, _}, C02} = receive_commands(S, C01), + C02; + _ -> + C01 + end, + {Offset, Lag} = consumer_offset_info(Config, ConnectionName), + CheckFun(Offset, Lag), + {ok, C04} = stream_test_utils:unsubscribe(S, C03, SubId), + C04 + end, C6, [{first, true, + fun(Offset, Lag) -> + ?assert(Offset >= 0, "first, at least one chunk consumed"), + ?assert(Lag > 0, "first, not all messages consumed") + end}, + {last, true, + fun(Offset, _Lag) -> + ?assert(Offset > 0, "offset expected for last") + end}, + {next, false, + fun(Offset, Lag) -> + ?assertEqual(NextOffset, Offset, "next, offset should be at the end of the stream"), + ?assert(Lag =:= 0, "next, offset lag should be 0") + end}, + {0, true, + fun(Offset, Lag) -> + ?assert(Offset >= 0, "offset spec = 0, at least one chunk consumed"), + ?assert(Lag > 0, "offset spec = 0, not all messages consumed") + end}, + {1_000, false, + fun(Offset, Lag) -> + ?assertEqual(NextOffset, Offset, "offset spec = 1000, offset should be at the end of the stream"), + ?assert(Lag =:= 0, "offset spec = 1000, offset lag should be 0") + end}, + {{timestamp, TheFuture}, false, + fun(Offset, Lag) -> + ?assertEqual(NextOffset, Offset, "offset spec in future, offset should be at the end of the stream"), + ?assert(Lag =:= 0, "offset spec in future , offset lag should be 0") + end}]), + + {ok, C8} = stream_test_utils:delete_stream(S, C7, St), + {ok, _} = stream_test_utils:close(S, C8), ok. authentication_error_should_close_with_delay(Config) ->