Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/pyodide-wheels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ on:
python:
description: "Python version"
required: true
default: "3.12"
default: "3.13"
type: choice
options:
- 3.12
- 3.13
- 3.14
- 3.15
- "3.12"
- "3.13"
- "3.14"
- "3.15"
pyodide:
description: "New Pyodide version to build for"
required: true
Expand Down Expand Up @@ -62,7 +62,7 @@ jobs:
./emsdk activate ${PYODIDE_EMSCRIPTEN_VERSION}
source emsdk_env.sh
cd ..
RUSTUP_TOOLCHAIN=nightly maturin build --release -o dist --target wasm32-unknown-emscripten -i python${{ inputs.python }} --manifest-path ${{ matrix.module }}/Cargo.toml
RUSTUP_TOOLCHAIN=nightly maturin build --release -o dist --target wasm32-unknown-emscripten -i python${{ inputs.python }} --manifest-path ${{ matrix.module }}/Cargo.toml --no-default-features

- name: Get info from built wheel file
run: |
Expand Down
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.11
3.13
11 changes: 5 additions & 6 deletions DEVELOP.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ rustup target add --toolchain nightly wasm32-unknown-emscripten

Install maturin and pyodide-build (choose a specific version of pyodide-build if desired)

```bash
pip install -U maturin
pip install pyodide-build
```bashu
pip install -U maturin pyodide-build
```

Clone emsdk. I clone this into a specific path at `~/github/emscripten-core/emsdk` so that it can be shared across projects.
Expand All @@ -37,7 +36,7 @@ Clone emsdk. I clone this into a specific path at `~/github/emscripten-core/emsd
mkdir -p ~/github/emscripten-core/
git clone https://github.com/emscripten-core/emsdk.git ~/github/emscripten-core/emsdk
# Or, set this manually
PYODIDE_EMSCRIPTEN_VERSION=$(pyodide config get emscripten_version)
PYODIDE_EMSCRIPTEN_VERSION=$(uv run -p 3.13 pyodide config get emscripten_version)
~/github/emscripten-core/emsdk/emsdk install ${PYODIDE_EMSCRIPTEN_VERSION}
~/github/emscripten-core/emsdk/emsdk activate ${PYODIDE_EMSCRIPTEN_VERSION}
source ~/github/emscripten-core/emsdk/emsdk_env.sh
Expand All @@ -47,10 +46,10 @@ Build `arro3-core`:

```bash
RUSTUP_TOOLCHAIN=nightly \
maturin build \
uv run maturin build \
--release \
-o dist \
-m arro3-core/Cargo.toml \
--target wasm32-unknown-emscripten \
-i python3.11
-i python3.13
```
2 changes: 2 additions & 0 deletions arro3-io/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum Arro3IoError {
ArrowError(#[from] arrow_schema::ArrowError),

/// A wrapped [object_store::Error]
#[cfg(feature = "async")]
#[error(transparent)]
ObjectStoreError(#[from] object_store::Error),

Expand All @@ -31,6 +32,7 @@ impl From<Arro3IoError> for PyErr {
match error {
Arro3IoError::PyErr(err) => err,
Arro3IoError::ArrowError(err) => PyException::new_err(err.to_string()),
#[cfg(feature = "async")]
Arro3IoError::ObjectStoreError(err) => PyException::new_err(err.to_string()),
Arro3IoError::ParquetError(err) => PyException::new_err(err.to_string()),
}
Expand Down
3 changes: 3 additions & 0 deletions arro3-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ fn _io(py: Python, m: &Bound<PyModule>) -> PyResult<()> {

m.add_wrapped(wrap_pyfunction!(___version))?;

#[cfg(feature = "async")]
pyo3_object_store::register_store_module(py, m, "arro3.io", "store")?;
#[cfg(feature = "async")]
pyo3_object_store::register_exceptions_module(py, m, "arro3.io", "exceptions")?;

m.add_wrapped(wrap_pyfunction!(csv::infer_csv_schema))?;
Expand All @@ -57,6 +59,7 @@ fn _io(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(ipc::write_ipc_stream))?;

m.add_wrapped(wrap_pyfunction!(parquet::read_parquet))?;
#[cfg(feature = "async")]
m.add_wrapped(wrap_pyfunction!(parquet::read_parquet_async))?;
m.add_wrapped(wrap_pyfunction!(parquet::write_parquet))?;

Expand Down
42 changes: 42 additions & 0 deletions arro3-io/src/parquet/async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::sync::Arc;

use parquet::arrow::async_reader::ParquetObjectReader;
use pyo3::prelude::*;
use pyo3_arrow::error::PyArrowResult;
use pyo3_arrow::PyTable;
use pyo3_object_store::PyObjectStore;

use crate::error::Arro3IoResult;

#[pyfunction]
#[pyo3(signature = (path, *, store))]
pub fn read_parquet_async(
py: Python,
path: String,
store: PyObjectStore,
) -> PyArrowResult<PyObject> {
let fut = pyo3_async_runtimes::tokio::future_into_py(py, async move {
Ok(read_parquet_async_inner(store.into_inner(), path).await?)
})?;

Ok(fut.into())
}

async fn read_parquet_async_inner(
store: Arc<dyn object_store::ObjectStore>,
path: String,
) -> Arro3IoResult<PyTable> {
use futures::TryStreamExt;
use parquet::arrow::ParquetRecordBatchStreamBuilder;

let object_reader = ParquetObjectReader::new(store, path.into());
let builder = ParquetRecordBatchStreamBuilder::new(object_reader).await?;

let metadata = builder.schema().metadata().clone();
let reader = builder.build()?;

let arrow_schema = Arc::new(reader.schema().as_ref().clone().with_metadata(metadata));

let batches = reader.try_collect::<Vec<_>>().await?;
Ok(PyTable::try_new(batches, arrow_schema)?)
}
7 changes: 7 additions & 0 deletions arro3-io/src/parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#[cfg(feature = "async")]
mod r#async;
mod sync;

#[cfg(feature = "async")]
pub(crate) use r#async::read_parquet_async;
pub(crate) use sync::{read_parquet, write_parquet};
38 changes: 1 addition & 37 deletions arro3-io/src/parquet.rs → arro3-io/src/parquet/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::sync::Arc;
use arrow_array::{RecordBatchIterator, RecordBatchReader};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::arrow_writer::ArrowWriterOptions;
use parquet::arrow::async_reader::ParquetObjectReader;
use parquet::arrow::ArrowWriter;
use parquet::basic::{Compression, Encoding};
use parquet::file::properties::{WriterProperties, WriterVersion};
Expand All @@ -16,10 +15,8 @@ use pyo3::prelude::*;
use pyo3_arrow::error::PyArrowResult;
use pyo3_arrow::export::Arro3RecordBatchReader;
use pyo3_arrow::input::AnyRecordBatch;
use pyo3_arrow::{PyRecordBatchReader, PyTable};
use pyo3_object_store::PyObjectStore;
use pyo3_arrow::PyRecordBatchReader;

use crate::error::Arro3IoResult;
use crate::utils::{FileReader, FileWriter};

#[pyfunction]
Expand All @@ -44,39 +41,6 @@ pub fn read_parquet(file: FileReader) -> PyArrowResult<Arro3RecordBatchReader> {
Ok(PyRecordBatchReader::new(iter).into())
}

#[pyfunction]
#[pyo3(signature = (path, *, store))]
pub fn read_parquet_async(
py: Python,
path: String,
store: PyObjectStore,
) -> PyArrowResult<PyObject> {
let fut = pyo3_async_runtimes::tokio::future_into_py(py, async move {
Ok(read_parquet_async_inner(store.into_inner(), path).await?)
})?;

Ok(fut.into())
}

async fn read_parquet_async_inner(
store: Arc<dyn object_store::ObjectStore>,
path: String,
) -> Arro3IoResult<PyTable> {
use futures::TryStreamExt;
use parquet::arrow::ParquetRecordBatchStreamBuilder;

let object_reader = ParquetObjectReader::new(store, path.into());
let builder = ParquetRecordBatchStreamBuilder::new(object_reader).await?;

let metadata = builder.schema().metadata().clone();
let reader = builder.build()?;

let arrow_schema = Arc::new(reader.schema().as_ref().clone().with_metadata(metadata));

let batches = reader.try_collect::<Vec<_>>().await?;
Ok(PyTable::try_new(batches, arrow_schema)?)
}

pub(crate) struct PyWriterVersion(WriterVersion);

impl<'py> FromPyObject<'py> for PyWriterVersion {
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dev-dependencies = [
"pandas>=2.2.3",
"pip>=24.2",
"pyarrow>=17.0.0",
"pyodide-build>=0.25.1",
"pytest>=8.3.3",
]

Expand Down
Loading
Loading