Skip to content

Commit

Permalink
[feat][broker] Support configuring replicator rate limiter per cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Mar 10, 2025
1 parent 8f8ecba commit 1d2177c
Show file tree
Hide file tree
Showing 19 changed files with 717 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1532,11 +1532,12 @@ protected SubscribeRate internalGetSubscribeRate() {
return policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
}

protected void internalRemoveReplicatorDispatchRate() {
protected void internalRemoveReplicatorDispatchRate(String cluster) {
validateSuperUserAccess();
try {
updatePolicies(namespaceName, policies -> {
policies.replicatorDispatchRate.remove(pulsar().getConfiguration().getClusterName());
policies.replicatorDispatchRate.remove(
StringUtils.isNotEmpty(cluster) ? cluster : pulsar().getConfiguration().getClusterName());
return policies;
});
log.info("[{}] Successfully delete the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
Expand All @@ -1548,12 +1549,14 @@ protected void internalRemoveReplicatorDispatchRate() {
}
}

protected void internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate) {
protected void internalSetReplicatorDispatchRate(String cluster, DispatchRateImpl dispatchRate) {
validateSuperUserAccess();
log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
try {
updatePolicies(namespaceName, policies -> {
policies.replicatorDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
policies.replicatorDispatchRate.put(
StringUtils.isNotEmpty(cluster) ? cluster : pulsar().getConfiguration().getClusterName(),
dispatchRate);
return policies;
});
log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
Expand All @@ -1565,11 +1568,12 @@ protected void internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate)
}
}

protected DispatchRate internalGetReplicatorDispatchRate() {
protected DispatchRate internalGetReplicatorDispatchRate(String cluster) {
validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
return policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
return policies.replicatorDispatchRate.get(
StringUtils.isNotEmpty(cluster) ? cluster : pulsar().getConfiguration().getClusterName());
}

protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3597,25 +3597,64 @@ protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer ma
});
}

protected CompletableFuture<DispatchRateImpl> internalGetReplicatorDispatchRate(boolean applied, boolean isGlobal) {
protected CompletableFuture<DispatchRateImpl> internalGetReplicatorDispatchRate(String cluster, boolean applied,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getReplicatorDispatchRate)
.orElseGet(() -> {
if (applied) {
DispatchRateImpl namespacePolicy = getNamespacePolicies(namespaceName)
.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
return namespacePolicy == null ? replicatorDispatchRate() : namespacePolicy;
.thenApply(op -> op.map(n -> {
// Prioritize getting the dispatch rate from the replicatorDispatchRateMap if a specific cluster
// is provided.
// If the cluster is empty, it means the user has not explicitly set a rate for a particular
// cluster,
// so we still attempt to retrieve the value from the replicatorDispatchRateMap using the current
// cluster.
// If `applied` is true, we also need to consider the default cluster rate and finally fallback
// to `getReplicatorDispatchRate()` for backward compatibility.
if (StringUtils.isNotEmpty(cluster)) {
DispatchRateImpl dispatchRate = n.getReplicatorDispatchRateMap().get(cluster);
if (dispatchRate != null) {
return dispatchRate;
}
}

if (applied || StringUtils.isEmpty(cluster)) {
DispatchRateImpl dispatchRate =
n.getReplicatorDispatchRateMap().get(pulsar().getConfiguration().getClusterName());
if (dispatchRate != null) {
return dispatchRate;
}
// Backward compatibility.
return n.getReplicatorDispatchRate();
}
return null;
}).orElseGet(() -> {
if (!applied) {
return null;
}
Map<String, DispatchRateImpl> replicatorDispatchRate =
getNamespacePolicies(namespaceName).replicatorDispatchRate;
DispatchRateImpl namespacePolicy = replicatorDispatchRate.getOrDefault(cluster,
replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName()));
return namespacePolicy == null ? replicatorDispatchRate() : namespacePolicy;
}));
}

protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate,
protected CompletableFuture<Void> internalSetReplicatorDispatchRate(String cluster, DispatchRateImpl dispatchRate,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setReplicatorDispatchRate(dispatchRate);
boolean usingDefaultCluster = StringUtils.isEmpty(cluster);
if (dispatchRate == null) {
topicPolicies.getReplicatorDispatchRateMap()
.remove(usingDefaultCluster ? pulsar().getConfiguration().getClusterName() : cluster);
} else {
topicPolicies.getReplicatorDispatchRateMap()
.put(usingDefaultCluster ? pulsar().getConfiguration().getClusterName() : cluster,
dispatchRate);
}
if (usingDefaultCluster) {
topicPolicies.setReplicatorDispatchRate(dispatchRate);
}
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,10 +753,11 @@ public void deleteSubscriptionDispatchRate(@PathParam("property") String propert
public void setReplicatorDispatchRate(
@PathParam("tenant") String tenant,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@QueryParam("cluster") String queryCluster,
@ApiParam(value = "Replicator dispatch rate for all topics of the specified namespace")
DispatchRateImpl dispatchRate) {
validateNamespaceName(tenant, cluster, namespace);
internalSetReplicatorDispatchRate(dispatchRate);
internalSetReplicatorDispatchRate(queryCluster, dispatchRate);
}

@GET
Expand All @@ -768,9 +769,10 @@ public void setReplicatorDispatchRate(
@ApiResponse(code = 404, message = "Namespace does not exist") })
public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
@PathParam("namespace") String namespace,
@QueryParam("cluster") String queryCluster) {
validateNamespaceName(tenant, cluster, namespace);
return internalGetReplicatorDispatchRate();
return internalGetReplicatorDispatchRate(cluster);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,21 +717,24 @@ public SubscribeRate getSubscribeRate(@PathParam("tenant") String tenant,
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
public void removeReplicatorDispatchRate(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
@PathParam("namespace") String namespace,
@QueryParam("cluster") String cluster
) {
validateNamespaceName(tenant, namespace);
internalRemoveReplicatorDispatchRate();
internalRemoveReplicatorDispatchRate(cluster);
}

@POST
@Path("/{tenant}/{namespace}/replicatorDispatchRate")
@ApiOperation(value = "Set replicator dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
public void setReplicatorDispatchRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @ApiParam(value =
@PathParam("namespace") String namespace,
@QueryParam("cluster") String cluster, @ApiParam(value =
"Replicator dispatch rate for all topics of the specified namespace")
DispatchRateImpl dispatchRate) {
validateNamespaceName(tenant, namespace);
internalSetReplicatorDispatchRate(dispatchRate);
internalSetReplicatorDispatchRate(cluster, dispatchRate);
}

@GET
Expand All @@ -742,9 +745,11 @@ public void setReplicatorDispatchRate(@PathParam("tenant") String tenant,
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
@PathParam("namespace") String namespace,
@QueryParam("cluster") String cluster
) {
validateNamespaceName(tenant, namespace);
return internalGetReplicatorDispatchRate();
return internalGetReplicatorDispatchRate(cluster);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2531,11 +2531,12 @@ public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("applied") @DefaultValue("false") boolean applied,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("cluster") String cluster) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.READ)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetReplicatorDispatchRate(applied, isGlobal))
.thenCompose(__ -> internalGetReplicatorDispatchRate(cluster, applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getReplicatorDispatchRate", ex, asyncResponse);
Expand All @@ -2559,11 +2560,12 @@ public void setReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("cluster") String cluster,
@ApiParam(value = "Replicator dispatch rate of the topic") DispatchRateImpl dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetReplicatorDispatchRate(dispatchRate, isGlobal))
.thenCompose(__ -> internalSetReplicatorDispatchRate(cluster, dispatchRate, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}"
+ ", replicatorDispatchRate={}, isGlobal={}",
Expand All @@ -2590,11 +2592,12 @@ public void removeReplicatorDispatchRate(@Suspended final AsyncResponse asyncRes
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("cluster") String cluster) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetReplicatorDispatchRate(null, isGlobal))
.thenCompose(__ -> internalSetReplicatorDispatchRate(cluster, null, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}",
clientAppId(), namespaceName, topicName.getLocalName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -183,8 +184,14 @@ public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() {
return this.topicPolicies.getSchemaCompatibilityStrategy().get();
}

public DispatchRateImpl getReplicatorDispatchRate() {
return this.topicPolicies.getReplicatorDispatchRate().get();
public DispatchRateImpl getReplicatorDispatchRate(String remoteCluster) {
Map<String, DispatchRateImpl> dispatchRateMap = topicPolicies.getReplicatorDispatchRate().get();
DispatchRateImpl dispatchRate = dispatchRateMap.get(remoteCluster);
if (dispatchRate == null) {
// Use the default dispatch rate.
dispatchRate = dispatchRateMap.get(brokerService.pulsar().getConfiguration().getClusterName());
}
return normalize(dispatchRate);
}

public DispatchRateImpl getDispatchRate() {
Expand Down Expand Up @@ -225,7 +232,13 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
topicPolicies.getReplicatorDispatchRate().updateTopicValue(normalize(data.getReplicatorDispatchRate()));
// Backward compatibility.
// Default use the current cluster name as key, {@link TopicPolicies#getReplicatorDispatchRate()} is value.
HashMap<String, DispatchRateImpl> replicatorDispatchRateMap =
new HashMap<>(data.getReplicatorDispatchRateMap());
replicatorDispatchRateMap.putIfAbsent(brokerService.pulsar().getConfiguration().getClusterName(),
data.getReplicatorDispatchRate());
topicPolicies.getReplicatorDispatchRate().updateTopicValue(replicatorDispatchRateMap);
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
topicPolicies.getSubscribeRate().updateTopicValue(SubscribeRate.normalize(data.getSubscribeRate()));
topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate()));
Expand Down Expand Up @@ -272,8 +285,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
.map(DelayedDeliveryPolicies::getTickTime).orElse(null));
topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
updateNamespaceReplicatorDispatchRate(namespacePolicies,
brokerService.getPulsar().getConfig().getClusterName());
topicPolicies.getReplicatorDispatchRate().updateNamespaceValue(namespacePolicies.replicatorDispatchRate);
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
type -> this.topicPolicies.getBackLogQuotaMap().get(type)
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
Expand Down Expand Up @@ -303,11 +315,6 @@ private void updateNamespaceSubscriptionDispatchRate(Policies namespacePolicies,
.updateNamespaceValue(normalize(namespacePolicies.subscriptionDispatchRate.get(cluster)));
}

private void updateNamespaceReplicatorDispatchRate(Policies namespacePolicies, String cluster) {
topicPolicies.getReplicatorDispatchRate()
.updateNamespaceValue(normalize(namespacePolicies.replicatorDispatchRate.get(cluster)));
}

private DispatchRateImpl normalize(DispatchRateImpl dispatchRate) {
if (dispatchRate != null
&& (dispatchRate.getDispatchThrottlingRateInMsg() > 0
Expand Down Expand Up @@ -411,12 +418,14 @@ private DispatchRateImpl subscriptionDispatchRateInBroker(ServiceConfiguration c
.build();
}

private DispatchRateImpl replicatorDispatchRateInBroker(ServiceConfiguration config) {
return DispatchRateImpl.builder()
.dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerReplicatorInMsg())
.dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerReplicatorInByte())
.ratePeriodInSecond(1)
.build();
private Map<String, DispatchRateImpl> replicatorDispatchRateInBroker(ServiceConfiguration config) {
Map<String, DispatchRateImpl> dispatchRate = new HashMap<>();
dispatchRate.put(brokerService.pulsar().getConfiguration().getClusterName(), DispatchRateImpl.builder()
.dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerReplicatorInMsg())
.dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerReplicatorInByte())
.ratePeriodInSecond(1)
.build());
return dispatchRate;
}

private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) {
Expand Down
Loading

0 comments on commit 1d2177c

Please sign in to comment.