Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVSC files without a top level name, type and fields are considered invalid, but it shouldn't. #169

Open
samhwang opened this issue Nov 12, 2021 · 1 comment

Comments

@samhwang
Copy link

samhwang commented Nov 12, 2021

Hey KafkaJS Team, just wanted to started that this project is superb. However, I just came into this weird hiccup, which I think shouldn't be. I'm still new to how Kafka and Avro (and KafkaJS) works, so if I'm wrong, please lead me in the right direction :D.

Issue Description

I'm trying to read an avro file that does not have a top level record, but instead it has an array of records.

[
  {
    "name": "ARecord",
    "namespace": "com.samhwang.testavro",
    "type": "record",
    "fields": [
      {"name": "first_name", "type": "string"},
      {"name": "last_name", "type": "string"}
    ]
  },
  {
    "name": "BRecord",
    "namespace": "com.samhwang.testavro",
    "type": "record",
    "fields": [
      {"name": "test_attr", "type": "string"}
    ]
  }
]

This is validated to be correct, both by a locally hosted Kafka Schema Registry (using the docker image confluentinc/cp-schema-registry:6.2.1) and on Confluent Cloud (see screenshot attached).
Screen Shot 2021-11-12 at 2 18 05 pm

Reproduction steps/repo

I have created a sample repo in Typescript to reproduce this issue at: samhwang/test-node-kafka.

Reproduction steps

  • Install the reproduction repo
git clone https://github.com/samhwang/test-node-kafka.git
cd test-node-kafka
npm install
docker-compose up -d
  • Register schema via a http request to schema registry. This works fine.
❯ npx ts-node src/register_schema_axios.ts
{
  'Schema Registry URL': 'http://localhost:8081',
  Topic: 'test-topic',
  'Schema file': '/Users/sam/projects/test-node-kafka/src/sam.avsc'
}
Schema Registered Successfully! { id: 1 }

This script is essentially:

    try {
        const schema = await fs.readFile(absolutePathToSchema, {encoding: 'utf-8'});
        const url = schemaRegistryUrl + "/subjects/" + topic + "-value/versions";
        const headers = {"Content-Type": "application/vnd.schemaregistry.v1+json"};

        const response = await axios.post(url, {schema}, {
            headers,
        });
        const {data} = response;

        console.log("Schema Registered Successfully!", data);
    } catch (error: any) {
        console.error(error);
    }
  • Register schema via using the package @kafkajs/confluent-schema-registry. This just breaks at just reading the avsc file.
❯ npx ts-node src/register_schema_kafkajs.ts
ConfluentSchemaRegistryInvalidSchemaError: /Users/sam/projects/test-node-kafka/src/sam.avsc is not recognized as a valid AVSC file (expecting valid top-level name, type and fields attributes)
    at validatedSchema (/Users/sam/projects/test-node-kafka/node_modules/@kafkajs/confluent-schema-registry/src/utils/readAVSC.ts:21:11)
    at readAVSCAsync (/Users/sam/projects/test-node-kafka/node_modules/@kafkajs/confluent-schema-registry/src/utils/readAVSC.ts:35:10)
    at async registerSchema (/Users/sam/projects/test-node-kafka/src/register_schema_kafkajs.ts:8:24)
    try {
        const schema = await readAVSCAsync(absolutePathToSchema);
        const data = registry.register(schema);
        console.log("Schema Registered Successfully!", data);
    } catch (error: any) {
        console.error(error);
    }

Possible solution?

  • This problems seems to be at the validatedSchema function in src/utils/readAVSC.ts expecting a top level record. Is there something in the avsc that can be used to validate instead of this?
@pmsfc
Copy link

pmsfc commented Apr 24, 2024

@Nevon We're having the same issue when consuming a topic with a schema without top level record.

For example this code:

// this fails!
registry.decode(buffer, {
   [SchemaType.AVRO]: {readerSchema}, // We can't use a RawAvroSchema type without a Record
})

But if we use the schema id thats included in the Kafka message it works.

// this works!
registry.decode(buffer)

On the other hand the avsc lib allows this, we can check it's typing:

export type AvroSchema = DefinedType | DefinedType[]; // Note the array here
type DefinedType = PrimitiveType | ComplexType | LogicalType | string;
type PrimitiveType = 'null' | 'boolean' | 'int' | 'long' | 'float' | 'double' | 'bytes' | 'string';
type ComplexType = NamedType | RecordType | EnumType | MapType | ArrayType | FixedType;
type LogicalType = ComplexType & LogicalTypeExtension;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants