Skip to content

Commit

Permalink
[feat][broker] Support configuring replicator rate limiter per cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Mar 6, 2025
1 parent 8f8ecba commit 7dd7e00
Show file tree
Hide file tree
Showing 18 changed files with 659 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1532,11 +1532,12 @@ protected SubscribeRate internalGetSubscribeRate() {
return policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
}

protected void internalRemoveReplicatorDispatchRate() {
protected void internalRemoveReplicatorDispatchRate(String cluster) {
validateSuperUserAccess();
try {
updatePolicies(namespaceName, policies -> {
policies.replicatorDispatchRate.remove(pulsar().getConfiguration().getClusterName());
policies.replicatorDispatchRate.remove(
StringUtils.isNotEmpty(cluster) ? cluster : pulsar().getConfiguration().getClusterName());
return policies;
});
log.info("[{}] Successfully delete the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
Expand All @@ -1548,12 +1549,14 @@ protected void internalRemoveReplicatorDispatchRate() {
}
}

protected void internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate) {
protected void internalSetReplicatorDispatchRate(String cluster, DispatchRateImpl dispatchRate) {
validateSuperUserAccess();
log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
try {
updatePolicies(namespaceName, policies -> {
policies.replicatorDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
policies.replicatorDispatchRate.put(
StringUtils.isNotEmpty(cluster) ? cluster : pulsar().getConfiguration().getClusterName(),
dispatchRate);
return policies;
});
log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
Expand All @@ -1565,11 +1568,12 @@ protected void internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate)
}
}

protected DispatchRate internalGetReplicatorDispatchRate() {
protected DispatchRate internalGetReplicatorDispatchRate(String cluster) {
validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
return policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
return policies.replicatorDispatchRate.get(
StringUtils.isNotEmpty(cluster) ? cluster : pulsar().getConfiguration().getClusterName());
}

protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3597,25 +3597,42 @@ protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer ma
});
}

protected CompletableFuture<DispatchRateImpl> internalGetReplicatorDispatchRate(boolean applied, boolean isGlobal) {
protected CompletableFuture<DispatchRateImpl> internalGetReplicatorDispatchRate(String cluster, boolean applied,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getReplicatorDispatchRate)
.orElseGet(() -> {
.thenApply(op -> op.map(n -> {
String key = StringUtils.isNotEmpty(cluster) ? cluster :
pulsar().getConfiguration().getClusterName();
return n.getFinalReplicatorDispatchRateMap(pulsar().getConfiguration().getClusterName()).get(key);
}).orElseGet(() -> {
if (applied) {
String key = StringUtils.isNotEmpty(cluster) ? cluster :
pulsar().getConfiguration().getClusterName();
DispatchRateImpl namespacePolicy = getNamespacePolicies(namespaceName)
.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
.replicatorDispatchRate.get(key);
return namespacePolicy == null ? replicatorDispatchRate() : namespacePolicy;
}
return null;
}));
}

protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate,
protected CompletableFuture<Void> internalSetReplicatorDispatchRate(String cluster, DispatchRateImpl dispatchRate,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setReplicatorDispatchRate(dispatchRate);
boolean usingDefaultCluster = StringUtils.isEmpty(cluster);
if (dispatchRate == null) {
topicPolicies.getReplicatorDispatchRateMap()
.remove(usingDefaultCluster ? pulsar().getConfiguration().getClusterName() : cluster);
} else {
topicPolicies.getReplicatorDispatchRateMap()
.put(usingDefaultCluster ? pulsar().getConfiguration().getClusterName() : cluster,
dispatchRate);
}
if (usingDefaultCluster) {
topicPolicies.setReplicatorDispatchRate(dispatchRate);
}
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,10 +753,11 @@ public void deleteSubscriptionDispatchRate(@PathParam("property") String propert
public void setReplicatorDispatchRate(
@PathParam("tenant") String tenant,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@QueryParam("cluster") String queryCluster,
@ApiParam(value = "Replicator dispatch rate for all topics of the specified namespace")
DispatchRateImpl dispatchRate) {
validateNamespaceName(tenant, cluster, namespace);
internalSetReplicatorDispatchRate(dispatchRate);
internalSetReplicatorDispatchRate(queryCluster, dispatchRate);
}

@GET
Expand All @@ -768,9 +769,10 @@ public void setReplicatorDispatchRate(
@ApiResponse(code = 404, message = "Namespace does not exist") })
public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
@PathParam("namespace") String namespace,
@QueryParam("cluster") String queryCluster) {
validateNamespaceName(tenant, cluster, namespace);
return internalGetReplicatorDispatchRate();
return internalGetReplicatorDispatchRate(cluster);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,21 +717,24 @@ public SubscribeRate getSubscribeRate(@PathParam("tenant") String tenant,
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
public void removeReplicatorDispatchRate(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
@PathParam("namespace") String namespace,
@QueryParam("cluster") String cluster
) {
validateNamespaceName(tenant, namespace);
internalRemoveReplicatorDispatchRate();
internalRemoveReplicatorDispatchRate(cluster);
}

@POST
@Path("/{tenant}/{namespace}/replicatorDispatchRate")
@ApiOperation(value = "Set replicator dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
public void setReplicatorDispatchRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @ApiParam(value =
@PathParam("namespace") String namespace,
@QueryParam("cluster") String cluster, @ApiParam(value =
"Replicator dispatch rate for all topics of the specified namespace")
DispatchRateImpl dispatchRate) {
validateNamespaceName(tenant, namespace);
internalSetReplicatorDispatchRate(dispatchRate);
internalSetReplicatorDispatchRate(cluster, dispatchRate);
}

@GET
Expand All @@ -742,9 +745,11 @@ public void setReplicatorDispatchRate(@PathParam("tenant") String tenant,
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
@PathParam("namespace") String namespace,
@QueryParam("cluster") String cluster
) {
validateNamespaceName(tenant, namespace);
return internalGetReplicatorDispatchRate();
return internalGetReplicatorDispatchRate(cluster);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2531,11 +2531,12 @@ public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("applied") @DefaultValue("false") boolean applied,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("cluster") String cluster) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.READ)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetReplicatorDispatchRate(applied, isGlobal))
.thenCompose(__ -> internalGetReplicatorDispatchRate(cluster, applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getReplicatorDispatchRate", ex, asyncResponse);
Expand All @@ -2559,11 +2560,12 @@ public void setReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("cluster") String cluster,
@ApiParam(value = "Replicator dispatch rate of the topic") DispatchRateImpl dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetReplicatorDispatchRate(dispatchRate, isGlobal))
.thenCompose(__ -> internalSetReplicatorDispatchRate(cluster, dispatchRate, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}"
+ ", replicatorDispatchRate={}, isGlobal={}",
Expand All @@ -2590,11 +2592,12 @@ public void removeReplicatorDispatchRate(@Suspended final AsyncResponse asyncRes
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("cluster") String cluster) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetReplicatorDispatchRate(null, isGlobal))
.thenCompose(__ -> internalSetReplicatorDispatchRate(cluster, null, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}",
clientAppId(), namespaceName, topicName.getLocalName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,14 @@ public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() {
return this.topicPolicies.getSchemaCompatibilityStrategy().get();
}

public DispatchRateImpl getReplicatorDispatchRate() {
return this.topicPolicies.getReplicatorDispatchRate().get();
public DispatchRateImpl getReplicatorDispatchRate(String remoteCluster) {
Map<String, DispatchRateImpl> dispatchRateMap = topicPolicies.getReplicatorDispatchRate().get();
DispatchRateImpl dispatchRate = dispatchRateMap.get(remoteCluster);
if (dispatchRate == null) {
// Use the default dispatch rate.
dispatchRate = dispatchRateMap.get(brokerService.pulsar().getConfiguration().getClusterName());
}
return normalize(dispatchRate);
}

public DispatchRateImpl getDispatchRate() {
Expand Down Expand Up @@ -225,7 +231,8 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
topicPolicies.getReplicatorDispatchRate().updateTopicValue(normalize(data.getReplicatorDispatchRate()));
topicPolicies.getReplicatorDispatchRate().updateTopicValue(
data.getFinalReplicatorDispatchRateMap(brokerService.pulsar().getConfiguration().getClusterName()));
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
topicPolicies.getSubscribeRate().updateTopicValue(SubscribeRate.normalize(data.getSubscribeRate()));
topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate()));
Expand Down Expand Up @@ -272,8 +279,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
.map(DelayedDeliveryPolicies::getTickTime).orElse(null));
topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
updateNamespaceReplicatorDispatchRate(namespacePolicies,
brokerService.getPulsar().getConfig().getClusterName());
topicPolicies.getReplicatorDispatchRate().updateNamespaceValue(namespacePolicies.replicatorDispatchRate);
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
type -> this.topicPolicies.getBackLogQuotaMap().get(type)
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
Expand Down Expand Up @@ -303,11 +309,6 @@ private void updateNamespaceSubscriptionDispatchRate(Policies namespacePolicies,
.updateNamespaceValue(normalize(namespacePolicies.subscriptionDispatchRate.get(cluster)));
}

private void updateNamespaceReplicatorDispatchRate(Policies namespacePolicies, String cluster) {
topicPolicies.getReplicatorDispatchRate()
.updateNamespaceValue(normalize(namespacePolicies.replicatorDispatchRate.get(cluster)));
}

private DispatchRateImpl normalize(DispatchRateImpl dispatchRate) {
if (dispatchRate != null
&& (dispatchRate.getDispatchThrottlingRateInMsg() > 0
Expand Down Expand Up @@ -411,12 +412,14 @@ private DispatchRateImpl subscriptionDispatchRateInBroker(ServiceConfiguration c
.build();
}

private DispatchRateImpl replicatorDispatchRateInBroker(ServiceConfiguration config) {
return DispatchRateImpl.builder()
.dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerReplicatorInMsg())
.dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerReplicatorInByte())
.ratePeriodInSecond(1)
.build();
private Map<String, DispatchRateImpl> replicatorDispatchRateInBroker(ServiceConfiguration config) {
Map<String, DispatchRateImpl> dispatchRate = new ConcurrentHashMap<>();
dispatchRate.put(brokerService.pulsar().getConfiguration().getClusterName(), DispatchRateImpl.builder()
.dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerReplicatorInMsg())
.dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerReplicatorInByte())
.ratePeriodInSecond(1)
.build());
return dispatchRate;
}

private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,16 @@ private DispatchRate createDispatchRate() {
.build();
}

public void updateDispatchRate(){
updateDispatchRate((String) null);
}

/**
* Update dispatch-throttling-rate.
* Topic-level has the highest priority, then namespace-level, and finally use dispatch-throttling-rate in
* broker-level
*/
public void updateDispatchRate() {
public void updateDispatchRate(String remoteCluster) {
switch (type) {
case TOPIC:
updateDispatchRate(topic.getDispatchRate());
Expand All @@ -177,7 +181,7 @@ public void updateDispatchRate() {
updateDispatchRate(topic.getSubscriptionDispatchRate());
return;
case REPLICATOR:
updateDispatchRate(topic.getReplicatorDispatchRate());
updateDispatchRate(topic.getReplicatorDispatchRate(remoteCluster));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ public Optional<ResourceGroupDispatchLimiter> getResourceGroupDispatchRateLimite
public void initializeDispatchRateLimiterIfNeeded() {
synchronized (dispatchRateLimiterLock) {
if (!dispatchRateLimiter.isPresent()
&& DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) {
&& DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate(remoteCluster))) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.REPLICATOR));
}

Expand All @@ -866,7 +866,7 @@ public void initializeDispatchRateLimiterIfNeeded() {
@Override
public void updateRateLimiter() {
initializeDispatchRateLimiterIfNeeded();
dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
dispatchRateLimiter.ifPresent(n -> n.updateDispatchRate(remoteCluster));
}

private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?> msg, ByteBuf payload) {
Expand Down
Loading

0 comments on commit 7dd7e00

Please sign in to comment.