Skip to content

Conversation

@kylebarron
Copy link
Owner

@kylebarron kylebarron commented Mar 25, 2025

Change list

  • Add ParquetFile class that can wrap either a sync or an async source.
  • Add ParquetFile.read, which reads from a local or remote source, always synchronously.
    • For local sources, it's easy to share the record batch stream via FFI. For remote sources that are async, I previously didn't know if this was possible, but I was able to wrap an async source in a BlockingAsyncParquetReader that appears to correctly compile, so we should be able to expose an async source as a synchronous Python API. This means that each record batch will be fetched synchronously (but there could be multiple concurrent reads in the process of reading that record batch).
  • Add ParquetFile.read_async, which reads from a local (tbd, but should be easy) or remote source, and exposes an async iterator. This async iterator can't be exchanged via FFI, but Python can await on these async functions and then the result of the future can be exchanged to Python via FFI.
  • Handle exposing async ParquetRecordBatchStream as a blocking Python iterator (and actually via C FFI!!) so that we can provide a clean synchronous API for remote files.
  • Add Python type hints for ParquetFile

TODO:

  • Implement constructors for ParquetFile (e7004d7 (#313))
  • Implement AsyncFileReader for SyncSource to allow read_async from local file (e03ab04 (#313))
  • Support RowFilter: Prototype letting user pass an Arrow UDF for ArrowPredicate. Now that I've learned how to have these Python user callbacks in obstore, we can apply it here. Then the user can pass in a list of these predicates as a RowFilter. (ba671ab (#313))
    • Do we even need a geo-specific GeoParquet reader library if we can efficiently implement RowFilter here...?
  • Settle on naming for projection vs columns ✅
  • Future read_table API that reads multiple row groups concurrently (see next_row_group and this example in the tests and this PR)
  • Support RowSelection (this can be left for a future PR)
  • Future: Implement ParquetDataset
  • Future: deprecate read_parquet_async?
  • Examples with polars, pyarrow creating predicates for predicate pushdown
  • Write documentation comparing our filters to pyarrow filters.
  • Document that you must supply both row_groups and filters for best predicate pushdown
  • API documentation

Example

from arro3.io import ParquetFile
from obstore.store import S3Store

store = S3Store("overturemaps-us-west-2", region="us-west-2", skip_signature=True)

path = "release/2024-03-12-alpha.0/theme=buildings/type=building/part-00217-4dfc75cd-2680-4d52-b5e0-f4cc9f36b267-c000.zstd.parquet"
file = await ParquetFile.open_async(path, store=store)
stream = file.read_async(batch_size=128000)
i = 0
async for record_batch in stream:
    i += 1
    print(f"iteration: {i}")
    print(record_batch)

Ref #258 (comment) for earlier example.

Closes #258, closes #195

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Parquet Feature requests

2 participants