Skip to content

Commit 6791eb6

Browse files
Copilotstreamich
andcommitted
feat: implement Apache Avro encoder with schema validation and tests
Co-authored-by: streamich <[email protected]>
1 parent 2aaa33c commit 6791eb6

File tree

7 files changed

+2085
-0
lines changed

7 files changed

+2085
-0
lines changed

src/avro/AvroEncoder.ts

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
import type {IWriter, IWriterGrowable} from '@jsonjoy.com/util/lib/buffers';
2+
import type {BinaryJsonEncoder} from '../types';
3+
4+
/**
5+
* Apache Avro binary encoder for basic value encoding.
6+
* Implements the Avro binary encoding specification without schema validation.
7+
* Based on https://avro.apache.org/docs/1.12.0/specification/
8+
*/
9+
export class AvroEncoder implements BinaryJsonEncoder {
10+
constructor(public readonly writer: IWriter & IWriterGrowable) {}
11+
12+
public encode(value: unknown): Uint8Array {
13+
const writer = this.writer;
14+
writer.reset();
15+
this.writeAny(value);
16+
return writer.flush();
17+
}
18+
19+
/**
20+
* Called when the encoder encounters a value that it does not know how to encode.
21+
*/
22+
public writeUnknown(value: unknown): void {
23+
this.writeNull();
24+
}
25+
26+
public writeAny(value: unknown): void {
27+
switch (typeof value) {
28+
case 'boolean':
29+
return this.writeBoolean(value);
30+
case 'number':
31+
return this.writeNumber(value);
32+
case 'string':
33+
return this.writeStr(value);
34+
case 'object': {
35+
if (value === null) return this.writeNull();
36+
const constructor = value.constructor;
37+
switch (constructor) {
38+
case Object:
39+
return this.writeObj(value as Record<string, unknown>);
40+
case Array:
41+
return this.writeArr(value as unknown[]);
42+
case Uint8Array:
43+
return this.writeBin(value as Uint8Array);
44+
default:
45+
return this.writeUnknown(value);
46+
}
47+
}
48+
case 'bigint':
49+
return this.writeLong(value);
50+
case 'undefined':
51+
return this.writeNull();
52+
default:
53+
return this.writeUnknown(value);
54+
}
55+
}
56+
57+
/**
58+
* Writes an Avro null value.
59+
*/
60+
public writeNull(): void {
61+
// Null values are encoded as zero bytes
62+
}
63+
64+
/**
65+
* Writes an Avro boolean value.
66+
*/
67+
public writeBoolean(bool: boolean): void {
68+
this.writer.u8(bool ? 1 : 0);
69+
}
70+
71+
/**
72+
* Writes an Avro int value using zigzag encoding.
73+
*/
74+
public writeInt(int: number): void {
75+
this.writeVarIntSigned(this.encodeZigZag32(Math.trunc(int)));
76+
}
77+
78+
/**
79+
* Writes an Avro long value using zigzag encoding.
80+
*/
81+
public writeLong(long: number | bigint): void {
82+
if (typeof long === 'bigint') {
83+
this.writeVarLong(this.encodeZigZag64(long));
84+
} else {
85+
this.writeVarLong(this.encodeZigZag64(BigInt(Math.trunc(long))));
86+
}
87+
}
88+
89+
/**
90+
* Writes an Avro float value using IEEE 754 single-precision.
91+
*/
92+
public writeFloatAvro(float: number): void {
93+
this.writer.ensureCapacity(4);
94+
this.writer.view.setFloat32(this.writer.x, float, true); // little-endian
95+
this.writer.move(4);
96+
}
97+
98+
/**
99+
* Writes an Avro double value using IEEE 754 double-precision.
100+
*/
101+
public writeDouble(double: number): void {
102+
this.writer.ensureCapacity(8);
103+
this.writer.view.setFloat64(this.writer.x, double, true); // little-endian
104+
this.writer.move(8);
105+
}
106+
107+
/**
108+
* Writes an Avro bytes value with length-prefixed encoding.
109+
*/
110+
public writeBin(bytes: Uint8Array): void {
111+
this.writeVarIntUnsigned(bytes.length);
112+
this.writer.buf(bytes, bytes.length);
113+
}
114+
115+
/**
116+
* Writes an Avro string value with UTF-8 encoding and length prefix.
117+
*/
118+
public writeStr(str: string): void {
119+
const bytes = new TextEncoder().encode(str);
120+
this.writeVarIntUnsigned(bytes.length);
121+
this.writer.buf(bytes, bytes.length);
122+
}
123+
124+
/**
125+
* Writes an Avro array with length-prefixed encoding.
126+
*/
127+
public writeArr(arr: unknown[]): void {
128+
this.writeVarIntUnsigned(arr.length);
129+
for (const item of arr) {
130+
this.writeAny(item);
131+
}
132+
this.writeVarIntUnsigned(0); // End of array marker
133+
}
134+
135+
/**
136+
* Writes an Avro map with length-prefixed encoding.
137+
*/
138+
public writeObj(obj: Record<string, unknown>): void {
139+
const entries = Object.entries(obj);
140+
this.writeVarIntUnsigned(entries.length);
141+
for (const [key, value] of entries) {
142+
this.writeStr(key);
143+
this.writeAny(value);
144+
}
145+
this.writeVarIntUnsigned(0); // End of map marker
146+
}
147+
148+
// BinaryJsonEncoder interface methods
149+
150+
/**
151+
* Generic number writing - determines type based on value
152+
*/
153+
public writeNumber(num: number): void {
154+
if (Number.isInteger(num)) {
155+
if (num >= -2147483648 && num <= 2147483647) {
156+
this.writeInt(num);
157+
} else {
158+
this.writeLong(num);
159+
}
160+
} else {
161+
this.writeDouble(num);
162+
}
163+
}
164+
165+
/**
166+
* Writes an integer value
167+
*/
168+
public writeInteger(int: number): void {
169+
this.writeInt(int);
170+
}
171+
172+
/**
173+
* Writes an unsigned integer value
174+
*/
175+
public writeUInteger(uint: number): void {
176+
this.writeInt(uint);
177+
}
178+
179+
/**
180+
* Writes a float value (interface method)
181+
*/
182+
public writeFloat(float: number): void {
183+
this.writeFloatValue(float);
184+
}
185+
186+
/**
187+
* Writes a float value using IEEE 754 single-precision.
188+
*/
189+
private writeFloatValue(float: number): void {
190+
this.writer.ensureCapacity(4);
191+
this.writer.view.setFloat32(this.writer.x, float, true); // little-endian
192+
this.writer.move(4);
193+
}
194+
195+
/**
196+
* Writes an ASCII string (same as regular string in Avro)
197+
*/
198+
public writeAsciiStr(str: string): void {
199+
this.writeStr(str);
200+
}
201+
202+
// Utility methods for Avro encoding
203+
204+
/**
205+
* Encodes a variable-length integer (for signed values with zigzag)
206+
*/
207+
private writeVarIntSigned(value: number): void {
208+
let n = value >>> 0; // Convert to unsigned 32-bit
209+
while (n >= 0x80) {
210+
this.writer.u8((n & 0x7f) | 0x80);
211+
n >>>= 7;
212+
}
213+
this.writer.u8(n & 0x7f);
214+
}
215+
216+
/**
217+
* Encodes a variable-length integer (for unsigned values like lengths)
218+
*/
219+
private writeVarIntUnsigned(value: number): void {
220+
let n = value >>> 0; // Convert to unsigned 32-bit
221+
while (n >= 0x80) {
222+
this.writer.u8((n & 0x7f) | 0x80);
223+
n >>>= 7;
224+
}
225+
this.writer.u8(n & 0x7f);
226+
}
227+
228+
/**
229+
* Encodes a variable-length long using Avro's encoding
230+
*/
231+
private writeVarLong(value: bigint): void {
232+
let n = value;
233+
const mask = BigInt(0x7f);
234+
const shift = BigInt(7);
235+
236+
while (n >= BigInt(0x80)) {
237+
this.writer.u8(Number((n & mask) | BigInt(0x80)));
238+
n >>= shift;
239+
}
240+
this.writer.u8(Number(n & mask));
241+
}
242+
243+
/**
244+
* Encodes a 32-bit integer using zigzag encoding
245+
*/
246+
private encodeZigZag32(value: number): number {
247+
return (value << 1) ^ (value >> 31);
248+
}
249+
250+
/**
251+
* Encodes a 64-bit integer using zigzag encoding
252+
*/
253+
private encodeZigZag64(value: bigint): bigint {
254+
return (value << BigInt(1)) ^ (value >> BigInt(63));
255+
}
256+
}

0 commit comments

Comments
 (0)