Skip to content

Commit

Permalink
reenable the test
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed Nov 12, 2024
1 parent d4b8b15 commit 6238e2c
Showing 1 changed file with 46 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,25 @@
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;

import static org.testng.Assert.assertTrue;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -174,4 +181,42 @@ public void testDifferentTopicCreationRule(ReplicationMode replicationMode) thro
public void testReplicationCountMetrics() throws Exception {
super.testReplicationCountMetrics();
}

@Test
public void testRemoveCluster() throws Exception {
// Initialize.
final String ns1 = defaultTenant + "/" + "ns_73b1a31afce34671a5ddc48fe5ad7fc8";
final String topic = "persistent://" + ns1 + "/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a";
final String topicChangeEvents = "persistent://" + ns1 + "/__change_events";
admin1.namespaces().createNamespace(ns1);
admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2)));
admin1.topics().createNonPartitionedTopic(topic);

// Wait for loading topic up.
Producer<String> p = client1.newProducer(Schema.STRING).topic(topic).create();
Awaitility.await().untilAsserted(() -> {
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> tps
= pulsar1.getBrokerService().getTopics();
assertTrue(tps.containsKey(topic));
assertTrue(tps.containsKey(topicChangeEvents));
});

// The topics under the namespace of the cluster-1 will be deleted.
// Verify the result.
admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster2)));
Awaitility.await().atMost(Duration.ofSeconds(120)).untilAsserted(() -> {
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> tps
= pulsar1.getBrokerService().getTopics();
assertFalse(tps.containsKey(topic));
assertFalse(tps.containsKey(topicChangeEvents));
assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic)).join().isExists());
assertFalse(pulsar1.getNamespaceService()
.checkTopicExists(TopicName.get(topicChangeEvents)).join().isExists());
});

// cleanup.
p.close();
admin2.topics().delete(topic);
admin2.namespaces().deleteNamespace(ns1);
}
}

0 comments on commit 6238e2c

Please sign in to comment.