diff --git a/Cargo.lock b/Cargo.lock index 2107cc8..4ae8bec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -115,6 +115,7 @@ dependencies = [ "pyo3-async-runtimes", "pyo3-file", "pyo3-object_store", + "rayon", "thiserror 1.0.69", "tokio", ] @@ -567,6 +568,31 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crunchy" version = "0.2.3" @@ -2024,6 +2050,26 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.5.10" diff --git a/Cargo.toml b/Cargo.toml index a9234e4..4a4ae48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ pyo3-arrow = { path = "./pyo3-arrow" } pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"] } pyo3-file = "0.12" pyo3-object_store = "0.4" +rayon = "1.10.0" thiserror = "1.0.63" tokio = { version = "1.40", features = [ "macros", diff --git a/arro3-io/Cargo.toml b/arro3-io/Cargo.toml index e1c22d5..067441e 100644 --- a/arro3-io/Cargo.toml +++ b/arro3-io/Cargo.toml @@ -46,5 +46,6 @@ pyo3-async-runtimes = { workspace = true, features = [ ], optional = true } pyo3-file = { workspace = true } pyo3-object_store = { workspace = true, optional = true } +rayon = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/arro3-io/src/parquet/reader/concurrency.rs b/arro3-io/src/parquet/reader/concurrency.rs new file mode 100644 index 0000000..81a9042 --- /dev/null +++ b/arro3-io/src/parquet/reader/concurrency.rs @@ -0,0 +1,78 @@ +use arrow_array::RecordBatch; +use futures::future::join_all; +use futures::{StreamExt, TryStreamExt}; +use parquet::arrow::arrow_reader::ArrowReaderMetadata; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::ParquetRecordBatchStreamBuilder; + +use crate::error::Arro3IoResult; +use crate::parquet::reader::options::PyParquetOptions; + +pub(crate) async fn read_concurrent( + source: T, + meta: &ArrowReaderMetadata, + options: PyParquetOptions, +) -> Arro3IoResult> { + let split_options = split_options(options); + let mut readers = split_options + .into_iter() + .map(|options| { + let async_reader_builder = + ParquetRecordBatchStreamBuilder::new_with_metadata(source.clone(), meta.clone()); + options + .apply_to_reader_builder(async_reader_builder, &meta) + .build() + }) + .collect::, _>>()?; + + let futures = readers + .iter_mut() + .map(|stream| stream.try_collect::>()) + .collect::>(); + let batches = join_all(futures) + .await + .into_iter() + .collect::, _>>()? + .into_iter() + .flatten() + .collect::>(); + Ok(batches) +} + +fn split_options(options: PyParquetOptions) -> Vec { + if can_split_readers(&options) { + let mut split_options = vec![]; + if let Some(row_groups) = options.row_groups { + let row_groups_per_reader = row_groups / 2; + for i in 0..2 { + let start = i * row_groups_per_reader; + let end = (i + 1) * row_groups_per_reader; + let mut new_options = options.clone(); + new_options.row_groups = Some(row_groups[start..end].to_vec()); + split_options.push(new_options); + } + return split_options; + } + } + todo!() +} + +fn can_split_readers(options: &PyParquetOptions) -> bool { + // No row groups to + if options + .row_groups + .is_some_and(|row_groups| row_groups.len() <= 1) + { + return false; + } + if let Some(row_groups) = options.row_groups { + if row_groups <= 1 { + return false; + } + + if options.limit.is_some() { + return false; + } + if let Some(limit) = options.limit {} + } +} diff --git a/arro3-io/src/parquet/reader/file.rs b/arro3-io/src/parquet/reader/file.rs index 8997408..b82549c 100644 --- a/arro3-io/src/parquet/reader/file.rs +++ b/arro3-io/src/parquet/reader/file.rs @@ -177,7 +177,8 @@ impl ParquetFile { let record_batch_reader = options .apply_to_reader_builder(sync_reader_builder, &self.meta) .build()?; - Ok(PyRecordBatchReader::new(Box::new(record_batch_reader)).into()) + record_batch_reader + .Ok(PyRecordBatchReader::new(Box::new(record_batch_reader)).into()) } ParquetSource::Async(async_source) => { let async_reader_builder = ParquetRecordBatchStreamBuilder::new_with_metadata( @@ -220,6 +221,36 @@ impl ParquetFile { } } + #[pyo3(signature = (**kwargs))] + fn read_table_async( + &self, + kwargs: Option, + ) -> Arro3IoResult { + let options = kwargs.unwrap_or_default(); + match &self.source { + ParquetSource::Sync(sync_source) => { + let async_reader_builder = ParquetRecordBatchStreamBuilder::new_with_metadata( + Box::new(sync_source.try_clone()?) as _, + self.meta.clone(), + ); + let record_batch_stream = options + .apply_to_reader_builder(async_reader_builder, &self.meta) + .build()?; + Ok(PyRecordBatchStream::new(record_batch_stream)) + } + ParquetSource::Async(async_source) => { + let async_reader_builder = ParquetRecordBatchStreamBuilder::new_with_metadata( + Box::new(async_source.clone()) as _, + self.meta.clone(), + ); + let record_batch_stream = options + .apply_to_reader_builder(async_reader_builder, &self.meta) + .build()?; + Ok(PyRecordBatchStream::new(record_batch_stream)) + } + } + } + #[getter] fn schema_arrow(&self) -> Arro3Schema { self.meta.schema().clone().into() diff --git a/arro3-io/src/parquet/reader/mod.rs b/arro3-io/src/parquet/reader/mod.rs index 480d65e..cc55467 100644 --- a/arro3-io/src/parquet/reader/mod.rs +++ b/arro3-io/src/parquet/reader/mod.rs @@ -1,7 +1,9 @@ +// mod concurrency; mod file; mod functional; mod options; mod stream; +mod thread_pool; pub(crate) use file::ParquetFile; pub(crate) use functional::{read_parquet, read_parquet_async}; diff --git a/arro3-io/src/parquet/reader/stream.rs b/arro3-io/src/parquet/reader/stream.rs index 1308d10..3862213 100644 --- a/arro3-io/src/parquet/reader/stream.rs +++ b/arro3-io/src/parquet/reader/stream.rs @@ -7,9 +7,12 @@ use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration}; use pyo3::prelude::*; use pyo3_arrow::export::{Arro3RecordBatch, Arro3Table}; use pyo3_arrow::PyTable; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::ThreadPool; use tokio::sync::Mutex; use crate::error::Arro3IoError; +use crate::parquet::reader::thread_pool::get_default_pool; #[pyclass(name = "RecordBatchStream", frozen)] pub(crate) struct PyRecordBatchStream { @@ -41,8 +44,12 @@ impl PyRecordBatchStream { } fn collect_async<'py>(&'py self, py: Python<'py>) -> PyResult> { + let pool = get_default_pool(py)?.clone(); let stream = self.stream.clone(); - pyo3_async_runtimes::tokio::future_into_py(py, collect_stream(stream, self.schema.clone())) + pyo3_async_runtimes::tokio::future_into_py( + py, + collect_stream(stream, self.schema.clone(), pool), + ) } } @@ -69,16 +76,30 @@ async fn next_stream( async fn collect_stream( stream: Arc>>>, schema: SchemaRef, + pool: Arc, ) -> PyResult { let mut stream = stream.lock().await; - let mut batches: Vec<_> = vec![]; - loop { - match stream.next().await { - Some(Ok(batch)) => { - batches.push(batch); - } - Some(Err(err)) => return Err(Arro3IoError::ParquetError(err).into()), - None => return Ok(PyTable::try_new(batches, schema)?.into()), - }; + + let mut readers = vec![]; + while let Some(reader) = stream + .next_row_group() + .await + .map_err(Arro3IoError::ParquetError)? + { + readers.push(reader); } + + let batches = pool.install(|| { + let batches = readers + .into_par_iter() + .map(|r| r.collect::, _>>()) + .collect::, _>>() + .map_err(Arro3IoError::ArrowError)? + .into_iter() + .flatten() + .collect::>(); + Ok::<_, PyErr>(batches) + })?; + + Ok(PyTable::try_new(batches, schema)?.into()) } diff --git a/arro3-io/src/parquet/reader/thread_pool.rs b/arro3-io/src/parquet/reader/thread_pool.rs new file mode 100644 index 0000000..9e0e037 --- /dev/null +++ b/arro3-io/src/parquet/reader/thread_pool.rs @@ -0,0 +1,19 @@ +use std::sync::Arc; + +use pyo3::exceptions::PyValueError; +use pyo3::prelude::*; + +use pyo3::sync::GILOnceCell; +use rayon::{ThreadPool, ThreadPoolBuilder}; + +static DEFAULT_POOL: GILOnceCell> = GILOnceCell::new(); + +pub fn get_default_pool(py: Python<'_>) -> PyResult> { + let runtime = DEFAULT_POOL.get_or_try_init(py, || { + let pool = ThreadPoolBuilder::new().build().map_err(|err| { + PyValueError::new_err(format!("Could not create rayon threadpool. {}", err)) + })?; + Ok::<_, PyErr>(Arc::new(pool)) + })?; + Ok(runtime.clone()) +}