Skip to content

Increase timeouts and improve error logging in stream test (backport #14240) #14242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions deps/rabbitmq_ct_helpers/src/stream_test_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,17 @@ connect(Config, Node) ->
connect(StreamPort).

connect(StreamPort) ->
do_connect(StreamPort, #{}).

connect_pp(StreamPort, PeerProperties) ->
do_connect(StreamPort, PeerProperties).

do_connect(StreamPort, PeerProperties) ->
{ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),

C0 = rabbit_stream_core:init(0),
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties,
PeerProperties}}),
ok = gen_tcp:send(Sock, PeerPropertiesFrame),
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(Sock, C0),

Expand Down Expand Up @@ -78,8 +85,12 @@ delete_publisher(Sock, C0, PublisherId) ->
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, #{}).


subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props) ->
Cmd = {subscribe, SubscriptionId, Stream, _OffsetSpec = first,
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props, first).

subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props, OffsetSpec) ->
Cmd = {subscribe, SubscriptionId, Stream, OffsetSpec,
InitialCredit, Props},
SubscribeFrame = rabbit_stream_core:frame({request, 1, Cmd}),
ok = gen_tcp:send(Sock, SubscribeFrame),
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2344,8 +2344,8 @@ handle_frame_post_auth(Transport,
case {is_binary(Host), is_integer(Port)} of
{true, true} -> Acc#{Node => {Host, Port}};
_ ->
rabbit_log:warning("Error when retrieving broker metadata: ~tp ~tp",
[Host, Port]),
rabbit_log:warning("Error when retrieving broker '~tp' metadata: ~tp ~tp",
[Node, Host, Port]),
Acc
end
end,
Expand Down
141 changes: 69 additions & 72 deletions deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -819,89 +819,86 @@ store_offset_requires_read_access(Config) ->

offset_lag_calculation(Config) ->
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C = rabbit_stream_core:init(0),
Port = get_port(gen_tcp, Config),
ConnectionName = FunctionName,
test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C),
test_authenticate(T, S, C),
{ok, S, C0} = stream_test_utils:connect_pp(Port,
#{<<"connection_name">> => ConnectionName}),

Stream = FunctionName,
test_create_stream(T, S, Stream, C),
St = FunctionName,
{ok, C1} = stream_test_utils:create_stream(S, C0, St),

SubId = 1,
TheFuture = os:system_time(millisecond) + 60 * 60 * 1_000,
lists:foreach(fun(OffsetSpec) ->
test_subscribe(T, S, SubId, Stream,
OffsetSpec, 10, #{},
?RESPONSE_CODE_OK, C),
ConsumerInfo = consumer_offset_info(Config, ConnectionName),
?assertEqual({0, 0}, ConsumerInfo),
test_unsubscribe(T, S, SubId, C)
end, [first, last, next, 0, 1_000, {timestamp, TheFuture}]),


PublisherId = 1,
test_declare_publisher(T, S, PublisherId, Stream, C),
C2 = lists:foldl(
fun(OffsetSpec, C00) ->
{ok, C01} = stream_test_utils:subscribe(S, C00, St, SubId,
10, #{}, OffsetSpec),
ConsumerInfo = consumer_offset_info(Config, ConnectionName),
?assertEqual({0, 0}, ConsumerInfo),
{ok, C02} = stream_test_utils:unsubscribe(S, C01, SubId),
C02
end, C1, [first, last, next, 0, 1_000, {timestamp, TheFuture}]),

PubId = 1,
{ok, C3} = stream_test_utils:declare_publisher(S, C2, St, PubId),
MessageCount = 10,
Body = <<"hello">>,
lists:foreach(fun(_) ->
test_publish_confirm(T, S, PublisherId, Body, C)
end, lists:seq(1, MessageCount - 1)),
{ok, C4} = stream_test_utils:publish(S, C3, PubId, 1,
lists:duplicate(MessageCount - 1, Body)),
%% to make sure to have 2 chunks
timer:sleep(200),
test_publish_confirm(T, S, PublisherId, Body, C),
test_delete_publisher(T, S, PublisherId, C),
{ok, C5} = stream_test_utils:publish(S, C4, PubId, 1, [Body]),
{ok, C6} = stream_test_utils:delete_publisher(S, C5, PubId),

NextOffset = MessageCount,
lists:foreach(fun({OffsetSpec, ReceiveDeliver, CheckFun}) ->
test_subscribe(T, S, SubId, Stream,
OffsetSpec, 1, #{},
?RESPONSE_CODE_OK, C),
case ReceiveDeliver of
true ->
{{deliver, SubId, _}, _} = receive_commands(T, S, C);
_ ->
ok
end,
{Offset, Lag} = consumer_offset_info(Config, ConnectionName),
CheckFun(Offset, Lag),
test_unsubscribe(T, S, SubId, C)
end, [{first, true,
fun(Offset, Lag) ->
?assert(Offset >= 0, "first, at least one chunk consumed"),
?assert(Lag > 0, "first, not all messages consumed")
end},
{last, true,
fun(Offset, _Lag) ->
?assert(Offset > 0, "offset expected for last")
end},
{next, false,
fun(Offset, Lag) ->
?assertEqual(NextOffset, Offset, "next, offset should be at the end of the stream"),
?assert(Lag =:= 0, "next, offset lag should be 0")
end},
{0, true,
fun(Offset, Lag) ->
?assert(Offset >= 0, "offset spec = 0, at least one chunk consumed"),
?assert(Lag > 0, "offset spec = 0, not all messages consumed")
end},
{1_000, false,
fun(Offset, Lag) ->
?assertEqual(NextOffset, Offset, "offset spec = 1000, offset should be at the end of the stream"),
?assert(Lag =:= 0, "offset spec = 1000, offset lag should be 0")
end},
{{timestamp, TheFuture}, false,
fun(Offset, Lag) ->
?assertEqual(NextOffset, Offset, "offset spec in future, offset should be at the end of the stream"),
?assert(Lag =:= 0, "offset spec in future , offset lag should be 0")
end}]),

test_delete_stream(T, S, Stream, C, false),
test_close(T, S, C),

C7 = lists:foldl(
fun({OffsetSpec, ReceiveDeliver, CheckFun}, C00) ->
{ok, C01} = stream_test_utils:subscribe(S, C00, St, SubId,
1, #{}, OffsetSpec),

C03 = case ReceiveDeliver of
true ->
{{deliver, SubId, _}, C02} = receive_commands(S, C01),
C02;
_ ->
C01
end,
{Offset, Lag} = consumer_offset_info(Config, ConnectionName),
CheckFun(Offset, Lag),
{ok, C04} = stream_test_utils:unsubscribe(S, C03, SubId),
C04
end, C6, [{first, true,
fun(Offset, Lag) ->
?assert(Offset >= 0, "first, at least one chunk consumed"),
?assert(Lag > 0, "first, not all messages consumed")
end},
{last, true,
fun(Offset, _Lag) ->
?assert(Offset > 0, "offset expected for last")
end},
{next, false,
fun(Offset, Lag) ->
?assertEqual(NextOffset, Offset, "next, offset should be at the end of the stream"),
?assert(Lag =:= 0, "next, offset lag should be 0")
end},
{0, true,
fun(Offset, Lag) ->
?assert(Offset >= 0, "offset spec = 0, at least one chunk consumed"),
?assert(Lag > 0, "offset spec = 0, not all messages consumed")
end},
{1_000, false,
fun(Offset, Lag) ->
?assertEqual(NextOffset, Offset, "offset spec = 1000, offset should be at the end of the stream"),
?assert(Lag =:= 0, "offset spec = 1000, offset lag should be 0")
end},
{{timestamp, TheFuture}, false,
fun(Offset, Lag) ->
?assertEqual(NextOffset, Offset, "offset spec in future, offset should be at the end of the stream"),
?assert(Lag =:= 0, "offset spec in future , offset lag should be 0")
end}]),

{ok, C8} = stream_test_utils:delete_stream(S, C7, St),
{ok, _} = stream_test_utils:close(S, C8),
ok.

authentication_error_should_close_with_delay(Config) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ToLongFunction;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,6 +48,7 @@ public class FailureTest {

private static final Logger LOGGER = LoggerFactory.getLogger(FailureTest.class);

static String testMethod;
TestUtils.ClientFactory cf;
String stream;
ExecutorService executorService;
Expand All @@ -57,6 +61,11 @@ static void wait(Duration duration) {
}
}

@BeforeEach
void init(TestInfo info) {
testMethod = info.getTestMethod().get().getName();
}

@AfterEach
void tearDown() {
if (executorService != null) {
Expand Down Expand Up @@ -142,9 +151,9 @@ void leaderFailureWhenPublisherConnectedToReplica() throws Exception {
waitAtMost(
Duration.ofSeconds(10),
() -> {
LOGGER.info("Getting metadata for {}", stream);
log("Getting metadata for {}", stream);
Client.StreamMetadata m = publisher.metadata(stream).get(stream);
LOGGER.info("Metadata for {} (expecting 2 replicas): {}", stream, m);
log("Metadata for {} (expecting 2 replicas): {}", stream, m);
return m.getReplicas().size() == 2;
});

Expand Down Expand Up @@ -195,6 +204,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
Map<Long, Message> published = new ConcurrentHashMap<>();
Set<Message> confirmed = ConcurrentHashMap.newKeySet();

// match confirmed messages to published messages
Client.PublishConfirmListener publishConfirmListener =
(publisherId, publishingId) -> {
Message confirmedMessage;
Expand All @@ -212,18 +222,22 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
AtomicReference<Client> publisher = new AtomicReference<>();
CountDownLatch reconnectionLatch = new CountDownLatch(1);
AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>();
// shutdown listener reconnects to node 2 to locate the node the stream leader is on
// it then re-creates a publisher connected to this node
Client.ShutdownListener shutdownListener =
shutdownContext -> {
if (shutdownContext.getShutdownReason()
== Client.ShutdownContext.ShutdownReason.UNKNOWN) {
log("Connection got closed, reconnecting");
// avoid long-running task in the IO thread
executorService.submit(
() -> {
connected.set(false);
AtomicReference<Client> locator = new AtomicReference<>();
try {
log("Reconnecting to node 2");
waitAtMost(
Duration.ofSeconds(5),
Duration.ofSeconds(20),
() -> {
try {
locator.set(
Expand All @@ -233,14 +247,35 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
return false;
}
});
log("Reconnected to node 2, looking up new stream leader");
waitAtMost(
Duration.ofSeconds(5),
Duration.ofSeconds(20),
() -> {
Client.StreamMetadata m = locator.get().metadata(stream).get(stream);
return m.getLeader() != null
&& m.getLeader().getPort() != streamPortNode1();
});
log("New stream leader is on another node than node 1");
} catch (Throwable e) {
log("Error while trying to connect to new stream leader");
if (locator.get() == null) {
log("Could not reconnect");
} else {
try {
Client.StreamMetadata m = locator.get().metadata(stream).get(stream);
if (m.getLeader() == null) {
log("The stream has no leader");
} else {
log(
"The stream is on node with port {} (node 1 = {}, node 2 = {})",
m.getLeader().getPort(),
streamPortNode1(),
streamPortNode2());
}
} catch (Exception ex) {
log("Error while checking failure: {}", ex.getMessage());
}
}
reconnectionLatch.countDown();
return;
}
Expand Down Expand Up @@ -278,6 +313,9 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {

AtomicBoolean keepPublishing = new AtomicBoolean(true);

AtomicLong publishSequence = new AtomicLong(0);
ToLongFunction<Object> publishSequenceFunction = value -> publishSequence.getAndIncrement();

executorService.submit(
() -> {
while (keepPublishing.get()) {
Expand All @@ -295,7 +333,11 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
.build();
try {
long publishingId =
publisher.get().publish((byte) 1, Collections.singletonList(message)).get(0);
publisher
.get()
.publish(
(byte) 1, Collections.singletonList(message), publishSequenceFunction)
.get(0);
published.put(publishingId, message);
} catch (Exception e) {
// keep going
Expand All @@ -314,6 +356,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
int confirmedCount = confirmed.size();

try {
// stop the first node (this is where the stream leader is)
Host.rabbitmqctl("stop_app");

assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
Expand All @@ -324,6 +367,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
} finally {
Host.rabbitmqctl("start_app");
}
// making sure we published a few messages and got the confirmations
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
confirmedCount = confirmed.size();

Expand All @@ -339,6 +383,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
// let's publish for a bit of time
Thread.sleep(2000);

// making sure we published messages and got the confirmations
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);

keepPublishing.set(false);
Expand Down Expand Up @@ -640,4 +685,8 @@ void shouldReceiveMetadataUpdateWhenReplicaIsKilledWithPublisherAndConsumerOnSam
Host.killStreamLeaderProcess(stream);
waitUntil(() -> metadataNotifications.get() == 2);
}

private static void log(String format, Object... args) {
LOGGER.info("[" + testMethod + "] " + format, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
</appender>

<logger name="com.rabbitmq.stream" level="info" />
<logger name="com.rabbitmq.stream.impl.Client" level="warn" />

<root level="info">
<appender-ref ref="STDOUT" />
Expand Down
Loading