Skip to content

Commit 13b6c32

Browse files
committed
Make the state machine implementation more complete
1 parent 6440289 commit 13b6c32

File tree

12 files changed

+1180
-493
lines changed

12 files changed

+1180
-493
lines changed

Cargo.lock

Lines changed: 0 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

avro/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,14 @@ categories.workspace = true
2929
documentation.workspace = true
3030

3131
[features]
32+
default = ["futures", "sync"]
3233
bzip = ["dep:bzip2"]
3334
derive = ["dep:apache-avro-derive"]
3435
snappy = ["dep:crc32fast", "dep:snap"]
3536
xz = ["dep:xz2"]
3637
zstandard = ["dep:zstd"]
38+
futures = []
39+
sync = []
3740

3841
[lib]
3942
# disable benchmarks to allow passing criterion arguments to `cargo bench`
@@ -73,7 +76,6 @@ thiserror = { default-features = false, version = "2.0.12" }
7376
uuid = { default-features = false, version = "1.17.0", features = ["serde", "std"] }
7477
xz2 = { default-features = false, version = "0.1.7", optional = true }
7578
zstd = { default-features = false, version = "0.13.3", optional = true }
76-
winnow = { version = "0.7.12", features = ["simd"] }
7779
oval = { version = "2.0.0", features = ["bytes"] }
7880
futures = "0.3.31"
7981
async-stream = "0.3.6"

avro/src/bigdecimal.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ pub(crate) fn serialize_big_decimal(decimal: &BigDecimal) -> AvroResult<Vec<u8>>
4747
Ok(final_buffer)
4848
}
4949

50-
pub(crate) fn deserialize_big_decimal(bytes: &Vec<u8>) -> AvroResult<BigDecimal> {
51-
let mut bytes: &[u8] = bytes.as_slice();
50+
pub(crate) fn deserialize_big_decimal(mut bytes: &[u8]) -> AvroResult<BigDecimal> {
5251
let mut big_decimal_buffer = match decode_len(&mut bytes) {
5352
Ok(size) => vec![0u8; size],
5453
Err(err) => return Err(Details::BigDecimalLen(Box::new(err)).into()),

avro/src/state_machines/reading/async_impl.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ use crate::{
99
Error, Schema,
1010
error::Details,
1111
state_machines::reading::{
12-
ItemRead, StateMachine, StateMachineControlFlow, deserialize_from_tape,
12+
ItemRead, StateMachine, StateMachineControlFlow,
13+
commands::CommandTape,
14+
deserialize_from_tape,
1315
object_container_file::{
1416
ObjectContainerFileBodyStateMachine, ObjectContainerFileHeader,
1517
ObjectContainerFileHeaderStateMachine,
1618
},
17-
schema_to_command_tape, value_from_tape,
19+
value_from_tape,
1820
},
1921
types::Value,
2022
};
@@ -57,11 +59,15 @@ impl<'a, R: AsyncRead> ObjectContainerFileReader<'a, R> {
5759
}
5860
};
5961

60-
let tape = schema_to_command_tape(&header.schema);
62+
let tape = CommandTape::build_from_schema(&header.schema)?;
6163

6264
Ok(Self {
6365
reader_schema: None,
64-
fsm: Some(ObjectContainerFileBodyStateMachine::new(tape, header.sync)),
66+
fsm: Some(ObjectContainerFileBodyStateMachine::new(
67+
tape,
68+
header.sync,
69+
header.codec,
70+
)),
6571
header,
6672
reader,
6773
buffer,
@@ -112,17 +118,19 @@ impl<'a, R: AsyncRead> ObjectContainerFileReader<'a, R> {
112118
) -> impl Stream<Item = Result<T, Error>> {
113119
try_stream! {
114120
while let Some(object) = self.next_object().await {
115-
let mut tape = object?;
116-
yield deserialize_from_tape(&mut tape, self.reader_schema.unwrap_or(&self.header.schema))?;
121+
let _tape = object?;
122+
yield todo!();
123+
// yield deserialize_from_tape(&mut tape, self.reader_schema.unwrap_or(&self.header.schema))?;
117124
}
118125
}
119126
}
120127

121128
pub async fn stream(&mut self) -> impl Stream<Item = Result<Value, Error>> {
122129
try_stream! {
123130
while let Some(object) = self.next_object().await {
124-
let mut tape = object?;
125-
yield value_from_tape(&mut tape, self.reader_schema.unwrap_or(&self.header.schema))?;
131+
let _tape = object?;
132+
yield todo!();
133+
// yield value_from_tape(&mut tape, self.reader_schema.unwrap_or(&self.header.schema))?;
126134
}
127135
}
128136
}

avro/src/state_machines/reading/block.rs

Lines changed: 6 additions & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,19 @@ use crate::{
77
error::Details,
88
state_machines::reading::{
99
CommandTape, ItemRead, StateMachine, StateMachineControlFlow, SubStateMachine,
10-
bytes::BytesStateMachine, decode_zigzag, object::ObjectStateMachine, replace_drop,
10+
decode_zigzag_buffer, object::ObjectStateMachine, replace_drop,
1111
},
1212
};
1313

14-
pub struct ArrayStateMachine {
14+
pub struct BlockStateMachine {
1515
command_tape: CommandTape,
1616
current_sub_machine: Box<SubStateMachine>,
1717
tape: Vec<ItemRead>,
1818
left_in_current_block: usize,
1919
need_to_read_block_byte_size: bool,
2020
}
2121

22-
impl ArrayStateMachine {
22+
impl BlockStateMachine {
2323
pub fn new(command_tape: CommandTape, tape: Vec<ItemRead>) -> Self {
2424
Self {
2525
command_tape,
@@ -31,17 +31,16 @@ impl ArrayStateMachine {
3131
}
3232
}
3333

34-
impl StateMachine for ArrayStateMachine {
34+
impl StateMachine for BlockStateMachine {
3535
type Output = Vec<ItemRead>;
36-
3736
fn parse(
3837
mut self,
3938
buffer: &mut Buffer,
4039
) -> Result<StateMachineControlFlow<Self, Self::Output>, Error> {
4140
loop {
4241
// If we finished the last block (or are newly created) read the block info
4342
if self.left_in_current_block == 0 {
44-
let Some(block) = decode_zigzag(buffer)? else {
43+
let Some(block) = decode_zigzag_buffer(buffer)? else {
4544
// Not enough data left in the buffer
4645
return Ok(StateMachineControlFlow::NeedMore(self));
4746
};
@@ -57,7 +56,7 @@ impl StateMachine for ArrayStateMachine {
5756
}
5857
// If the block length was negative we need to read the block size
5958
if self.need_to_read_block_byte_size {
60-
let Some(block) = decode_zigzag(buffer)? else {
59+
let Some(block) = decode_zigzag_buffer(buffer)? else {
6160
// Not enough data left in the buffer
6261
return Ok(StateMachineControlFlow::NeedMore(self));
6362
};
@@ -107,167 +106,3 @@ impl StateMachine for ArrayStateMachine {
107106
}
108107
}
109108
}
110-
111-
pub struct MapStateMachine {
112-
command_tape: CommandTape,
113-
current_sub_machine: Box<SubStateMachine>,
114-
tape: Vec<ItemRead>,
115-
left_in_current_block: usize,
116-
need_to_read_block_byte_size: bool,
117-
}
118-
119-
impl MapStateMachine {
120-
pub fn new(command_tape: CommandTape, tape: Vec<ItemRead>) -> Self {
121-
Self {
122-
command_tape,
123-
current_sub_machine: Box::new(SubStateMachine::None),
124-
tape,
125-
left_in_current_block: 0,
126-
need_to_read_block_byte_size: false,
127-
}
128-
}
129-
}
130-
131-
impl StateMachine for MapStateMachine {
132-
type Output = Vec<ItemRead>;
133-
134-
fn parse(
135-
mut self,
136-
buffer: &mut Buffer,
137-
) -> Result<StateMachineControlFlow<Self, Self::Output>, Error> {
138-
loop {
139-
// If we finished the last block (or are newly created) read the block info
140-
if self.left_in_current_block == 0 {
141-
let Some(block) = decode_zigzag(buffer)? else {
142-
// Not enough data left in the buffer
143-
return Ok(StateMachineControlFlow::NeedMore(self));
144-
};
145-
self.need_to_read_block_byte_size = block.is_negative();
146-
let abs_block = block.unsigned_abs();
147-
let abs_block = usize::try_from(abs_block)
148-
.map_err(|e| Details::ConvertU64ToUsize(e, abs_block))?;
149-
self.tape.push(ItemRead::Block(abs_block));
150-
if abs_block == 0 {
151-
// Done parsing the map
152-
return Ok(StateMachineControlFlow::Done(self.tape));
153-
}
154-
}
155-
// If the block length was negative we need to read the block size
156-
if self.need_to_read_block_byte_size {
157-
let Some(block) = decode_zigzag(buffer)? else {
158-
// Not enough data left in the buffer
159-
return Ok(StateMachineControlFlow::NeedMore(self));
160-
};
161-
// Make sure the value is sane
162-
let _ = usize::try_from(block).map_err(|e| Details::ConvertI64ToUsize(e, block))?;
163-
self.need_to_read_block_byte_size = false;
164-
}
165-
166-
// Either run the existing state machine or create a new one and run that
167-
match std::mem::take(self.current_sub_machine.deref_mut()) {
168-
SubStateMachine::None => {
169-
let fsm = BytesStateMachine::new();
170-
// Optimistically run the state machine
171-
match fsm.parse(buffer)? {
172-
StateMachineControlFlow::NeedMore(fsm) => {
173-
replace_drop(
174-
self.current_sub_machine.deref_mut(),
175-
SubStateMachine::String(fsm),
176-
);
177-
return Ok(StateMachineControlFlow::NeedMore(self));
178-
}
179-
StateMachineControlFlow::Done(bytes) => {
180-
let string =
181-
String::from_utf8(bytes).map_err(Details::ConvertToUtf8)?;
182-
self.tape.push(ItemRead::String(string.into_boxed_str()));
183-
replace_drop(
184-
self.current_sub_machine.deref_mut(),
185-
SubStateMachine::None,
186-
);
187-
}
188-
}
189-
// Finished reading the key, start on the value
190-
let fsm = ObjectStateMachine::new_with_tape(
191-
self.command_tape.clone(),
192-
std::mem::take(&mut self.tape),
193-
);
194-
// Optimistically run the state machine
195-
match fsm.parse(buffer)? {
196-
StateMachineControlFlow::NeedMore(fsm) => {
197-
replace_drop(
198-
self.current_sub_machine.deref_mut(),
199-
SubStateMachine::Object(fsm),
200-
);
201-
return Ok(StateMachineControlFlow::NeedMore(self));
202-
}
203-
StateMachineControlFlow::Done(tape) => {
204-
self.tape = tape;
205-
self.left_in_current_block -= 1;
206-
}
207-
}
208-
}
209-
SubStateMachine::String(fsm) => {
210-
// We didn't finish reading the key last loop
211-
match fsm.parse(buffer)? {
212-
StateMachineControlFlow::NeedMore(fsm) => {
213-
replace_drop(
214-
self.current_sub_machine.deref_mut(),
215-
SubStateMachine::String(fsm),
216-
);
217-
return Ok(StateMachineControlFlow::NeedMore(self));
218-
}
219-
StateMachineControlFlow::Done(bytes) => {
220-
let string =
221-
String::from_utf8(bytes).map_err(Details::ConvertToUtf8)?;
222-
self.tape.push(ItemRead::String(string.into_boxed_str()));
223-
replace_drop(
224-
self.current_sub_machine.deref_mut(),
225-
SubStateMachine::None,
226-
);
227-
}
228-
}
229-
// Finished reading the key, start on the value
230-
let fsm = ObjectStateMachine::new_with_tape(
231-
self.command_tape.clone(),
232-
std::mem::take(&mut self.tape),
233-
);
234-
// Optimistically run the state machine
235-
match fsm.parse(buffer)? {
236-
StateMachineControlFlow::NeedMore(fsm) => {
237-
replace_drop(
238-
self.current_sub_machine.deref_mut(),
239-
SubStateMachine::Object(fsm),
240-
);
241-
return Ok(StateMachineControlFlow::NeedMore(self));
242-
}
243-
StateMachineControlFlow::Done(tape) => {
244-
self.tape = tape;
245-
self.left_in_current_block -= 1;
246-
}
247-
}
248-
}
249-
SubStateMachine::Object(fsm) => {
250-
// We didn't finish reading the value last loop
251-
match fsm.parse(buffer)? {
252-
StateMachineControlFlow::NeedMore(fsm) => {
253-
replace_drop(
254-
self.current_sub_machine.deref_mut(),
255-
SubStateMachine::Object(fsm),
256-
);
257-
return Ok(StateMachineControlFlow::NeedMore(self));
258-
}
259-
StateMachineControlFlow::Done(tape) => {
260-
self.tape = tape;
261-
replace_drop(
262-
self.current_sub_machine.deref_mut(),
263-
SubStateMachine::None,
264-
);
265-
self.left_in_current_block -= 1;
266-
}
267-
}
268-
}
269-
_ => unreachable!(),
270-
}
271-
}
272-
}
273-
}

avro/src/state_machines/reading/bytes.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use oval::Buffer;
22

33
use crate::{
44
error::Details,
5-
state_machines::reading::{StateMachine, StateMachineControlFlow, decode_zigzag},
5+
state_machines::reading::{StateMachine, StateMachineControlFlow, decode_zigzag_buffer},
66
};
77

88
use super::StateMachineResult;
@@ -41,7 +41,7 @@ impl StateMachine for BytesStateMachine {
4141

4242
fn parse(mut self, buffer: &mut Buffer) -> StateMachineResult<Self, Self::Output> {
4343
if self.length.is_none() {
44-
let Some(length) = decode_zigzag(buffer)? else {
44+
let Some(length) = decode_zigzag_buffer(buffer)? else {
4545
// Not enough data left in the buffer varint byte plus we know
4646
// there at least 127 bytes in the buffer now (as otherwise we wouldn't need one more varint byte).
4747
return Ok(StateMachineControlFlow::NeedMore(self));

0 commit comments

Comments
 (0)