34
34
import java .util .concurrent .atomic .AtomicInteger ;
35
35
import java .util .concurrent .atomic .AtomicLong ;
36
36
import java .util .concurrent .atomic .AtomicReference ;
37
+ import java .util .function .ToLongFunction ;
37
38
import org .junit .jupiter .api .AfterEach ;
39
+ import org .junit .jupiter .api .BeforeEach ;
38
40
import org .junit .jupiter .api .Test ;
41
+ import org .junit .jupiter .api .TestInfo ;
39
42
import org .junit .jupiter .api .extension .ExtendWith ;
40
43
import org .slf4j .Logger ;
41
44
import org .slf4j .LoggerFactory ;
@@ -45,6 +48,7 @@ public class FailureTest {
45
48
46
49
private static final Logger LOGGER = LoggerFactory .getLogger (FailureTest .class );
47
50
51
+ static String testMethod ;
48
52
TestUtils .ClientFactory cf ;
49
53
String stream ;
50
54
ExecutorService executorService ;
@@ -57,6 +61,11 @@ static void wait(Duration duration) {
57
61
}
58
62
}
59
63
64
+ @ BeforeEach
65
+ void init (TestInfo info ) {
66
+ testMethod = info .getTestMethod ().get ().getName ();
67
+ }
68
+
60
69
@ AfterEach
61
70
void tearDown () {
62
71
if (executorService != null ) {
@@ -142,9 +151,9 @@ void leaderFailureWhenPublisherConnectedToReplica() throws Exception {
142
151
waitAtMost (
143
152
Duration .ofSeconds (10 ),
144
153
() -> {
145
- LOGGER . info ("Getting metadata for {}" , stream );
154
+ log ("Getting metadata for {}" , stream );
146
155
Client .StreamMetadata m = publisher .metadata (stream ).get (stream );
147
- LOGGER . info ("Metadata for {} (expecting 2 replicas): {}" , stream , m );
156
+ log ("Metadata for {} (expecting 2 replicas): {}" , stream , m );
148
157
return m .getReplicas ().size () == 2 ;
149
158
});
150
159
@@ -195,6 +204,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
195
204
Map <Long , Message > published = new ConcurrentHashMap <>();
196
205
Set <Message > confirmed = ConcurrentHashMap .newKeySet ();
197
206
207
+ // match confirmed messages to published messages
198
208
Client .PublishConfirmListener publishConfirmListener =
199
209
(publisherId , publishingId ) -> {
200
210
Message confirmedMessage ;
@@ -212,18 +222,22 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
212
222
AtomicReference <Client > publisher = new AtomicReference <>();
213
223
CountDownLatch reconnectionLatch = new CountDownLatch (1 );
214
224
AtomicReference <Client .ShutdownListener > shutdownListenerReference = new AtomicReference <>();
225
+ // shutdown listener reconnects to node 2 to locate the node the stream leader is on
226
+ // it then re-creates a publisher connected to this node
215
227
Client .ShutdownListener shutdownListener =
216
228
shutdownContext -> {
217
229
if (shutdownContext .getShutdownReason ()
218
230
== Client .ShutdownContext .ShutdownReason .UNKNOWN ) {
231
+ log ("Connection got closed, reconnecting" );
219
232
// avoid long-running task in the IO thread
220
233
executorService .submit (
221
234
() -> {
222
235
connected .set (false );
223
236
AtomicReference <Client > locator = new AtomicReference <>();
224
237
try {
238
+ log ("Reconnecting to node 2" );
225
239
waitAtMost (
226
- Duration .ofSeconds (5 ),
240
+ Duration .ofSeconds (20 ),
227
241
() -> {
228
242
try {
229
243
locator .set (
@@ -233,14 +247,35 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
233
247
return false ;
234
248
}
235
249
});
250
+ log ("Reconnected to node 2, looking up new stream leader" );
236
251
waitAtMost (
237
- Duration .ofSeconds (5 ),
252
+ Duration .ofSeconds (20 ),
238
253
() -> {
239
254
Client .StreamMetadata m = locator .get ().metadata (stream ).get (stream );
240
255
return m .getLeader () != null
241
256
&& m .getLeader ().getPort () != streamPortNode1 ();
242
257
});
258
+ log ("New stream leader is on another node than node 1" );
243
259
} catch (Throwable e ) {
260
+ log ("Error while trying to connect to new stream leader" );
261
+ if (locator .get () == null ) {
262
+ log ("Could not reconnect" );
263
+ } else {
264
+ try {
265
+ Client .StreamMetadata m = locator .get ().metadata (stream ).get (stream );
266
+ if (m .getLeader () == null ) {
267
+ log ("The stream has no leader" );
268
+ } else {
269
+ log (
270
+ "The stream is on node with port {} (node 1 = {}, node 2 = {})" ,
271
+ m .getLeader ().getPort (),
272
+ streamPortNode1 (),
273
+ streamPortNode2 ());
274
+ }
275
+ } catch (Exception ex ) {
276
+ log ("Error while checking failure: {}" , ex .getMessage ());
277
+ }
278
+ }
244
279
reconnectionLatch .countDown ();
245
280
return ;
246
281
}
@@ -278,6 +313,9 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
278
313
279
314
AtomicBoolean keepPublishing = new AtomicBoolean (true );
280
315
316
+ AtomicLong publishSequence = new AtomicLong (0 );
317
+ ToLongFunction <Object > publishSequenceFunction = value -> publishSequence .getAndIncrement ();
318
+
281
319
executorService .submit (
282
320
() -> {
283
321
while (keepPublishing .get ()) {
@@ -295,7 +333,11 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
295
333
.build ();
296
334
try {
297
335
long publishingId =
298
- publisher .get ().publish ((byte ) 1 , Collections .singletonList (message )).get (0 );
336
+ publisher
337
+ .get ()
338
+ .publish (
339
+ (byte ) 1 , Collections .singletonList (message ), publishSequenceFunction )
340
+ .get (0 );
299
341
published .put (publishingId , message );
300
342
} catch (Exception e ) {
301
343
// keep going
@@ -314,6 +356,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
314
356
int confirmedCount = confirmed .size ();
315
357
316
358
try {
359
+ // stop the first node (this is where the stream leader is)
317
360
Host .rabbitmqctl ("stop_app" );
318
361
319
362
assertThat (reconnectionLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
@@ -324,6 +367,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
324
367
} finally {
325
368
Host .rabbitmqctl ("start_app" );
326
369
}
370
+ // making sure we published a few messages and got the confirmations
327
371
assertThat (confirmed ).hasSizeGreaterThan (confirmedCount );
328
372
confirmedCount = confirmed .size ();
329
373
@@ -339,6 +383,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
339
383
// let's publish for a bit of time
340
384
Thread .sleep (2000 );
341
385
386
+ // making sure we published messages and got the confirmations
342
387
assertThat (confirmed ).hasSizeGreaterThan (confirmedCount );
343
388
344
389
keepPublishing .set (false );
@@ -640,4 +685,8 @@ void shouldReceiveMetadataUpdateWhenReplicaIsKilledWithPublisherAndConsumerOnSam
640
685
Host .killStreamLeaderProcess (stream );
641
686
waitUntil (() -> metadataNotifications .get () == 2 );
642
687
}
688
+
689
+ private static void log (String format , Object ... args ) {
690
+ LOGGER .info ("[" + testMethod + "] " + format , args );
691
+ }
643
692
}
0 commit comments