From bd883f0424aa011bfae8f913b831e8bd16a31d02 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Sun, 19 Feb 2023 21:36:41 +0800 Subject: [PATCH 1/2] PIP-246: Improved PROTOBUF_NATIVE schema compatibility checks without using avro-protobuf. --- .../ProtobufNativeSchemaBreakCheckUtils.java | 172 +++++++++++++++ ...rotobufNativeSchemaCompatibilityCheck.java | 72 ++++--- .../ProtoBufCanReadCheckException.java | 26 +++ ...otobufNativeAlwaysCompatibleValidator.java | 33 +++ ...rotobufNativeNeverCompatibleValidator.java | 37 ++++ ...rotobufNativeSchemaBreakValidatorImpl.java | 80 +++++++ ...rotobufNativeSchemaValidationStrategy.java | 41 ++++ .../ProtobufNativeSchemaValidator.java | 28 +++ .../ProtobufNativeSchemaValidatorBuilder.java | 41 ++++ ...otobufNativeSchemaBreakCheckUtilsTest.java | 203 ++++++++++++++++++ ...bufNativeSchemaCompatibilityCheckTest.java | 127 ++++++++++- .../src/test/proto/ExternalReader.proto | 28 +++ pulsar-broker/src/test/proto/Reader.proto | 63 ++++++ pulsar-broker/src/test/proto/Writer.proto | 63 ++++++ .../test/proto/WriterAddRequiredField.proto | 65 ++++++ .../proto/WriterDeleteRequiredField.proto | 62 ++++++ .../proto/WriterWithAddHasDefaultValue.proto | 64 ++++++ .../proto/WriterWithAddNoDefaultValue.proto | 64 ++++++ .../src/test/proto/WriterWithEnumAdd.proto | 64 ++++++ .../src/test/proto/WriterWithEnumDelete.proto | 63 ++++++ .../proto/WriterWithFieldNameChange.proto | 63 ++++++ .../proto/WriterWithFieldNumberChange.proto | 63 ++++++ .../proto/WriterWithFieldTypeChange.proto | 63 ++++++ .../WriterWithRemoveDefaultValueField.proto | 63 ++++++ .../WriterWithRemoveNoDefaultValueField.proto | 63 ++++++ .../test/proto/WriterWithTypeNameChange.proto | 63 ++++++ .../impl/schema/ProtobufNativeSchema.java | 10 +- .../schema/ProtobufNativeSchemaUtils.java | 25 +++ .../schema/ProtobufNativeSchemaUtilsTest.java | 31 +++ 29 files changed, 1812 insertions(+), 28 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtils.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/ProtoBufCanReadCheckException.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeAlwaysCompatibleValidator.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeNeverCompatibleValidator.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaBreakValidatorImpl.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidationStrategy.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidator.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidatorBuilder.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtilsTest.java create mode 100644 pulsar-broker/src/test/proto/ExternalReader.proto create mode 100644 pulsar-broker/src/test/proto/Reader.proto create mode 100644 pulsar-broker/src/test/proto/Writer.proto create mode 100644 pulsar-broker/src/test/proto/WriterAddRequiredField.proto create mode 100644 pulsar-broker/src/test/proto/WriterDeleteRequiredField.proto create mode 100644 pulsar-broker/src/test/proto/WriterWithAddHasDefaultValue.proto create mode 100644 pulsar-broker/src/test/proto/WriterWithAddNoDefaultValue.proto create mode 100644 pulsar-broker/src/test/proto/WriterWithEnumAdd.proto create mode 100644 pulsar-broker/src/test/proto/WriterWithEnumDelete.proto create mode 100644 pulsar-broker/src/test/proto/WriterWithFieldNameChange.proto create mode 100644 pulsar-broker/src/test/proto/WriterWithFieldNumberChange.proto create mode 100644 pulsar-broker/src/test/proto/WriterWithFieldTypeChange.proto create mode 100644 pulsar-broker/src/test/proto/WriterWithRemoveDefaultValueField.proto create mode 100644 pulsar-broker/src/test/proto/WriterWithRemoveNoDefaultValueField.proto create mode 100644 pulsar-broker/src/test/proto/WriterWithTypeNameChange.proto diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtils.java new file mode 100644 index 0000000000000..d1ff73377c413 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtils.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import static org.apache.pulsar.client.impl.schema.ProtobufNativeSchema.ProtoBufParsingInfo; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.ListUtils; +import org.apache.pulsar.broker.service.schema.exceptions.ProtoBufCanReadCheckException; +import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema; +import org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaUtils; + +@Slf4j +public class ProtobufNativeSchemaBreakCheckUtils { + + public static void checkSchemaCompatibility(Descriptors.Descriptor writtenSchema, + Descriptors.Descriptor readSchema) + throws ProtoBufCanReadCheckException { + String writtenSchemaRootName = writtenSchema.getName(); + String readSchemaRootName = readSchema.getName(); + if (!writtenSchemaRootName.equals(readSchemaRootName)) { + throw new ProtoBufCanReadCheckException("Protobuf root message isn't allow change!"); + } + + Map> writtenSchemaAllProto = new HashMap<>(); + ProtobufNativeSchemaUtils.getSchemaDependenciesFileDescriptorCache(writtenSchema) + .forEach((s, fileDescriptorProto) -> { + ProtobufNativeSchemaUtils.coverAllNestedAndEnumFileDescriptor(fileDescriptorProto, + writtenSchemaAllProto); + }); + + Map> readSchemaAllProto = new HashMap<>(); + ProtobufNativeSchemaUtils.getSchemaDependenciesFileDescriptorCache(readSchema) + .forEach((s, fileDescriptorProto) -> { + ProtobufNativeSchemaUtils.coverAllNestedAndEnumFileDescriptor(fileDescriptorProto, + readSchemaAllProto); + }); + + List writtenRootProtoBufParsingInfos = writtenSchemaAllProto.get(writtenSchemaRootName); + List readRootProtoBufParsingInfos = readSchemaAllProto.get(readSchemaRootName); + // root check first + check(writtenRootProtoBufParsingInfos, readRootProtoBufParsingInfos); + + for (String writtenSchemaMessageName : writtenSchemaAllProto.keySet()) { + // skip root + if (!writtenSchemaMessageName.equals(writtenSchemaRootName) + && readSchemaAllProto.containsKey(writtenSchemaMessageName)) { + List writtenProtoBufParsingInfoList = + writtenSchemaAllProto.get(writtenSchemaMessageName); + List readProtoBufParsingInfoList = + readSchemaAllProto.get(writtenSchemaMessageName); + check(writtenProtoBufParsingInfoList, readProtoBufParsingInfoList); + } + } + } + + private static void check(List writtenSchemaFieldInfoList, + List readSchemaFieldInfoList) + throws ProtoBufCanReadCheckException { + List readSchemaRequiredFields = new LinkedList<>(); + List writtenSchemaRequiredFields = new LinkedList<>(); + Map readSchemaFieldInfoByFieldNumberMap = new HashMap<>(); + Map writtenSchemaFieldInfoByFieldNumberMap = new HashMap<>(); + Map writtenSchemaFieldInfoByFieldNameMap = new HashMap<>(); + + readSchemaFieldInfoList.forEach(readSchemaFieldInfo -> { + if (readSchemaFieldInfo.getLabel() + .equals(DescriptorProtos.FieldDescriptorProto.Label.LABEL_REQUIRED.name())) { + readSchemaRequiredFields.add(readSchemaFieldInfo); + } + readSchemaFieldInfoByFieldNumberMap.put(readSchemaFieldInfo.getNumber(), readSchemaFieldInfo); + }); + writtenSchemaFieldInfoList.forEach(writtenSchemaFieldInfo -> { + if (writtenSchemaFieldInfo.getLabel() + .equals(DescriptorProtos.FieldDescriptorProto.Label.LABEL_REQUIRED.name())) { + writtenSchemaRequiredFields.add(writtenSchemaFieldInfo); + } + writtenSchemaFieldInfoByFieldNumberMap.put(writtenSchemaFieldInfo.getNumber(), writtenSchemaFieldInfo); + writtenSchemaFieldInfoByFieldNameMap.put(writtenSchemaFieldInfo.getName(), writtenSchemaFieldInfo); + }); + + if (!ListUtils.isEqualList(readSchemaRequiredFields, writtenSchemaRequiredFields)) { + throw new ProtoBufCanReadCheckException("Required Fields have been modified."); + } + + int readSchemaMaxFieldNumber = readSchemaFieldInfoList.stream() + .mapToInt(ProtoBufParsingInfo::getNumber).max().orElse(0); + int writtenSchemaMaxFieldNumber = writtenSchemaFieldInfoList.stream() + .mapToInt(ProtoBufParsingInfo::getNumber).max().orElse(0); + int maxFieldNumber = Math.max(readSchemaMaxFieldNumber, writtenSchemaMaxFieldNumber); + readSchemaFieldInfoList = fillUpProtoBufParsingInfoList(readSchemaFieldInfoByFieldNumberMap, + maxFieldNumber); + writtenSchemaFieldInfoList = fillUpProtoBufParsingInfoList(writtenSchemaFieldInfoByFieldNumberMap, + maxFieldNumber); + + ProtoBufParsingInfo readSchemaFieldInfo; + ProtoBufParsingInfo writtenSchemaFieldInfo; + for (int i = 0; i < maxFieldNumber; i++) { + readSchemaFieldInfo = readSchemaFieldInfoList.get(i); + writtenSchemaFieldInfo = writtenSchemaFieldInfoList.get(i); + if (readSchemaFieldInfo != null && !readSchemaFieldInfo.equals(writtenSchemaFieldInfo)) { + if (writtenSchemaFieldInfo != null + && readSchemaFieldInfo.getNumber() == writtenSchemaFieldInfo.getNumber() + && readSchemaFieldInfo.getName().equals(writtenSchemaFieldInfo.getName()) + && !readSchemaFieldInfo.getType().equals(writtenSchemaFieldInfo.getType())) { + // TODO: field type check, need to discuss first. + checkFieldTypeChanged(writtenSchemaFieldInfo, readSchemaFieldInfo); + } else if (writtenSchemaFieldInfoByFieldNameMap.containsKey(readSchemaFieldInfo.getName()) + && writtenSchemaFieldInfoByFieldNameMap.get(readSchemaFieldInfo.getName()).getNumber() + != readSchemaFieldInfo.getNumber()) { + throw new ProtoBufCanReadCheckException("The field number of the field have been changed."); + } else if (writtenSchemaFieldInfoByFieldNumberMap.containsKey(readSchemaFieldInfo.getNumber()) + && !writtenSchemaFieldInfoByFieldNumberMap.get(readSchemaFieldInfo.getNumber()).getName() + .equals(readSchemaFieldInfo.getName())) { + throw new ProtoBufCanReadCheckException("The field name of the field have been changed."); + } else if (writtenSchemaFieldInfo == null && !readSchemaFieldInfo.isHasDefaultValue()) { + throw new ProtoBufCanReadCheckException("No default value fields have been added or removed."); + } + } else if (readSchemaFieldInfo != null && readSchemaFieldInfo.equals(writtenSchemaFieldInfo) + && (readSchemaFieldInfo.getType() + .equals(DescriptorProtos.FieldDescriptorProto.Type.TYPE_ENUM.name()) + || readSchemaFieldInfo.getType() + .equals(DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE.name()))) { + String[] readSchemaFieldEnumName = readSchemaFieldInfo.getTypeName().split("\\."); + String[] writtenSchemaFieldEnumName = writtenSchemaFieldInfo.getTypeName().split("\\."); + if (!readSchemaFieldEnumName[readSchemaFieldEnumName.length - 1] + .equals(writtenSchemaFieldEnumName[writtenSchemaFieldEnumName.length - 1])) { + throw new ProtoBufCanReadCheckException("The field type name have been changed."); + } + } + } + } + + private static void checkFieldTypeChanged(ProtoBufParsingInfo writtenSchemaFieldInfo, + ProtoBufParsingInfo readSchemaFieldInfo) + throws ProtoBufCanReadCheckException { + // TODO: field type check, need to discuss first. + // throw new ProtoBufCanReadCheckException("The field type have been changed."); + } + + private static List fillUpProtoBufParsingInfoList( + Map protoBufParsingInfoMap, int maxCapacity) { + List fullProtoBufParsingInfoList = new LinkedList<>(); + for (int i = 0; i < maxCapacity; i++) { + int currentFieldNumber = i + 1; + fullProtoBufParsingInfoList.add(protoBufParsingInfoMap.getOrDefault(currentFieldNumber, null)); + } + return fullProtoBufParsingInfoList; + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java index 16b3b33ec7894..4e461f8b458bb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java @@ -18,8 +18,18 @@ */ package org.apache.pulsar.broker.service.schema; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.protobuf.Descriptors.Descriptor; +import java.util.Collections; +import java.util.LinkedList; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; +import org.apache.pulsar.broker.service.schema.exceptions.ProtoBufCanReadCheckException; +import org.apache.pulsar.broker.service.schema.validator.ProtobufNativeAlwaysCompatibleValidator; +import org.apache.pulsar.broker.service.schema.validator.ProtobufNativeNeverCompatibleValidator; +import org.apache.pulsar.broker.service.schema.validator.ProtobufNativeSchemaValidationStrategy; +import org.apache.pulsar.broker.service.schema.validator.ProtobufNativeSchemaValidator; +import org.apache.pulsar.broker.service.schema.validator.ProtobufNativeSchemaValidatorBuilder; import org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaUtils; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.schema.SchemaData; @@ -28,6 +38,7 @@ /** * The {@link SchemaCompatibilityCheck} implementation for {@link SchemaType#PROTOBUF_NATIVE}. */ +@Slf4j public class ProtobufNativeSchemaCompatibilityCheck implements SchemaCompatibilityCheck { @Override @@ -38,37 +49,50 @@ public SchemaType getSchemaType() { @Override public void checkCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException { - Descriptor fromDescriptor = ProtobufNativeSchemaUtils.deserialize(from.getData()); - Descriptor toDescriptor = ProtobufNativeSchemaUtils.deserialize(to.getData()); - switch (strategy) { - case BACKWARD_TRANSITIVE: - case BACKWARD: - case FORWARD_TRANSITIVE: - case FORWARD: - case FULL_TRANSITIVE: - case FULL: - checkRootMessageChange(fromDescriptor, toDescriptor, strategy); - return; - case ALWAYS_COMPATIBLE: - return; - default: - throw new IncompatibleSchemaException("Unknown SchemaCompatibilityStrategy."); - } + checkCompatible(Collections.singletonList(from), to, strategy); } @Override public void checkCompatible(Iterable from, SchemaData to, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException { - for (SchemaData schemaData : from) { - checkCompatible(schemaData, to, strategy); + checkArgument(from != null, "check compatibility list is null"); + LinkedList fromList = new LinkedList<>(); + try { + for (SchemaData schemaData : from) { + fromList.addFirst(ProtobufNativeSchemaUtils.deserialize(schemaData.getData())); + } + Descriptor toDescriptor = ProtobufNativeSchemaUtils.deserialize(to.getData()); + ProtobufNativeSchemaValidator schemaValidator = createSchemaValidator(strategy); + schemaValidator.validate(fromList, toDescriptor); + } catch (ProtoBufCanReadCheckException e) { + throw new IncompatibleSchemaException(e); } } - private void checkRootMessageChange(Descriptor fromDescriptor, Descriptor toDescriptor, - SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException { - if (!fromDescriptor.getFullName().equals(toDescriptor.getFullName())) { - throw new IncompatibleSchemaException("Protobuf root message isn't allow change!"); - } + static ProtobufNativeSchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy) { + final ProtobufNativeSchemaValidatorBuilder schemaValidatorBuilder = new + ProtobufNativeSchemaValidatorBuilder(); + return switch (compatibilityStrategy) { + case BACKWARD_TRANSITIVE -> schemaValidatorBuilder + .validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanReadExistingStrategy) + .isOnlyValidateLatest(false).build(); + case BACKWARD -> schemaValidatorBuilder + .validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanReadExistingStrategy) + .isOnlyValidateLatest(true).build(); + case FORWARD_TRANSITIVE -> schemaValidatorBuilder + .validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadByExistingStrategy) + .isOnlyValidateLatest(false).build(); + case FORWARD -> schemaValidatorBuilder + .validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadByExistingStrategy) + .isOnlyValidateLatest(true).build(); + case FULL_TRANSITIVE -> schemaValidatorBuilder + .validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadMutualStrategy) + .isOnlyValidateLatest(false).build(); + case FULL -> schemaValidatorBuilder + .validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadMutualStrategy) + .isOnlyValidateLatest(true).build(); + case ALWAYS_COMPATIBLE -> ProtobufNativeAlwaysCompatibleValidator.INSTANCE; + default -> ProtobufNativeNeverCompatibleValidator.INSTANCE; + }; } - } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/ProtoBufCanReadCheckException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/ProtoBufCanReadCheckException.java new file mode 100644 index 0000000000000..9f09045921dd6 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/ProtoBufCanReadCheckException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.exceptions; + +public class ProtoBufCanReadCheckException extends Exception { + + public ProtoBufCanReadCheckException(String message) { + super(message); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeAlwaysCompatibleValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeAlwaysCompatibleValidator.java new file mode 100644 index 0000000000000..d380f285898c1 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeAlwaysCompatibleValidator.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.validator; + +import com.google.protobuf.Descriptors; +import org.apache.pulsar.broker.service.schema.exceptions.ProtoBufCanReadCheckException; + +public class ProtobufNativeAlwaysCompatibleValidator implements ProtobufNativeSchemaValidator { + public static final ProtobufNativeAlwaysCompatibleValidator INSTANCE = + new ProtobufNativeAlwaysCompatibleValidator(); + + @Override + public void validate(Iterable fromDescriptor, Descriptors.Descriptor toDescriptor) + throws ProtoBufCanReadCheckException { + return; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeNeverCompatibleValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeNeverCompatibleValidator.java new file mode 100644 index 0000000000000..bff23aed321fa --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeNeverCompatibleValidator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.validator; + +import com.google.protobuf.Descriptors; +import org.apache.pulsar.broker.service.schema.exceptions.ProtoBufCanReadCheckException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProtobufNativeNeverCompatibleValidator implements ProtobufNativeSchemaValidator { + private static final Logger log = LoggerFactory.getLogger(ProtobufNativeNeverCompatibleValidator.class); + public static final ProtobufNativeNeverCompatibleValidator INSTANCE = new ProtobufNativeNeverCompatibleValidator(); + + @Override + public void validate(Iterable fromDescriptor, Descriptors.Descriptor toDescriptor) + throws ProtoBufCanReadCheckException { + for (Descriptors.Descriptor descriptor : fromDescriptor) { + throw new ProtoBufCanReadCheckException("Unknown SchemaCompatibilityStrategy."); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaBreakValidatorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaBreakValidatorImpl.java new file mode 100644 index 0000000000000..f21fd9fb0fcab --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaBreakValidatorImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.validator; + +import com.google.protobuf.Descriptors; +import java.util.Iterator; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaBreakCheckUtils; +import org.apache.pulsar.broker.service.schema.exceptions.ProtoBufCanReadCheckException; + +@Slf4j +public class ProtobufNativeSchemaBreakValidatorImpl implements ProtobufNativeSchemaValidator { + private final ProtobufNativeSchemaValidationStrategy strategy; + private final boolean onlyValidateLatest; + + public ProtobufNativeSchemaBreakValidatorImpl(ProtobufNativeSchemaValidationStrategy strategy, + boolean onlyValidateLatest) { + this.strategy = strategy; + this.onlyValidateLatest = onlyValidateLatest; + } + + @Override + public void validate(Iterable fromDescriptor, Descriptors.Descriptor toDescriptor) + throws ProtoBufCanReadCheckException { + if (onlyValidateLatest) { + validateLatest(fromDescriptor, toDescriptor); + } else { + validateAll(fromDescriptor, toDescriptor); + } + } + + private void validateAll(Iterable fromDescriptor, Descriptors.Descriptor toDescriptor) + throws ProtoBufCanReadCheckException { + for (Descriptors.Descriptor existing : fromDescriptor) { + validateWithStrategy(toDescriptor, existing); + } + } + + private void validateLatest(Iterable fromDescriptor, Descriptors.Descriptor toDescriptor) + throws ProtoBufCanReadCheckException { + Iterator schemas = fromDescriptor.iterator(); + if (schemas.hasNext()) { + Descriptors.Descriptor existing = schemas.next(); + validateWithStrategy(toDescriptor, existing); + } + } + + private void validateWithStrategy(Descriptors.Descriptor toValidate, Descriptors.Descriptor fromDescriptor) + throws ProtoBufCanReadCheckException { + switch (strategy) { + case CanReadExistingStrategy -> canRead(fromDescriptor, toValidate); + case CanBeReadByExistingStrategy -> canRead(toValidate, fromDescriptor); + case CanBeReadMutualStrategy -> { + canRead(toValidate, fromDescriptor); + canRead(fromDescriptor, toValidate); + } + } + } + + private void canRead(Descriptors.Descriptor writtenSchema, Descriptors.Descriptor readSchema) + throws ProtoBufCanReadCheckException { + ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writtenSchema, readSchema); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidationStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidationStrategy.java new file mode 100644 index 0000000000000..a6f61c56c8999 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidationStrategy.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.validator; + +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Stable +public enum ProtobufNativeSchemaValidationStrategy { + /** + * a schema can be used to read existing schema(s). + */ + CanReadExistingStrategy, + + /** + * a schema can be read by existing schema(s). + */ + CanBeReadByExistingStrategy, + + /** + * a schema can read existing schema(s). + */ + CanBeReadMutualStrategy +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidator.java new file mode 100644 index 0000000000000..5e45e46e19651 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidator.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.validator; + +import com.google.protobuf.Descriptors; +import org.apache.pulsar.broker.service.schema.exceptions.ProtoBufCanReadCheckException; + +public interface ProtobufNativeSchemaValidator { + + void validate(Iterable fromDescriptor, Descriptors.Descriptor toDescriptor) + throws ProtoBufCanReadCheckException; +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidatorBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidatorBuilder.java new file mode 100644 index 0000000000000..2397c2a030299 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidatorBuilder.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.validator; + +import lombok.NonNull; + +public class ProtobufNativeSchemaValidatorBuilder { + private ProtobufNativeSchemaValidationStrategy strategy; + private boolean onlyValidateLatest; + + public ProtobufNativeSchemaValidatorBuilder validatorStrategy( + @NonNull ProtobufNativeSchemaValidationStrategy strategy) { + this.strategy = strategy; + return this; + } + + public ProtobufNativeSchemaValidatorBuilder isOnlyValidateLatest(boolean onlyValidateLatest) { + this.onlyValidateLatest = onlyValidateLatest; + return this; + } + + public ProtobufNativeSchemaValidator build() { + return new ProtobufNativeSchemaBreakValidatorImpl(strategy, onlyValidateLatest); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtilsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtilsTest.java new file mode 100644 index 0000000000000..59783dc78ffdf --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtilsTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import static com.google.protobuf.Descriptors.Descriptor; +import lombok.extern.slf4j.Slf4j; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; +import org.apache.pulsar.broker.service.schema.exceptions.ProtoBufCanReadCheckException; +import org.apache.pulsar.client.schema.proto.reader.Reader; +import org.apache.pulsar.client.schema.proto.writer.Writer; +import org.apache.pulsar.client.schema.proto.writerWithAddHasDefaultValue.WriterWithAddHasDefaultValue; +import org.apache.pulsar.client.schema.proto.writerWithAddNoDefaultValue.WriterWithAddNoDefaultValue; +import org.apache.pulsar.client.schema.proto.writerWithFieldNameChange.WriterWithFieldNameChange; +import org.apache.pulsar.client.schema.proto.writerWithRemoveDefaultValueField.WriterWithRemoveDefaultValueField; +import org.apache.pulsar.client.schema.proto.writerWithRemoveNoDefaultValueField.WriterWithRemoveNoDefaultValueField; +import org.apache.pulsar.client.schema.proto.writerAddRequiredField.WriterAddRequiredField; +import org.apache.pulsar.client.schema.proto.writerDeleteRequiredField.WriterDeleteRequiredField; +import org.apache.pulsar.client.schema.proto.writerWithEnumAdd.WriterWithEnumAdd; +import org.apache.pulsar.client.schema.proto.writerWithEnumDelete.WriterWithEnumDelete; +import org.apache.pulsar.client.schema.proto.writerWithFieldNumberChange.WriterWithFieldNumberChange; +import org.apache.pulsar.client.schema.proto.writerWithFieldTypeChange.WriterWithFieldTypeChange; +import org.apache.pulsar.client.schema.proto.writerWithTypeNameChange.WriterWithTypeNameChange; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +@Slf4j +public class ProtobufNativeSchemaBreakCheckUtilsTest { + private Descriptor readDescriptor; + + @BeforeTest + private void initReadSchema() { + this.readDescriptor = Reader.ProtobufSchema.getDescriptor(); + assertNotNull(readDescriptor); + } + + @Test + public void testCheckSchemaCompatibilityWithSameVersion() + throws ProtoBufCanReadCheckException { + Descriptor writtenDescriptor = Writer.ProtobufSchema.getDescriptor(); + assertNotNull(writtenDescriptor); + ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writtenDescriptor, readDescriptor); + } + + @Test + public void testCheckSchemaCompatibilityWithAddNoDefaultValueField( + ) throws ProtoBufCanReadCheckException { + Descriptor writerWithAddNoDefaultValue = WriterWithAddNoDefaultValue.ProtobufSchema.getDescriptor(); + assertNotNull(writerWithAddNoDefaultValue); + ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writerWithAddNoDefaultValue, + readDescriptor); + } + + @Test + public void testCheckSchemaCompatibilityWithAddDefaultValueField( + ) throws ProtoBufCanReadCheckException { + Descriptor writerWithAddHasDefaultValue = WriterWithAddHasDefaultValue.ProtobufSchema.getDescriptor(); + assertNotNull(writerWithAddHasDefaultValue); + ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writerWithAddHasDefaultValue, + readDescriptor); + } + + @Test + public void testCheckSchemaCompatibilityWithRemoveNoDefaultValueField( + ) { + Descriptor writerWithRemoveNoDefaultValueField = WriterWithRemoveNoDefaultValueField.ProtobufSchema.getDescriptor(); + assertNotNull(writerWithRemoveNoDefaultValueField); + try { + ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writerWithRemoveNoDefaultValueField, + readDescriptor); + fail("Schema should be incompatible"); + } catch (ProtoBufCanReadCheckException e) { + log.warn(e.getMessage()); + assertEquals(e.getMessage(), "No default value fields have been added or removed."); + } + } + + @Test + public void testCheckSchemaCompatibilityWithRemoveDefaultValueField( + ) throws ProtoBufCanReadCheckException { + Descriptor writerWithRemoveDefaultValueField = WriterWithRemoveDefaultValueField.ProtobufSchema.getDescriptor(); + assertNotNull(writerWithRemoveDefaultValueField); + ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writerWithRemoveDefaultValueField, + readDescriptor); + } + + @Test + public void testCheckSchemaCompatibilityWithRequiredFieldChange() { + // add required field + Descriptor writtenAddRequiredDescriptor = WriterAddRequiredField.ProtobufSchema.getDescriptor(); + assertNotNull(writtenAddRequiredDescriptor); + try { + ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writtenAddRequiredDescriptor, + readDescriptor); + fail("Schema should be incompatible"); + } catch (ProtoBufCanReadCheckException e) { + log.warn(e.getMessage()); + assertEquals(e.getMessage(), "Required Fields have been modified."); + } + + // delete required field + Descriptor writtenDeleteRequiredDescriptor = WriterDeleteRequiredField.ProtobufSchema.getDescriptor(); + assertNotNull(writtenDeleteRequiredDescriptor); + try { + ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writtenDeleteRequiredDescriptor, + readDescriptor); + fail("Schema should be incompatible"); + } catch (ProtoBufCanReadCheckException e) { + log.warn(e.getMessage()); + assertEquals(e.getMessage(), "Required Fields have been modified."); + } + } + + @Test + public void testCheckSchemaCompatibilityWithFieldTypeChange() { + Descriptor writerWithFieldTypeChange = WriterWithFieldTypeChange.ProtobufSchema.getDescriptor(); + assertNotNull(writerWithFieldTypeChange); + try { + ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writerWithFieldTypeChange, + readDescriptor); + fail("Schema should be incompatible"); + } catch (ProtoBufCanReadCheckException e) { + log.warn(e.getMessage()); + assertEquals(e.getMessage(), "The field type have been changed."); + } + } + + @Test + public void testCheckSchemaCompatibilityWithFieldTypeNameChange() { + Descriptor writerWithTypeNameChange = WriterWithTypeNameChange.ProtobufSchema.getDescriptor(); + assertNotNull(writerWithTypeNameChange); + try { + ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writerWithTypeNameChange, + readDescriptor); + fail("Schema should be incompatible"); + } catch (ProtoBufCanReadCheckException e) { + log.warn(e.getMessage()); + assertEquals(e.getMessage(), "The field type name have been changed."); + } + } + + @Test + public void testCheckSchemaCompatibilityWithFieldNumberChange() { + Descriptor writerWithFieldNumberChange = WriterWithFieldNumberChange.ProtobufSchema.getDescriptor(); + assertNotNull(writerWithFieldNumberChange); + try { + ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writerWithFieldNumberChange, + readDescriptor); + fail("Schema should be incompatible"); + } catch (ProtoBufCanReadCheckException e) { + log.warn(e.getMessage()); + assertEquals(e.getMessage(), "The field number of the field have been changed."); + } + } + + @Test + public void testCheckSchemaCompatibilityWithFieldNameChange() { + Descriptor writerWithFieldNameChange = WriterWithFieldNameChange.ProtobufSchema.getDescriptor(); + assertNotNull(writerWithFieldNameChange); + try { + ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writerWithFieldNameChange, + readDescriptor); + fail("Schema should be incompatible"); + } catch (ProtoBufCanReadCheckException e) { + log.warn(e.getMessage()); + assertEquals(e.getMessage(), "The field name of the field have been changed."); + } + } + + @Test + public void testCheckSchemaCompatibilityWithEnumChange() + throws ProtoBufCanReadCheckException { + // add enum field + Descriptor writerWithEnumAdd = WriterWithEnumAdd.ProtobufSchema.getDescriptor(); + assertNotNull(writerWithEnumAdd); + ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writerWithEnumAdd, + readDescriptor); + + // delete enum field + Descriptor writerWithEnumDelete = WriterWithEnumDelete.ProtobufSchema.getDescriptor(); + assertNotNull(writerWithEnumDelete); + ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writerWithEnumDelete, + readDescriptor); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheckTest.java index dead7797c6811..bb3e9238f553f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheckTest.java @@ -18,15 +18,23 @@ */ package org.apache.pulsar.broker.service.schema; +import static com.google.protobuf.Descriptors.Descriptor; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaUtils; +import org.apache.pulsar.client.schema.proto.reader.Reader; +import org.apache.pulsar.client.schema.proto.writerWithAddHasDefaultValue.WriterWithAddHasDefaultValue; +import org.apache.pulsar.client.schema.proto.writerWithAddNoDefaultValue.WriterWithAddNoDefaultValue; +import org.apache.pulsar.client.schema.proto.writerWithRemoveNoDefaultValueField.WriterWithRemoveNoDefaultValueField; +import org.apache.pulsar.client.schema.proto.writerWithRemoveDefaultValueField.WriterWithRemoveDefaultValueField; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.Arrays; +import java.util.Collections; -import static com.google.protobuf.Descriptors.Descriptor; - +@Slf4j @Test(groups = "broker") public class ProtobufNativeSchemaCompatibilityCheckTest { @@ -34,17 +42,130 @@ public class ProtobufNativeSchemaCompatibilityCheckTest { private static final SchemaData schemaData2 = getSchemaData(org.apache.pulsar.client.api.schema.proto.Test.SubMessage.getDescriptor()); + private static final SchemaData reader = getSchemaData(Reader.ProtobufSchema.getDescriptor()); + private static final SchemaData reader2 = getSchemaData(Reader.ProtobufSchema.getDescriptor()); + private static final SchemaData writerWithAddHasDefaultValue = getSchemaData(WriterWithAddHasDefaultValue.ProtobufSchema.getDescriptor()); + private static final SchemaData writerWithAddNoDefaultValue = getSchemaData(WriterWithAddNoDefaultValue.ProtobufSchema.getDescriptor()); + private static final SchemaData writerWithRemoveNoDefaultValueField = getSchemaData(WriterWithRemoveNoDefaultValueField.ProtobufSchema.getDescriptor()); + private static final SchemaData writerWithRemoveDefaultValueField = getSchemaData(WriterWithRemoveDefaultValueField.ProtobufSchema.getDescriptor()); + /** * make sure protobuf root message isn't allow change */ @Test public void testRootMessageChange() { - ProtobufNativeSchemaCompatibilityCheck compatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); + SchemaCompatibilityCheck compatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); Assert.assertFalse(compatibilityCheck.isCompatible(schemaData2, schemaData1, SchemaCompatibilityStrategy.FULL), "Protobuf root message isn't allow change"); } + @Test + public void testBackwardCompatibility() { + SchemaCompatibilityCheck schemaCompatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); + // adding a field with default is backwards compatible + log.info("adding a field with default is backwards compatible"); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(reader, writerWithAddHasDefaultValue, + SchemaCompatibilityStrategy.BACKWARD), + "adding a field with default is backwards compatible"); + // adding a field without default is NOT backwards compatible + log.info("adding a field without default is NOT backwards compatible"); + Assert.assertFalse(schemaCompatibilityCheck.isCompatible(reader, writerWithAddNoDefaultValue, + SchemaCompatibilityStrategy.BACKWARD), + "adding a field without default is NOT backwards compatible"); + // removing a field with no default is backwards compatible + log.info("removing a field with no default is backwards compatible"); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(reader, writerWithRemoveNoDefaultValueField, + SchemaCompatibilityStrategy.BACKWARD), + "removing a field with no default is backwards compatible"); + // removing a field with default value is backwards compatible + log.info("removing a field with default value is backwards compatible"); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(reader, writerWithRemoveDefaultValueField, + SchemaCompatibilityStrategy.BACKWARD), + "removing a field with default value is backwards compatible"); + } + + @Test + public void testForwardCompatibility() { + SchemaCompatibilityCheck schemaCompatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); + // adding a field with default is forward compatible + log.info("adding a field with default is forward compatible"); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(reader, writerWithAddHasDefaultValue, + SchemaCompatibilityStrategy.FORWARD), + "adding a field with default is forward compatible"); + // adding a field without default is forward compatible + log.info("adding a field without default is forward compatible"); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(reader, writerWithAddNoDefaultValue, + SchemaCompatibilityStrategy.FORWARD), + "adding a field without default is forward compatible"); + // removing a field with no default is not forward compatible + log.info("removing a field with no default is not forward compatible"); + Assert.assertFalse(schemaCompatibilityCheck.isCompatible(reader, writerWithRemoveNoDefaultValueField, + SchemaCompatibilityStrategy.FORWARD), + "removing a field with no default is not forward compatible"); + // removing a field with default value is forward compatible + log.info("removing a field with default value is forward compatible"); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(reader, writerWithRemoveDefaultValueField, + SchemaCompatibilityStrategy.FORWARD), + "removing a field with default value is forward compatible"); + } + + @Test + public void testFullCompatibility() { + SchemaCompatibilityCheck schemaCompatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(reader, writerWithAddHasDefaultValue, + SchemaCompatibilityStrategy.FULL), + "adding a field with default fully compatible"); + Assert.assertFalse(schemaCompatibilityCheck.isCompatible(reader, writerWithAddNoDefaultValue, + SchemaCompatibilityStrategy.FULL), + "adding a field without default is not fully compatible"); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(reader, writerWithRemoveDefaultValueField, + SchemaCompatibilityStrategy.FULL), + "removing a field with default is fully compatible"); + Assert.assertFalse(schemaCompatibilityCheck.isCompatible(reader, writerWithRemoveNoDefaultValueField, + SchemaCompatibilityStrategy.FULL), + "removing a field with no default is not fully compatible"); + } + + @Test + public void testBackwardTransitive() { + SchemaCompatibilityCheck schemaCompatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, reader2), writerWithAddHasDefaultValue, + SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE)); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, reader2, writerWithAddHasDefaultValue), + writerWithRemoveNoDefaultValueField, SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE)); + + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Collections.singletonList(reader), writerWithRemoveDefaultValueField, + SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE)); + + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, writerWithAddNoDefaultValue), + writerWithAddNoDefaultValue, SchemaCompatibilityStrategy.BACKWARD)); + Assert.assertFalse(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, writerWithAddNoDefaultValue), + writerWithAddNoDefaultValue, SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE)); + } + + @Test + public void testForwardTransitive() { + SchemaCompatibilityCheck schemaCompatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, reader2), writerWithRemoveDefaultValueField, + SchemaCompatibilityStrategy.FORWARD_TRANSITIVE)); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, reader2, writerWithRemoveDefaultValueField), + writerWithAddHasDefaultValue, SchemaCompatibilityStrategy.FORWARD_TRANSITIVE)); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, writerWithRemoveNoDefaultValueField), + writerWithRemoveNoDefaultValueField, SchemaCompatibilityStrategy.FORWARD)); + Assert.assertFalse(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, writerWithRemoveNoDefaultValueField), + writerWithRemoveNoDefaultValueField, SchemaCompatibilityStrategy.FORWARD_TRANSITIVE)); + } + + @Test + public void testFullTransitive() { + SchemaCompatibilityCheck schemaCompatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, writerWithRemoveDefaultValueField), + writerWithAddHasDefaultValue, SchemaCompatibilityStrategy.FULL)); + Assert.assertFalse(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, reader2), + writerWithAddNoDefaultValue, SchemaCompatibilityStrategy.FULL_TRANSITIVE)); + } + private static SchemaData getSchemaData(Descriptor descriptor) { return SchemaData.builder().data(ProtobufNativeSchemaUtils.serialize(descriptor)).type(SchemaType.PROTOBUF_NATIVE).build(); } diff --git a/pulsar-broker/src/test/proto/ExternalReader.proto b/pulsar-broker/src/test/proto/ExternalReader.proto new file mode 100644 index 0000000000000..8075ababf9e44 --- /dev/null +++ b/pulsar-broker/src/test/proto/ExternalReader.proto @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; +package proto.external; + +option java_package = "org.apache.pulsar.client.schema.proto.external"; +option java_outer_classname = "ExternalReader"; + +message ExternalReaderMessage { + required string externalStringField = 1; + optional double externalDoubleField = 2; +} \ No newline at end of file diff --git a/pulsar-broker/src/test/proto/Reader.proto b/pulsar-broker/src/test/proto/Reader.proto new file mode 100644 index 0000000000000..a6258bb18dcf9 --- /dev/null +++ b/pulsar-broker/src/test/proto/Reader.proto @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; +package proto.reader; + +import "ExternalReader.proto"; + +option java_package = "org.apache.pulsar.client.schema.proto.reader"; +option java_outer_classname = "Reader"; + +enum WeekEnum { + MONDAY = 0; + TUESDAY = 1; + WEDNESDAY = 2; + THURSDAY = 3; + FRIDAY = 4; + SATURDAY = 5; + SUNDAY = 6; +} + +message ProtobufMessage { + required string protobufFoo = 1; + required double protobufBar = 2; +} + +message ProtobufSchema { + required string schemaId = 1; + optional double doubleField = 2; + optional float floatField = 3; + optional int64 int64Field = 4; + optional uint64 uint64Field = 5; + optional int32 int32Field = 6; + optional fixed64 fixed64Field = 7; + optional fixed32 fixed32Field = 8; + optional bool boolField = 9 [default = false]; + optional string stringField = 10; + map groupField = 11; + optional ProtobufMessage messageField = 12; + optional bytes bytesField = 13; + optional uint32 uint32Field = 14; + optional WeekEnum enumField = 15; + optional sfixed32 sfixed32Field = 16; + optional sfixed64 sfixed64Field = 17; + optional sint32 sint32Field = 18; + optional sint64 sint64Field = 19; + optional proto.external.ExternalReaderMessage externalReaderMessage = 20; +} \ No newline at end of file diff --git a/pulsar-broker/src/test/proto/Writer.proto b/pulsar-broker/src/test/proto/Writer.proto new file mode 100644 index 0000000000000..0c6f120201dd7 --- /dev/null +++ b/pulsar-broker/src/test/proto/Writer.proto @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; +package proto.writer; + +import "ExternalReader.proto"; + +option java_package = "org.apache.pulsar.client.schema.proto.writer"; +option java_outer_classname = "Writer"; + +enum WeekEnum { + MONDAY = 0; + TUESDAY = 1; + WEDNESDAY = 2; + THURSDAY = 3; + FRIDAY = 4; + SATURDAY = 5; + SUNDAY = 6; +} + +message ProtobufMessage { + required string protobufFoo = 1; + required double protobufBar = 2; +} + +message ProtobufSchema { + required string schemaId = 1; + optional double doubleField = 2; + optional float floatField = 3; + optional int64 int64Field = 4; + optional uint64 uint64Field = 5; + optional int32 int32Field = 6; + optional fixed64 fixed64Field = 7; + optional fixed32 fixed32Field = 8; + optional bool boolField = 9 [default = false]; + optional string stringField = 10; + map groupField = 11; + optional ProtobufMessage messageField = 12; + optional bytes bytesField = 13; + optional uint32 uint32Field = 14; + optional WeekEnum enumField = 15; + optional sfixed32 sfixed32Field = 16; + optional sfixed64 sfixed64Field = 17; + optional sint32 sint32Field = 18; + optional sint64 sint64Field = 19; + optional proto.external.ExternalReaderMessage externalReaderMessage = 20; +} \ No newline at end of file diff --git a/pulsar-broker/src/test/proto/WriterAddRequiredField.proto b/pulsar-broker/src/test/proto/WriterAddRequiredField.proto new file mode 100644 index 0000000000000..f5ba61efee963 --- /dev/null +++ b/pulsar-broker/src/test/proto/WriterAddRequiredField.proto @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; +package proto.writerAddRequiredField; + +import "ExternalReader.proto"; + +option java_package = "org.apache.pulsar.client.schema.proto.writerAddRequiredField"; +option java_outer_classname = "WriterAddRequiredField"; + +enum WeekEnum { + MONDAY = 0; + TUESDAY = 1; + WEDNESDAY = 2; + THURSDAY = 3; + FRIDAY = 4; + SATURDAY = 5; + SUNDAY = 6; +} + +message ProtobufMessage { + required string protobufFoo = 1; + required double protobufBar = 2; + required double protobufBar_new = 3; +} + +message ProtobufSchema { + required string schemaId = 1; + optional double doubleField = 2; + optional float floatField = 3; + optional int64 int64Field = 4; + optional uint64 uint64Field = 5; + optional int32 int32Field = 6; + optional fixed64 fixed64Field = 7; + optional fixed32 fixed32Field = 8; + optional bool boolField = 9 [default = false]; + optional string stringField = 10; + map groupField = 11; + optional ProtobufMessage messageField = 12; + optional bytes bytesField = 13; + optional uint32 uint32Field = 14; + optional WeekEnum enumField = 15; + optional sfixed32 sfixed32Field = 16; + optional sfixed64 sfixed64Field = 17; + optional sint32 sint32Field = 18; + optional sint64 sint64Field = 19; + optional proto.external.ExternalReaderMessage externalReaderMessage = 20; + required string schemaId_new = 21; +} \ No newline at end of file diff --git a/pulsar-broker/src/test/proto/WriterDeleteRequiredField.proto b/pulsar-broker/src/test/proto/WriterDeleteRequiredField.proto new file mode 100644 index 0000000000000..3005e9e8636b8 --- /dev/null +++ b/pulsar-broker/src/test/proto/WriterDeleteRequiredField.proto @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; +package proto.writerDeleteRequiredField; + +import "ExternalReader.proto"; + +option java_package = "org.apache.pulsar.client.schema.proto.writerDeleteRequiredField"; +option java_outer_classname = "WriterDeleteRequiredField"; + +enum WeekEnum { + MONDAY = 0; + TUESDAY = 1; + WEDNESDAY = 2; + THURSDAY = 3; + FRIDAY = 4; + SATURDAY = 5; + SUNDAY = 6; +} + +message ProtobufMessage { + required string protobufFoo = 1; +} + +message ProtobufSchema { + required string schemaId = 1; + optional double doubleField = 2; + optional float floatField = 3; + optional int64 int64Field = 4; + optional uint64 uint64Field = 5; + optional int32 int32Field = 6; + optional fixed64 fixed64Field = 7; + optional fixed32 fixed32Field = 8; + optional bool boolField = 9 [default = false]; + optional string stringField = 10; + map groupField = 11; + optional ProtobufMessage messageField = 12; + optional bytes bytesField = 13; + optional uint32 uint32Field = 14; + optional WeekEnum enumField = 15; + optional sfixed32 sfixed32Field = 16; + optional sfixed64 sfixed64Field = 17; + optional sint32 sint32Field = 18; + optional sint64 sint64Field = 19; + optional proto.external.ExternalReaderMessage externalReaderMessage = 20; +} \ No newline at end of file diff --git a/pulsar-broker/src/test/proto/WriterWithAddHasDefaultValue.proto b/pulsar-broker/src/test/proto/WriterWithAddHasDefaultValue.proto new file mode 100644 index 0000000000000..8afdfb308008a --- /dev/null +++ b/pulsar-broker/src/test/proto/WriterWithAddHasDefaultValue.proto @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; +package proto.writerWithAddHasDefaultValue; + +import "ExternalReader.proto"; + +option java_package = "org.apache.pulsar.client.schema.proto.writerWithAddHasDefaultValue"; +option java_outer_classname = "WriterWithAddHasDefaultValue"; + +enum WeekEnum { + MONDAY = 0; + TUESDAY = 1; + WEDNESDAY = 2; + THURSDAY = 3; + FRIDAY = 4; + SATURDAY = 5; + SUNDAY = 6; +} + +message ProtobufMessage { + required string protobufFoo = 1; + required double protobufBar = 2; +} + +message ProtobufSchema { + required string schemaId = 1; + optional double doubleField = 2; + optional float floatField = 3; + optional int64 int64Field = 4; + optional uint64 uint64Field = 5; + optional int32 int32Field = 6; + optional fixed64 fixed64Field = 7; + optional fixed32 fixed32Field = 8; + optional bool boolField = 9 [default = false]; + optional string stringField = 10; + map groupField = 11; + optional ProtobufMessage messageField = 12; + optional bytes bytesField = 13; + optional uint32 uint32Field = 14; + optional WeekEnum enumField = 15; + optional sfixed32 sfixed32Field = 16; + optional sfixed64 sfixed64Field = 17; + optional sint32 sint32Field = 18; + optional sint64 sint64Field = 19; + optional proto.external.ExternalReaderMessage externalReaderMessage = 20; + optional bool boolField_new = 21 [default = true]; +} \ No newline at end of file diff --git a/pulsar-broker/src/test/proto/WriterWithAddNoDefaultValue.proto b/pulsar-broker/src/test/proto/WriterWithAddNoDefaultValue.proto new file mode 100644 index 0000000000000..8d01c8f419f24 --- /dev/null +++ b/pulsar-broker/src/test/proto/WriterWithAddNoDefaultValue.proto @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; +package proto.writerWithAddNoDefaultValue; + +import "ExternalReader.proto"; + +option java_package = "org.apache.pulsar.client.schema.proto.writerWithAddNoDefaultValue"; +option java_outer_classname = "WriterWithAddNoDefaultValue"; + +enum WeekEnum { + MONDAY = 0; + TUESDAY = 1; + WEDNESDAY = 2; + THURSDAY = 3; + FRIDAY = 4; + SATURDAY = 5; + SUNDAY = 6; +} + +message ProtobufMessage { + required string protobufFoo = 1; + required double protobufBar = 2; +} + +message ProtobufSchema { + required string schemaId = 1; + optional double doubleField = 2; + optional float floatField = 3; + optional int64 int64Field = 4; + optional uint64 uint64Field = 5; + optional int32 int32Field = 6; + optional fixed64 fixed64Field = 7; + optional fixed32 fixed32Field = 8; + optional bool boolField = 9 [default = false]; + optional string stringField = 10; + map groupField = 11; + optional ProtobufMessage messageField = 12; + optional bytes bytesField = 13; + optional uint32 uint32Field = 14; + optional WeekEnum enumField = 15; + optional sfixed32 sfixed32Field = 16; + optional sfixed64 sfixed64Field = 17; + optional sint32 sint32Field = 18; + optional sint64 sint64Field = 19; + optional proto.external.ExternalReaderMessage externalReaderMessage = 20; + optional bool boolField_new = 21; +} \ No newline at end of file diff --git a/pulsar-broker/src/test/proto/WriterWithEnumAdd.proto b/pulsar-broker/src/test/proto/WriterWithEnumAdd.proto new file mode 100644 index 0000000000000..f7d0587b6c577 --- /dev/null +++ b/pulsar-broker/src/test/proto/WriterWithEnumAdd.proto @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; +package proto.writerWithEnumAdd; + +import "ExternalReader.proto"; + +option java_package = "org.apache.pulsar.client.schema.proto.writerWithEnumAdd"; +option java_outer_classname = "WriterWithEnumAdd"; + +enum WeekEnum { + MONDAY = 0; + TUESDAY = 1; + WEDNESDAY = 2; + THURSDAY = 3; + FRIDAY = 4; + SATURDAY = 5; + SUNDAY = 6; + WEEK_ADD = 7; +} + +message ProtobufMessage { + required string protobufFoo = 1; + required double protobufBar = 2; +} + +message ProtobufSchema { + required string schemaId = 1; + optional double doubleField = 2; + optional float floatField = 3; + optional int64 int64Field = 4; + optional uint64 uint64Field = 5; + optional int32 int32Field = 6; + optional fixed64 fixed64Field = 7; + optional fixed32 fixed32Field = 8; + optional bool boolField = 9 [default = false]; + optional string stringField = 10; + map groupField = 11; + optional ProtobufMessage messageField = 12; + optional bytes bytesField = 13; + optional uint32 uint32Field = 14; + optional WeekEnum enumField = 15; + optional sfixed32 sfixed32Field = 16; + optional sfixed64 sfixed64Field = 17; + optional sint32 sint32Field = 18; + optional sint64 sint64Field = 19; + optional proto.external.ExternalReaderMessage externalReaderMessage = 20; +} \ No newline at end of file diff --git a/pulsar-broker/src/test/proto/WriterWithEnumDelete.proto b/pulsar-broker/src/test/proto/WriterWithEnumDelete.proto new file mode 100644 index 0000000000000..f539672efba58 --- /dev/null +++ b/pulsar-broker/src/test/proto/WriterWithEnumDelete.proto @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; +package proto.writerWithEnumDelete; + +import "ExternalReader.proto"; + +option java_package = "org.apache.pulsar.client.schema.proto.writerWithEnumDelete"; +option java_outer_classname = "WriterWithEnumDelete"; + +enum WeekEnum { +// MONDAY = 0; + TUESDAY = 1; + WEDNESDAY = 2; + THURSDAY = 3; + FRIDAY = 4; + SATURDAY = 5; + SUNDAY = 6; +} + +message ProtobufMessage { + required string protobufFoo = 1; + required double protobufBar = 2; +} + +message ProtobufSchema { + required string schemaId = 1; + optional double doubleField = 2; + optional float floatField = 3; + optional int64 int64Field = 4; + optional uint64 uint64Field = 5; + optional int32 int32Field = 6; + optional fixed64 fixed64Field = 7; + optional fixed32 fixed32Field = 8; + optional bool boolField = 9 [default = false]; + optional string stringField = 10; + map groupField = 11; + optional ProtobufMessage messageField = 12; + optional bytes bytesField = 13; + optional uint32 uint32Field = 14; + optional WeekEnum enumField = 15; + optional sfixed32 sfixed32Field = 16; + optional sfixed64 sfixed64Field = 17; + optional sint32 sint32Field = 18; + optional sint64 sint64Field = 19; + optional proto.external.ExternalReaderMessage externalReaderMessage = 20; +} \ No newline at end of file diff --git a/pulsar-broker/src/test/proto/WriterWithFieldNameChange.proto b/pulsar-broker/src/test/proto/WriterWithFieldNameChange.proto new file mode 100644 index 0000000000000..41d7b542f5e5b --- /dev/null +++ b/pulsar-broker/src/test/proto/WriterWithFieldNameChange.proto @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; +package proto.writerWithFieldNameChange; + +import "ExternalReader.proto"; + +option java_package = "org.apache.pulsar.client.schema.proto.writerWithFieldNameChange"; +option java_outer_classname = "WriterWithFieldNameChange"; + +enum WeekEnum { + MONDAY = 0; + TUESDAY = 1; + WEDNESDAY = 2; + THURSDAY = 3; + FRIDAY = 4; + SATURDAY = 5; + SUNDAY = 6; +} + +message ProtobufMessage { + required string protobufFoo = 1; + required double protobufBar = 2; +} + +message ProtobufSchema { + required string schemaId = 1; + optional double doubleField = 2; + optional float floatField = 3; + optional int64 int64Field = 4; + optional uint64 uint64Field = 5; + optional int32 int32Field_change = 6; + optional fixed64 fixed64Field = 7; + optional fixed32 fixed32Field = 8; + optional bool boolField = 9 [default = false]; + optional string stringField = 10; + map groupField = 11; + optional ProtobufMessage messageField = 12; + optional bytes bytesField = 13; + optional uint32 uint32Field = 14; + optional WeekEnum enumField = 15; + optional sfixed32 sfixed32Field = 16; + optional sfixed64 sfixed64Field = 17; + optional sint32 sint32Field = 18; + optional sint64 sint64Field = 19; + optional proto.external.ExternalReaderMessage externalReaderMessage = 20; +} \ No newline at end of file diff --git a/pulsar-broker/src/test/proto/WriterWithFieldNumberChange.proto b/pulsar-broker/src/test/proto/WriterWithFieldNumberChange.proto new file mode 100644 index 0000000000000..ea880dd774ed3 --- /dev/null +++ b/pulsar-broker/src/test/proto/WriterWithFieldNumberChange.proto @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; +package proto.writerWithFieldNumberChange; + +import "ExternalReader.proto"; + +option java_package = "org.apache.pulsar.client.schema.proto.writerWithFieldNumberChange"; +option java_outer_classname = "WriterWithFieldNumberChange"; + +enum WeekEnum { + MONDAY = 0; + TUESDAY = 1; + WEDNESDAY = 2; + THURSDAY = 3; + FRIDAY = 4; + SATURDAY = 5; + SUNDAY = 6; +} + +message ProtobufMessage { + required string protobufFoo = 1; + required double protobufBar = 2; +} + +message ProtobufSchema { + required string schemaId = 1; + optional double doubleField = 5; + optional float floatField = 3; + optional int64 int64Field = 4; +// optional uint64 uint64Field = 5; + optional int32 int32Field = 6; + optional fixed64 fixed64Field = 7; + optional fixed32 fixed32Field = 8; + optional bool boolField = 9 [default = false]; + optional string stringField = 10; + map groupField = 11; + optional ProtobufMessage messageField = 12; + optional bytes bytesField = 13; + optional uint32 uint32Field = 14; + optional WeekEnum enumField = 15; + optional sfixed32 sfixed32Field = 16; + optional sfixed64 sfixed64Field = 17; + optional sint32 sint32Field = 18; + optional sint64 sint64Field = 19; + optional proto.external.ExternalReaderMessage externalReaderMessage = 20; +} \ No newline at end of file diff --git a/pulsar-broker/src/test/proto/WriterWithFieldTypeChange.proto b/pulsar-broker/src/test/proto/WriterWithFieldTypeChange.proto new file mode 100644 index 0000000000000..e218c45ed033b --- /dev/null +++ b/pulsar-broker/src/test/proto/WriterWithFieldTypeChange.proto @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; +package proto.writerWithFieldTypeChange; + +import "ExternalReader.proto"; + +option java_package = "org.apache.pulsar.client.schema.proto.writerWithFieldTypeChange"; +option java_outer_classname = "WriterWithFieldTypeChange"; + +enum WeekEnum { + MONDAY = 0; + TUESDAY = 1; + WEDNESDAY = 2; + THURSDAY = 3; + FRIDAY = 4; + SATURDAY = 5; + SUNDAY = 6; +} + +message ProtobufMessage { + required string protobufFoo = 1; + required double protobufBar = 2; +} + +message ProtobufSchema { + required string schemaId = 1; + optional string doubleField = 2; + optional float floatField = 3; + optional int64 int64Field = 4; + optional uint64 uint64Field = 5; + optional int32 int32Field = 6; + optional fixed64 fixed64Field = 7; + optional fixed32 fixed32Field = 8; + optional bool boolField = 9 [default = false]; + optional string stringField = 10; + map groupField = 11; + optional ProtobufMessage messageField = 12; + optional bytes bytesField = 13; + optional uint32 uint32Field = 14; + optional WeekEnum enumField = 15; + optional sfixed32 sfixed32Field = 16; + optional sfixed64 sfixed64Field = 17; + optional sint32 sint32Field = 18; + optional sint64 sint64Field = 19; + optional proto.external.ExternalReaderMessage externalReaderMessage = 20; +} \ No newline at end of file diff --git a/pulsar-broker/src/test/proto/WriterWithRemoveDefaultValueField.proto b/pulsar-broker/src/test/proto/WriterWithRemoveDefaultValueField.proto new file mode 100644 index 0000000000000..975a8140eb5c1 --- /dev/null +++ b/pulsar-broker/src/test/proto/WriterWithRemoveDefaultValueField.proto @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; +package proto.writerWithRemoveDefaultValueField; + +import "ExternalReader.proto"; + +option java_package = "org.apache.pulsar.client.schema.proto.writerWithRemoveDefaultValueField"; +option java_outer_classname = "WriterWithRemoveDefaultValueField"; + +enum WeekEnum { + MONDAY = 0; + TUESDAY = 1; + WEDNESDAY = 2; + THURSDAY = 3; + FRIDAY = 4; + SATURDAY = 5; + SUNDAY = 6; +} + +message ProtobufMessage { + required string protobufFoo = 1; + required double protobufBar = 2; +} + +message ProtobufSchema { + required string schemaId = 1; + optional double doubleField = 2; + optional float floatField = 3; + optional int64 int64Field = 4; + optional uint64 uint64Field = 5; + optional int32 int32Field = 6; + optional fixed64 fixed64Field = 7; + optional fixed32 fixed32Field = 8; +// optional bool boolField = 9 [default = false]; + optional string stringField = 10; + map groupField = 11; + optional ProtobufMessage messageField = 12; + optional bytes bytesField = 13; + optional uint32 uint32Field = 14; + optional WeekEnum enumField = 15; + optional sfixed32 sfixed32Field = 16; + optional sfixed64 sfixed64Field = 17; + optional sint32 sint32Field = 18; + optional sint64 sint64Field = 19; + optional proto.external.ExternalReaderMessage externalReaderMessage = 20; +} \ No newline at end of file diff --git a/pulsar-broker/src/test/proto/WriterWithRemoveNoDefaultValueField.proto b/pulsar-broker/src/test/proto/WriterWithRemoveNoDefaultValueField.proto new file mode 100644 index 0000000000000..21493a00a7755 --- /dev/null +++ b/pulsar-broker/src/test/proto/WriterWithRemoveNoDefaultValueField.proto @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; +package proto.writerWithRemoveNoDefaultValueField; + +import "ExternalReader.proto"; + +option java_package = "org.apache.pulsar.client.schema.proto.writerWithRemoveNoDefaultValueField"; +option java_outer_classname = "WriterWithRemoveNoDefaultValueField"; + +enum WeekEnum { + MONDAY = 0; + TUESDAY = 1; + WEDNESDAY = 2; + THURSDAY = 3; + FRIDAY = 4; + SATURDAY = 5; + SUNDAY = 6; +} + +message ProtobufMessage { + required string protobufFoo = 1; + required double protobufBar = 2; +} + +message ProtobufSchema { + required string schemaId = 1; + optional double doubleField = 2; + optional float floatField = 3; + optional int64 int64Field = 4; + optional uint64 uint64Field = 5; + optional int32 int32Field = 6; + optional fixed64 fixed64Field = 7; + optional fixed32 fixed32Field = 8; + optional bool boolField = 9 [default = false]; + optional string stringField = 10; + map groupField = 11; + optional ProtobufMessage messageField = 12; + optional bytes bytesField = 13; + optional uint32 uint32Field = 14; + optional WeekEnum enumField = 15; +// optional sfixed32 sfixed32Field = 16; +// optional sfixed64 sfixed64Field = 17; +// optional sint32 sint32Field = 18; +// optional sint64 sint64Field = 19; + optional proto.external.ExternalReaderMessage externalReaderMessage = 20; +} \ No newline at end of file diff --git a/pulsar-broker/src/test/proto/WriterWithTypeNameChange.proto b/pulsar-broker/src/test/proto/WriterWithTypeNameChange.proto new file mode 100644 index 0000000000000..e424b70f83170 --- /dev/null +++ b/pulsar-broker/src/test/proto/WriterWithTypeNameChange.proto @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; +package proto.writerWithTypeNameChange; + +import "ExternalReader.proto"; + +option java_package = "org.apache.pulsar.client.schema.proto.writerWithTypeNameChange"; +option java_outer_classname = "WriterWithTypeNameChange"; + +enum WeekEnum { + MONDAY = 0; + TUESDAY = 1; + WEDNESDAY = 2; + THURSDAY = 3; + FRIDAY = 4; + SATURDAY = 5; + SUNDAY = 6; +} + +message ProtobufMessage_V2 { + required string protobufFoo = 1; + required double protobufBar = 2; +} + +message ProtobufSchema { + required string schemaId = 1; + optional double doubleField = 2; + optional float floatField = 3; + optional int64 int64Field = 4; + optional uint64 uint64Field = 5; + optional int32 int32Field = 6; + optional fixed64 fixed64Field = 7; + optional fixed32 fixed32Field = 8; + optional bool boolField = 9 [default = false]; + optional string stringField = 10; + map groupField = 11; + optional ProtobufMessage_V2 messageField = 12; + optional bytes bytesField = 13; + optional uint32 uint32Field = 14; + optional WeekEnum enumField = 15; + optional sfixed32 sfixed32Field = 16; + optional sfixed64 sfixed64Field = 17; + optional sint32 sint32Field = 18; + optional sint64 sint64Field = 19; + optional proto.external.ExternalReaderMessage externalReaderMessage = 20; +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java index 46a2f7d806a61..10db3bd196668 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java @@ -30,7 +30,9 @@ import java.util.Optional; import java.util.function.Consumer; import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.ToString; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.reader.ProtobufNativeReader; import org.apache.pulsar.client.impl.schema.writer.ProtobufNativeWriter; @@ -47,13 +49,18 @@ public class ProtobufNativeSchema extends Abstract @Getter @AllArgsConstructor + @EqualsAndHashCode + @ToString public static class ProtoBufParsingInfo { private final int number; private final String name; private final String type; + @EqualsAndHashCode.Exclude + private final String typeName; private final String label; // For future nested fields private final Map definition; + private final boolean hasDefaultValue; } private static Descriptors.Descriptor createProtobufNativeSchema(Class pojo) { @@ -84,7 +91,8 @@ private String getParsingInfo(T protoMessageInstance) { public void accept(Descriptors.FieldDescriptor fieldDescriptor) { protoBufParsingInfos.add(new ProtoBufParsingInfo(fieldDescriptor.getNumber(), fieldDescriptor.getName(), fieldDescriptor.getType().name(), - fieldDescriptor.toProto().getLabel().name(), null)); + fieldDescriptor.toProto().getTypeName(), fieldDescriptor.toProto().getLabel().name(), null, + fieldDescriptor.hasDefaultValue())); } }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java index 2dd90ce0c3fa1..cdeb6582302fd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java @@ -24,6 +24,8 @@ import com.google.protobuf.Descriptors; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.SchemaSerializationException; @@ -154,6 +156,29 @@ private static void deserializeFileDescriptor(FileDescriptorProto fileDescriptor } } + public static Map getSchemaDependenciesFileDescriptorCache( + Descriptors.Descriptor rootDescriptor) { + Map dependenciesFileDescriptorCache = new HashMap<>(); + serializeFileDescriptor(rootDescriptor.getFile(), dependenciesFileDescriptorCache); + return dependenciesFileDescriptorCache; + } + + public static void coverAllNestedAndEnumFileDescriptor(FileDescriptorProto fileDescriptorProto, + Map> + fileDescriptorCache) { + fileDescriptorProto.getMessageTypeList().forEach(descriptorProto -> { + List protoBufParsingInfoList = new LinkedList<>(); + descriptorProto.getFieldList().forEach(fieldDescriptorProto -> { + protoBufParsingInfoList.add(new ProtobufNativeSchema.ProtoBufParsingInfo( + fieldDescriptorProto.getNumber(), fieldDescriptorProto.getName(), + fieldDescriptorProto.getType().name(), fieldDescriptorProto.getTypeName(), + fieldDescriptorProto.getLabel().name(), null, + fieldDescriptorProto.hasDefaultValue())); + }); + fileDescriptorCache.put(descriptorProto.getName(), protoBufParsingInfoList); + }); + } + private static final Logger logger = LoggerFactory.getLogger(ProtobufNativeSchemaUtils.class); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtilsTest.java index e82f93718e1d9..b363e9569ae90 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtilsTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtilsTest.java @@ -18,10 +18,16 @@ */ package org.apache.pulsar.client.impl.schema; +import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; +import lombok.extern.slf4j.Slf4j; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +@Slf4j public class ProtobufNativeSchemaUtilsTest { @Test @@ -35,7 +41,32 @@ public static void testSerialize() { @Test public static void testNestedMessage() { + byte[] data = + ProtobufNativeSchemaUtils.serialize(org.apache.pulsar.client.schema.proto.Test.SubMessage.NestedMessage.getDescriptor()); + Descriptors.Descriptor descriptor = ProtobufNativeSchemaUtils.deserialize(data); + Assert.assertNotNull(descriptor); + } + + @Test + public void testGetSchemaDependenciesFileDescriptorCache() { + byte[] data = ProtobufNativeSchemaUtils.serialize(org.apache.pulsar.client.schema.proto.Test.TestMessage.getDescriptor()); + Descriptors.Descriptor descriptor = ProtobufNativeSchemaUtils.deserialize(data); + Map schemaDependenciesFileDescriptorCache = + ProtobufNativeSchemaUtils.getSchemaDependenciesFileDescriptorCache(descriptor); + Assert.assertNotNull(schemaDependenciesFileDescriptorCache); + } + + @Test + public void testCoverAllNestedAndEnumFileDescriptor() { + byte[] data = ProtobufNativeSchemaUtils.serialize(org.apache.pulsar.client.schema.proto.Test.TestMessage.getDescriptor()); + Descriptors.Descriptor descriptor = ProtobufNativeSchemaUtils.deserialize(data); + Map> schemaAllProto = new HashMap<>(); + ProtobufNativeSchemaUtils.getSchemaDependenciesFileDescriptorCache(descriptor) + .forEach((s, fileDescriptorProto) -> + ProtobufNativeSchemaUtils.coverAllNestedAndEnumFileDescriptor(fileDescriptorProto, + schemaAllProto)); + Assert.assertNotNull(schemaAllProto); } } From 8f5497c328d41c0b565f5a4871ef4767344e5157 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Sun, 5 Mar 2023 20:18:32 +0800 Subject: [PATCH 2/2] add a flag to modify ProtobufNativeSchemaValidator different implementations in the conf; fix test; when the field type changes there is no check, only a warning. --- conf/broker.conf | 4 + conf/standalone.conf | 4 + .../pulsar/broker/ServiceConfiguration.java | 7 ++ .../apache/pulsar/broker/PulsarService.java | 3 +- .../ProtobufNativeSchemaBreakCheckUtils.java | 11 +-- ...rotobufNativeSchemaCompatibilityCheck.java | 13 ++- .../service/schema/SchemaRegistryService.java | 11 ++- ...otobufNativeAlwaysCompatibleValidator.java | 2 +- ...rotobufNativeNeverCompatibleValidator.java | 4 +- .../ProtobufNativeSchemaValidator.java | 11 ++- .../ProtobufNativeSchemaValidatorBuilder.java | 17 +++- ...otobufNativeSchemaBreakCheckUtilsTest.java | 30 +++---- ...bufNativeSchemaCompatibilityCheckTest.java | 80 ++++++++++++++----- 13 files changed, 134 insertions(+), 63 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index f64d08a1de88c..ea219c14a876b 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -634,6 +634,10 @@ schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.Json # Available values: ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE +# Class name of the SchemaValidator implementation used to check whether a protobufNative schema is compatible with another protobufNative schema. +# Available values: org.apache.pulsar.broker.service.schema.validator.ProtobufNativeSchemaBreakValidatorImpl +protoBufNativeSchemaValidatorClassName= + # Enable or disable topic level policies, topic level policies depends on the system topic # Please enable the system topic first. topicLevelPoliciesEnabled=true diff --git a/conf/standalone.conf b/conf/standalone.conf index ed883406883ed..cd7eda6b8ceae 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -492,6 +492,10 @@ schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.Json # Available values: ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE +# Class name of the SchemaValidator implementation used to check whether a protobufNative schema is compatible with another protobufNative schema. +# Available values: org.apache.pulsar.broker.service.schema.validator.ProtobufNativeSchemaBreakValidatorImpl +protoBufNativeSchemaValidatorClassName= + # Enable or disable topic level policies, topic level policies depends on the system topic # Please enable the system topic first. topicLevelPoliciesEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 106410d855e22..5089245a204c2 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2666,6 +2666,13 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se ) private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL; + @FieldContext( + category = CATEGORY_SCHEMA, + doc = "Class name of the SchemaValidator implementation used to check whether a protobufNative schema " + + "is compatible with another protobufNative schema." + ) + private String protoBufNativeSchemaValidatorClassName = ""; + /**** --- WebSocket. --- ****/ @FieldContext( category = CATEGORY_WEBSOCKET, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index ae0fb1a9f283c..ab8bf44d6b3f5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -766,7 +766,8 @@ public void start() throws PulsarServerException { schemaStorage = createAndStartSchemaStorage(); schemaRegistryService = SchemaRegistryService.create( - schemaStorage, config.getSchemaRegistryCompatibilityCheckers(), this.executor); + schemaStorage, config.getSchemaRegistryCompatibilityCheckers(), + config.getProtoBufNativeSchemaValidatorClassName(), this.executor); OffloadPoliciesImpl defaultOffloadPolicies = OffloadPoliciesImpl.create(this.getConfiguration().getProperties()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtils.java index d1ff73377c413..1042bb8d36573 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtils.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtils.java @@ -124,8 +124,8 @@ private static void check(List writtenSchemaFieldInfoList, && readSchemaFieldInfo.getNumber() == writtenSchemaFieldInfo.getNumber() && readSchemaFieldInfo.getName().equals(writtenSchemaFieldInfo.getName()) && !readSchemaFieldInfo.getType().equals(writtenSchemaFieldInfo.getType())) { - // TODO: field type check, need to discuss first. - checkFieldTypeChanged(writtenSchemaFieldInfo, readSchemaFieldInfo); + log.warn("The field type for a field with number {} has been changed.", + readSchemaFieldInfo.getNumber()); } else if (writtenSchemaFieldInfoByFieldNameMap.containsKey(readSchemaFieldInfo.getName()) && writtenSchemaFieldInfoByFieldNameMap.get(readSchemaFieldInfo.getName()).getNumber() != readSchemaFieldInfo.getNumber()) { @@ -152,13 +152,6 @@ private static void check(List writtenSchemaFieldInfoList, } } - private static void checkFieldTypeChanged(ProtoBufParsingInfo writtenSchemaFieldInfo, - ProtoBufParsingInfo readSchemaFieldInfo) - throws ProtoBufCanReadCheckException { - // TODO: field type check, need to discuss first. - // throw new ProtoBufCanReadCheckException("The field type have been changed."); - } - private static List fillUpProtoBufParsingInfoList( Map protoBufParsingInfoMap, int maxCapacity) { List fullProtoBufParsingInfoList = new LinkedList<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java index 4e461f8b458bb..ad2b0dae7c239 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java @@ -41,6 +41,8 @@ @Slf4j public class ProtobufNativeSchemaCompatibilityCheck implements SchemaCompatibilityCheck { + private String schemaValidatorClassName; + @Override public SchemaType getSchemaType() { return SchemaType.PROTOBUF_NATIVE; @@ -69,9 +71,9 @@ public void checkCompatible(Iterable from, SchemaData to, SchemaComp } } - static ProtobufNativeSchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy) { - final ProtobufNativeSchemaValidatorBuilder schemaValidatorBuilder = new - ProtobufNativeSchemaValidatorBuilder(); + private ProtobufNativeSchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy) { + final ProtobufNativeSchemaValidatorBuilder schemaValidatorBuilder = new ProtobufNativeSchemaValidatorBuilder() + .validatorClassName(schemaValidatorClassName); return switch (compatibilityStrategy) { case BACKWARD_TRANSITIVE -> schemaValidatorBuilder .validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanReadExistingStrategy) @@ -95,4 +97,9 @@ static ProtobufNativeSchemaValidator createSchemaValidator(SchemaCompatibilitySt default -> ProtobufNativeNeverCompatibleValidator.INSTANCE; }; } + + public void setProtobufNativeSchemaValidatorClassName(String schemaValidatorClassName) { + this.schemaValidatorClassName = schemaValidatorClassName; + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java index 3c5e3aae7ff5d..dfa051338f944 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java @@ -33,21 +33,28 @@ public interface SchemaRegistryService extends SchemaRegistry { Logger LOG = LoggerFactory.getLogger(SchemaRegistryService.class); long NO_SCHEMA_VERSION = -1L; - static Map getCheckers(Set checkerClasses) throws Exception { + static Map getCheckers( + Set checkerClasses, String protobufNativeSchemaValidatorClassName) throws Exception { Map checkers = new HashMap<>(); for (String className : checkerClasses) { SchemaCompatibilityCheck schemaCompatibilityCheck = Reflections.createInstance(className, SchemaCompatibilityCheck.class, Thread.currentThread().getContextClassLoader()); + if (schemaCompatibilityCheck instanceof ProtobufNativeSchemaCompatibilityCheck) { + ((ProtobufNativeSchemaCompatibilityCheck) schemaCompatibilityCheck) + .setProtobufNativeSchemaValidatorClassName(protobufNativeSchemaValidatorClassName); + } checkers.put(schemaCompatibilityCheck.getSchemaType(), schemaCompatibilityCheck); } return checkers; } static SchemaRegistryService create(SchemaStorage schemaStorage, Set schemaRegistryCompatibilityCheckers, + String protobufNativeSchemaValidatorClassName, ScheduledExecutorService scheduler) { if (schemaStorage != null) { try { - Map checkers = getCheckers(schemaRegistryCompatibilityCheckers); + Map checkers = getCheckers(schemaRegistryCompatibilityCheckers, + protobufNativeSchemaValidatorClassName); checkers.put(SchemaType.KEY_VALUE, new KeyValueSchemaCompatibilityCheck(checkers)); return SchemaRegistryServiceWithSchemaDataValidator.of( new SchemaRegistryServiceImpl(schemaStorage, checkers, scheduler)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeAlwaysCompatibleValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeAlwaysCompatibleValidator.java index d380f285898c1..a341124396617 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeAlwaysCompatibleValidator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeAlwaysCompatibleValidator.java @@ -26,7 +26,7 @@ public class ProtobufNativeAlwaysCompatibleValidator implements ProtobufNativeSc new ProtobufNativeAlwaysCompatibleValidator(); @Override - public void validate(Iterable fromDescriptor, Descriptors.Descriptor toDescriptor) + public void validate(Iterable fromDescriptors, Descriptors.Descriptor toDescriptor) throws ProtoBufCanReadCheckException { return; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeNeverCompatibleValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeNeverCompatibleValidator.java index bff23aed321fa..21b49f17606aa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeNeverCompatibleValidator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeNeverCompatibleValidator.java @@ -28,9 +28,9 @@ public class ProtobufNativeNeverCompatibleValidator implements ProtobufNativeSch public static final ProtobufNativeNeverCompatibleValidator INSTANCE = new ProtobufNativeNeverCompatibleValidator(); @Override - public void validate(Iterable fromDescriptor, Descriptors.Descriptor toDescriptor) + public void validate(Iterable fromDescriptors, Descriptors.Descriptor toDescriptor) throws ProtoBufCanReadCheckException { - for (Descriptors.Descriptor descriptor : fromDescriptor) { + for (Descriptors.Descriptor descriptor : fromDescriptors) { throw new ProtoBufCanReadCheckException("Unknown SchemaCompatibilityStrategy."); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidator.java index 5e45e46e19651..c6b8ec9934db5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidator.java @@ -23,6 +23,15 @@ public interface ProtobufNativeSchemaValidator { - void validate(Iterable fromDescriptor, Descriptors.Descriptor toDescriptor) + void validate(Iterable fromDescriptors, Descriptors.Descriptor toDescriptor) throws ProtoBufCanReadCheckException; + + ProtobufNativeSchemaValidator DEFAULT = (fromDescriptors, toDescriptor) -> { + for (Descriptors.Descriptor fromDescriptor : fromDescriptors) { + if (!fromDescriptor.getFullName().equals(toDescriptor.getFullName())) { + throw new ProtoBufCanReadCheckException("Protobuf root message isn't allow change!"); + } + } + }; + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidatorBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidatorBuilder.java index 2397c2a030299..1626f3765712f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidatorBuilder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaValidatorBuilder.java @@ -19,10 +19,13 @@ package org.apache.pulsar.broker.service.schema.validator; import lombok.NonNull; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.util.Reflections; public class ProtobufNativeSchemaValidatorBuilder { private ProtobufNativeSchemaValidationStrategy strategy; private boolean onlyValidateLatest; + private String validatorClassName; public ProtobufNativeSchemaValidatorBuilder validatorStrategy( @NonNull ProtobufNativeSchemaValidationStrategy strategy) { @@ -35,7 +38,19 @@ public ProtobufNativeSchemaValidatorBuilder isOnlyValidateLatest(boolean onlyVal return this; } + public ProtobufNativeSchemaValidatorBuilder validatorClassName(String validatorClassName) { + this.validatorClassName = validatorClassName; + return this; + } + public ProtobufNativeSchemaValidator build() { - return new ProtobufNativeSchemaBreakValidatorImpl(strategy, onlyValidateLatest); + if (StringUtils.isBlank(validatorClassName)) { + return ProtobufNativeSchemaValidator.DEFAULT; + } else { + Object[] params = {strategy, onlyValidateLatest}; + Class[] paramTypes = {ProtobufNativeSchemaValidationStrategy.class, boolean.class}; + return (ProtobufNativeSchemaValidator) Reflections.createInstance(validatorClassName, + Thread.currentThread().getContextClassLoader(), params, paramTypes); + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtilsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtilsTest.java index 59783dc78ffdf..af840b84d6ddf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtilsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaBreakCheckUtilsTest.java @@ -52,16 +52,14 @@ private void initReadSchema() { } @Test - public void testCheckSchemaCompatibilityWithSameVersion() - throws ProtoBufCanReadCheckException { + public void testCheckSchemaCompatibilityWithSameVersion() throws ProtoBufCanReadCheckException { Descriptor writtenDescriptor = Writer.ProtobufSchema.getDescriptor(); assertNotNull(writtenDescriptor); ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writtenDescriptor, readDescriptor); } @Test - public void testCheckSchemaCompatibilityWithAddNoDefaultValueField( - ) throws ProtoBufCanReadCheckException { + public void testCheckSchemaCompatibilityWithAddNoDefaultValueField() throws ProtoBufCanReadCheckException { Descriptor writerWithAddNoDefaultValue = WriterWithAddNoDefaultValue.ProtobufSchema.getDescriptor(); assertNotNull(writerWithAddNoDefaultValue); ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writerWithAddNoDefaultValue, @@ -69,8 +67,7 @@ public void testCheckSchemaCompatibilityWithAddNoDefaultValueField( } @Test - public void testCheckSchemaCompatibilityWithAddDefaultValueField( - ) throws ProtoBufCanReadCheckException { + public void testCheckSchemaCompatibilityWithAddDefaultValueField() throws ProtoBufCanReadCheckException { Descriptor writerWithAddHasDefaultValue = WriterWithAddHasDefaultValue.ProtobufSchema.getDescriptor(); assertNotNull(writerWithAddHasDefaultValue); ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writerWithAddHasDefaultValue, @@ -78,8 +75,7 @@ public void testCheckSchemaCompatibilityWithAddDefaultValueField( } @Test - public void testCheckSchemaCompatibilityWithRemoveNoDefaultValueField( - ) { + public void testCheckSchemaCompatibilityWithRemoveNoDefaultValueField() { Descriptor writerWithRemoveNoDefaultValueField = WriterWithRemoveNoDefaultValueField.ProtobufSchema.getDescriptor(); assertNotNull(writerWithRemoveNoDefaultValueField); try { @@ -93,8 +89,7 @@ public void testCheckSchemaCompatibilityWithRemoveNoDefaultValueField( } @Test - public void testCheckSchemaCompatibilityWithRemoveDefaultValueField( - ) throws ProtoBufCanReadCheckException { + public void testCheckSchemaCompatibilityWithRemoveDefaultValueField() throws ProtoBufCanReadCheckException { Descriptor writerWithRemoveDefaultValueField = WriterWithRemoveDefaultValueField.ProtobufSchema.getDescriptor(); assertNotNull(writerWithRemoveDefaultValueField); ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writerWithRemoveDefaultValueField, @@ -129,17 +124,11 @@ public void testCheckSchemaCompatibilityWithRequiredFieldChange() { } @Test - public void testCheckSchemaCompatibilityWithFieldTypeChange() { + public void testCheckSchemaCompatibilityWithFieldTypeChange() throws ProtoBufCanReadCheckException { Descriptor writerWithFieldTypeChange = WriterWithFieldTypeChange.ProtobufSchema.getDescriptor(); assertNotNull(writerWithFieldTypeChange); - try { - ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writerWithFieldTypeChange, - readDescriptor); - fail("Schema should be incompatible"); - } catch (ProtoBufCanReadCheckException e) { - log.warn(e.getMessage()); - assertEquals(e.getMessage(), "The field type have been changed."); - } + ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writerWithFieldTypeChange, + readDescriptor); } @Test @@ -185,8 +174,7 @@ public void testCheckSchemaCompatibilityWithFieldNameChange() { } @Test - public void testCheckSchemaCompatibilityWithEnumChange() - throws ProtoBufCanReadCheckException { + public void testCheckSchemaCompatibilityWithEnumChange() throws ProtoBufCanReadCheckException { // add enum field Descriptor writerWithEnumAdd = WriterWithEnumAdd.ProtobufSchema.getDescriptor(); assertNotNull(writerWithEnumAdd); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheckTest.java index bb3e9238f553f..709b14d2609d0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheckTest.java @@ -30,6 +30,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.util.Arrays; import java.util.Collections; @@ -49,20 +50,29 @@ public class ProtobufNativeSchemaCompatibilityCheckTest { private static final SchemaData writerWithRemoveNoDefaultValueField = getSchemaData(WriterWithRemoveNoDefaultValueField.ProtobufSchema.getDescriptor()); private static final SchemaData writerWithRemoveDefaultValueField = getSchemaData(WriterWithRemoveDefaultValueField.ProtobufSchema.getDescriptor()); + @DataProvider(name = "protobufNativeSchemaValidatorDomain") + public static Object[] protobufNativeSchemaValidatorDomain() { + return new Object[]{ "", "org.apache.pulsar.broker.service.schema.validator.ProtobufNativeSchemaBreakValidatorImpl"}; + } + /** * make sure protobuf root message isn't allow change */ - @Test - public void testRootMessageChange() { - SchemaCompatibilityCheck compatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); - Assert.assertFalse(compatibilityCheck.isCompatible(schemaData2, schemaData1, + @Test(dataProvider = "protobufNativeSchemaValidatorDomain") + public void testRootMessageChange(String schemaValidator) { + SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCompatibilityCheck(schemaValidator); + Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData1, SchemaCompatibilityStrategy.FULL), "Protobuf root message isn't allow change"); } - @Test - public void testBackwardCompatibility() { - SchemaCompatibilityCheck schemaCompatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); + @Test(dataProvider = "protobufNativeSchemaValidatorDomain") + public void testBackwardCompatibility(String schemaValidator) { + SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCompatibilityCheck(schemaValidator); + if (schemaValidator.isBlank()) { + testRootMessageChange(schemaValidator); + return; + } // adding a field with default is backwards compatible log.info("adding a field with default is backwards compatible"); Assert.assertTrue(schemaCompatibilityCheck.isCompatible(reader, writerWithAddHasDefaultValue, @@ -85,9 +95,13 @@ public void testBackwardCompatibility() { "removing a field with default value is backwards compatible"); } - @Test - public void testForwardCompatibility() { - SchemaCompatibilityCheck schemaCompatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); + @Test(dataProvider = "protobufNativeSchemaValidatorDomain") + public void testForwardCompatibility(String schemaValidator) { + SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCompatibilityCheck(schemaValidator); + if (schemaValidator.isBlank()) { + testRootMessageChange(schemaValidator); + return; + } // adding a field with default is forward compatible log.info("adding a field with default is forward compatible"); Assert.assertTrue(schemaCompatibilityCheck.isCompatible(reader, writerWithAddHasDefaultValue, @@ -110,9 +124,13 @@ public void testForwardCompatibility() { "removing a field with default value is forward compatible"); } - @Test - public void testFullCompatibility() { - SchemaCompatibilityCheck schemaCompatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); + @Test(dataProvider = "protobufNativeSchemaValidatorDomain") + public void testFullCompatibility(String schemaValidator) { + SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCompatibilityCheck(schemaValidator); + if (schemaValidator.isBlank()) { + testRootMessageChange(schemaValidator); + return; + } Assert.assertTrue(schemaCompatibilityCheck.isCompatible(reader, writerWithAddHasDefaultValue, SchemaCompatibilityStrategy.FULL), "adding a field with default fully compatible"); @@ -127,9 +145,13 @@ public void testFullCompatibility() { "removing a field with no default is not fully compatible"); } - @Test - public void testBackwardTransitive() { - SchemaCompatibilityCheck schemaCompatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); + @Test(dataProvider = "protobufNativeSchemaValidatorDomain") + public void testBackwardTransitive(String schemaValidator) { + SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCompatibilityCheck(schemaValidator); + if (schemaValidator.isBlank()) { + testRootMessageChange(schemaValidator); + return; + } Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, reader2), writerWithAddHasDefaultValue, SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE)); Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, reader2, writerWithAddHasDefaultValue), @@ -144,9 +166,13 @@ public void testBackwardTransitive() { writerWithAddNoDefaultValue, SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE)); } - @Test - public void testForwardTransitive() { - SchemaCompatibilityCheck schemaCompatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); + @Test(dataProvider = "protobufNativeSchemaValidatorDomain") + public void testForwardTransitive(String schemaValidator) { + SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCompatibilityCheck(schemaValidator); + if (schemaValidator.isBlank()) { + testRootMessageChange(schemaValidator); + return; + } Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, reader2), writerWithRemoveDefaultValueField, SchemaCompatibilityStrategy.FORWARD_TRANSITIVE)); Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, reader2, writerWithRemoveDefaultValueField), @@ -157,9 +183,13 @@ public void testForwardTransitive() { writerWithRemoveNoDefaultValueField, SchemaCompatibilityStrategy.FORWARD_TRANSITIVE)); } - @Test - public void testFullTransitive() { - SchemaCompatibilityCheck schemaCompatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); + @Test(dataProvider = "protobufNativeSchemaValidatorDomain") + public void testFullTransitive(String schemaValidator) { + SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCompatibilityCheck(schemaValidator); + if (schemaValidator.isBlank()) { + testRootMessageChange(schemaValidator); + return; + } Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, writerWithRemoveDefaultValueField), writerWithAddHasDefaultValue, SchemaCompatibilityStrategy.FULL)); Assert.assertFalse(schemaCompatibilityCheck.isCompatible(Arrays.asList(reader, reader2), @@ -169,4 +199,10 @@ public void testFullTransitive() { private static SchemaData getSchemaData(Descriptor descriptor) { return SchemaData.builder().data(ProtobufNativeSchemaUtils.serialize(descriptor)).type(SchemaType.PROTOBUF_NATIVE).build(); } + + private static ProtobufNativeSchemaCompatibilityCheck getSchemaCompatibilityCheck(String schemaValidator) { + ProtobufNativeSchemaCompatibilityCheck compatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); + compatibilityCheck.setProtobufNativeSchemaValidatorClassName(schemaValidator); + return compatibilityCheck; + } }