Skip to content

Commit 736587e

Browse files
committed
parquet serialization
1 parent 33ed5b0 commit 736587e

File tree

1 file changed

+68
-21
lines changed
  • src/storage-operators/src/oneshot_source

1 file changed

+68
-21
lines changed

src/storage-operators/src/oneshot_source/parquet.rs

Lines changed: 68 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
use std::fmt;
1313
use std::sync::Arc;
1414

15-
use arrow::array::{make_array, Array, RecordBatch, StructArray};
15+
use arrow::array::{Array, RecordBatch, StructArray};
1616
use bytes::{Bytes, BytesMut};
1717
use futures::future::BoxFuture;
1818
use futures::stream::BoxStream;
@@ -25,8 +25,8 @@ use parquet::arrow::ParquetRecordBatchStreamBuilder;
2525
use parquet::errors::ParquetError;
2626
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
2727
use prost::Message;
28-
use serde::de::DeserializeOwned;
29-
use serde::{Deserialize, Serialize};
28+
use serde::de::Visitor;
29+
use serde::{Deserialize, Deserializer, Serialize};
3030
use smallvec::{smallvec, SmallVec};
3131

3232
use crate::oneshot_source::{
@@ -57,24 +57,6 @@ pub struct ParquetRowGroup {
5757
record_batch: RecordBatch,
5858
}
5959

60-
impl Serialize for ParquetRowGroup {
61-
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
62-
where
63-
S: serde::Serializer,
64-
{
65-
todo!()
66-
}
67-
}
68-
69-
impl<'de> Deserialize<'de> for ParquetRowGroup {
70-
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
71-
where
72-
D: serde::Deserializer<'de>,
73-
{
74-
todo!()
75-
}
76-
}
77-
7860
impl OneshotFormat for ParquetFormat {
7961
type WorkRequest<S>
8062
= ParquetWorkRequest<S::Object, S::Checksum>
@@ -234,3 +216,68 @@ impl<S: OneshotSource> AsyncFileReader for ParquetReaderAdapter<S> {
234216
})
235217
}
236218
}
219+
220+
// Note(parkmycar): Instead of a manual implementation of Serialize and Deserialize we could
221+
// change `ParquetRowGroup` to have a type which we can derive the impl for. But no types from the
222+
// `arrow` crate do, and we'd prefer not to use a Vec<u8> since serialization is only required when
223+
// Timely workers span multiple processes.
224+
impl Serialize for ParquetRowGroup {
225+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
226+
where
227+
S: serde::Serializer,
228+
{
229+
// Note: This implementation isn't very efficient, but it should only be rarely used so
230+
// it's not too much of a concern.
231+
let struct_array = StructArray::from(self.record_batch.clone());
232+
let proto_array: ProtoArrayData = struct_array.into_data().into_proto();
233+
let encoded_proto = proto_array.encode_to_vec();
234+
encoded_proto.serialize(serializer)
235+
}
236+
}
237+
238+
impl<'de> Deserialize<'de> for ParquetRowGroup {
239+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
240+
where
241+
D: serde::Deserializer<'de>,
242+
{
243+
fn struct_array<'de: 'a, 'a, D: Deserializer<'de>>(
244+
deserializer: D,
245+
) -> Result<StructArray, D::Error> {
246+
struct StructArrayVisitor;
247+
248+
impl<'a> Visitor<'a> for StructArrayVisitor {
249+
type Value = StructArray;
250+
251+
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
252+
formatter.write_str("binary data")
253+
}
254+
255+
fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
256+
where
257+
E: serde::de::Error,
258+
{
259+
let serde_err =
260+
|| serde::de::Error::invalid_value(serde::de::Unexpected::Bytes(v), &self);
261+
262+
let array_data = ProtoArrayData::decode(v)
263+
.map_err(|_| serde_err())
264+
.and_then(|proto_array| proto_array.into_rust().map_err(|_| serde_err()))?;
265+
let array_ref = arrow::array::make_array(array_data);
266+
let struct_array = array_ref
267+
.as_any()
268+
.downcast_ref::<StructArray>()
269+
.ok_or_else(|| serde_err())?;
270+
271+
Ok(struct_array.clone())
272+
}
273+
}
274+
275+
deserializer.deserialize_bytes(StructArrayVisitor)
276+
}
277+
278+
let struct_array = struct_array(deserializer)?;
279+
let record_batch = RecordBatch::from(struct_array);
280+
281+
Ok(ParquetRowGroup { record_batch })
282+
}
283+
}

0 commit comments

Comments
 (0)