Skip to content

Commit 976998f

Browse files
authored
Merge pull request #44 from jsonjoy-com/copilot/fix-38
feat: implement Apache Avro decoders
2 parents 585b4b7 + e27ae53 commit 976998f

File tree

5 files changed

+1364
-0
lines changed

5 files changed

+1364
-0
lines changed

src/avro/AvroDecoder.ts

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
import {Reader} from '@jsonjoy.com/util/lib/buffers/Reader';
2+
import type {BinaryJsonDecoder} from '../types';
3+
4+
/**
5+
* Apache Avro binary decoder for basic value decoding.
6+
* Implements the Avro binary decoding specification without schema validation.
7+
* Based on https://avro.apache.org/docs/1.12.0/specification/
8+
*/
9+
export class AvroDecoder implements BinaryJsonDecoder {
10+
public reader = new Reader();
11+
12+
public read(uint8: Uint8Array): unknown {
13+
this.reader.reset(uint8);
14+
return this.readAny();
15+
}
16+
17+
public decode(uint8: Uint8Array): unknown {
18+
this.reader.reset(uint8);
19+
return this.readAny();
20+
}
21+
22+
/**
23+
* Generic method to read any value - typically used when schema type is unknown
24+
*/
25+
public readAny(): unknown {
26+
throw new Error('readAny() requires schema information. Use readNull, readBoolean, etc. directly.');
27+
}
28+
29+
/**
30+
* Reads an Avro null value.
31+
*/
32+
public readNull(): null {
33+
// Null values are encoded as zero bytes
34+
return null;
35+
}
36+
37+
/**
38+
* Reads an Avro boolean value.
39+
*/
40+
public readBoolean(): boolean {
41+
return this.reader.u8() === 1;
42+
}
43+
44+
/**
45+
* Reads an Avro int value using zigzag decoding.
46+
*/
47+
public readInt(): number {
48+
const zigzag = this.readVarIntUnsigned();
49+
return this.decodeZigZag32(zigzag);
50+
}
51+
52+
/**
53+
* Reads an Avro long value using zigzag decoding.
54+
*/
55+
public readLong(): number | bigint {
56+
const zigzag = this.readVarLong();
57+
const decoded = this.decodeZigZag64(zigzag);
58+
59+
// Return number if it fits in safe integer range, otherwise bigint
60+
if (decoded >= BigInt(Number.MIN_SAFE_INTEGER) && decoded <= BigInt(Number.MAX_SAFE_INTEGER)) {
61+
return Number(decoded);
62+
}
63+
return decoded;
64+
}
65+
66+
/**
67+
* Reads an Avro float value using IEEE 754 single-precision.
68+
*/
69+
public readFloat(): number {
70+
const reader = this.reader;
71+
const value = reader.view.getFloat32(reader.x, true); // little-endian
72+
reader.x += 4;
73+
return value;
74+
}
75+
76+
/**
77+
* Reads an Avro double value using IEEE 754 double-precision.
78+
*/
79+
public readDouble(): number {
80+
const reader = this.reader;
81+
const value = reader.view.getFloat64(reader.x, true); // little-endian
82+
reader.x += 8;
83+
return value;
84+
}
85+
86+
/**
87+
* Reads an Avro bytes value with length-prefixed encoding.
88+
*/
89+
public readBytes(): Uint8Array {
90+
const length = this.readVarIntUnsigned();
91+
return this.reader.buf(length);
92+
}
93+
94+
/**
95+
* Reads an Avro string value with UTF-8 encoding and length prefix.
96+
*/
97+
public readString(): string {
98+
const length = this.readVarIntUnsigned();
99+
const bytes = this.reader.buf(length);
100+
return new TextDecoder().decode(bytes);
101+
}
102+
103+
/**
104+
* Reads an Avro array with length-prefixed encoding.
105+
* The itemReader function is called for each array item.
106+
*/
107+
public readArray<T>(itemReader: () => T): T[] {
108+
const result: T[] = [];
109+
110+
while (true) {
111+
const count = this.readVarIntUnsigned();
112+
if (count === 0) break; // End of array marker
113+
114+
for (let i = 0; i < count; i++) {
115+
result.push(itemReader());
116+
}
117+
}
118+
119+
return result;
120+
}
121+
122+
/**
123+
* Reads an Avro map with length-prefixed encoding.
124+
* The valueReader function is called for each map value.
125+
*/
126+
public readMap<T>(valueReader: () => T): Record<string, T> {
127+
const result: Record<string, T> = {};
128+
129+
while (true) {
130+
const count = this.readVarIntUnsigned();
131+
if (count === 0) break; // End of map marker
132+
133+
for (let i = 0; i < count; i++) {
134+
const key = this.readString();
135+
if (key === '__proto__') throw new Error('INVALID_KEY');
136+
result[key] = valueReader();
137+
}
138+
}
139+
140+
return result;
141+
}
142+
143+
/**
144+
* Reads an Avro union value.
145+
* Returns an object with index and value.
146+
*/
147+
public readUnion<T>(schemaReaders: Array<() => T>): {index: number; value: T} {
148+
const index = this.decodeZigZag32(this.readVarIntUnsigned());
149+
if (index < 0 || index >= schemaReaders.length) {
150+
throw new Error(`Invalid union index: ${index}`);
151+
}
152+
153+
const value = schemaReaders[index]();
154+
return {index, value};
155+
}
156+
157+
/**
158+
* Reads an Avro enum value.
159+
* Returns the symbol index.
160+
*/
161+
public readEnum(): number {
162+
return this.decodeZigZag32(this.readVarIntUnsigned());
163+
}
164+
165+
/**
166+
* Reads an Avro fixed value with specified length.
167+
*/
168+
public readFixed(size: number): Uint8Array {
169+
return this.reader.buf(size);
170+
}
171+
172+
/**
173+
* Reads an Avro record.
174+
* The fieldReaders array contains functions to read each field in order.
175+
*/
176+
public readRecord<T>(fieldReaders: Array<() => any>): T {
177+
const result: any = {};
178+
for (let i = 0; i < fieldReaders.length; i++) {
179+
const fieldValue = fieldReaders[i]();
180+
// Note: This generic record reader doesn't know field names
181+
// The schema-aware decoder will handle proper field mapping
182+
result[`field${i}`] = fieldValue;
183+
}
184+
return result as T;
185+
}
186+
187+
// Utility methods for Avro decoding
188+
189+
/**
190+
* Reads a variable-length integer (for unsigned values like lengths)
191+
*/
192+
private readVarIntUnsigned(): number {
193+
const reader = this.reader;
194+
let result = 0;
195+
let shift = 0;
196+
197+
while (true) {
198+
const byte = reader.u8();
199+
result |= (byte & 0x7f) << shift;
200+
201+
if ((byte & 0x80) === 0) break;
202+
203+
shift += 7;
204+
if (shift >= 32) {
205+
throw new Error('Variable-length integer is too long');
206+
}
207+
}
208+
209+
return result >>> 0; // Convert to unsigned 32-bit
210+
}
211+
212+
/**
213+
* Reads a variable-length long
214+
*/
215+
private readVarLong(): bigint {
216+
const reader = this.reader;
217+
let result = BigInt(0);
218+
let shift = BigInt(0);
219+
220+
while (true) {
221+
const byte = BigInt(reader.u8());
222+
result |= (byte & BigInt(0x7f)) << shift;
223+
224+
if ((byte & BigInt(0x80)) === BigInt(0)) break;
225+
226+
shift += BigInt(7);
227+
if (shift >= BigInt(64)) {
228+
throw new Error('Variable-length long is too long');
229+
}
230+
}
231+
232+
return result;
233+
}
234+
235+
/**
236+
* Decodes a 32-bit integer using zigzag decoding
237+
*/
238+
private decodeZigZag32(value: number): number {
239+
return (value >>> 1) ^ -(value & 1);
240+
}
241+
242+
/**
243+
* Decodes a 64-bit integer using zigzag decoding
244+
*/
245+
private decodeZigZag64(value: bigint): bigint {
246+
return (value >> BigInt(1)) ^ -(value & BigInt(1));
247+
}
248+
}

0 commit comments

Comments
 (0)