Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into managed-ledger-exec…
Browse files Browse the repository at this point in the history
…utor-cache
  • Loading branch information
merlimat committed Oct 20, 2022
2 parents 42eb83b + 7b52a92 commit c27ba86
Show file tree
Hide file tree
Showing 1,819 changed files with 2,756 additions and 492,534 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/ci-semantic-pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
name: Check pull request title
runs-on: ubuntu-latest
steps:
- uses: amannn/action-semantic-pull-request@v4
- uses: amannn/action-semantic-pull-request@v5.0.2
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
Expand All @@ -49,16 +49,18 @@ jobs:
revert
# Scope abbreviation comments:
# cli -> command line interface
# fn -> Pulasr Functions
# fn -> Pulsar Functions
# io -> Pulsar Connectors
# offload -> tiered storage
# sec -> security
# sql -> Pulsar Trino Plugin
# txn -> transaction
# ws -> websocket
# ml -> managed ledger
scopes: |
admin
broker
ml
build
ci
cli
Expand Down
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,10 @@ splitTopicAndPartitionLabelInPrometheus=false
# Otherwise, aggregate it by list index.
aggregatePublisherStatsByProducerName=false

# Interval between checks to see if cluster is migrated and marks topic migrated
# if cluster is marked migrated. Disable with value 0. (Default disabled).
clusterMigrationCheckDurationSeconds=0

### --- Schema storage --- ###
# The schema storage implementation used by this broker
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
Expand Down
5 changes: 5 additions & 0 deletions conf/zookeeper.conf
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ forceSync=yes
# Default: false
sslQuorum=false

# Enable TLS Certificate reloading for Quorum and Server connentions
# Follows Pulsar's general default to reload these files.
sslQuorumReloadCertFiles=true
client.certReload=true

# Specifies that the client port should accept SSL connections
# (using the same configuration as the secure client port).
# Default: false
Expand Down
4 changes: 2 additions & 2 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,8 @@ MIT License

Protocol Buffers License
* Protocol Buffers
- com.google.protobuf-protobuf-java-3.19.2.jar -- licenses/LICENSE-protobuf.txt
- com.google.protobuf-protobuf-java-util-3.19.2.jar -- licenses/LICENSE-protobuf.txt
- com.google.protobuf-protobuf-java-3.19.6.jar -- licenses/LICENSE-protobuf.txt
- com.google.protobuf-protobuf-java-util-3.19.6.jar -- licenses/LICENSE-protobuf.txt

CDDL-1.1 -- licenses/LICENSE-CDDL-1.1.txt
* Java Annotations API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, L

void asyncTerminate(TerminateCallback callback, Object ctx);

CompletableFuture<Position> asyncMigrate();

/**
* Terminate the managed ledger and return the last committed entry.
*
Expand Down Expand Up @@ -534,6 +536,11 @@ void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, L
*/
boolean isTerminated();

/**
* Returns whether the managed ledger was migrated.
*/
boolean isMigrated();

/**
* Returns managed-ledger config.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2326,8 +2326,8 @@ List<Entry> filterReadEntries(List<Entry> entries) {
log.debug("[{}] [{}] Filtering entries {} - alreadyDeleted: {}", ledger.getName(), name, entriesRange,
individualDeletedMessages);
}
if (individualDeletedMessages.isEmpty() || individualDeletedMessages.span() == null
|| !entriesRange.isConnected(individualDeletedMessages.span())) {
Range<PositionImpl> span = individualDeletedMessages.isEmpty() ? null : individualDeletedMessages.span();
if (span == null || !entriesRange.isConnected(span)) {
// There are no individually deleted messages in this entry list, no need to perform filtering
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] No filtering needed for entries {}", ledger.getName(), name, entriesRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down Expand Up @@ -241,6 +242,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {

protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
private static final String MIGRATION_STATE_PROPERTY = "migrated";

public enum State {
None, // Uninitialized
Expand Down Expand Up @@ -268,6 +270,7 @@ public enum PositionBound {
private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state");
protected volatile State state = null;
private volatile boolean migrated = false;

@Getter
private final OrderedScheduler scheduledExecutor;
Expand Down Expand Up @@ -343,7 +346,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
// Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
this.maximumRolloverTimeMs = getMaximumRolloverTimeMs(config);
this.mlOwnershipChecker = mlOwnershipChecker;
this.propertiesMap = new HashMap();
this.propertiesMap = new ConcurrentHashMap<>();
this.inactiveLedgerRollOverTimeMs = config.getInactiveLedgerRollOverTimeMs();
if (config.getManagedLedgerInterceptor() != null) {
this.managedLedgerInterceptor = config.getManagedLedgerInterceptor();
Expand All @@ -367,7 +370,6 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition());
log.info("[{}] Recovering managed ledger terminated at {}", name, lastConfirmedEntry);
}

for (LedgerInfo ls : mlInfo.getLedgerInfoList()) {
ledgers.put(ls.getLedgerId(), ls);
}
Expand All @@ -379,6 +381,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
propertiesMap.put(property.getKey(), property.getValue());
}
}
migrated = mlInfo.hasTerminatedPosition() && propertiesMap.containsKey(MIGRATION_STATE_PROPERTY);
if (managedLedgerInterceptor != null) {
managedLedgerInterceptor.onManagedLedgerPropertiesInitialize(propertiesMap);
}
Expand Down Expand Up @@ -1271,6 +1274,27 @@ private long consumedLedgerSize(long ledgerSize, long ledgerEntries, long consum
}
}

public CompletableFuture<Position> asyncMigrate() {
propertiesMap.put(MIGRATION_STATE_PROPERTY, Boolean.TRUE.toString());
CompletableFuture<Position> result = new CompletableFuture<>();
asyncTerminate(new TerminateCallback() {

@Override
public void terminateComplete(Position lastCommittedPosition, Object ctx) {
migrated = true;
log.info("[{}] topic successfully terminated and migrated at {}", name, lastCommittedPosition);
result.complete(lastCommittedPosition);
}

@Override
public void terminateFailed(ManagedLedgerException exception, Object ctx) {
log.info("[{}] topic failed to terminate and migrate ", name, exception);
result.completeExceptionally(exception);
}
}, null);
return result;
}

@Override
public synchronized void asyncTerminate(TerminateCallback callback, Object ctx) {
if (state == State.Fenced) {
Expand Down Expand Up @@ -1363,6 +1387,11 @@ public boolean isTerminated() {
return state == State.Terminated;
}

@Override
public boolean isMigrated() {
return migrated;
}

@Override
public void close() throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,12 @@ public Pair<Integer, Long> evictLEntriesBeforeTimestamp(long maxTimestamp) {
if (entry == null || timestampExtractor.getTimestamp(entry.getValue()) > maxTimestamp) {
break;
}

entry = entries.pollFirstEntry();
if (entry == null) {
Value value = entry.getValue();
boolean removeHits = entries.remove(entry.getKey(), value);
if (!removeHits) {
break;
}

Value value = entry.getValue();
removedSize += weighter.getSize(value);
removedCount++;
value.release();
Expand Down
10 changes: 9 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ flexible messaging model and an intuitive client API.</description>
<docker-maven.version>0.40.2</docker-maven.version>
<docker.verbose>true</docker.verbose>
<typetools.version>0.5.0</typetools.version>
<protobuf3.version>3.19.2</protobuf3.version>
<protobuf3.version>3.19.6</protobuf3.version>
<protoc3.version>${protobuf3.version}</protoc3.version>
<grpc.version>1.45.1</grpc.version>
<google-http-client.version>1.41.0</google-http-client.version>
Expand Down Expand Up @@ -247,6 +247,7 @@ flexible messaging model and an intuitive client API.</description>
<objenesis.version>3.1</objenesis.version>
<awaitility.version>4.2.0</awaitility.version>
<reload4j.version>1.2.22</reload4j.version>
<jettison.version>1.5.1</jettison.version>

<!-- Plugin dependencies -->
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
Expand Down Expand Up @@ -798,6 +799,13 @@ flexible messaging model and an intuitive client API.</description>
<scope>import</scope>
</dependency>

<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<version>${jettison.version}</version>
</dependency>


<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2514,6 +2514,13 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
)
private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Interval between checks to see if cluster is migrated and marks topic migrated "
+ " if cluster is marked migrated. Disable with value 0. (Default disabled)."
)
private int clusterMigrationCheckDurationSeconds = 0;

@FieldContext(
category = CATEGORY_SCHEMA,
doc = "Enforce schema validation on following cases:\n\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,11 @@ private void startBookieWithMetadataStore() throws Exception {
} else {
log.info("Starting BK with metadata store:", metadataStoreUrl);
}

ServerConfiguration bkServerConf = new ServerConfiguration();
bkServerConf.loadConf(new File(configFile).toURI().toURL());
bkCluster = BKCluster.builder()
.baseServerConfiguration(bkServerConf)
.metadataServiceUri(metadataStoreUrl)
.bkPort(bkPort)
.numBookies(numOfBk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,8 @@ public CompletableFuture<Void> closeAsync() {
if (transactionExecutorProvider != null) {
transactionExecutorProvider.shutdownNow();
}
MLPendingAckStoreProvider.closeBufferedWriterMetrics();
MLTransactionMetadataStoreProvider.closeBufferedWriterMetrics();
if (this.offloaderStats != null) {
this.offloaderStats.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,40 +742,40 @@ protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityS
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ)
.thenCompose((__) -> {
CompletableFuture<SchemaCompatibilityStrategy> future;
if (config().isTopicLevelPoliciesEnabled()) {
future = getTopicPoliciesAsyncWithRetry(topicName)
.thenApply(op -> op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null));
} else {
future = CompletableFuture.completedFuture(null);
}

return future.thenCompose((topicSchemaCompatibilityStrategy) -> {
if (!SchemaCompatibilityStrategy.isUndefined(topicSchemaCompatibilityStrategy)) {
return CompletableFuture.completedFuture(topicSchemaCompatibilityStrategy);
}
return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
SchemaCompatibilityStrategy schemaCompatibilityStrategy =
policies.schema_compatibility_strategy;
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
}
}
return schemaCompatibilityStrategy;
});
});
}).whenComplete((__, ex) -> {
.thenCompose((__) -> getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] Failed to get schema compatibility strategy of topic {} {}",
clientAppId(), topicName, ex);
}
});
}

protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsyncWithoutAuth() {
CompletableFuture<SchemaCompatibilityStrategy> future = CompletableFuture.completedFuture(null);
if (config().isTopicLevelPoliciesEnabled()) {
future = getTopicPoliciesAsyncWithRetry(topicName)
.thenApply(op -> op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null));
}

return future.thenCompose((topicSchemaCompatibilityStrategy) -> {
if (!SchemaCompatibilityStrategy.isUndefined(topicSchemaCompatibilityStrategy)) {
return CompletableFuture.completedFuture(topicSchemaCompatibilityStrategy);
}
return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
SchemaCompatibilityStrategy schemaCompatibilityStrategy =
policies.schema_compatibility_strategy;
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
}
}
return schemaCompatibilityStrategy;
});
});
}

@CanIgnoreReturnValue
public static <T> T checkNotNull(T reference) {
return Objects.requireNonNull(reference);
Expand Down
Loading

0 comments on commit c27ba86

Please sign in to comment.