Skip to content

Commit 3114199

Browse files
[fix][broker] avoid offload system topic (apache#22497)
Co-authored-by: 道君 <[email protected]>
1 parent 80d4675 commit 3114199

File tree

2 files changed

+101
-1
lines changed

2 files changed

+101
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1963,7 +1963,13 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
19631963
topicLevelOffloadPolicies,
19641964
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
19651965
getPulsar().getConfig().getProperties());
1966-
if (NamespaceService.isSystemServiceNamespace(namespace.toString())) {
1966+
if (NamespaceService.isSystemServiceNamespace(namespace.toString())
1967+
|| SystemTopicNames.isSystemTopic(topicName)) {
1968+
/*
1969+
Avoid setting broker internal system topics using off-loader because some of them are the
1970+
preconditions of other topics. The slow replying log speed will cause a delay in all the topic
1971+
loading.(timeout)
1972+
*/
19671973
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
19681974
} else {
19691975
if (topicLevelOffloadPolicies != null) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,14 @@
6767
import java.util.concurrent.atomic.AtomicReference;
6868
import lombok.Cleanup;
6969
import lombok.extern.slf4j.Slf4j;
70+
import org.apache.bookkeeper.client.api.ReadHandle;
71+
import org.apache.bookkeeper.mledger.LedgerOffloader;
7072
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
7173
import org.apache.bookkeeper.mledger.ManagedLedgerException;
7274
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
7375
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
7476
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
77+
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
7578
import org.apache.http.HttpResponse;
7679
import org.apache.http.client.methods.HttpGet;
7780
import org.apache.http.impl.client.CloseableHttpClient;
@@ -112,6 +115,9 @@
112115
import org.apache.pulsar.common.naming.TopicName;
113116
import org.apache.pulsar.common.policies.data.BundlesData;
114117
import org.apache.pulsar.common.policies.data.LocalPolicies;
118+
import org.apache.pulsar.common.policies.data.OffloadPolicies;
119+
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
120+
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
115121
import org.apache.pulsar.common.policies.data.SubscriptionStats;
116122
import org.apache.pulsar.common.policies.data.TopicStats;
117123
import org.apache.pulsar.common.protocol.Commands;
@@ -1745,4 +1751,92 @@ public void testUnsubscribeNonDurableSub() throws Exception {
17451751
fail("Unsubscribe failed");
17461752
}
17471753
}
1754+
1755+
1756+
@Test
1757+
public void testOffloadConfShouldNotAppliedForSystemTopic() throws PulsarAdminException {
1758+
final String driver = "aws-s3";
1759+
final String region = "test-region";
1760+
final String bucket = "test-bucket";
1761+
final String role = "test-role";
1762+
final String roleSessionName = "test-role-session-name";
1763+
final String credentialId = "test-credential-id";
1764+
final String credentialSecret = "test-credential-secret";
1765+
final String endPoint = "test-endpoint";
1766+
final Integer maxBlockSizeInBytes = 5;
1767+
final Integer readBufferSizeInBytes = 2;
1768+
final Long offloadThresholdInBytes = 10L;
1769+
final Long offloadThresholdInSeconds = 1000L;
1770+
final Long offloadDeletionLagInMillis = 5L;
1771+
1772+
final OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(
1773+
driver,
1774+
region,
1775+
bucket,
1776+
endPoint,
1777+
role,
1778+
roleSessionName,
1779+
credentialId,
1780+
credentialSecret,
1781+
maxBlockSizeInBytes,
1782+
readBufferSizeInBytes,
1783+
offloadThresholdInBytes,
1784+
offloadThresholdInSeconds,
1785+
offloadDeletionLagInMillis,
1786+
OffloadedReadPriority.TIERED_STORAGE_FIRST
1787+
);
1788+
1789+
var fakeOffloader = new LedgerOffloader() {
1790+
@Override
1791+
public String getOffloadDriverName() {
1792+
return driver;
1793+
}
1794+
1795+
@Override
1796+
public CompletableFuture<Void> offload(ReadHandle ledger, UUID uid, Map<String, String> extraMetadata) {
1797+
return CompletableFuture.completedFuture(null);
1798+
}
1799+
1800+
@Override
1801+
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
1802+
return CompletableFuture.completedFuture(null);
1803+
}
1804+
1805+
@Override
1806+
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
1807+
return CompletableFuture.completedFuture(null);
1808+
}
1809+
1810+
@Override
1811+
public OffloadPolicies getOffloadPolicies() {
1812+
return offloadPolicies;
1813+
}
1814+
1815+
@Override
1816+
public void close() {
1817+
}
1818+
};
1819+
1820+
final BrokerService brokerService = pulsar.getBrokerService();
1821+
final String namespace = "prop/" + UUID.randomUUID();
1822+
admin.namespaces().createNamespace(namespace);
1823+
admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);
1824+
1825+
// Inject the cache to avoid real load off-loader jar
1826+
final Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = pulsar.getLedgerOffloaderMap();
1827+
ledgerOffloaderMap.put(NamespaceName.get(namespace), fakeOffloader);
1828+
1829+
// (1) test normal topic
1830+
final String normalTopic = "persistent://" + namespace + "/" + UUID.randomUUID();
1831+
var managedLedgerConfig = brokerService.getManagedLedgerConfig(TopicName.get(normalTopic)).join();
1832+
1833+
Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), fakeOffloader);
1834+
1835+
// (2) test system topic
1836+
for (String eventTopicName : SystemTopicNames.EVENTS_TOPIC_NAMES) {
1837+
managedLedgerConfig = brokerService.getManagedLedgerConfig(TopicName.get(eventTopicName)).join();
1838+
Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), NullLedgerOffloader.INSTANCE);
1839+
}
1840+
}
17481841
}
1842+

0 commit comments

Comments
 (0)