|
40 | 40 | import com.mongodb.internal.connection.TestClusterListener; |
41 | 41 | import com.mongodb.internal.connection.TestCommandListener; |
42 | 42 | import com.mongodb.internal.connection.TestConnectionPoolListener; |
| 43 | +import com.mongodb.internal.connection.TestServerListener; |
43 | 44 | import com.mongodb.internal.logging.LogMessage; |
44 | 45 | import com.mongodb.lang.NonNull; |
45 | 46 | import com.mongodb.lang.Nullable; |
|
71 | 72 | import java.util.Collections; |
72 | 73 | import java.util.HashSet; |
73 | 74 | import java.util.List; |
74 | | -import java.util.Map; |
75 | 75 | import java.util.Set; |
76 | 76 | import java.util.concurrent.ExecutionException; |
77 | 77 | import java.util.concurrent.ExecutorService; |
@@ -119,6 +119,7 @@ public abstract class UnifiedTest { |
119 | 119 | private static final String TOPOLOGY_CLOSED_EVENT = "topologyClosedEvent"; |
120 | 120 | private static final List<String> TOPOLOGY_EVENT_NAMES = asList("topologyOpeningEvent", "topologyDescriptionChangedEvent", |
121 | 121 | TOPOLOGY_CLOSED_EVENT); |
| 122 | + private static final String SERVER_DESCRIPTION_CHANGED_EVENT = "serverDescriptionChangedEvent"; |
122 | 123 |
|
123 | 124 | public static final int RETRY_ATTEMPTS = 3; |
124 | 125 | public static final int FORCE_FLAKY_ATTEMPTS = 10; |
@@ -421,33 +422,46 @@ private void compareEvents(final UnifiedTestContext context, final BsonDocument |
421 | 422 | context.getEventMatcher().assertConnectionPoolEventsEquality(client, ignoreExtraEvents, expectedEvents, |
422 | 423 | listener.getEvents()); |
423 | 424 | } else if (eventType.equals("sdam")) { |
| 425 | + List<BsonDocument> expectedTopologyEvents = new ArrayList<>(); |
| 426 | + List<BsonDocument> expectedServerDescriptionChangedEvents = new ArrayList<>(); |
| 427 | + List<BsonDocument> expectedHeartbeatEvents = new ArrayList<>(); |
| 428 | + |
| 429 | + for (BsonValue event : expectedEvents) { |
| 430 | + BsonDocument doc = event.asDocument(); |
| 431 | + if (TOPOLOGY_EVENT_NAMES.stream().anyMatch(doc::containsKey)) { |
| 432 | + expectedTopologyEvents.add(doc); |
| 433 | + } else if (doc.containsKey(SERVER_DESCRIPTION_CHANGED_EVENT)) { |
| 434 | + expectedServerDescriptionChangedEvents.add(doc); |
| 435 | + } else { |
| 436 | + expectedHeartbeatEvents.add(doc); |
| 437 | + } |
| 438 | + } |
424 | 439 |
|
425 | | - // SDAM tests also include topology events, so we need to separate them to be able to assert them separately. |
426 | | - // Partition the expected events into two lists with the key being if it's a topology based event or not. |
427 | | - Map<Boolean, List<BsonDocument>> partitionedEventsMap = expectedEvents.stream() |
428 | | - .map(BsonValue::asDocument) |
429 | | - .collect(Collectors.partitioningBy(doc -> TOPOLOGY_EVENT_NAMES.stream().anyMatch(doc::containsKey))); |
430 | | - |
431 | | - BsonArray expectedTopologyEvents = new BsonArray(partitionedEventsMap.get(true)); |
432 | 440 | if (!expectedTopologyEvents.isEmpty()) { |
433 | 441 | TestClusterListener clusterListener = entities.getClusterListener(client); |
434 | | - // Unfortunately, some tests expect the cluster to be closed, but do not define it as a waitForEvent in the spec - |
435 | | - // causing a race condition in the test. |
436 | | - if (expectedTopologyEvents.stream().anyMatch(doc -> doc.asDocument().containsKey(TOPOLOGY_CLOSED_EVENT))) { |
| 442 | + // Race guard: some tests expect topologyClosedEvent without a prior waitForEvent. |
| 443 | + if (expectedTopologyEvents.stream().anyMatch(doc -> doc.containsKey(TOPOLOGY_CLOSED_EVENT))) { |
437 | 444 | context.getEventMatcher().waitForClusterClosedEvent(client, clusterListener); |
438 | 445 | } |
439 | | - |
440 | 446 | List<Object> topologyEvents = new ArrayList<>(); |
441 | 447 | topologyEvents.add(clusterListener.getClusterOpeningEvent()); |
442 | 448 | topologyEvents.addAll(clusterListener.getClusterDescriptionChangedEvents()); |
443 | 449 | topologyEvents.add(clusterListener.getClusterClosingEvent()); |
444 | | - context.getEventMatcher().assertTopologyEventsEquality(client, ignoreExtraEvents, expectedTopologyEvents, topologyEvents); |
| 450 | + context.getEventMatcher().assertTopologyEventsEquality(client, ignoreExtraEvents, |
| 451 | + new BsonArray(expectedTopologyEvents), topologyEvents); |
| 452 | + } |
| 453 | + |
| 454 | + if (!expectedServerDescriptionChangedEvents.isEmpty()) { |
| 455 | + TestServerListener serverListener = entities.getServerListener(client); |
| 456 | + context.getEventMatcher().assertServerMonitorEventsEquality(client, ignoreExtraEvents, |
| 457 | + new BsonArray(expectedServerDescriptionChangedEvents), |
| 458 | + serverListener.getServerDescriptionChangedEvents()); |
445 | 459 | } |
446 | 460 |
|
447 | | - BsonArray expectedSdamEvents = new BsonArray(partitionedEventsMap.get(false)); |
448 | | - if (!expectedSdamEvents.isEmpty()) { |
| 461 | + if (!expectedHeartbeatEvents.isEmpty()) { |
449 | 462 | TestServerMonitorListener serverMonitorListener = entities.getServerMonitorListener(client); |
450 | | - context.getEventMatcher().assertServerMonitorEventsEquality(client, ignoreExtraEvents, expectedSdamEvents, serverMonitorListener.getEvents()); |
| 463 | + context.getEventMatcher().assertServerMonitorEventsEquality(client, ignoreExtraEvents, |
| 464 | + new BsonArray(expectedHeartbeatEvents), serverMonitorListener.getEvents()); |
451 | 465 | } |
452 | 466 | } else { |
453 | 467 | throw new UnsupportedOperationException("Unexpected event type: " + eventType); |
|
0 commit comments