From eee9283666cc9d84d0ddb998c279600898121d2b Mon Sep 17 00:00:00 2001 From: Heesung Sohn <103456639+heesung-sn@users.noreply.github.com> Date: Thu, 3 Oct 2024 12:50:11 -0700 Subject: [PATCH] [fix][broker] timeout when broker registry hangs and monitor broker registry (ExtensibleLoadManagerImpl only) (#23382) --- .../pulsar/broker/admin/impl/BrokersBase.java | 11 ++- .../extensions/BrokerRegistry.java | 5 + .../extensions/BrokerRegistryImpl.java | 69 ++++++++++--- .../extensions/ExtensibleLoadManagerImpl.java | 18 +++- .../channel/ServiceUnitStateChannelImpl.java | 98 ++++++++++++++++--- .../extensions/BrokerRegistryTest.java | 33 +++++++ .../ExtensibleLoadManagerImplTest.java | 15 +++ .../channel/ServiceUnitStateChannelTest.java | 70 +++++++++---- .../apache/pulsar/client/admin/Brokers.java | 12 ++- .../client/admin/internal/BrokersImpl.java | 18 +++- 10 files changed, 299 insertions(+), 50 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 4d0b598a8e4f1..e13cb1858f79d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -48,6 +48,7 @@ import javax.ws.rs.container.Suspended; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.commons.lang.StringUtils; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService.State; @@ -368,20 +369,26 @@ public void isReady(@Suspended AsyncResponse asyncResponse) { @ApiOperation(value = "Run a healthCheck against the broker") @ApiResponses(value = { @ApiResponse(code = 200, message = "Everything is OK"), + @ApiResponse(code = 307, message = "Current broker is not the target broker"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Cluster doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")}) public void healthCheck(@Suspended AsyncResponse asyncResponse, @ApiParam(value = "Topic Version") - @QueryParam("topicVersion") TopicVersion topicVersion) { + @QueryParam("topicVersion") TopicVersion topicVersion, + @QueryParam("brokerId") String brokerId) { validateSuperUserAccessAsync() .thenAccept(__ -> checkDeadlockedThreads()) + .thenCompose(__ -> maybeRedirectToBroker( + StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() : brokerId)) .thenCompose(__ -> internalRunHealthCheck(topicVersion)) .thenAccept(__ -> { LOG.info("[{}] Successfully run health check.", clientAppId()); asyncResponse.resume(Response.ok("ok").build()); }).exceptionally(ex -> { - LOG.error("[{}] Fail to run health check.", clientAppId(), ex); + if (!isRedirectException(ex)) { + LOG.error("[{}] Fail to run health check.", clientAppId(), ex); + } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java index 79dba9c63342e..d154edfbb320e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java @@ -48,6 +48,11 @@ public interface BrokerRegistry extends AutoCloseable { */ boolean isStarted(); + /** + * Return the broker has been registered. + */ + boolean isRegistered(); + /** * Register local broker to metadata store. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index a13b332e6eb5f..5a8307df27a63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -52,6 +52,8 @@ @Slf4j public class BrokerRegistryImpl implements BrokerRegistry { + private static final int MAX_REGISTER_RETRY_DELAY_IN_MILLIS = 1000; + private final PulsarService pulsar; private final ServiceConfiguration conf; @@ -77,10 +79,11 @@ protected enum State { @VisibleForTesting final AtomicReference state = new AtomicReference<>(State.Init); - public BrokerRegistryImpl(PulsarService pulsar) { + @VisibleForTesting + BrokerRegistryImpl(PulsarService pulsar, MetadataCache brokerLookupDataMetadataCache) { this.pulsar = pulsar; this.conf = pulsar.getConfiguration(); - this.brokerLookupDataMetadataCache = pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class); + this.brokerLookupDataMetadataCache = brokerLookupDataMetadataCache; this.scheduler = pulsar.getLoadManagerExecutor(); this.listeners = new ArrayList<>(); this.brokerIdKeyPath = keyPath(pulsar.getBrokerId()); @@ -99,6 +102,10 @@ public BrokerRegistryImpl(PulsarService pulsar) { pulsar.getConfig().lookupProperties()); } + public BrokerRegistryImpl(PulsarService pulsar) { + this(pulsar, pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class)); + } + @Override public synchronized void start() throws PulsarServerException { if (!this.state.compareAndSet(State.Init, State.Started)) { @@ -118,6 +125,12 @@ public boolean isStarted() { return state == State.Started || state == State.Registered; } + @Override + public boolean isRegistered() { + final var state = this.state.get(); + return state == State.Registered; + } + @Override public CompletableFuture registerAsync() { final var state = this.state.get(); @@ -127,12 +140,35 @@ public CompletableFuture registerAsync() { } log.info("[{}] Started registering self to {} (state: {})", getBrokerId(), brokerIdKeyPath, state); return brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral)) - .thenAccept(__ -> { - this.state.set(State.Registered); - log.info("[{}] Finished registering self", getBrokerId()); + .orTimeout(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS) + .whenComplete((__, ex) -> { + if (ex == null) { + this.state.set(State.Registered); + log.info("[{}] Finished registering self", getBrokerId()); + } else { + log.error("[{}] Failed registering self", getBrokerId(), ex); + } }); } + private void doRegisterAsyncWithRetries(int retry, CompletableFuture future) { + pulsar.getExecutor().schedule(() -> { + registerAsync().whenComplete((__, e) -> { + if (e != null) { + doRegisterAsyncWithRetries(retry + 1, future); + } else { + future.complete(null); + } + }); + }, Math.min(MAX_REGISTER_RETRY_DELAY_IN_MILLIS, retry * retry * 50), TimeUnit.MILLISECONDS); + } + + private CompletableFuture registerAsyncWithRetries() { + var retryFuture = new CompletableFuture(); + doRegisterAsyncWithRetries(0, retryFuture); + return retryFuture; + } + @Override public synchronized void unregister() throws MetadataStoreException { if (state.compareAndSet(State.Registered, State.Unregistering)) { @@ -219,17 +255,26 @@ private void handleMetadataStoreNotification(Notification t) { // The registered node is an ephemeral node that could be deleted when the metadata store client's session // is expired. In this case, we should register again. final var brokerId = t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1); + + CompletableFuture register; if (t.getType() == NotificationType.Deleted && getBrokerId().equals(brokerId)) { - registerAsync(); - } - if (listeners.isEmpty()) { - return; + this.state.set(State.Started); + register = registerAsyncWithRetries(); + } else { + register = CompletableFuture.completedFuture(null); } - this.scheduler.submit(() -> { - for (BiConsumer listener : listeners) { - listener.accept(brokerId, t.getType()); + // Make sure to run the listeners after re-registered. + register.thenAccept(__ -> { + if (listeners.isEmpty()) { + return; } + this.scheduler.submit(() -> { + for (BiConsumer listener : listeners) { + listener.accept(brokerId, t.getType()); + } + }); }); + } catch (RejectedExecutionException e) { // Executor is shutting down } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index d8a279b854576..abca2bb398232 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -35,8 +35,10 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -987,8 +989,12 @@ protected void monitor() { return; } + // Monitor broker registry + // Periodically check the broker registry in case metadata store fails. + validateBrokerRegistry(); + // Monitor role - // Periodically check the role in case ZK watcher fails. + // Periodically check the role in case metadata store fails. var isChannelOwner = serviceUnitStateChannel.isChannelOwner(); if (isChannelOwner) { // System topic config might fail due to the race condition @@ -1087,5 +1093,15 @@ private boolean isPersistentSystemTopicUsed() { .equals(pulsar.getConfiguration().getLoadManagerServiceUnitStateTableViewClassName()); } + private void validateBrokerRegistry() + throws ExecutionException, InterruptedException, TimeoutException { + var timeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(); + var lookup = brokerRegistry.lookupAsync(brokerRegistry.getBrokerId()).get(timeout, TimeUnit.SECONDS); + if (lookup.isEmpty()) { + log.warn("Found this broker:{} has not registered yet. Trying to register it", + brokerRegistry.getBrokerId()); + brokerRegistry.registerAsync().get(timeout, TimeUnit.SECONDS); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index ce975495feb2a..49d038d512e59 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -86,11 +86,13 @@ import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; +import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.Reflections; @@ -108,6 +110,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS = 10; private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10 * 60 * 1000; + private static final long MAX_BROKER_HEALTH_CHECK_RETRY = 3; + private static final long MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS = 1000; private final PulsarService pulsar; private final ServiceConfiguration config; private final Schema schema; @@ -115,6 +119,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private final String brokerId; private final Map> cleanupJobs; private final StateChangeListeners stateChangeListeners; + private BrokerRegistry brokerRegistry; private LeaderElectionService leaderElectionService; @@ -350,6 +355,11 @@ protected LeaderElectionService getLeaderElectionService() { .get().getLeaderElectionService(); } + @VisibleForTesting + protected PulsarAdmin getPulsarAdmin() throws PulsarServerException { + return pulsar.getAdminClient(); + } + @Override public synchronized void close() throws PulsarServerException { channelState = Closed; @@ -448,6 +458,14 @@ private CompletableFuture> getActiveOwnerAsync( String serviceUnit, ServiceUnitState state, Optional owner) { + + // If this broker's registry does not exist(possibly suffering from connecting to the metadata store), + // we return the owner without its activeness check. + // This broker tries to serve lookups on a best efforts basis when metadata store connection is unstable. + if (!brokerRegistry.isRegistered()) { + return CompletableFuture.completedFuture(owner); + } + return dedupeGetOwnerRequest(serviceUnit) .thenCompose(newOwner -> { if (newOwner == null) { @@ -1255,19 +1273,25 @@ private MetadataState getMetadataState() { } private void handleBrokerCreationEvent(String broker) { - CompletableFuture future = cleanupJobs.remove(broker); - if (future != null) { - future.cancel(false); - totalInactiveBrokerCleanupCancelledCnt++; - log.info("Successfully cancelled the ownership cleanup for broker:{}." - + " Active cleanup job count:{}", - broker, cleanupJobs.size()); - } else { - if (debug()) { - log.info("No needs to cancel the ownership cleanup for broker:{}." - + " There was no scheduled cleanup job. Active cleanup job count:{}", - broker, cleanupJobs.size()); - } + + if (!cleanupJobs.isEmpty() && cleanupJobs.containsKey(broker)) { + healthCheckBrokerAsync(broker) + .thenAccept(__ -> { + CompletableFuture future = cleanupJobs.remove(broker); + if (future != null) { + future.cancel(false); + totalInactiveBrokerCleanupCancelledCnt++; + log.info("Successfully cancelled the ownership cleanup for broker:{}." + + " Active cleanup job count:{}", + broker, cleanupJobs.size()); + } else { + if (debug()) { + log.info("No needs to cancel the ownership cleanup for broker:{}." + + " There was no scheduled cleanup job. Active cleanup job count:{}", + broker, cleanupJobs.size()); + } + } + }); } } @@ -1431,6 +1455,37 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max System.currentTimeMillis() - started); } + private CompletableFuture healthCheckBrokerAsync(String brokerId) { + CompletableFuture future = new CompletableFuture<>(); + doHealthCheckBrokerAsyncWithRetries(brokerId, 0, future); + return future; + } + + private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, CompletableFuture future) { + try { + var admin = getPulsarAdmin(); + admin.brokers().healthcheckAsync(TopicVersion.V2, Optional.of(brokerId)) + .whenComplete((__, e) -> { + if (e == null) { + log.info("Completed health-check broker :{}", brokerId, e); + future.complete(null); + return; + } + if (retry == MAX_BROKER_HEALTH_CHECK_RETRY) { + log.error("Failed health-check broker :{}", brokerId, e); + future.completeExceptionally(FutureUtil.unwrapCompletionException(e)); + } else { + pulsar.getExecutor() + .schedule(() -> doHealthCheckBrokerAsyncWithRetries(brokerId, retry + 1, future), + Math.min(MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS, retry * retry * 50), + MILLISECONDS); + } + }); + } catch (PulsarServerException e) { + future.completeExceptionally(e); + } + } + private synchronized void doCleanup(String broker, boolean gracefully) { try { if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS) @@ -1444,6 +1499,23 @@ private synchronized void doCleanup(String broker, boolean gracefully) { return; } + // if not gracefully, verify the broker is inactive by health-check. + if (!gracefully) { + try { + healthCheckBrokerAsync(broker).get( + pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS); + log.warn("Found that the broker to clean is healthy. Skip the broker:{}'s orphan bundle cleanup", + broker); + return; + } catch (Exception e) { + if (debug()) { + log.info("Failed to check broker:{} health", broker, e); + } + log.info("Checked the broker:{} health. Continue the orphan bundle cleanup", broker); + } + } + + long startTime = System.nanoTime(); log.info("Started ownership cleanup for the inactive broker:{}", broker); int orphanServiceUnitCleanupCnt = 0; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java index 28a2a18500f5f..941d0e4cbc3a0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java @@ -19,7 +19,10 @@ package org.apache.pulsar.broker.loadbalance.extensions; import static org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -36,6 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -48,6 +52,7 @@ import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; @@ -396,6 +401,34 @@ public void testKeyPath() { assertEquals(keyPath, LOADBALANCE_BROKERS_ROOT + "/brokerId"); } + @Test + public void testRegisterAsyncTimeout() throws Exception { + var pulsar1 = createPulsarService(); + pulsar1.start(); + pulsar1.getConfiguration().setMetadataStoreOperationTimeoutSeconds(1); + var metadataCache = mock(MetadataCache.class); + var brokerRegistry = new BrokerRegistryImpl(pulsar1, metadataCache); + + // happy case + doReturn(CompletableFuture.completedFuture(null)).when(metadataCache).put(any(), any(), any()); + brokerRegistry.start(); + + // unhappy case (timeout) + doAnswer(invocationOnMock -> { + return CompletableFuture.supplyAsync(() -> null, CompletableFuture.delayedExecutor(5, TimeUnit.SECONDS)); + }).when(metadataCache).put(any(), any(), any()); + try { + brokerRegistry.registerAsync().join(); + } catch (Exception e) { + assertTrue(e.getCause() instanceof TimeoutException); + } + + // happy case again + doReturn(CompletableFuture.completedFuture(null)).when(metadataCache).put(any(), any(), any()); + brokerRegistry.registerAsync().join(); + } + + private static BrokerRegistryImpl.State getState(BrokerRegistryImpl brokerRegistry) { return brokerRegistry.state.get(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 7871e612c847a..d8d3e5bb44ffb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -132,6 +132,7 @@ import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; @@ -2106,6 +2107,20 @@ public void compactionScheduleTest() { }); } + @Test(timeOut = 30 * 1000) + public void testMonitorBrokerRegistry() throws MetadataStoreException { + primaryLoadManager.getBrokerRegistry().unregister(); + assertFalse(primaryLoadManager.getBrokerRegistry().isRegistered()); + Awaitility.await() + .pollInterval(200, TimeUnit.MILLISECONDS) + .atMost(30, TimeUnit.SECONDS) + .ignoreExceptions() + .untilAsserted(() -> { // wait until true + primaryLoadManager.monitor(); + assertTrue(primaryLoadManager.getBrokerRegistry().isRegistered()); + }); + } + private static abstract class MockBrokerFilter implements BrokerFilter { @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 92cdf61f44269..b6e38d4f6956c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -44,6 +44,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -89,6 +90,8 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.admin.Brokers; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.TableView; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; @@ -136,10 +139,14 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest { private BrokerRegistryImpl registry; + private PulsarAdmin pulsarAdmin; + private ExtensibleLoadManagerImpl loadManager; private final String serviceUnitStateTableViewClassName; + private Brokers brokers; + @DataProvider(name = "serviceUnitStateTableViewClassName") public static Object[][] serviceUnitStateTableViewClassName() { return new Object[][]{ @@ -174,7 +181,9 @@ protected void setup() throws Exception { admin.namespaces().createNamespace(namespaceName2); pulsar1 = pulsar; - registry = new BrokerRegistryImpl(pulsar); + registry = spy(new BrokerRegistryImpl(pulsar1)); + registry.start(); + pulsarAdmin = spy(pulsar.getAdminClient()); loadManagerContext = mock(LoadManagerContext.class); doReturn(mock(LoadDataStore.class)).when(loadManagerContext).brokerLoadDataStore(); doReturn(mock(LoadDataStore.class)).when(loadManagerContext).topBundleLoadDataStore(); @@ -207,6 +216,10 @@ protected void setup() throws Exception { childBundle31 = namespaceName2 + "/" + childBundle1Range; childBundle32 = namespaceName2 + "/" + childBundle2Range; + + brokers = mock(Brokers.class); + doReturn(CompletableFuture.failedFuture(new RuntimeException("failed"))).when(brokers) + .healthcheckAsync(any(), any()); } @BeforeMethod @@ -220,6 +233,7 @@ protected void initChannels() throws Exception { cleanMetadataState(channel1); cleanMetadataState(channel2); enableChannels(); + reset(pulsarAdmin); } @@ -719,17 +733,19 @@ public void handleMetadataSessionEventTest() throws IllegalAccessException { @Test(priority = 8) public void handleBrokerCreationEventTest() throws IllegalAccessException { var cleanupJobs = getCleanupJobs(channel1); - String broker = "broker-1"; + String broker = brokerId2; var future = new CompletableFuture(); cleanupJobs.put(broker, future); ((ServiceUnitStateChannelImpl) channel1).handleBrokerRegistrationEvent(broker, NotificationType.Created); - assertEquals(0, cleanupJobs.size()); - assertTrue(future.isCancelled()); + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(0, cleanupJobs.size()); + assertTrue(future.isCancelled()); + }); + } @Test(priority = 9) - public void handleBrokerDeletionEventTest() - throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException { + public void handleBrokerDeletionEventTest() throws Exception { var cleanupJobs1 = getCleanupJobs(channel1); var cleanupJobs2 = getCleanupJobs(channel2); @@ -782,8 +798,12 @@ public void handleBrokerDeletionEventTest() System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); FieldUtils.writeDeclaredField(followerChannel, "lastMetadataSessionEventTimestamp", System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); + + doReturn(brokers).when(pulsarAdmin).brokers(); leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); + + leaderChannel.handleBrokerRegistrationEvent(brokerId2, NotificationType.Deleted); followerChannel.handleBrokerRegistrationEvent(brokerId2, @@ -841,6 +861,7 @@ public void handleBrokerDeletionEventTest() 3, 0, 0); + reset(pulsarAdmin); // broker is back online leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Created); @@ -865,6 +886,7 @@ public void handleBrokerDeletionEventTest() // broker is offline again + doReturn(brokers).when(pulsarAdmin).brokers(); FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3, true); leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); @@ -906,6 +928,7 @@ public void handleBrokerDeletionEventTest() 4, 0, 1); + reset(pulsarAdmin); // test unstable state channel1.publishUnloadEventAsync(new Unload(brokerId2, bundle1, Optional.of(broker))); @@ -1585,9 +1608,12 @@ public void testOverrideInactiveBrokerStateData() System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); FieldUtils.writeDeclaredField(followerChannel, "lastMetadataSessionEventTimestamp", System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); + + doReturn(brokers).when(pulsarAdmin).brokers(); leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); + waitUntilNewOwner(channel2, releasingBundle, brokerId2); waitUntilNewOwner(channel2, childBundle11, brokerId2); waitUntilNewOwner(channel2, childBundle12, brokerId2); @@ -1600,7 +1626,7 @@ public void testOverrideInactiveBrokerStateData() // clean-up FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3 * 60, true); cleanTableViews(); - + reset(pulsarAdmin); } @Test(priority = 19) @@ -1736,13 +1762,10 @@ public void testActiveGetOwner() throws Exception { } // case 5: the owner lookup gets delayed - var spyRegistry = spy(new BrokerRegistryImpl(pulsar)); - FieldUtils.writeDeclaredField(channel1, - "brokerRegistry", spyRegistry, true); FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 1000, true); var delayedFuture = new CompletableFuture(); - doReturn(delayedFuture).when(spyRegistry).lookupAsync(eq(broker)); + doReturn(delayedFuture).when(registry).lookupAsync(eq(broker)); CompletableFuture.runAsync(() -> { try { Thread.sleep(500); @@ -1760,7 +1783,7 @@ public void testActiveGetOwner() throws Exception { // case 6: the owner is inactive doReturn(CompletableFuture.completedFuture(Optional.empty())) - .when(spyRegistry).lookupAsync(eq(broker)); + .when(registry).lookupAsync(eq(broker)); // verify getOwnerAsync times out start = System.currentTimeMillis(); @@ -1768,6 +1791,18 @@ public void testActiveGetOwner() throws Exception { assertTrue(ex.getCause() instanceof IllegalStateException); assertTrue(System.currentTimeMillis() - start >= 1000); + try { + // verify getOwnerAsync returns immediately when not registered + registry.unregister(); + start = System.currentTimeMillis(); + assertEquals(broker, channel1.getOwnerAsync(bundle).get().get()); + elapsed = System.currentTimeMillis() - start; + assertTrue(elapsed < 1000); + } finally { + registry.registerAsync().join(); + } + + // case 7: the ownership cleanup(no new owner) by the leader channel doReturn(CompletableFuture.completedFuture(Optional.empty())) .when(loadManager).selectAsync(any(), any(), any()); @@ -1781,6 +1816,7 @@ public void testActiveGetOwner() throws Exception { leaderChannel.handleMetadataSessionEvent(SessionReestablished); FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); + doReturn(brokers).when(pulsarAdmin).brokers(); leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); // verify the ownership cleanup, and channel's getOwnerAsync returns empty result without timeout @@ -1792,7 +1828,7 @@ public void testActiveGetOwner() throws Exception { waitUntilState(channel2, bundle, Init); assertTrue(System.currentTimeMillis() - start < 20_000); - + reset(pulsarAdmin); // case 8: simulate ownership cleanup(brokerId1 as the new owner) by the leader channel try { disableChannels(); @@ -1807,6 +1843,7 @@ public void testActiveGetOwner() throws Exception { FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); getCleanupJobs(leaderChannel).clear(); + doReturn(brokers).when(pulsarAdmin).brokers(); leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); // verify the ownership cleanup, and channel's getOwnerAsync returns brokerId1 without timeout @@ -1817,10 +1854,8 @@ public void testActiveGetOwner() throws Exception { // test clean-up FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 30 * 1000, true); - FieldUtils.writeDeclaredField(channel1, - "brokerRegistry", registry, true); cleanTableViews(); - + reset(pulsarAdmin); } @Test(priority = 21) @@ -2253,7 +2288,7 @@ private static void validateMonitorCounters(ServiceUnitStateChannel channel, } ServiceUnitStateChannelImpl createChannel(PulsarService pulsar) - throws IllegalAccessException { + throws IllegalAccessException, PulsarServerException { var tmpChannel = new ServiceUnitStateChannelImpl(pulsar); FieldUtils.writeDeclaredField(tmpChannel, "ownershipMonitorDelayTimeInSecs", 5, true); var channel = spy(tmpChannel); @@ -2261,6 +2296,7 @@ ServiceUnitStateChannelImpl createChannel(PulsarService pulsar) doReturn(loadManagerContext).when(channel).getContext(); doReturn(registry).when(channel).getBrokerRegistry(); doReturn(loadManager).when(channel).getLoadManager(); + doReturn(pulsarAdmin).when(channel).getPulsarAdmin(); var leaderElectionService = new LeaderElectionService( diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java index dc0b7c9885a9a..eed73f38282ac 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; @@ -320,10 +321,19 @@ Map getOwnedNamespaces(String cluster, String */ void healthcheck(TopicVersion topicVersion) throws PulsarAdminException; + /** + * Run a healthcheck on the target broker or on the broker. + * @param brokerId target broker id to check the health. If empty, it checks the health on the connected broker. + * + * @throws PulsarAdminException if the healthcheck fails. + */ + void healthcheck(TopicVersion topicVersion, Optional brokerId) throws PulsarAdminException; + /** * Run a healthcheck on the broker asynchronously. */ - CompletableFuture healthcheckAsync(TopicVersion topicVersion); + CompletableFuture healthcheckAsync(TopicVersion topicVersion, Optional brokerId); + /** * Trigger the current broker to graceful-shutdown asynchronously. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java index b82c3fd0f414b..35b261b196eee 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import javax.ws.rs.client.Entity; import javax.ws.rs.client.InvocationCallback; @@ -168,26 +169,35 @@ public CompletableFuture backlogQuotaCheckAsync() { @Override @Deprecated public void healthcheck() throws PulsarAdminException { - healthcheck(TopicVersion.V1); + healthcheck(TopicVersion.V1, Optional.empty()); } @Override @Deprecated public CompletableFuture healthcheckAsync() { - return healthcheckAsync(TopicVersion.V1); + return healthcheckAsync(TopicVersion.V1, Optional.empty()); } + @Override public void healthcheck(TopicVersion topicVersion) throws PulsarAdminException { - sync(() -> healthcheckAsync(topicVersion)); + sync(() -> healthcheckAsync(topicVersion, Optional.empty())); } @Override - public CompletableFuture healthcheckAsync(TopicVersion topicVersion) { + public void healthcheck(TopicVersion topicVersion, Optional brokerId) throws PulsarAdminException { + sync(() -> healthcheckAsync(topicVersion, brokerId)); + } + + @Override + public CompletableFuture healthcheckAsync(TopicVersion topicVersion, Optional brokerId) { WebTarget path = adminBrokers.path("health"); if (topicVersion != null) { path = path.queryParam("topicVersion", topicVersion); } + if (brokerId.isPresent()) { + path = path.queryParam("brokerId", brokerId.get()); + } final CompletableFuture future = new CompletableFuture<>(); asyncGetRequest(path, new InvocationCallback() {