Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-246: Improved PROTOBUF_NATIVE schema compatibility checks without using avro-protobuf #2

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,10 @@ schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.Json
# Available values: ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE

# Class name of the SchemaValidator implementation used to check whether a protobufNative schema is compatible with another protobufNative schema.
# Available values: org.apache.pulsar.broker.service.schema.validator.ProtobufNativeSchemaBreakValidatorImpl
protoBufNativeSchemaValidatorClassName=

# Enable or disable topic level policies, topic level policies depends on the system topic
# Please enable the system topic first.
topicLevelPoliciesEnabled=true
Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,10 @@ schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.Json
# Available values: ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE

# Class name of the SchemaValidator implementation used to check whether a protobufNative schema is compatible with another protobufNative schema.
# Available values: org.apache.pulsar.broker.service.schema.validator.ProtobufNativeSchemaBreakValidatorImpl
protoBufNativeSchemaValidatorClassName=

# Enable or disable topic level policies, topic level policies depends on the system topic
# Please enable the system topic first.
topicLevelPoliciesEnabled=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2666,6 +2666,13 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
)
private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL;

@FieldContext(
category = CATEGORY_SCHEMA,
doc = "Class name of the SchemaValidator implementation used to check whether a protobufNative schema "
+ "is compatible with another protobufNative schema."
)
private String protoBufNativeSchemaValidatorClassName = "";

/**** --- WebSocket. --- ****/
@FieldContext(
category = CATEGORY_WEBSOCKET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,8 @@ public void start() throws PulsarServerException {

schemaStorage = createAndStartSchemaStorage();
schemaRegistryService = SchemaRegistryService.create(
schemaStorage, config.getSchemaRegistryCompatibilityCheckers(), this.executor);
schemaStorage, config.getSchemaRegistryCompatibilityCheckers(),
config.getProtoBufNativeSchemaValidatorClassName(), this.executor);

OffloadPoliciesImpl defaultOffloadPolicies =
OffloadPoliciesImpl.create(this.getConfiguration().getProperties());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;

import static org.apache.pulsar.client.impl.schema.ProtobufNativeSchema.ProtoBufParsingInfo;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.ListUtils;
import org.apache.pulsar.broker.service.schema.exceptions.ProtoBufCanReadCheckException;
import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
import org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaUtils;

@Slf4j
public class ProtobufNativeSchemaBreakCheckUtils {

public static void checkSchemaCompatibility(Descriptors.Descriptor writtenSchema,
Descriptors.Descriptor readSchema)
throws ProtoBufCanReadCheckException {
String writtenSchemaRootName = writtenSchema.getName();
String readSchemaRootName = readSchema.getName();
if (!writtenSchemaRootName.equals(readSchemaRootName)) {
throw new ProtoBufCanReadCheckException("Protobuf root message isn't allow change!");
}

Map<String, List<ProtobufNativeSchema.ProtoBufParsingInfo>> writtenSchemaAllProto = new HashMap<>();
ProtobufNativeSchemaUtils.getSchemaDependenciesFileDescriptorCache(writtenSchema)
.forEach((s, fileDescriptorProto) -> {
ProtobufNativeSchemaUtils.coverAllNestedAndEnumFileDescriptor(fileDescriptorProto,
writtenSchemaAllProto);
});

Map<String, List<ProtobufNativeSchema.ProtoBufParsingInfo>> readSchemaAllProto = new HashMap<>();
ProtobufNativeSchemaUtils.getSchemaDependenciesFileDescriptorCache(readSchema)
.forEach((s, fileDescriptorProto) -> {
ProtobufNativeSchemaUtils.coverAllNestedAndEnumFileDescriptor(fileDescriptorProto,
readSchemaAllProto);
});

List<ProtoBufParsingInfo> writtenRootProtoBufParsingInfos = writtenSchemaAllProto.get(writtenSchemaRootName);
List<ProtoBufParsingInfo> readRootProtoBufParsingInfos = readSchemaAllProto.get(readSchemaRootName);
// root check first
check(writtenRootProtoBufParsingInfos, readRootProtoBufParsingInfos);

for (String writtenSchemaMessageName : writtenSchemaAllProto.keySet()) {
// skip root
if (!writtenSchemaMessageName.equals(writtenSchemaRootName)
&& readSchemaAllProto.containsKey(writtenSchemaMessageName)) {
List<ProtoBufParsingInfo> writtenProtoBufParsingInfoList =
writtenSchemaAllProto.get(writtenSchemaMessageName);
List<ProtoBufParsingInfo> readProtoBufParsingInfoList =
readSchemaAllProto.get(writtenSchemaMessageName);
check(writtenProtoBufParsingInfoList, readProtoBufParsingInfoList);
}
}
}

private static void check(List<ProtoBufParsingInfo> writtenSchemaFieldInfoList,
List<ProtoBufParsingInfo> readSchemaFieldInfoList)
throws ProtoBufCanReadCheckException {
List<ProtoBufParsingInfo> readSchemaRequiredFields = new LinkedList<>();
List<ProtoBufParsingInfo> writtenSchemaRequiredFields = new LinkedList<>();
Map<Integer, ProtoBufParsingInfo> readSchemaFieldInfoByFieldNumberMap = new HashMap<>();
Map<Integer, ProtoBufParsingInfo> writtenSchemaFieldInfoByFieldNumberMap = new HashMap<>();
Map<String, ProtoBufParsingInfo> writtenSchemaFieldInfoByFieldNameMap = new HashMap<>();

readSchemaFieldInfoList.forEach(readSchemaFieldInfo -> {
if (readSchemaFieldInfo.getLabel()
.equals(DescriptorProtos.FieldDescriptorProto.Label.LABEL_REQUIRED.name())) {
readSchemaRequiredFields.add(readSchemaFieldInfo);
}
readSchemaFieldInfoByFieldNumberMap.put(readSchemaFieldInfo.getNumber(), readSchemaFieldInfo);
});
writtenSchemaFieldInfoList.forEach(writtenSchemaFieldInfo -> {
if (writtenSchemaFieldInfo.getLabel()
.equals(DescriptorProtos.FieldDescriptorProto.Label.LABEL_REQUIRED.name())) {
writtenSchemaRequiredFields.add(writtenSchemaFieldInfo);
}
writtenSchemaFieldInfoByFieldNumberMap.put(writtenSchemaFieldInfo.getNumber(), writtenSchemaFieldInfo);
writtenSchemaFieldInfoByFieldNameMap.put(writtenSchemaFieldInfo.getName(), writtenSchemaFieldInfo);
});

if (!ListUtils.isEqualList(readSchemaRequiredFields, writtenSchemaRequiredFields)) {
throw new ProtoBufCanReadCheckException("Required Fields have been modified.");
}

int readSchemaMaxFieldNumber = readSchemaFieldInfoList.stream()
.mapToInt(ProtoBufParsingInfo::getNumber).max().orElse(0);
int writtenSchemaMaxFieldNumber = writtenSchemaFieldInfoList.stream()
.mapToInt(ProtoBufParsingInfo::getNumber).max().orElse(0);
int maxFieldNumber = Math.max(readSchemaMaxFieldNumber, writtenSchemaMaxFieldNumber);
readSchemaFieldInfoList = fillUpProtoBufParsingInfoList(readSchemaFieldInfoByFieldNumberMap,
maxFieldNumber);
writtenSchemaFieldInfoList = fillUpProtoBufParsingInfoList(writtenSchemaFieldInfoByFieldNumberMap,
maxFieldNumber);

ProtoBufParsingInfo readSchemaFieldInfo;
ProtoBufParsingInfo writtenSchemaFieldInfo;
for (int i = 0; i < maxFieldNumber; i++) {
readSchemaFieldInfo = readSchemaFieldInfoList.get(i);
writtenSchemaFieldInfo = writtenSchemaFieldInfoList.get(i);
if (readSchemaFieldInfo != null && !readSchemaFieldInfo.equals(writtenSchemaFieldInfo)) {
if (writtenSchemaFieldInfo != null
&& readSchemaFieldInfo.getNumber() == writtenSchemaFieldInfo.getNumber()
&& readSchemaFieldInfo.getName().equals(writtenSchemaFieldInfo.getName())
&& !readSchemaFieldInfo.getType().equals(writtenSchemaFieldInfo.getType())) {
log.warn("The field type for a field with number {} has been changed.",
readSchemaFieldInfo.getNumber());
} else if (writtenSchemaFieldInfoByFieldNameMap.containsKey(readSchemaFieldInfo.getName())
&& writtenSchemaFieldInfoByFieldNameMap.get(readSchemaFieldInfo.getName()).getNumber()
!= readSchemaFieldInfo.getNumber()) {
throw new ProtoBufCanReadCheckException("The field number of the field have been changed.");
} else if (writtenSchemaFieldInfoByFieldNumberMap.containsKey(readSchemaFieldInfo.getNumber())
&& !writtenSchemaFieldInfoByFieldNumberMap.get(readSchemaFieldInfo.getNumber()).getName()
.equals(readSchemaFieldInfo.getName())) {
throw new ProtoBufCanReadCheckException("The field name of the field have been changed.");
} else if (writtenSchemaFieldInfo == null && !readSchemaFieldInfo.isHasDefaultValue()) {
throw new ProtoBufCanReadCheckException("No default value fields have been added or removed.");
}
} else if (readSchemaFieldInfo != null && readSchemaFieldInfo.equals(writtenSchemaFieldInfo)
&& (readSchemaFieldInfo.getType()
.equals(DescriptorProtos.FieldDescriptorProto.Type.TYPE_ENUM.name())
|| readSchemaFieldInfo.getType()
.equals(DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE.name()))) {
String[] readSchemaFieldEnumName = readSchemaFieldInfo.getTypeName().split("\\.");
String[] writtenSchemaFieldEnumName = writtenSchemaFieldInfo.getTypeName().split("\\.");
if (!readSchemaFieldEnumName[readSchemaFieldEnumName.length - 1]
.equals(writtenSchemaFieldEnumName[writtenSchemaFieldEnumName.length - 1])) {
throw new ProtoBufCanReadCheckException("The field type name have been changed.");
}
}
}
}

private static List<ProtoBufParsingInfo> fillUpProtoBufParsingInfoList(
Map<Integer, ProtoBufParsingInfo> protoBufParsingInfoMap, int maxCapacity) {
List<ProtoBufParsingInfo> fullProtoBufParsingInfoList = new LinkedList<>();
for (int i = 0; i < maxCapacity; i++) {
int currentFieldNumber = i + 1;
fullProtoBufParsingInfoList.add(protoBufParsingInfoMap.getOrDefault(currentFieldNumber, null));
}
return fullProtoBufParsingInfoList;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,18 @@
*/
package org.apache.pulsar.broker.service.schema;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.protobuf.Descriptors.Descriptor;
import java.util.Collections;
import java.util.LinkedList;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.ProtoBufCanReadCheckException;
import org.apache.pulsar.broker.service.schema.validator.ProtobufNativeAlwaysCompatibleValidator;
import org.apache.pulsar.broker.service.schema.validator.ProtobufNativeNeverCompatibleValidator;
import org.apache.pulsar.broker.service.schema.validator.ProtobufNativeSchemaValidationStrategy;
import org.apache.pulsar.broker.service.schema.validator.ProtobufNativeSchemaValidator;
import org.apache.pulsar.broker.service.schema.validator.ProtobufNativeSchemaValidatorBuilder;
import org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaUtils;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
Expand All @@ -28,8 +38,11 @@
/**
* The {@link SchemaCompatibilityCheck} implementation for {@link SchemaType#PROTOBUF_NATIVE}.
*/
@Slf4j
public class ProtobufNativeSchemaCompatibilityCheck implements SchemaCompatibilityCheck {

private String schemaValidatorClassName;

@Override
public SchemaType getSchemaType() {
return SchemaType.PROTOBUF_NATIVE;
Expand All @@ -38,37 +51,55 @@ public SchemaType getSchemaType() {
@Override
public void checkCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy)
throws IncompatibleSchemaException {
Descriptor fromDescriptor = ProtobufNativeSchemaUtils.deserialize(from.getData());
Descriptor toDescriptor = ProtobufNativeSchemaUtils.deserialize(to.getData());
switch (strategy) {
case BACKWARD_TRANSITIVE:
case BACKWARD:
case FORWARD_TRANSITIVE:
case FORWARD:
case FULL_TRANSITIVE:
case FULL:
checkRootMessageChange(fromDescriptor, toDescriptor, strategy);
return;
case ALWAYS_COMPATIBLE:
return;
default:
throw new IncompatibleSchemaException("Unknown SchemaCompatibilityStrategy.");
}
checkCompatible(Collections.singletonList(from), to, strategy);
}

@Override
public void checkCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy)
throws IncompatibleSchemaException {
for (SchemaData schemaData : from) {
checkCompatible(schemaData, to, strategy);
checkArgument(from != null, "check compatibility list is null");
LinkedList<Descriptor> fromList = new LinkedList<>();
try {
for (SchemaData schemaData : from) {
fromList.addFirst(ProtobufNativeSchemaUtils.deserialize(schemaData.getData()));
}
Descriptor toDescriptor = ProtobufNativeSchemaUtils.deserialize(to.getData());
ProtobufNativeSchemaValidator schemaValidator = createSchemaValidator(strategy);
schemaValidator.validate(fromList, toDescriptor);
} catch (ProtoBufCanReadCheckException e) {
throw new IncompatibleSchemaException(e);
}
}

private void checkRootMessageChange(Descriptor fromDescriptor, Descriptor toDescriptor,
SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException {
if (!fromDescriptor.getFullName().equals(toDescriptor.getFullName())) {
throw new IncompatibleSchemaException("Protobuf root message isn't allow change!");
}
private ProtobufNativeSchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy) {
final ProtobufNativeSchemaValidatorBuilder schemaValidatorBuilder = new ProtobufNativeSchemaValidatorBuilder()
.validatorClassName(schemaValidatorClassName);
return switch (compatibilityStrategy) {
case BACKWARD_TRANSITIVE -> schemaValidatorBuilder
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanReadExistingStrategy)
.isOnlyValidateLatest(false).build();
case BACKWARD -> schemaValidatorBuilder
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanReadExistingStrategy)
.isOnlyValidateLatest(true).build();
case FORWARD_TRANSITIVE -> schemaValidatorBuilder
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadByExistingStrategy)
.isOnlyValidateLatest(false).build();
case FORWARD -> schemaValidatorBuilder
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadByExistingStrategy)
.isOnlyValidateLatest(true).build();
case FULL_TRANSITIVE -> schemaValidatorBuilder
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadMutualStrategy)
.isOnlyValidateLatest(false).build();
case FULL -> schemaValidatorBuilder
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadMutualStrategy)
.isOnlyValidateLatest(true).build();
case ALWAYS_COMPATIBLE -> ProtobufNativeAlwaysCompatibleValidator.INSTANCE;
default -> ProtobufNativeNeverCompatibleValidator.INSTANCE;
};
}

public void setProtobufNativeSchemaValidatorClassName(String schemaValidatorClassName) {
this.schemaValidatorClassName = schemaValidatorClassName;
}

}
Loading