From 16e2cb05c6f62d82ef06e568249854ae31f586ad Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Sat, 27 May 2023 13:57:08 +0800 Subject: [PATCH 1/9] [improve][broker] Supplement schema ledger if schema ledger is lost --- .../pulsar/broker/service/AbstractTopic.java | 25 +++ .../pulsar/broker/service/Consumer.java | 18 +- .../pulsar/broker/service/Producer.java | 19 ++- .../pulsar/broker/service/ServerCnx.java | 45 +++-- .../broker/service/SubscriptionOption.java | 5 +- .../apache/pulsar/broker/service/Topic.java | 4 +- .../service/persistent/PersistentTopic.java | 66 +++++-- .../schema/BookkeeperSchemaStorage.java | 135 ++++++++++++++- .../schema/DefaultSchemaRegistryService.java | 11 ++ .../broker/service/schema/SchemaRegistry.java | 4 + .../schema/SchemaRegistryServiceImpl.java | 31 +++- .../service/schema/SchemaServiceTest.java | 161 +++++++++++++++++- .../common/protocol/schema/SchemaStorage.java | 4 + 13 files changed, 491 insertions(+), 37 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 4614b846c8eee..1641c613a661d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -697,6 +697,31 @@ public CompletableFuture checkSchemaCompatibleForConsumer(SchemaData schem .checkConsumerCompatibility(id, schema, getSchemaCompatibilityStrategy()); } + protected CompletableFuture tryCompleteTheLostSchema(SchemaVersion schemaVersion, SchemaData schema) { + String id = getSchemaId(); + return brokerService.pulsar() + .getSchemaRegistryService() + .tryCompleteTheLostSchema(id, schemaVersion, schema); + } + + @Override + public CompletableFuture findSchemaVersion(SchemaData schema) { + String id = getSchemaId(); + return brokerService.pulsar() + .getSchemaRegistryService() + .findSchemaVersion(id, schema); + } + + protected CompletableFuture getLatestSchemaVersion() { + return brokerService.pulsar() + .getSchemaRegistryService() + .getLatestSchemaVersion(getSchemaId()) + .thenApply(schemaVersion -> { + log.warn("~~~~~~~~~~~~~~~~~~~~~~" + schemaVersion); + return schemaVersion; + }); + } + @Override public CompletableFuture> addProducer(Producer producer, CompletableFuture producerQueuedFuture) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index a3f9da41e6b35..89f186c0f0d58 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -60,7 +60,7 @@ import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; @@ -146,7 +146,11 @@ public class Consumer { private long negtiveUnackedMsgsTimestamp; @Getter - private final SchemaType schemaType; + private final SchemaData schemaData; + @Getter + @Setter + private final long schemaVersion; + public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, @@ -154,7 +158,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo Map metadata, boolean readCompacted, KeySharedMeta keySharedMeta, MessageId startMessageId, long consumerEpoch) { this(subscription, subType, topicName, consumerId, priorityLevel, consumerName, isDurable, cnx, appId, - metadata, readCompacted, keySharedMeta, startMessageId, consumerEpoch, null); + metadata, readCompacted, keySharedMeta, startMessageId, consumerEpoch, null, -1L); } public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, @@ -162,7 +166,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo boolean isDurable, TransportCnx cnx, String appId, Map metadata, boolean readCompacted, KeySharedMeta keySharedMeta, MessageId startMessageId, - long consumerEpoch, SchemaType schemaType) { + long consumerEpoch, SchemaData schemaData, long schemaVersion) { this.subscription = subscription; this.subType = subType; this.topicName = topicName; @@ -220,7 +224,8 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo this.isAcknowledgmentAtBatchIndexLevelEnabled = subscription.getTopic().getBrokerService() .getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled(); - this.schemaType = schemaType; + this.schemaData = schemaData; + this.schemaVersion = schemaVersion; } @VisibleForTesting @@ -248,7 +253,8 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo this.clientAddress = null; this.startMessageId = null; this.isAcknowledgmentAtBatchIndexLevelEnabled = false; - this.schemaType = null; + this.schemaData = null; + this.schemaVersion = -1L; MESSAGE_PERMITS_UPDATER.set(this, availablePermits); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 53b79f06e8e24..f8da0428d57e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -55,6 +55,7 @@ import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl; import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.DateFormatter; @@ -98,15 +99,26 @@ public class Producer { private final Map metadata; private final SchemaVersion schemaVersion; + private final SchemaData schemaData; private final String clientAddress; // IP address only, no port number included private final AtomicBoolean isDisconnecting = new AtomicBoolean(false); + public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId, + boolean isEncrypted, Map metadata, SchemaVersion schemaVersion, long epoch, + boolean userProvidedProducerName, + ProducerAccessMode accessMode, + Optional topicEpoch, + boolean supportsPartialProducer) { + this(topic, cnx, producerId, producerName, appId, isEncrypted, metadata, schemaVersion, epoch, + userProvidedProducerName, accessMode, topicEpoch, supportsPartialProducer, null); + } + public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId, boolean isEncrypted, Map metadata, SchemaVersion schemaVersion, long epoch, boolean userProvidedProducerName, ProducerAccessMode accessMode, Optional topicEpoch, - boolean supportsPartialProducer) { + boolean supportsPartialProducer, SchemaData schemaData) { final ServiceConfiguration serviceConf = cnx.getBrokerService().pulsar().getConfiguration(); this.topic = topic; @@ -155,6 +167,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN this.isEncrypted = isEncrypted; this.schemaVersion = schemaVersion; + this.schemaData = schemaData; this.accessMode = accessMode; this.topicEpoch = topicEpoch; @@ -827,6 +840,10 @@ public SchemaVersion getSchemaVersion() { return schemaVersion; } + public SchemaData getSchemaData() { + return schemaData; + } + public ProducerAccessMode getAccessMode() { return accessMode; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 888668e15b167..ab533d5268a88 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1222,23 +1222,31 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { "Subscription does not exist")); } - SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this) + SubscriptionOption.SubscriptionOptionBuilder optionBuilder = SubscriptionOption.builder() + .cnx(ServerCnx.this) .subscriptionName(subscriptionName) - .consumerId(consumerId).subType(subType).priorityLevel(priorityLevel) - .consumerName(consumerName).isDurable(isDurable) - .startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted) + .consumerId(consumerId) + .subType(subType) + .priorityLevel(priorityLevel) + .consumerName(consumerName) + .isDurable(isDurable) + .startMessageId(startMessageId) + .metadata(metadata) + .readCompacted(readCompacted) .initialPosition(initialPosition) .startMessageRollbackDurationSec(startMessageRollbackDurationSec) - .replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta) + .replicatedSubscriptionStateArg(isReplicated) + .keySharedMeta(keySharedMeta) .subscriptionProperties(subscriptionProperties) .consumerEpoch(consumerEpoch) - .schemaType(schema == null ? null : schema.getType()) - .build(); + .schemaData(schema); if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) { return topic.addSchemaIfIdleOrCheckCompatible(schema) - .thenCompose(v -> topic.subscribe(option)); + .thenCompose(schemaVersion -> + topic.subscribe(optionBuilder.schemaVersion(schemaVersion).build())); } else { - return topic.subscribe(option); + topic.findSchemaVersion(schema).thenApply(optionBuilder::schemaVersion); + return topic.subscribe(optionBuilder.build()); } }) .thenAccept(consumer -> { @@ -1499,7 +1507,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted, metadata, schemaVersion, epoch, userProvidedProducerName, topicName, - producerAccessMode, topicEpoch, supportsPartialProducer, producerFuture); + producerAccessMode, topicEpoch, supportsPartialProducer, producerFuture, schema); }); }).exceptionally(exception -> { Throwable cause = exception.getCause(); @@ -1572,11 +1580,11 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ boolean userProvidedProducerName, TopicName topicName, ProducerAccessMode producerAccessMode, Optional topicEpoch, boolean supportsPartialProducer, - CompletableFuture producerFuture){ + CompletableFuture producerFuture, SchemaData schemaData){ CompletableFuture producerQueuedFuture = new CompletableFuture<>(); Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, getPrincipal(), isEncrypted, metadata, schemaVersion, epoch, - userProvidedProducerName, producerAccessMode, topicEpoch, supportsPartialProducer); + userProvidedProducerName, producerAccessMode, topicEpoch, supportsPartialProducer, schemaData); topic.addProducer(producer, producerQueuedFuture).thenAccept(newTopicEpoch -> { if (isActive()) { @@ -2798,7 +2806,18 @@ private CompletableFuture tryAddSchema(Topic topic, SchemaData sc if (schema != null) { return topic.addSchema(schema); } else { - return topic.hasSchema().thenCompose((hasSchema) -> { + return topic.hasSchema() + // .handle((hasSchema, ex) -> { + // if (ex != null) { + // if (ex.getCause() instanceof BKException.BKNoSuchLedgerExistsException + // || ex.getCause() instanceof BKException.BKNoSuchLedgerExistsOnMetadataServerException) { + // topic.completeTheMissingSchema(SchemaVersion.Latest, schema); + // return true; + // } + // } + // return hasSchema; + // }) + .thenCompose((hasSchema) -> { if (log.isDebugEnabled()) { log.debug("[{}] {} configured with schema {}", remoteAddress, topic.getName(), hasSchema); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java index af56d023616b4..398702b95de7d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java @@ -29,7 +29,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeyValue; -import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.protocol.schema.SchemaData; @Getter @Builder @@ -50,7 +50,8 @@ public class SubscriptionOption { private KeySharedMeta keySharedMeta; private Optional> subscriptionProperties; private long consumerEpoch; - private SchemaType schemaType; + private SchemaData schemaData; + private long schemaVersion; public static Optional> getPropertiesMap(List list) { if (list == null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 7657d77e1299f..137ae38907930 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -307,7 +307,9 @@ CompletableFuture asyncGetStats(boolean getPreciseBack * add the passed schema to the topic. Otherwise, check that the passed schema is compatible * with what the topic already has. */ - CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schema); + CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schema); + + CompletableFuture findSchemaVersion(SchemaData schema); CompletableFuture deleteForcefully(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 15854f55c5cd1..f8b7a353fd734 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -30,6 +30,7 @@ import io.netty.util.concurrent.FastThreadLocal; import java.time.Clock; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -161,6 +162,7 @@ import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.schema.LongSchemaVersion; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.topics.TopicCompactionStrategy; import org.apache.pulsar.common.util.Codec; @@ -783,7 +785,7 @@ public CompletableFuture subscribe(SubscriptionOption option) { option.getInitialPosition(), option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(Collections.emptyMap()), - option.getConsumerEpoch(), option.getSchemaType()); + option.getConsumerEpoch(), option.getSchemaData(), option.getSchemaVersion()); } private CompletableFuture internalSubscribe(final TransportCnx cnx, String subscriptionName, @@ -797,7 +799,8 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St KeySharedMeta keySharedMeta, Map subscriptionProperties, long consumerEpoch, - SchemaType schemaType) { + SchemaData schemaData, + long schemaVersion) { if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) { return FutureUtil.failedFuture(new NotAllowedException( "readCompacted only allowed on failover or exclusive subscriptions")); @@ -885,8 +888,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St CompletableFuture future = subscriptionFuture.thenCompose(subscription -> { Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, isDurable, cnx, cnx.getAuthRole(), metadata, - readCompacted, keySharedMeta, startMessageId, consumerEpoch, schemaType); - + readCompacted, keySharedMeta, startMessageId, consumerEpoch, schemaData, schemaVersion); return addConsumerToSubscription(subscription, consumer).thenCompose(v -> { if (subscription instanceof PersistentSubscription persistentSubscription) { checkBackloggedCursor(persistentSubscription); @@ -964,7 +966,7 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs KeySharedMeta keySharedMeta) { return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec, - replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null); + replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null, 0L); } private CompletableFuture getDurableSubscription(String subscriptionName, @@ -3244,21 +3246,63 @@ public synchronized OffloadProcessStatus offloadStatus() { private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class); @Override - public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schema) { - return hasSchema().thenCompose((hasSchema) -> { + public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schema) { + return hasSchema().handle((hasSchema, ex) -> { + log.warn("hasSchema: " + hasSchema + " ex: " + ex); + if (ex != null) { + log.error("addSchemaIfIdleOrCheckCompatible: " + ex.getMessage()); + if (ex.getMessage().contains("Failed to open ledger")) { + getLatestSchemaVersion().thenApply(schemaVersion -> { + log.warn("++++++++latest" + schemaVersion.toString()); + SchemaData schemaData = null; + Collection values = producers.values(); + // TODO: CompletableFuture.anyOf() + for (Producer value : values) { + log.warn("++++++++producer schemaversion:" + value.getSchemaVersion()); + if (((LongSchemaVersion) value.getSchemaVersion()).getVersion() + == ((LongSchemaVersion) schemaVersion).getVersion()) { + schemaData = value.getSchemaData(); + break; + } + } + // TODO: CompletableFuture.anyOf() + for (PersistentSubscription value : subscriptions.values()) { + for (Consumer consumer : value.getConsumers()) { + log.warn("++++++++consumer schemaversion:" + consumer.getSchemaVersion()); + if (consumer.getSchemaVersion() == ((LongSchemaVersion) schemaVersion).getVersion()) { + schemaData = consumer.getSchemaData(); + break; + } + } + break; + } + return tryCompleteTheLostSchema(schemaVersion, schemaData); + }); + return true; + } + } + return hasSchema; + }).thenCompose((hasSchema) -> { int numActiveConsumersWithoutAutoSchema = subscriptions.values().stream() .mapToInt(subscription -> subscription.getConsumers().stream() - .filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME) + .filter(consumer -> consumer.getSchemaData().getType() != SchemaType.AUTO_CONSUME) .toList().size()) .sum(); if (hasSchema || (!producers.isEmpty()) || (numActiveConsumersWithoutAutoSchema != 0) || (ledger.getTotalSize() != 0)) { - return checkSchemaCompatibleForConsumer(schema); + log.warn("################ checkSchemaCompatibleForConsumer:" + hasSchema); + checkSchemaCompatibleForConsumer(schema); + log.warn("################ checkSchemaCompatibleForConsumer success"); + return findSchemaVersion(schema); } else { - return addSchema(schema).thenCompose(schemaVersion -> - CompletableFuture.completedFuture(null)); + log.warn("################ addSchema:"); + return addSchema(schema).thenApply(version -> { + log.warn("################ addSchema:" + version); + LongSchemaVersion longVersion = (LongSchemaVersion) version; + return longVersion.getVersion(); + }); } }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index a8fc15f296598..b20b0a80d3273 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -23,11 +23,14 @@ import static java.util.Objects.isNull; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry; +import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs; import static org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; import static org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Clock; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -53,6 +56,8 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.schema.exceptions.SchemaException; +import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat; +import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaStorage; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.protocol.schema.StoredSchema; @@ -274,6 +279,129 @@ public SchemaVersion versionFromBytes(byte[] version) { return new LongSchemaVersion(bb.getLong()); } + @NotNull + public CompletableFuture getLatestSchemaVersion(String key) { + return getSchemaLocator(getSchemaPath(key)).thenCompose(locator -> { + if (locator.isEmpty()) { + return completedFuture(null); + } + + SchemaStorageFormat.SchemaLocator schemaLocator = locator.get().locator; + return completedFuture(new LongSchemaVersion(schemaLocator.getInfo().getVersion())); + }); + } + + @Override + public CompletableFuture tryCompleteTheLostSchemaLedger(String schemaId, SchemaVersion version, + SchemaData schema) { + // TODO: clean up the code. + LocatorEntry entry = getLocator(schemaId).join().orElse(null); + + if (entry == null) { + return CompletableFuture.completedFuture(null); + } + + LongSchemaVersion longVersion = (LongSchemaVersion) version; + log.warn("locator Version:" + entry.version); + SchemaStorageFormat.IndexEntry oldIndexEntry = entry.locator.getIndexList() + .stream() + .filter(indexEntry -> indexEntry.getVersion() == longVersion.getVersion()) + .findFirst() + .orElse(null); + + if (oldIndexEntry == null) { + return CompletableFuture.completedFuture(null); + } + + // log.warn("old:" + oldIndexEntry.getPosition().getLedgerId() + oldIndexEntry.getPosition().getEntryId()); + // entry.locator.getIndexList().forEach(indexEntry -> { + // log.warn("LedgerId:" + indexEntry.getPosition().getLedgerId()); + // log.warn("EntryId:" + indexEntry.getPosition().getEntryId()); + // log.warn("Schema Version:" + indexEntry.getVersion()); + // }); + log.warn(entry.locator.getInfo().getVersion() + " " + entry.locator.getInfo().getPosition().getLedgerId() + + " " + entry.locator.getInfo().getPosition().getEntryId()); + + SchemaStorageFormat.PositionInfo.Builder positionInfoBuilder = SchemaStorageFormat.PositionInfo.newBuilder(); + byte[] hash = SchemaRegistryFormat.SchemaInfo.newBuilder() + .setType(SchemaRegistryServiceImpl.Functions.convertFromDomainType(schema.getType())) + .setSchema(ByteString.copyFrom(schema.getData())) + .setSchemaId(schemaId) + .setUser(schema.getUser()) + .setDeleted(false) + .setTimestamp(Clock.systemUTC().millis()) + .addAllProps(toPairs(schema.getProps())) + .build() + .toByteArray(); + + return createLedger(schemaId).thenCompose(ledgerHandle -> { + final long newLedgerId = ledgerHandle.getId(); + log.warn("ledgerHandle: " + newLedgerId); + SchemaStorageFormat.IndexEntry index = SchemaStorageFormat.IndexEntry.newBuilder() + .setVersion(oldIndexEntry.getVersion()) + .setHash(copyFrom(oldIndexEntry.getHash().toByteArray())) + .setPosition(positionInfoBuilder.setEntryId(oldIndexEntry.getPosition().getEntryId()) + .setLedgerId(newLedgerId)) + .build(); + + SchemaStorageFormat.SchemaEntry schemaEntry = SchemaStorageFormat.SchemaEntry.newBuilder() + .setSchemaData(copyFrom(hash)) + .addAllIndex(newArrayList(index)) + .build(); + + return addEntry(ledgerHandle, schemaEntry) + .thenApply(entryId -> { + ledgerHandle.closeAsync(); + return Functions.newPositionInfo(newLedgerId, entryId); + }) + .thenCompose(position -> { + long infoVersion = entry.locator.getInfo().getVersion(); + + SchemaStorageFormat.SchemaLocator locator = entry.locator; + SchemaStorageFormat.IndexEntry info = + SchemaStorageFormat.IndexEntry.newBuilder() + .setVersion(infoVersion) + .setPosition(position) + .setHash(copyFrom(entry.locator.getInfo().getHash().toByteArray())) + .build(); + + final ArrayList indexList = new ArrayList<>(); + for (SchemaStorageFormat.IndexEntry indexEntry : locator.getIndexList()) { + if (indexEntry.getVersion() == longVersion.getVersion()) { + indexList.add(index); + } else { + indexList.add(indexEntry); + } + } + + return updateSchemaLocator(getSchemaPath(schemaId), + SchemaStorageFormat.SchemaLocator.newBuilder() + .setInfo(info) + .addAllIndex(indexList) + .build() + , entry.version + ).thenApply(ignore -> infoVersion).whenComplete((__, ex) -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.warn("[{}] Failed to update schema locator with position {}", schemaId, + position, ex); + if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { + bookKeeper.asyncDeleteLedger(position.getLedgerId(), + new AsyncCallback.DeleteCallback() { + @Override + public void deleteComplete(int rc, Object ctx) { + if (rc != BKException.Code.OK) { + log.warn("[{}] Failed to delete ledger {} after updating" + + " schema locator failed, rc: {}", + schemaId, position.getLedgerId(), rc); + } + } + }, null); + } + }); + }); + }); + } + @Override public void close() throws Exception { if (bookKeeper != null) { @@ -673,6 +801,10 @@ static SchemaStorageFormat.SchemaEntry newSchemaEntry( List index, byte[] data ) { + for (SchemaStorageFormat.IndexEntry indexEntry : index) { + log.warn("newSchemaEntry: " + indexEntry.getPosition().getLedgerId() + " - " + + indexEntry.getPosition().getEntryId()); + } return SchemaStorageFormat.SchemaEntry.newBuilder() .setSchemaData(copyFrom(data)) .addAllIndex(index) @@ -705,7 +837,8 @@ public static Exception bkException(String operation, int rc, long ledgerId, lon message += " - entry=" + entryId; } boolean recoverable = rc != BKException.Code.NoSuchLedgerExistsException - && rc != BKException.Code.NoSuchEntryException; + && rc != BKException.Code.NoSuchEntryException + && rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException; return new SchemaException(recoverable, message); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java index 1f36419c815ff..35ab93c084ea9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java @@ -70,6 +70,17 @@ public CompletableFuture getSchemaVersionBySchemaData(List tryCompleteTheLostSchema(String schemaId, SchemaVersion schemaVersion, + SchemaData schema) { + return completedFuture(null); + } + + @Override + public CompletableFuture getLatestSchemaVersion(String schemaId) { + return completedFuture(null); + } + @Override public CompletableFuture deleteSchema(String schemaId, String user, boolean force) { return completedFuture(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java index 8e9831ae34242..c9578d51f5793 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java @@ -59,6 +59,10 @@ CompletableFuture checkConsumerCompatibility(String schemaId, SchemaData s CompletableFuture getSchemaVersionBySchemaData(List schemaAndMetadataList, SchemaData schemaData); + CompletableFuture tryCompleteTheLostSchema(String schemaId, SchemaVersion schemaVersion, SchemaData schema); + + CompletableFuture getLatestSchemaVersion(String schemaId); + SchemaVersion versionFromBytes(byte[] version); class SchemaAndMetadata { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index ae56df248d85d..087db1b82287a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -445,8 +445,37 @@ public CompletableFuture getSchemaVersionBySchemaData( return completableFuture; } + @Override + public CompletableFuture tryCompleteTheLostSchema(String schemaId, SchemaVersion schemaVersion, + SchemaData schema) { + // long start = this.clock.millis(); + CompletableFuture longCompletableFuture = new CompletableFuture<>(); + schemaStorage + .tryCompleteTheLostSchemaLedger(schemaId, schemaVersion, schema) + .whenComplete((v, t) -> { + // TODO: add completeLostSchemaLedger stats? + if (t != null) { + // this.stats.recordDelFailed(schemaId); + log.error("[{}] Delete schema storage failed", schemaId); + longCompletableFuture.completeExceptionally(t); + } else { + // this.stats.recordDelLatency(schemaId, this.clock.millis() - start); + if (log.isDebugEnabled()) { + log.debug("[{}] Delete schema storage finished", schemaId); + } + longCompletableFuture.complete(null); + } + }); + return longCompletableFuture; + } + + @Override + public CompletableFuture getLatestSchemaVersion(String schemaId) { + return schemaStorage.getLatestSchemaVersion(schemaId); + } + private CompletableFuture checkCompatibilityWithLatest(String schemaId, SchemaData schema, - SchemaCompatibilityStrategy strategy) { + SchemaCompatibilityStrategy strategy) { if (SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE == strategy) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index c7e30d5c3fc37..eb6de404de9a2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -22,6 +22,7 @@ import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertNull; +import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertTrue; import com.google.common.collect.Multimap; import com.google.common.hash.HashFunction; @@ -32,6 +33,7 @@ import java.time.Instant; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -40,11 +42,23 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Cleanup; +import lombok.Data; +import lombok.NoArgsConstructor; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata; import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.schema.SchemaData; @@ -54,6 +68,7 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker") @@ -82,13 +97,15 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest { private static final SchemaData schemaData3 = getSchemaData(schemaJson3); private SchemaRegistryServiceImpl schemaRegistryService; + private BookkeeperSchemaStorage storage; @BeforeMethod @Override protected void setup() throws Exception { conf.setSchemaRegistryStorageClassName("org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory"); super.internalSetup(); - BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar); + super.setupDefaultTenantAndNamespace(); + storage = new BookkeeperSchemaStorage(pulsar); storage.start(); Map checkMap = new HashMap<>(); checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck()); @@ -326,6 +343,148 @@ public void testSchemaStorageFailed() throws Exception { } } + @Test(dataProvider = "lostSchemaLedgerIndexes", timeOut = 30000) + public void testSchemaLedgerLost(List lostSchemaLedgerIndexes) throws Exception { + final String namespace = "public/default"; + final String topic = namespace + "/testSchemaLedgerLost"; + final Schema schemaV1 = Schema.AVRO(V1Data.class); + final Schema schemaV2 = Schema.AVRO(V2Data.class); + admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.BACKWARD); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().setSchemaValidationEnforced(topic, true); + + Producer producer1 = pulsarClient.newProducer(schemaV1) + .topic(topic) + .producerName("producer1") + .create(); + Producer producer2 = pulsarClient.newProducer(schemaV2) + .topic(topic) + .producerName("producer2") + .create(); + Consumer consumer1 = pulsarClient.newConsumer(schemaV1) + .topic(topic) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName("sub0") + .consumerName("consumer1") + .subscribe(); + // @Cleanup + // Consumer consumer2 = pulsarClient.newConsumer(schemaV2) + // .topic(topic) + // .subscriptionType(SubscriptionType.Shared) + // .subscriptionName("sub0") + // .consumerName("consumerAfterLostLedger2") + // .subscribe(); + + SchemaAndMetadata schemaAndMetadata0 = schemaRegistryService.getSchema(TopicName.get(topic) + .getSchemaName(), new LongSchemaVersion(0)).get(); + SchemaAndMetadata schemaAndMetadata1 = schemaRegistryService.getSchema(TopicName.get(topic) + .getSchemaName(), new LongSchemaVersion(1)).get(); + + // delete ledger + String key = TopicName.get(topic).getSchemaName(); + List schemaLedgerList = storage.getSchemaLedgerList(key); + Assert.assertEquals(schemaLedgerList.size(), 2); + for (int i = 0; i < schemaLedgerList.size(); i++){ + if (lostSchemaLedgerIndexes.contains(i)){ + storage.getBookKeeper().deleteLedger(schemaLedgerList.get(i)); + } + } + + // Without introducing this pr, connected producers or consumers are not affected if the schema ledger is lost + final int numMessages = 5; + for (int i = 0; i < numMessages; i++) { + producer1.send(new V1Data(i)); + producer2.send(new V2Data(i, i + 1)); + } + for (int i = 0; i < numMessages; i++) { + Message msg = consumer1.receive(3, TimeUnit.SECONDS); + consumer1.acknowledge(msg); + } + + // try to fix the lost schema ledger + if (lostSchemaLedgerIndexes.contains(0) && lostSchemaLedgerIndexes.contains(1)) { + schemaRegistryService.tryCompleteTheLostSchema(TopicName.get(topic) + .getSchemaName(), new LongSchemaVersion(0) + , schemaAndMetadata0.schema); + // TODO: BadVersion for /schemas/public/default/testSchemaLedgerLost. Need to fix. + // When lostSchemaLedgerIndexes contains 0 and 1. + schemaRegistryService.tryCompleteTheLostSchema(TopicName.get(topic) + .getSchemaName(), new LongSchemaVersion(1) + , schemaAndMetadata1.schema); + } else if (lostSchemaLedgerIndexes.contains(0) && !lostSchemaLedgerIndexes.contains(1)) { + schemaRegistryService.tryCompleteTheLostSchema(TopicName.get(topic) + .getSchemaName(), new LongSchemaVersion(0) + , schemaAndMetadata0.schema); + } else if (!lostSchemaLedgerIndexes.contains(0) && lostSchemaLedgerIndexes.contains(1)) { + schemaRegistryService.tryCompleteTheLostSchema(TopicName.get(topic) + .getSchemaName(), new LongSchemaVersion(1) + , schemaAndMetadata1.schema); + } + + @Cleanup + Producer producerAfterLostLedger1 = pulsarClient.newProducer(schemaV1) + .topic(topic) + .producerName("producerAfterLostLedger1") + .create(); + assertNotNull(producerAfterLostLedger1.send(new V1Data(10))); + @Cleanup + Producer producerAfterLostLedger2 = pulsarClient.newProducer(schemaV2) + .topic(topic) + .producerName("producerAfterLostLedger2") + .create(); + assertNotNull(producerAfterLostLedger2.send(new V2Data(10, 10))); + + @Cleanup + Consumer consumerAfterLostLedger1 = pulsarClient.newConsumer(schemaV1) + .topic(topic) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName("sub1") + .consumerName("consumerAfterLostLedger1") + .subscribe(); + producer1.send(new V1Data(11)); + assertNotNull(consumerAfterLostLedger1.receive(3, TimeUnit.SECONDS)); + + @Cleanup + Consumer consumerAfterLostLedger2 = pulsarClient.newConsumer(schemaV2) + .topic(topic) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName("sub0") + .consumerName("consumerAfterLostLedger2") + .subscribe(); + producer2.send(new V2Data(11, 11)); + assertNotNull(consumerAfterLostLedger2.receive(3, TimeUnit.SECONDS)); + + producer1.close(); + producer2.close(); + consumer1.close(); + } + + @DataProvider(name = "lostSchemaLedgerIndexes") + public Object[][] lostSchemaLedgerIndexes(){ + return new Object[][]{ + // {Arrays.asList(0,1)}, + {Arrays.asList(0)}, + {Arrays.asList(1)} + }; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + static class V1Data { + int i; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + static class V2Data { + int i; + Integer j; + } + private void putSchema(String schemaId, SchemaData schema, SchemaVersion expectedVersion) throws Exception { putSchema(schemaId, schema, expectedVersion, SchemaCompatibilityStrategy.FULL); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java index 9cdc85defd499..ec1058c197353 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java @@ -54,6 +54,10 @@ default CompletableFuture put(String key, SchemaVersion versionFromBytes(byte[] version); + CompletableFuture getLatestSchemaVersion(String key); + + CompletableFuture tryCompleteTheLostSchemaLedger(String schemaId, SchemaVersion version, SchemaData schema); + void start() throws Exception; void close() throws Exception; From 24f03c8a9542f053ea6be7ff653434d41f6f2826 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Fri, 2 Jun 2023 23:54:44 +0800 Subject: [PATCH 2/9] clean up tryCompleteTheLostSchemaLedger step1. --- .../schema/BookkeeperSchemaStorage.java | 185 +++++++++--------- .../schema/SchemaRegistryServiceImpl.java | 4 +- 2 files changed, 90 insertions(+), 99 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index b20b0a80d3273..9fe358ff4443c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -294,111 +294,102 @@ public CompletableFuture getLatestSchemaVersion(String key) { @Override public CompletableFuture tryCompleteTheLostSchemaLedger(String schemaId, SchemaVersion version, SchemaData schema) { - // TODO: clean up the code. - LocatorEntry entry = getLocator(schemaId).join().orElse(null); - - if (entry == null) { - return CompletableFuture.completedFuture(null); - } + return getLocator(schemaId).thenCompose(optEntry -> { + if (optEntry.isEmpty()) { + return CompletableFuture.completedFuture(null); + } - LongSchemaVersion longVersion = (LongSchemaVersion) version; - log.warn("locator Version:" + entry.version); - SchemaStorageFormat.IndexEntry oldIndexEntry = entry.locator.getIndexList() - .stream() - .filter(indexEntry -> indexEntry.getVersion() == longVersion.getVersion()) - .findFirst() - .orElse(null); + LocatorEntry entry = optEntry.get(); + LongSchemaVersion longVersion = (LongSchemaVersion) version; + Optional optOldIndexEntry = entry.locator.getIndexList() + .stream() + .filter(indexEntry -> indexEntry.getVersion() == longVersion.getVersion()) + .findFirst(); - if (oldIndexEntry == null) { - return CompletableFuture.completedFuture(null); - } + if (optOldIndexEntry.isEmpty()) { + return CompletableFuture.completedFuture(null); + } - // log.warn("old:" + oldIndexEntry.getPosition().getLedgerId() + oldIndexEntry.getPosition().getEntryId()); - // entry.locator.getIndexList().forEach(indexEntry -> { - // log.warn("LedgerId:" + indexEntry.getPosition().getLedgerId()); - // log.warn("EntryId:" + indexEntry.getPosition().getEntryId()); - // log.warn("Schema Version:" + indexEntry.getVersion()); - // }); - log.warn(entry.locator.getInfo().getVersion() + " " + entry.locator.getInfo().getPosition().getLedgerId() - + " " + entry.locator.getInfo().getPosition().getEntryId()); - - SchemaStorageFormat.PositionInfo.Builder positionInfoBuilder = SchemaStorageFormat.PositionInfo.newBuilder(); - byte[] hash = SchemaRegistryFormat.SchemaInfo.newBuilder() - .setType(SchemaRegistryServiceImpl.Functions.convertFromDomainType(schema.getType())) - .setSchema(ByteString.copyFrom(schema.getData())) - .setSchemaId(schemaId) - .setUser(schema.getUser()) - .setDeleted(false) - .setTimestamp(Clock.systemUTC().millis()) - .addAllProps(toPairs(schema.getProps())) - .build() - .toByteArray(); + SchemaStorageFormat.IndexEntry oldIndexEntry = optOldIndexEntry.get(); + byte[] hash = SchemaRegistryFormat.SchemaInfo.newBuilder() + .setType(SchemaRegistryServiceImpl.Functions.convertFromDomainType(schema.getType())) + .setSchema(ByteString.copyFrom(schema.getData())) + .setSchemaId(schemaId) + .setUser(schema.getUser()) + .setDeleted(false) + .setTimestamp(Clock.systemUTC().millis()) + .addAllProps(toPairs(schema.getProps())) + .build() + .toByteArray(); + + return createLedger(schemaId).thenCompose(ledgerHandle -> { + final long newLedgerId = ledgerHandle.getId(); + + SchemaStorageFormat.IndexEntry index = SchemaStorageFormat.IndexEntry.newBuilder() + .setVersion(oldIndexEntry.getVersion()) + .setHash(copyFrom(oldIndexEntry.getHash().toByteArray())) + .setPosition(SchemaStorageFormat.PositionInfo.newBuilder() + .setEntryId(oldIndexEntry.getPosition().getEntryId()) + .setLedgerId(newLedgerId)) + .build(); + + SchemaStorageFormat.SchemaEntry schemaEntry = SchemaStorageFormat.SchemaEntry.newBuilder() + .setSchemaData(copyFrom(hash)) + .addAllIndex(newArrayList(index)) + .build(); + + return addEntry(ledgerHandle, schemaEntry) + .thenApply(entryId -> { + ledgerHandle.closeAsync(); + return Functions.newPositionInfo(newLedgerId, entryId); + }) + .thenCompose(position -> updateExistsLocatorWithNewLedgerId(schemaId, position, + entry, longVersion, index)); + }); + }); + } - return createLedger(schemaId).thenCompose(ledgerHandle -> { - final long newLedgerId = ledgerHandle.getId(); - log.warn("ledgerHandle: " + newLedgerId); - SchemaStorageFormat.IndexEntry index = SchemaStorageFormat.IndexEntry.newBuilder() - .setVersion(oldIndexEntry.getVersion()) - .setHash(copyFrom(oldIndexEntry.getHash().toByteArray())) - .setPosition(positionInfoBuilder.setEntryId(oldIndexEntry.getPosition().getEntryId()) - .setLedgerId(newLedgerId)) - .build(); + private CompletableFuture updateExistsLocatorWithNewLedgerId(String schemaId, + SchemaStorageFormat.PositionInfo position, + LocatorEntry entry, + LongSchemaVersion longVersion, + SchemaStorageFormat.IndexEntry newIndexEntry) { + long infoVersion = entry.locator.getInfo().getVersion(); - SchemaStorageFormat.SchemaEntry schemaEntry = SchemaStorageFormat.SchemaEntry.newBuilder() - .setSchemaData(copyFrom(hash)) - .addAllIndex(newArrayList(index)) - .build(); + SchemaStorageFormat.SchemaLocator locator = entry.locator; + SchemaStorageFormat.IndexEntry info = + SchemaStorageFormat.IndexEntry.newBuilder() + .setVersion(infoVersion) + .setPosition(position) + .setHash(copyFrom(entry.locator.getInfo().getHash().toByteArray())) + .build(); - return addEntry(ledgerHandle, schemaEntry) - .thenApply(entryId -> { - ledgerHandle.closeAsync(); - return Functions.newPositionInfo(newLedgerId, entryId); - }) - .thenCompose(position -> { - long infoVersion = entry.locator.getInfo().getVersion(); - - SchemaStorageFormat.SchemaLocator locator = entry.locator; - SchemaStorageFormat.IndexEntry info = - SchemaStorageFormat.IndexEntry.newBuilder() - .setVersion(infoVersion) - .setPosition(position) - .setHash(copyFrom(entry.locator.getInfo().getHash().toByteArray())) - .build(); - - final ArrayList indexList = new ArrayList<>(); - for (SchemaStorageFormat.IndexEntry indexEntry : locator.getIndexList()) { - if (indexEntry.getVersion() == longVersion.getVersion()) { - indexList.add(index); - } else { - indexList.add(indexEntry); - } - } + final List indexList = locator.getIndexList().stream() + .map(indexEntry -> indexEntry.getVersion() == longVersion.getVersion() ? newIndexEntry : indexEntry) + .collect(Collectors.toList()); - return updateSchemaLocator(getSchemaPath(schemaId), - SchemaStorageFormat.SchemaLocator.newBuilder() - .setInfo(info) - .addAllIndex(indexList) - .build() - , entry.version - ).thenApply(ignore -> infoVersion).whenComplete((__, ex) -> { - Throwable cause = FutureUtil.unwrapCompletionException(ex); - log.warn("[{}] Failed to update schema locator with position {}", schemaId, - position, ex); - if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { - bookKeeper.asyncDeleteLedger(position.getLedgerId(), - new AsyncCallback.DeleteCallback() { - @Override - public void deleteComplete(int rc, Object ctx) { - if (rc != BKException.Code.OK) { - log.warn("[{}] Failed to delete ledger {} after updating" - + " schema locator failed, rc: {}", - schemaId, position.getLedgerId(), rc); - } - } - }, null); + return updateSchemaLocator(getSchemaPath(schemaId), + SchemaStorageFormat.SchemaLocator.newBuilder() + .setInfo(info) + .addAllIndex(indexList) + .build() + , entry.version + ).thenApply(ignore -> infoVersion).whenComplete((v, ex) -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.warn("[{}] Failed to update exists schema locator with new ledgerId {}", schemaId, position, ex); + if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { + bookKeeper.asyncDeleteLedger(position.getLedgerId(), + new AsyncCallback.DeleteCallback() { + @Override + public void deleteComplete(int rc, Object ctx) { + if (rc != BKException.Code.OK) { + log.warn("[{}] Failed to delete ledger {} after updating" + + " exists schema locator with new ledgerId failed, rc: {}", + schemaId, position.getLedgerId(), rc); + } } - }); - }); + }, null); + } }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index 087db1b82287a..d33d9b542058c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -456,12 +456,12 @@ public CompletableFuture tryCompleteTheLostSchema(String schemaId, SchemaV // TODO: add completeLostSchemaLedger stats? if (t != null) { // this.stats.recordDelFailed(schemaId); - log.error("[{}] Delete schema storage failed", schemaId); + log.error("[{}] Complete lost schema failed", schemaId); longCompletableFuture.completeExceptionally(t); } else { // this.stats.recordDelLatency(schemaId, this.clock.millis() - start); if (log.isDebugEnabled()) { - log.debug("[{}] Delete schema storage finished", schemaId); + log.debug("[{}] Complete lost schema finished", schemaId); } longCompletableFuture.complete(null); } From 1c7aa03f8a50f94b4269e9d66b4e45cc515d8281 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Sat, 3 Jun 2023 22:37:43 +0800 Subject: [PATCH 3/9] fix BadVersionException or AlreadyExistsException --- .../schema/BookkeeperSchemaStorage.java | 53 ++++++++++++------- .../schema/SchemaRegistryServiceImpl.java | 6 ++- .../service/schema/SchemaServiceTest.java | 21 ++++---- 3 files changed, 47 insertions(+), 33 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 9fe358ff4443c..ea2212f6b383e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -294,7 +294,15 @@ public CompletableFuture getLatestSchemaVersion(String key) { @Override public CompletableFuture tryCompleteTheLostSchemaLedger(String schemaId, SchemaVersion version, SchemaData schema) { - return getLocator(schemaId).thenCompose(optEntry -> { + CompletableFuture promise = new CompletableFuture<>(); + tryCompleteTheLostSchemaLedger(schemaId, version, schema, promise); + return promise; + } + + private void tryCompleteTheLostSchemaLedger(String schemaId, SchemaVersion version, SchemaData schema, + CompletableFuture promise) { + CompletableFuture> schemasWithLocator = getLocator(schemaId); + schemasWithLocator.thenCompose(optEntry -> { if (optEntry.isEmpty()) { return CompletableFuture.completedFuture(null); } @@ -344,7 +352,30 @@ public CompletableFuture tryCompleteTheLostSchemaLedger(String schemaId, S return Functions.newPositionInfo(newLedgerId, entryId); }) .thenCompose(position -> updateExistsLocatorWithNewLedgerId(schemaId, position, - entry, longVersion, index)); + entry, longVersion, index)) + .whenComplete((infoVersion, ex) -> { + if (ex == null) { + promise.complete(infoVersion); + } else { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { + bookKeeper.asyncDeleteLedger(newLedgerId, + new AsyncCallback.DeleteCallback() { + @Override + public void deleteComplete(int rc, Object ctx) { + if (rc != BKException.Code.OK) { + log.warn("[{}] Failed to delete ledger {} after updating" + + " exists schema locator with new ledgerId" + + " failed, rc: {}", schemaId, newLedgerId, rc); + } + } + }, null); + tryCompleteTheLostSchemaLedger(schemaId, version, schema, promise); + } else { + promise.completeExceptionally(ex); + } + } + }); }); }); } @@ -374,23 +405,7 @@ private CompletableFuture updateExistsLocatorWithNewLedgerId(String schema .addAllIndex(indexList) .build() , entry.version - ).thenApply(ignore -> infoVersion).whenComplete((v, ex) -> { - Throwable cause = FutureUtil.unwrapCompletionException(ex); - log.warn("[{}] Failed to update exists schema locator with new ledgerId {}", schemaId, position, ex); - if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { - bookKeeper.asyncDeleteLedger(position.getLedgerId(), - new AsyncCallback.DeleteCallback() { - @Override - public void deleteComplete(int rc, Object ctx) { - if (rc != BKException.Code.OK) { - log.warn("[{}] Failed to delete ledger {} after updating" - + " exists schema locator with new ledgerId failed, rc: {}", - schemaId, position.getLedgerId(), rc); - } - } - }, null); - } - }); + ).thenApply(ignore -> infoVersion); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index d33d9b542058c..9d35be46b0854 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -456,12 +456,14 @@ public CompletableFuture tryCompleteTheLostSchema(String schemaId, SchemaV // TODO: add completeLostSchemaLedger stats? if (t != null) { // this.stats.recordDelFailed(schemaId); - log.error("[{}] Complete lost schema failed", schemaId); + log.error("[{}] Complete lost schema({}) failed", schemaId, + ((LongSchemaVersion) schemaVersion).getVersion()); longCompletableFuture.completeExceptionally(t); } else { // this.stats.recordDelLatency(schemaId, this.clock.millis() - start); if (log.isDebugEnabled()) { - log.debug("[{}] Complete lost schema finished", schemaId); + log.debug("[{}] Complete lost schema({}) finished", schemaId, + ((LongSchemaVersion) schemaVersion).getVersion()); } longCompletableFuture.complete(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index eb6de404de9a2..e33460a9f6357 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -349,7 +349,7 @@ public void testSchemaLedgerLost(List lostSchemaLedgerIndexes) throws E final String topic = namespace + "/testSchemaLedgerLost"; final Schema schemaV1 = Schema.AVRO(V1Data.class); final Schema schemaV2 = Schema.AVRO(V2Data.class); - admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.BACKWARD); + admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE); admin.topics().createNonPartitionedTopic(topic); admin.topics().setSchemaValidationEnforced(topic, true); @@ -374,6 +374,7 @@ public void testSchemaLedgerLost(List lostSchemaLedgerIndexes) throws E // .subscriptionName("sub0") // .consumerName("consumerAfterLostLedger2") // .subscribe(); + assertEquals(admin.schemas().getAllSchemas(topic).size(), 2); SchemaAndMetadata schemaAndMetadata0 = schemaRegistryService.getSchema(TopicName.get(topic) .getSchemaName(), new LongSchemaVersion(0)).get(); @@ -404,23 +405,19 @@ public void testSchemaLedgerLost(List lostSchemaLedgerIndexes) throws E // try to fix the lost schema ledger if (lostSchemaLedgerIndexes.contains(0) && lostSchemaLedgerIndexes.contains(1)) { schemaRegistryService.tryCompleteTheLostSchema(TopicName.get(topic) - .getSchemaName(), new LongSchemaVersion(0) - , schemaAndMetadata0.schema); - // TODO: BadVersion for /schemas/public/default/testSchemaLedgerLost. Need to fix. - // When lostSchemaLedgerIndexes contains 0 and 1. + .getSchemaName(), new LongSchemaVersion(0), schemaAndMetadata0.schema); schemaRegistryService.tryCompleteTheLostSchema(TopicName.get(topic) - .getSchemaName(), new LongSchemaVersion(1) - , schemaAndMetadata1.schema); + .getSchemaName(), new LongSchemaVersion(1), schemaAndMetadata1.schema).join(); } else if (lostSchemaLedgerIndexes.contains(0) && !lostSchemaLedgerIndexes.contains(1)) { schemaRegistryService.tryCompleteTheLostSchema(TopicName.get(topic) - .getSchemaName(), new LongSchemaVersion(0) - , schemaAndMetadata0.schema); + .getSchemaName(), new LongSchemaVersion(0), schemaAndMetadata0.schema).join(); } else if (!lostSchemaLedgerIndexes.contains(0) && lostSchemaLedgerIndexes.contains(1)) { schemaRegistryService.tryCompleteTheLostSchema(TopicName.get(topic) - .getSchemaName(), new LongSchemaVersion(1) - , schemaAndMetadata1.schema); + .getSchemaName(), new LongSchemaVersion(1), schemaAndMetadata1.schema).join(); } + assertEquals(admin.schemas().getAllSchemas(topic).size(), 2); + @Cleanup Producer producerAfterLostLedger1 = pulsarClient.newProducer(schemaV1) .topic(topic) @@ -462,7 +459,7 @@ public void testSchemaLedgerLost(List lostSchemaLedgerIndexes) throws E @DataProvider(name = "lostSchemaLedgerIndexes") public Object[][] lostSchemaLedgerIndexes(){ return new Object[][]{ - // {Arrays.asList(0,1)}, + {Arrays.asList(0,1)}, {Arrays.asList(0)}, {Arrays.asList(1)} }; From ea4af7578e7a9ac0ebcda5ad05e0c4367400ee82 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Mon, 5 Jun 2023 13:36:30 +0800 Subject: [PATCH 4/9] save the schema in the topic, not in each consumer and producer. --- .../pulsar/broker/service/AbstractTopic.java | 18 ++++++--- .../pulsar/broker/service/Consumer.java | 18 +++------ .../pulsar/broker/service/Producer.java | 19 +--------- .../pulsar/broker/service/ServerCnx.java | 7 +++- .../apache/pulsar/broker/service/Topic.java | 2 + .../service/persistent/PersistentTopic.java | 37 +++++-------------- 6 files changed, 37 insertions(+), 64 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 1641c613a661d..e91115f869efd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -78,6 +78,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,6 +152,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener> entryFilters; + protected ConcurrentOpenHashMap schemaCache; + public AbstractTopic(String topic, BrokerService brokerService) { this.topic = topic; this.brokerService = brokerService; @@ -164,6 +167,10 @@ public AbstractTopic(String topic, BrokerService brokerService) { this.lastActive = System.nanoTime(); this.preciseTopicPublishRateLimitingEnable = config.isPreciseTopicPublishRateLimiterEnable(); + this.schemaCache = ConcurrentOpenHashMap.newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); } public SubscribeRate getSubscribeRate() { @@ -715,11 +722,7 @@ public CompletableFuture findSchemaVersion(SchemaData schema) { protected CompletableFuture getLatestSchemaVersion() { return brokerService.pulsar() .getSchemaRegistryService() - .getLatestSchemaVersion(getSchemaId()) - .thenApply(schemaVersion -> { - log.warn("~~~~~~~~~~~~~~~~~~~~~~" + schemaVersion); - return schemaVersion; - }); + .getLatestSchemaVersion(getSchemaId()); } @Override @@ -1384,4 +1387,9 @@ public static Optional getMigratedClusterUrl(PulsarService pulsar) { } return Optional.empty(); } + + public void putSchemaAndVersionInSchemaCache(long schemaVersion, SchemaData schemaData) { + schemaCache.putIfAbsent(schemaVersion, schemaData); + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 89f186c0f0d58..a3f9da41e6b35 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -60,7 +60,7 @@ import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; @@ -146,11 +146,7 @@ public class Consumer { private long negtiveUnackedMsgsTimestamp; @Getter - private final SchemaData schemaData; - @Getter - @Setter - private final long schemaVersion; - + private final SchemaType schemaType; public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, @@ -158,7 +154,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo Map metadata, boolean readCompacted, KeySharedMeta keySharedMeta, MessageId startMessageId, long consumerEpoch) { this(subscription, subType, topicName, consumerId, priorityLevel, consumerName, isDurable, cnx, appId, - metadata, readCompacted, keySharedMeta, startMessageId, consumerEpoch, null, -1L); + metadata, readCompacted, keySharedMeta, startMessageId, consumerEpoch, null); } public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, @@ -166,7 +162,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo boolean isDurable, TransportCnx cnx, String appId, Map metadata, boolean readCompacted, KeySharedMeta keySharedMeta, MessageId startMessageId, - long consumerEpoch, SchemaData schemaData, long schemaVersion) { + long consumerEpoch, SchemaType schemaType) { this.subscription = subscription; this.subType = subType; this.topicName = topicName; @@ -224,8 +220,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo this.isAcknowledgmentAtBatchIndexLevelEnabled = subscription.getTopic().getBrokerService() .getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled(); - this.schemaData = schemaData; - this.schemaVersion = schemaVersion; + this.schemaType = schemaType; } @VisibleForTesting @@ -253,8 +248,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo this.clientAddress = null; this.startMessageId = null; this.isAcknowledgmentAtBatchIndexLevelEnabled = false; - this.schemaData = null; - this.schemaVersion = -1L; + this.schemaType = null; MESSAGE_PERMITS_UPDATER.set(this, availablePermits); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index f8da0428d57e2..53b79f06e8e24 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -55,7 +55,6 @@ import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl; import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl; import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.DateFormatter; @@ -99,26 +98,15 @@ public class Producer { private final Map metadata; private final SchemaVersion schemaVersion; - private final SchemaData schemaData; private final String clientAddress; // IP address only, no port number included private final AtomicBoolean isDisconnecting = new AtomicBoolean(false); - public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId, - boolean isEncrypted, Map metadata, SchemaVersion schemaVersion, long epoch, - boolean userProvidedProducerName, - ProducerAccessMode accessMode, - Optional topicEpoch, - boolean supportsPartialProducer) { - this(topic, cnx, producerId, producerName, appId, isEncrypted, metadata, schemaVersion, epoch, - userProvidedProducerName, accessMode, topicEpoch, supportsPartialProducer, null); - } - public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId, boolean isEncrypted, Map metadata, SchemaVersion schemaVersion, long epoch, boolean userProvidedProducerName, ProducerAccessMode accessMode, Optional topicEpoch, - boolean supportsPartialProducer, SchemaData schemaData) { + boolean supportsPartialProducer) { final ServiceConfiguration serviceConf = cnx.getBrokerService().pulsar().getConfiguration(); this.topic = topic; @@ -167,7 +155,6 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN this.isEncrypted = isEncrypted; this.schemaVersion = schemaVersion; - this.schemaData = schemaData; this.accessMode = accessMode; this.topicEpoch = topicEpoch; @@ -840,10 +827,6 @@ public SchemaVersion getSchemaVersion() { return schemaVersion; } - public SchemaData getSchemaData() { - return schemaData; - } - public ProducerAccessMode getAccessMode() { return accessMode; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index ab533d5268a88..6f147cbf76b02 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -161,6 +161,7 @@ import org.apache.pulsar.common.protocol.PulsarHandler; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; +import org.apache.pulsar.common.schema.LongSchemaVersion; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.topics.TopicList; import org.apache.pulsar.common.util.FutureUtil; @@ -1246,7 +1247,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { topic.subscribe(optionBuilder.schemaVersion(schemaVersion).build())); } else { topic.findSchemaVersion(schema).thenApply(optionBuilder::schemaVersion); - return topic.subscribe(optionBuilder.build()); + return topic.subscribe(optionBuilder.schemaVersion(-1L).build()); } }) .thenAccept(consumer -> { @@ -1584,12 +1585,14 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ CompletableFuture producerQueuedFuture = new CompletableFuture<>(); Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, getPrincipal(), isEncrypted, metadata, schemaVersion, epoch, - userProvidedProducerName, producerAccessMode, topicEpoch, supportsPartialProducer, schemaData); + userProvidedProducerName, producerAccessMode, topicEpoch, supportsPartialProducer); topic.addProducer(producer, producerQueuedFuture).thenAccept(newTopicEpoch -> { if (isActive()) { if (producerFuture.complete(producer)) { log.info("[{}] Created new producer: {}", remoteAddress, producer); + topic.putSchemaAndVersionInSchemaCache(((LongSchemaVersion) schemaVersion).getVersion(), + schemaData); commandSender.sendProducerSuccessResponse(requestId, producerName, producer.getLastSequenceId(), producer.getSchemaVersion(), newTopicEpoch, true /* producer is ready now */); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 137ae38907930..9605a4bd843fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -311,6 +311,8 @@ CompletableFuture asyncGetStats(boolean getPreciseBack CompletableFuture findSchemaVersion(SchemaData schema); + void putSchemaAndVersionInSchemaCache(long schemaVersion, SchemaData schemaData); + CompletableFuture deleteForcefully(); default Optional getDispatchRateLimiter() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f8b7a353fd734..fafb566f278a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -30,7 +30,6 @@ import io.netty.util.concurrent.FastThreadLocal; import java.time.Clock; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -888,7 +887,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St CompletableFuture future = subscriptionFuture.thenCompose(subscription -> { Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, isDurable, cnx, cnx.getAuthRole(), metadata, - readCompacted, keySharedMeta, startMessageId, consumerEpoch, schemaData, schemaVersion); + readCompacted, keySharedMeta, startMessageId, consumerEpoch, schemaData.getType()); return addConsumerToSubscription(subscription, consumer).thenCompose(v -> { if (subscription instanceof PersistentSubscription persistentSubscription) { checkBackloggedCursor(persistentSubscription); @@ -917,6 +916,9 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St new BrokerServiceException("Connection was closed while the opening the cursor ")); } else { checkReplicatedSubscriptionControllerState(); + if (schemaVersion != -1L) { + putSchemaAndVersionInSchemaCache(schemaVersion, schemaData); + } if (log.isDebugEnabled()) { log.debug("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId); } @@ -966,7 +968,7 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs KeySharedMeta keySharedMeta) { return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec, - replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null, 0L); + replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null, -1L); } private CompletableFuture getDurableSubscription(String subscriptionName, @@ -3253,30 +3255,11 @@ public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schem log.error("addSchemaIfIdleOrCheckCompatible: " + ex.getMessage()); if (ex.getMessage().contains("Failed to open ledger")) { getLatestSchemaVersion().thenApply(schemaVersion -> { - log.warn("++++++++latest" + schemaVersion.toString()); - SchemaData schemaData = null; - Collection values = producers.values(); - // TODO: CompletableFuture.anyOf() - for (Producer value : values) { - log.warn("++++++++producer schemaversion:" + value.getSchemaVersion()); - if (((LongSchemaVersion) value.getSchemaVersion()).getVersion() - == ((LongSchemaVersion) schemaVersion).getVersion()) { - schemaData = value.getSchemaData(); - break; - } - } - // TODO: CompletableFuture.anyOf() - for (PersistentSubscription value : subscriptions.values()) { - for (Consumer consumer : value.getConsumers()) { - log.warn("++++++++consumer schemaversion:" + consumer.getSchemaVersion()); - if (consumer.getSchemaVersion() == ((LongSchemaVersion) schemaVersion).getVersion()) { - schemaData = consumer.getSchemaData(); - break; - } - } - break; + long version = ((LongSchemaVersion) schemaVersion).getVersion(); + if (schemaCache.containsKey(version)) { + return tryCompleteTheLostSchema(schemaVersion, schemaCache.get(version)); } - return tryCompleteTheLostSchema(schemaVersion, schemaData); + return CompletableFuture.failedFuture(ex); }); return true; } @@ -3285,7 +3268,7 @@ public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schem }).thenCompose((hasSchema) -> { int numActiveConsumersWithoutAutoSchema = subscriptions.values().stream() .mapToInt(subscription -> subscription.getConsumers().stream() - .filter(consumer -> consumer.getSchemaData().getType() != SchemaType.AUTO_CONSUME) + .filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME) .toList().size()) .sum(); if (hasSchema From aafc5af6c302957ac79d52fa8be476eada7d7bb5 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Thu, 29 Jun 2023 22:52:12 +0800 Subject: [PATCH 5/9] forgotten code before. --- .../SchemaRegistryServiceWithSchemaDataValidator.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java index b3032d5498f18..9573db65ef17b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java @@ -84,6 +84,17 @@ public CompletableFuture getSchemaVersionBySchemaData(List tryCompleteTheLostSchema(String schemaId, SchemaVersion schemaVersion, + SchemaData schema) { + return this.service.tryCompleteTheLostSchema(schemaId, schemaVersion, schema); + } + + @Override + public CompletableFuture getLatestSchemaVersion(String schemaId) { + return this.service.getLatestSchemaVersion(schemaId); + } + @Override public CompletableFuture putSchemaIfAbsent(String schemaId, SchemaData schema, From 941b1ffebc2f1f8b88323db7a1ca3dde450a4013 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Tue, 16 Apr 2024 22:40:51 +0800 Subject: [PATCH 6/9] remove useless code --- .../pulsar/broker/service/AbstractTopic.java | 12 ----- .../pulsar/broker/service/ServerCnx.java | 20 ++------ .../broker/service/SubscriptionOption.java | 5 +- .../apache/pulsar/broker/service/Topic.java | 4 +- .../service/persistent/PersistentTopic.java | 47 ++++--------------- 5 files changed, 16 insertions(+), 72 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index c2f1d7cb1dbea..c9b1d70ada0b1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -81,7 +81,6 @@ import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,8 +158,6 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener activeRateLimiters; - protected ConcurrentOpenHashMap schemaCache; - public AbstractTopic(String topic, BrokerService brokerService) { this.topic = topic; this.brokerService = brokerService; @@ -176,10 +173,6 @@ public AbstractTopic(String topic, BrokerService brokerService) { this.preciseTopicPublishRateLimitingEnable = config.isPreciseTopicPublishRateLimiterEnable(); topicPublishRateLimiter = new PublishRateLimiterImpl(brokerService.getPulsar().getMonotonicSnapshotClock()); updateActiveRateLimiters(); - this.schemaCache = ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); } public SubscribeRate getSubscribeRate() { @@ -1378,9 +1371,4 @@ public static Optional getMigratedClusterUrl(PulsarService pulsar, S } return Optional.empty(); } - - public void putSchemaAndVersionInSchemaCache(long schemaVersion, SchemaData schemaData) { - schemaCache.putIfAbsent(schemaVersion, schemaData); - } - } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 64b965a910a99..a60f1d805ceb6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -163,7 +163,6 @@ import org.apache.pulsar.common.protocol.PulsarHandler; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; -import org.apache.pulsar.common.schema.LongSchemaVersion; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.topics.TopicList; import org.apache.pulsar.common.util.FutureUtil; @@ -1589,7 +1588,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted, metadata, schemaVersion, epoch, userProvidedProducerName, topicName, - producerAccessMode, topicEpoch, supportsPartialProducer, producerFuture, schema); + producerAccessMode, topicEpoch, supportsPartialProducer, producerFuture); }); }).exceptionally(exception -> { Throwable cause = exception.getCause(); @@ -1678,7 +1677,7 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ boolean userProvidedProducerName, TopicName topicName, ProducerAccessMode producerAccessMode, Optional topicEpoch, boolean supportsPartialProducer, - CompletableFuture producerFuture, SchemaData schemaData){ + CompletableFuture producerFuture){ CompletableFuture producerQueuedFuture = new CompletableFuture<>(); Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, getPrincipal(), isEncrypted, metadata, schemaVersion, epoch, @@ -1688,8 +1687,6 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ if (isActive()) { if (producerFuture.complete(producer)) { log.info("[{}] Created new producer: {}", remoteAddress, producer); - topic.putSchemaAndVersionInSchemaCache(((LongSchemaVersion) schemaVersion).getVersion(), - schemaData); commandSender.sendProducerSuccessResponse(requestId, producerName, producer.getLastSequenceId(), producer.getSchemaVersion(), newTopicEpoch, true /* producer is ready now */); @@ -2982,18 +2979,7 @@ private CompletableFuture tryAddSchema(Topic topic, SchemaData sc if (schema != null) { return topic.addSchema(schema); } else { - return topic.hasSchema() - // .handle((hasSchema, ex) -> { - // if (ex != null) { - // if (ex.getCause() instanceof BKException.BKNoSuchLedgerExistsException - // || ex.getCause() instanceof BKException.BKNoSuchLedgerExistsOnMetadataServerException) { - // topic.completeTheMissingSchema(SchemaVersion.Latest, schema); - // return true; - // } - // } - // return hasSchema; - // }) - .thenCompose((hasSchema) -> { + return topic.hasSchema().thenCompose((hasSchema) -> { if (log.isDebugEnabled()) { log.debug("[{}] {} configured with schema {}", remoteAddress, topic.getName(), hasSchema); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java index 398702b95de7d..af56d023616b4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java @@ -29,7 +29,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeyValue; -import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; @Getter @Builder @@ -50,8 +50,7 @@ public class SubscriptionOption { private KeySharedMeta keySharedMeta; private Optional> subscriptionProperties; private long consumerEpoch; - private SchemaData schemaData; - private long schemaVersion; + private SchemaType schemaType; public static Optional> getPropertiesMap(List list) { if (list == null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 4895c222117a0..8f52967127c7d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -303,12 +303,10 @@ CompletableFuture asyncGetStats(boolean getPreciseBack * add the passed schema to the topic. Otherwise, check that the passed schema is compatible * with what the topic already has. */ - CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schema); + CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schema); CompletableFuture findSchemaVersion(SchemaData schema); - void putSchemaAndVersionInSchemaCache(long schemaVersion, SchemaData schemaData); - CompletableFuture deleteForcefully(); default Optional getDispatchRateLimiter() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0fa80f0dd04c7..936091edce557 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -175,7 +175,6 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; -import org.apache.pulsar.common.schema.LongSchemaVersion; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.topics.TopicCompactionStrategy; import org.apache.pulsar.common.util.Codec; @@ -860,7 +859,7 @@ public CompletableFuture subscribe(SubscriptionOption option) { option.getInitialPosition(), option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(Collections.emptyMap()), - option.getConsumerEpoch(), option.getSchemaData(), option.getSchemaVersion()); + option.getConsumerEpoch(), option.getSchemaType()); } private CompletableFuture internalSubscribe(final TransportCnx cnx, String subscriptionName, @@ -874,8 +873,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St KeySharedMeta keySharedMeta, Map subscriptionProperties, long consumerEpoch, - SchemaData schemaData, - long schemaVersion) { + SchemaType schemaType) { if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) { return FutureUtil.failedFuture(new NotAllowedException( "readCompacted only allowed on failover or exclusive subscriptions")); @@ -963,7 +961,8 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St CompletableFuture future = subscriptionFuture.thenCompose(subscription -> { Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, isDurable, cnx, cnx.getAuthRole(), metadata, - readCompacted, keySharedMeta, startMessageId, consumerEpoch, schemaData.getType()); + readCompacted, keySharedMeta, startMessageId, consumerEpoch, schemaType); + return addConsumerToSubscription(subscription, consumer).thenCompose(v -> { if (subscription instanceof PersistentSubscription persistentSubscription) { checkBackloggedCursor(persistentSubscription); @@ -992,9 +991,6 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St new BrokerServiceException("Connection was closed while the opening the cursor ")); } else { checkReplicatedSubscriptionControllerState(); - if (schemaVersion != -1L) { - putSchemaAndVersionInSchemaCache(schemaVersion, schemaData); - } if (log.isDebugEnabled()) { log.debug("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId); } @@ -1044,7 +1040,7 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs KeySharedMeta keySharedMeta) { return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec, - replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null, -1L); + replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null); } private CompletableFuture getDurableSubscription(String subscriptionName, @@ -3749,24 +3745,8 @@ public synchronized OffloadProcessStatus offloadStatus() { private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class); @Override - public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schema) { - return hasSchema().handle((hasSchema, ex) -> { - log.warn("hasSchema: " + hasSchema + " ex: " + ex); - if (ex != null) { - log.error("addSchemaIfIdleOrCheckCompatible: " + ex.getMessage()); - if (ex.getMessage().contains("Failed to open ledger")) { - getLatestSchemaVersion().thenApply(schemaVersion -> { - long version = ((LongSchemaVersion) schemaVersion).getVersion(); - if (schemaCache.containsKey(version)) { - return tryCompleteTheLostSchema(schemaVersion, schemaCache.get(version)); - } - return CompletableFuture.failedFuture(ex); - }); - return true; - } - } - return hasSchema; - }).thenCompose((hasSchema) -> { + public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schema) { + return hasSchema().thenCompose((hasSchema) -> { int numActiveConsumersWithoutAutoSchema = subscriptions.values().stream() .mapToInt(subscription -> subscription.getConsumers().stream() .filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME) @@ -3776,17 +3756,10 @@ public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schem || (userCreatedProducerCount > 0) || (numActiveConsumersWithoutAutoSchema != 0) || (ledger.getTotalSize() != 0)) { - log.warn("################ checkSchemaCompatibleForConsumer:" + hasSchema); - checkSchemaCompatibleForConsumer(schema); - log.warn("################ checkSchemaCompatibleForConsumer success"); - return findSchemaVersion(schema); + return checkSchemaCompatibleForConsumer(schema); } else { - log.warn("################ addSchema:"); - return addSchema(schema).thenApply(version -> { - log.warn("################ addSchema:" + version); - LongSchemaVersion longVersion = (LongSchemaVersion) version; - return longVersion.getVersion(); - }); + return addSchema(schema).thenCompose(schemaVersion -> + CompletableFuture.completedFuture(null)); } }); } From 1fd6774ada1a62c9729a7c8390d7b7abad43f57d Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Thu, 18 Apr 2024 13:08:57 +0800 Subject: [PATCH 7/9] add information about the complement schema to the upload schema properties --- .../pulsar/broker/service/AbstractTopic.java | 21 -------------- .../apache/pulsar/broker/service/Topic.java | 2 -- .../schema/BookkeeperSchemaStorage.java | 2 +- .../schema/DefaultSchemaRegistryService.java | 6 ---- .../broker/service/schema/SchemaRegistry.java | 2 -- .../schema/SchemaRegistryServiceImpl.java | 29 +++++++++++-------- ...egistryServiceWithSchemaDataValidator.java | 6 ---- .../common/protocol/schema/SchemaStorage.java | 2 +- 8 files changed, 19 insertions(+), 51 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index c9b1d70ada0b1..05defa60c050b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -688,27 +688,6 @@ public CompletableFuture checkSchemaCompatibleForConsumer(SchemaData schem .checkConsumerCompatibility(id, schema, getSchemaCompatibilityStrategy()); } - protected CompletableFuture tryCompleteTheLostSchema(SchemaVersion schemaVersion, SchemaData schema) { - String id = getSchemaId(); - return brokerService.pulsar() - .getSchemaRegistryService() - .tryCompleteTheLostSchema(id, schemaVersion, schema); - } - - @Override - public CompletableFuture findSchemaVersion(SchemaData schema) { - String id = getSchemaId(); - return brokerService.pulsar() - .getSchemaRegistryService() - .findSchemaVersion(id, schema); - } - - protected CompletableFuture getLatestSchemaVersion() { - return brokerService.pulsar() - .getSchemaRegistryService() - .getLatestSchemaVersion(getSchemaId()); - } - @Override public CompletableFuture> addProducer(Producer producer, CompletableFuture producerQueuedFuture) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 8f52967127c7d..a296052a41191 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -305,8 +305,6 @@ CompletableFuture asyncGetStats(boolean getPreciseBack */ CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schema); - CompletableFuture findSchemaVersion(SchemaData schema); - CompletableFuture deleteForcefully(); default Optional getDispatchRateLimiter() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 1bd8215a21e4e..93d7b25c52211 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -293,7 +293,7 @@ public CompletableFuture getLatestSchemaVersion(String key) { } @Override - public CompletableFuture tryCompleteTheLostSchemaLedger(String schemaId, SchemaVersion version, + public CompletableFuture tryComplementTheLostSchemaLedger(String schemaId, SchemaVersion version, SchemaData schema) { CompletableFuture promise = new CompletableFuture<>(); tryCompleteTheLostSchemaLedger(schemaId, version, schema, promise); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java index 35ab93c084ea9..0bca7996221ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java @@ -70,12 +70,6 @@ public CompletableFuture getSchemaVersionBySchemaData(List tryCompleteTheLostSchema(String schemaId, SchemaVersion schemaVersion, - SchemaData schema) { - return completedFuture(null); - } - @Override public CompletableFuture getLatestSchemaVersion(String schemaId) { return completedFuture(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java index c9578d51f5793..1adbd31c56170 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java @@ -59,8 +59,6 @@ CompletableFuture checkConsumerCompatibility(String schemaId, SchemaData s CompletableFuture getSchemaVersionBySchemaData(List schemaAndMetadataList, SchemaData schemaData); - CompletableFuture tryCompleteTheLostSchema(String schemaId, SchemaVersion schemaVersion, SchemaData schema); - CompletableFuture getLatestSchemaVersion(String schemaId); SchemaVersion versionFromBytes(byte[] version); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index 9d35be46b0854..0e8857f490518 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -30,6 +30,7 @@ import com.google.common.hash.Hashing; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import java.nio.ByteBuffer; import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; @@ -67,6 +68,8 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService { private final SchemaStorage schemaStorage; private final Clock clock; private final SchemaRegistryStats stats; + private static final String COMPLEMENT_SCHEMA_ENABLE = "complementSchemaEnabled"; + private static final String COMPLEMENT_SCHEMA_VERSION = "complementSchemaVersion"; @VisibleForTesting SchemaRegistryServiceImpl(SchemaStorage schemaStorage, @@ -230,6 +233,14 @@ public CompletableFuture putSchemaIfAbsent(String schemaId, Schem }))).whenComplete((v, ex) -> { if (ex != null) { log.error("[{}] Put schema failed", schemaId, ex); + if (ex.getMessage().contains("No such ledger exists on Bookies") + && schema.getProps().containsKey(COMPLEMENT_SCHEMA_ENABLE) + && schema.getProps().containsKey(COMPLEMENT_SCHEMA_VERSION)) { + ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES); + bbVersion.putLong(Long.parseLong(schema.getProps().get(COMPLEMENT_SCHEMA_VERSION))); + SchemaVersion schemaVersion = versionFromBytes(bbVersion.array()); + tryComplementTheLostSchema(schemaId, schemaVersion, schema, promise); + } if (start.getValue() != 0) { this.stats.recordPutFailed(schemaId); } @@ -445,30 +456,24 @@ public CompletableFuture getSchemaVersionBySchemaData( return completableFuture; } - @Override - public CompletableFuture tryCompleteTheLostSchema(String schemaId, SchemaVersion schemaVersion, - SchemaData schema) { - // long start = this.clock.millis(); - CompletableFuture longCompletableFuture = new CompletableFuture<>(); + private void tryComplementTheLostSchema(String schemaId, SchemaVersion schemaVersion, + SchemaData schema, + CompletableFuture promise) { schemaStorage - .tryCompleteTheLostSchemaLedger(schemaId, schemaVersion, schema) + .tryComplementTheLostSchemaLedger(schemaId, schemaVersion, schema) .whenComplete((v, t) -> { - // TODO: add completeLostSchemaLedger stats? if (t != null) { - // this.stats.recordDelFailed(schemaId); log.error("[{}] Complete lost schema({}) failed", schemaId, ((LongSchemaVersion) schemaVersion).getVersion()); - longCompletableFuture.completeExceptionally(t); + promise.completeExceptionally(t); } else { - // this.stats.recordDelLatency(schemaId, this.clock.millis() - start); if (log.isDebugEnabled()) { log.debug("[{}] Complete lost schema({}) finished", schemaId, ((LongSchemaVersion) schemaVersion).getVersion()); } - longCompletableFuture.complete(null); + promise.complete(schemaVersion); } }); - return longCompletableFuture; } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java index 9573db65ef17b..be52ee5a857f1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java @@ -84,12 +84,6 @@ public CompletableFuture getSchemaVersionBySchemaData(List tryCompleteTheLostSchema(String schemaId, SchemaVersion schemaVersion, - SchemaData schema) { - return this.service.tryCompleteTheLostSchema(schemaId, schemaVersion, schema); - } - @Override public CompletableFuture getLatestSchemaVersion(String schemaId) { return this.service.getLatestSchemaVersion(schemaId); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java index ec1058c197353..d97c22f276aba 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java @@ -56,7 +56,7 @@ default CompletableFuture put(String key, CompletableFuture getLatestSchemaVersion(String key); - CompletableFuture tryCompleteTheLostSchemaLedger(String schemaId, SchemaVersion version, SchemaData schema); + CompletableFuture tryComplementTheLostSchemaLedger(String schemaId, SchemaVersion version, SchemaData schema); void start() throws Exception; From d9c4b0f3d7f9838355c0ac568aba4ce54a724257 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Thu, 18 Apr 2024 13:17:27 +0800 Subject: [PATCH 8/9] remove useless code --- .../service/schema/BookkeeperSchemaStorage.java | 12 ------------ .../service/schema/DefaultSchemaRegistryService.java | 5 ----- .../pulsar/broker/service/schema/SchemaRegistry.java | 2 -- .../service/schema/SchemaRegistryServiceImpl.java | 7 +------ ...SchemaRegistryServiceWithSchemaDataValidator.java | 5 ----- .../pulsar/common/protocol/schema/SchemaStorage.java | 2 -- 6 files changed, 1 insertion(+), 32 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 93d7b25c52211..3920b866b1dc1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -280,18 +280,6 @@ public SchemaVersion versionFromBytes(byte[] version) { return new LongSchemaVersion(bb.getLong()); } - @NotNull - public CompletableFuture getLatestSchemaVersion(String key) { - return getSchemaLocator(getSchemaPath(key)).thenCompose(locator -> { - if (locator.isEmpty()) { - return completedFuture(null); - } - - SchemaStorageFormat.SchemaLocator schemaLocator = locator.get().locator; - return completedFuture(new LongSchemaVersion(schemaLocator.getInfo().getVersion())); - }); - } - @Override public CompletableFuture tryComplementTheLostSchemaLedger(String schemaId, SchemaVersion version, SchemaData schema) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java index 0bca7996221ab..1f36419c815ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java @@ -70,11 +70,6 @@ public CompletableFuture getSchemaVersionBySchemaData(List getLatestSchemaVersion(String schemaId) { - return completedFuture(null); - } - @Override public CompletableFuture deleteSchema(String schemaId, String user, boolean force) { return completedFuture(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java index 1adbd31c56170..8e9831ae34242 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java @@ -59,8 +59,6 @@ CompletableFuture checkConsumerCompatibility(String schemaId, SchemaData s CompletableFuture getSchemaVersionBySchemaData(List schemaAndMetadataList, SchemaData schemaData); - CompletableFuture getLatestSchemaVersion(String schemaId); - SchemaVersion versionFromBytes(byte[] version); class SchemaAndMetadata { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index 0e8857f490518..0572d638affbd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -476,13 +476,8 @@ private void tryComplementTheLostSchema(String schemaId, SchemaVersion schemaVer }); } - @Override - public CompletableFuture getLatestSchemaVersion(String schemaId) { - return schemaStorage.getLatestSchemaVersion(schemaId); - } - private CompletableFuture checkCompatibilityWithLatest(String schemaId, SchemaData schema, - SchemaCompatibilityStrategy strategy) { + SchemaCompatibilityStrategy strategy) { if (SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE == strategy) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java index be52ee5a857f1..b3032d5498f18 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java @@ -84,11 +84,6 @@ public CompletableFuture getSchemaVersionBySchemaData(List getLatestSchemaVersion(String schemaId) { - return this.service.getLatestSchemaVersion(schemaId); - } - @Override public CompletableFuture putSchemaIfAbsent(String schemaId, SchemaData schema, diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java index d97c22f276aba..f6d33c798f4b1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java @@ -54,8 +54,6 @@ default CompletableFuture put(String key, SchemaVersion versionFromBytes(byte[] version); - CompletableFuture getLatestSchemaVersion(String key); - CompletableFuture tryComplementTheLostSchemaLedger(String schemaId, SchemaVersion version, SchemaData schema); void start() throws Exception; From 7a500f7cf83698f35285ca3e6b681a1204fcb404 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Thu, 25 Apr 2024 23:41:48 +0800 Subject: [PATCH 9/9] remove delete schema on metastore, and complete test code. --- .../schema/SchemaRegistryServiceImpl.java | 139 +++++++++--------- .../service/schema/SchemaServiceTest.java | 98 +++++++----- 2 files changed, 133 insertions(+), 104 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index 0572d638affbd..195284deaac01 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -191,73 +191,78 @@ public CompletableFuture putSchemaIfAbsent(String schemaId, Schem SchemaCompatibilityStrategy strategy) { MutableLong start = new MutableLong(0); CompletableFuture promise = new CompletableFuture<>(); - schemaStorage.put(schemaId, - schemasFuture -> schemasFuture - .thenCompose(schemaFutureList -> trimDeletedSchemaAndGetList(schemaId, - convertToSchemaAndMetadata(schemaId, schemaFutureList))) - .thenCompose(schemaAndMetadataList -> getSchemaVersionBySchemaData(schemaAndMetadataList, schema) - .thenCompose(schemaVersion -> { - if (schemaVersion != null) { - if (log.isDebugEnabled()) { - log.debug("[{}] Schema is already exists", schemaId); - } - promise.complete(schemaVersion); - return CompletableFuture.completedFuture(null); - } - CompletableFuture checkCompatibilityFuture = new CompletableFuture<>(); - if (schemaAndMetadataList.size() != 0) { - if (isTransitiveStrategy(strategy)) { - checkCompatibilityFuture = - checkCompatibilityWithAll(schemaId, schema, strategy, schemaAndMetadataList); - } else { - checkCompatibilityFuture = checkCompatibilityWithLatest(schemaId, schema, strategy); - } - } else { - checkCompatibilityFuture.complete(null); - } - return checkCompatibilityFuture.thenCompose(v -> { - byte[] context = hashFunction.hashBytes(schema.getData()).asBytes(); - SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder() - .setType(Functions.convertFromDomainType(schema.getType())) - .setSchema(ByteString.copyFrom(schema.getData())) - .setSchemaId(schemaId) - .setUser(schema.getUser()) - .setDeleted(false) - .setTimestamp(clock.millis()) - .addAllProps(toPairs(schema.getProps())) - .build(); - - start.setValue(this.clock.millis()); - return CompletableFuture.completedFuture(Pair.of(info.toByteArray(), context)); - }); - }))).whenComplete((v, ex) -> { - if (ex != null) { - log.error("[{}] Put schema failed", schemaId, ex); - if (ex.getMessage().contains("No such ledger exists on Bookies") - && schema.getProps().containsKey(COMPLEMENT_SCHEMA_ENABLE) - && schema.getProps().containsKey(COMPLEMENT_SCHEMA_VERSION)) { - ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES); - bbVersion.putLong(Long.parseLong(schema.getProps().get(COMPLEMENT_SCHEMA_VERSION))); - SchemaVersion schemaVersion = versionFromBytes(bbVersion.array()); - tryComplementTheLostSchema(schemaId, schemaVersion, schema, promise); - } + if (schema.getProps().containsKey(COMPLEMENT_SCHEMA_ENABLE) + && schema.getProps().containsKey(COMPLEMENT_SCHEMA_VERSION)) { + ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES); + bbVersion.putLong(Long.parseLong(schema.getProps().get(COMPLEMENT_SCHEMA_VERSION))); + SchemaVersion schemaVersion = versionFromBytes(bbVersion.array()); + tryComplementTheLostSchema(schemaId, schemaVersion, schema, promise); + } else { + schemaStorage.put(schemaId, + schemasFuture -> schemasFuture + .thenCompose(schemaFutureList -> trimDeletedSchemaAndGetList(schemaId, + convertToSchemaAndMetadata(schemaId, schemaFutureList))) + .thenCompose(schemaAndMetadataList -> + getSchemaVersionBySchemaData(schemaAndMetadataList, schema) + .thenCompose(schemaVersion -> { + if (schemaVersion != null) { + if (log.isDebugEnabled()) { + log.debug("[{}] Schema is already exists", schemaId); + } + promise.complete(schemaVersion); + return CompletableFuture.completedFuture(null); + } + CompletableFuture checkCompatibilityFuture = new CompletableFuture<>(); + if (schemaAndMetadataList.size() != 0) { + if (isTransitiveStrategy(strategy)) { + checkCompatibilityFuture = + checkCompatibilityWithAll(schemaId, schema, strategy, + schemaAndMetadataList); + } else { + checkCompatibilityFuture = checkCompatibilityWithLatest(schemaId, + schema, strategy); + } + } else { + checkCompatibilityFuture.complete(null); + } + return checkCompatibilityFuture.thenCompose(v -> { + byte[] context = hashFunction.hashBytes(schema.getData()).asBytes(); + SchemaRegistryFormat.SchemaInfo info = + SchemaRegistryFormat.SchemaInfo.newBuilder() + .setType(Functions.convertFromDomainType(schema.getType())) + .setSchema(ByteString.copyFrom(schema.getData())) + .setSchemaId(schemaId) + .setUser(schema.getUser()) + .setDeleted(false) + .setTimestamp(clock.millis()) + .addAllProps(toPairs(schema.getProps())) + .build(); + + start.setValue(this.clock.millis()); + return CompletableFuture.completedFuture(Pair.of(info.toByteArray(), + context)); + }); + }))).whenComplete((v, ex) -> { + if (ex != null) { + log.error("[{}] Put schema failed", schemaId, ex); + if (start.getValue() != 0) { + this.stats.recordPutFailed(schemaId); + } + promise.completeExceptionally(ex); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Put schema finished", schemaId); + } + // The schema storage will return null schema version if no schema is persisted to the storage + if (v != null) { + promise.complete(v); if (start.getValue() != 0) { - this.stats.recordPutFailed(schemaId); - } - promise.completeExceptionally(ex); - } else { - if (log.isDebugEnabled()) { - log.debug("[{}] Put schema finished", schemaId); - } - // The schema storage will return null schema version if no schema is persisted to the storage - if (v != null) { - promise.complete(v); - if (start.getValue() != 0) { - this.stats.recordPutLatency(schemaId, this.clock.millis() - start.getValue()); - } + this.stats.recordPutLatency(schemaId, this.clock.millis() - start.getValue()); } } + } }); + } return promise; } @@ -592,13 +597,7 @@ private CompletableFuture> trimDeletedSchemaAndGetList(S } }); trimDeletedSchemaAndGetList(list); - // clean up the broken schema from zk - deleteSchemaStorage(schemaId, true).handle((sv, th) -> { - log.info("Clean up non-recoverable schema {}. Deletion of schema {} {}", rc.getMessage(), - schemaId, (th == null ? "successful" : "failed, " + th.getCause().getMessage())); - schemaResult.complete(list); - return null; - }); + schemaResult.complete(list); return null; } // trim the deleted schema and return the result if schema is retrieved successfully diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index b9c9df23beee8..ec4b4ca42880c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -59,8 +59,6 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; -import org.apache.pulsar.broker.stats.PrometheusMetricsTest; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -69,6 +67,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse; +import org.apache.pulsar.common.protocol.schema.PostSchemaPayload; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.KeyValueEncodingType; @@ -76,6 +75,7 @@ import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfoWithVersion; import org.apache.pulsar.common.schema.SchemaType; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -379,19 +379,8 @@ public void testSchemaLedgerLost(List lostSchemaLedgerIndexes) throws E .subscriptionName("sub0") .consumerName("consumer1") .subscribe(); - // @Cleanup - // Consumer consumer2 = pulsarClient.newConsumer(schemaV2) - // .topic(topic) - // .subscriptionType(SubscriptionType.Shared) - // .subscriptionName("sub0") - // .consumerName("consumerAfterLostLedger2") - // .subscribe(); - assertEquals(admin.schemas().getAllSchemas(topic).size(), 2); - - SchemaAndMetadata schemaAndMetadata0 = schemaRegistryService.getSchema(TopicName.get(topic) - .getSchemaName(), new LongSchemaVersion(0)).get(); - SchemaAndMetadata schemaAndMetadata1 = schemaRegistryService.getSchema(TopicName.get(topic) - .getSchemaName(), new LongSchemaVersion(1)).get(); + + assertEquals(2, admin.schemas().getAllSchemas(topic).size()); // delete ledger String key = TopicName.get(topic).getSchemaName(); @@ -414,34 +403,64 @@ public void testSchemaLedgerLost(List lostSchemaLedgerIndexes) throws E consumer1.acknowledge(msg); } - // try to fix the lost schema ledger - if (lostSchemaLedgerIndexes.contains(0) && lostSchemaLedgerIndexes.contains(1)) { - schemaRegistryService.tryCompleteTheLostSchema(TopicName.get(topic) - .getSchemaName(), new LongSchemaVersion(0), schemaAndMetadata0.schema); - schemaRegistryService.tryCompleteTheLostSchema(TopicName.get(topic) - .getSchemaName(), new LongSchemaVersion(1), schemaAndMetadata1.schema).join(); - } else if (lostSchemaLedgerIndexes.contains(0) && !lostSchemaLedgerIndexes.contains(1)) { - schemaRegistryService.tryCompleteTheLostSchema(TopicName.get(topic) - .getSchemaName(), new LongSchemaVersion(0), schemaAndMetadata0.schema).join(); - } else if (!lostSchemaLedgerIndexes.contains(0) && lostSchemaLedgerIndexes.contains(1)) { - schemaRegistryService.tryCompleteTheLostSchema(TopicName.get(topic) - .getSchemaName(), new LongSchemaVersion(1), schemaAndMetadata1.schema).join(); + // try to complement the lost schema ledger + PostSchemaPayload v1Payload = new PostSchemaPayload(); + v1Payload.setType("AVRO"); + PostSchemaPayload v2Payload = new PostSchemaPayload(); + v2Payload.setType("AVRO"); + Map v1Properties = new HashMap<>(); + v1Properties.put("complementSchemaEnabled", "true"); + v1Properties.put("complementSchemaVersion", "0"); + Map v2Properties = new HashMap<>(); + v2Properties.put("complementSchemaEnabled", "true"); + v2Properties.put("complementSchemaVersion", "1"); + + if (lostSchemaLedgerIndexes.contains(0) && lostSchemaLedgerIndexes.size() == 1) { + SchemaInfo schemaInfo = schemaV1.getSchemaInfo(); + v1Payload.setSchema(new String(schemaInfo.getSchema(), StandardCharsets.UTF_8)); + v1Properties.putAll(schemaInfo.getProperties()); + v1Payload.setProperties(v1Properties); + admin.schemas().createSchema(topic, v1Payload); + } else if (lostSchemaLedgerIndexes.contains(1) && lostSchemaLedgerIndexes.size() == 1) { + SchemaInfo schemaInfo = schemaV2.getSchemaInfo(); + v2Payload.setSchema(new String(schemaInfo.getSchema(), StandardCharsets.UTF_8)); + v2Properties.putAll(schemaInfo.getProperties()); + v2Payload.setProperties(v2Properties); + admin.schemas().createSchema(topic, v2Payload); + } else if (lostSchemaLedgerIndexes.size() == 2) { + SchemaInfo v1SchemaInfo = schemaV1.getSchemaInfo(); + v1Payload.setSchema(new String(v1SchemaInfo.getSchema(), StandardCharsets.UTF_8)); + v1Properties.putAll(v1SchemaInfo.getProperties()); + v1Payload.setProperties(v1Properties); + admin.schemas().createSchema(topic, v1Payload); + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(1, admin.schemas().getAllSchemas(topic).size()); + }); + + SchemaInfo v2SchemaInfo = schemaV2.getSchemaInfo(); + v2Payload.setSchema(new String(v2SchemaInfo.getSchema(), StandardCharsets.UTF_8)); + v2Properties.putAll(v2SchemaInfo.getProperties()); + v2Payload.setProperties(v2Properties); + admin.schemas().createSchema(topic, v2Payload); + Awaitility.await().untilAsserted(() -> { + assertEquals(2, admin.schemas().getAllSchemas(topic).size()); + }); } - assertEquals(admin.schemas().getAllSchemas(topic).size(), 2); + assertEquals(2, admin.schemas().getAllSchemas(topic).size()); - @Cleanup - Producer producerAfterLostLedger1 = pulsarClient.newProducer(schemaV1) - .topic(topic) - .producerName("producerAfterLostLedger1") - .create(); - assertNotNull(producerAfterLostLedger1.send(new V1Data(10))); @Cleanup Producer producerAfterLostLedger2 = pulsarClient.newProducer(schemaV2) .topic(topic) .producerName("producerAfterLostLedger2") .create(); assertNotNull(producerAfterLostLedger2.send(new V2Data(10, 10))); + @Cleanup + Producer producerAfterLostLedger1 = pulsarClient.newProducer(schemaV1) + .topic(topic) + .producerName("producerAfterLostLedger1") + .create(); + assertNotNull(producerAfterLostLedger1.send(new V1Data(10))); @Cleanup Consumer consumerAfterLostLedger1 = pulsarClient.newConsumer(schemaV1) @@ -463,6 +482,17 @@ public void testSchemaLedgerLost(List lostSchemaLedgerIndexes) throws E producer2.send(new V2Data(11, 11)); assertNotNull(consumerAfterLostLedger2.receive(3, TimeUnit.SECONDS)); + for (int i = 0; i < numMessages; i++) { + producer1.send(new V1Data(i)); + producer2.send(new V2Data(i, i + 1)); + } + for (int i = 0; i < numMessages; i++) { + Message msg = consumer1.receive(3, TimeUnit.SECONDS); + consumer1.acknowledge(msg); + } + + assertEquals(2, admin.schemas().getAllSchemas(topic).size()); + producer1.close(); producer2.close(); consumer1.close();