Skip to content

Commit

Permalink
Expose record boundary information in JSON decoder (#7092)
Browse files Browse the repository at this point in the history
* Expose record boundary information in JSON decoder

* fix doc links
  • Loading branch information
scovich authored Feb 11, 2025
1 parent 19f01e3 commit 27d2a75
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 3 deletions.
42 changes: 40 additions & 2 deletions arrow-json/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,11 +615,27 @@ impl Decoder {
self.tape_decoder.serialize(rows)
}

/// True if the decoder is currently part way through decoding a record.
pub fn has_partial_record(&self) -> bool {
self.tape_decoder.has_partial_row()
}

/// The number of unflushed records, including the partially decoded record (if any).
pub fn len(&self) -> usize {
self.tape_decoder.num_buffered_rows()
}

/// True if there are no records to flush, i.e. [`Self::len`] is zero.
pub fn is_empty(&self) -> bool {
self.len() == 0
}

/// Flushes the currently buffered data to a [`RecordBatch`]
///
/// Returns `Ok(None)` if no buffered data
/// Returns `Ok(None)` if no buffered data, i.e. [`Self::is_empty`] is true.
///
/// Note: if called part way through decoding a record, this will return an error
/// Note: This will return an error if called part way through decoding a record,
/// i.e. [`Self::has_partial_record`] is true.
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
let tape = self.tape_decoder.finish()?;

Expand Down Expand Up @@ -803,6 +819,20 @@ mod tests {
Field::new("e", DataType::Date64, true),
]));

let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
assert!(decoder.is_empty());
assert_eq!(decoder.len(), 0);
assert!(!decoder.has_partial_record());
assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
assert!(!decoder.is_empty());
assert_eq!(decoder.len(), 6);
assert!(!decoder.has_partial_record());
let batch = decoder.flush().unwrap().unwrap();
assert_eq!(batch.num_rows(), 6);
assert!(decoder.is_empty());
assert_eq!(decoder.len(), 0);
assert!(!decoder.has_partial_record());

let batches = do_read(buf, 1024, false, false, schema);
assert_eq!(batches.len(), 1);

Expand Down Expand Up @@ -2158,6 +2188,14 @@ mod tests {
true,
)]));

let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
assert!(decoder.tape_decoder.has_partial_row());
assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
let _ = decoder.flush().unwrap_err();
assert!(decoder.tape_decoder.has_partial_row());
assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);

let parse_err = |s: &str| {
ReaderBuilder::new(schema.clone())
.build(Cursor::new(s.as_bytes()))
Expand Down
23 changes: 22 additions & 1 deletion arrow-json/src/reader/tape.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,17 @@ impl TapeDecoder {
Ok(())
}

/// The number of buffered rows, including the partially decoded row (if any).
pub fn num_buffered_rows(&self) -> usize {
self.cur_row
}

/// True if the decoder is part way through decoding a row. If so, calling [`Self::finish`]
/// would return an error.
pub fn has_partial_row(&self) -> bool {
!self.stack.is_empty()
}

/// Finishes the current [`Tape`]
pub fn finish(&self) -> Result<Tape<'_>, ArrowError> {
if let Some(b) = self.stack.last() {
Expand Down Expand Up @@ -726,8 +737,12 @@ mod tests {
"#;
let mut decoder = TapeDecoder::new(16, 2);
decoder.decode(a.as_bytes()).unwrap();
assert!(!decoder.has_partial_row());
assert_eq!(decoder.num_buffered_rows(), 7);

let finished = decoder.finish().unwrap();
assert!(!decoder.has_partial_row());
assert_eq!(decoder.num_buffered_rows(), 7); // didn't call clear() yet
assert_eq!(
finished.elements,
&[
Expand Down Expand Up @@ -820,7 +835,11 @@ mod tests {
0, 5, 10, 13, 14, 17, 19, 22, 25, 28, 29, 30, 31, 32, 32, 32, 33, 34, 35, 41, 47,
52, 55, 57, 58, 59, 62, 63, 63, 66, 69, 70, 71, 72, 73, 74, 75, 76, 77
]
)
);

decoder.clear();
assert!(!decoder.has_partial_row());
assert_eq!(decoder.num_buffered_rows(), 0);
}

#[test]
Expand Down Expand Up @@ -874,6 +893,8 @@ mod tests {
// Test truncation
let mut decoder = TapeDecoder::new(16, 2);
decoder.decode(b"{\"he").unwrap();
assert!(decoder.has_partial_row());
assert_eq!(decoder.num_buffered_rows(), 1);
let err = decoder.finish().unwrap_err().to_string();
assert_eq!(err, "Json error: Truncated record whilst reading string");

Expand Down

0 comments on commit 27d2a75

Please sign in to comment.