add HF dataset plugin#992
Conversation
Add a new plugin that provides native support for HuggingFace datasets.Dataset as a Flyte DataFrame type, enabling seamless serialization/deserialization through Parquet format. Features: - DataFrameEncoder/Decoder for datasets.Dataset <-> Parquet - Cloud storage support (S3, GCS, Azure) via fsspec storage options - Anonymous S3 fallback for public datasets - Column filtering on both encode and decode - Auto-registration via flyte.plugins.types entry point Signed-off-by: André Ahlert <[email protected]>
Signed-off-by: André Ahlert <[email protected]>
… infra - Use storage.get_configured_fsspec_kwargs() instead of get_storage() (fix review) - Add [tool.uv.sources] flyte editable for dev (match Anthropic/OpenAI) - Conftest: use LocalDB._get_db_path and reset _conn (match Polars after main) - Tests: patch flyte.storage._storage.get_storage; run.outputs()[0]; skip empty dataset to avoid CI flakiness Signed-off-by: André Ahlert <[email protected]>
Signed-off-by: André Ahlert <[email protected]>
…, DataFrame Signed-off-by: André Ahlert <[email protected]>
Renamed get_hf_storage_options to _get_storage_options (no public API exposure). Removed column filtering from encode to match the Polars plugin pattern. Removed misleading backwards-compatibility comment on module-level registration. Synced conftest cache isolation to use LocalDB._get_db_path. Signed-off-by: André Ahlert <[email protected]>
Replace datasets.Dataset.from_parquet with pq.read_table + datasets.Dataset(table). from_parquet routes through HuggingFace's DownloadManager which caches files locally under ~/.cache/huggingface/datasets/ before reading. For Flyte-managed storage this is wasteful (double I/O) and bypasses the fsspec filesystem we already have configured. Using pq.read_table with the Flyte filesystem reads directly from storage with no intermediate cache, removes the NoCredentialsError anonymous fallback, and avoids relying on storage_options flowing through **kwargs in from_parquet. Signed-off-by: André Ahlert <[email protected]>
…lesystem Both encode and decode now use pq.write_table/pq.read_table with the Flyte filesystem directly, removing the asymmetry where encode went through HuggingFace's to_parquet (fsspec.open) and decode used pyarrow. Removes _get_storage_options entirely along with its 8 unit tests. Enables the empty dataset test that was previously skipped due to from_parquet not handling empty parquet files. Signed-off-by: André Ahlert <[email protected]>
Signed-off-by: André Ahlert <[email protected]>
flyte-sdk 2.0 is stable, no need for pre-release flag. Signed-off-by: Andre Ahlert <[email protected]> Signed-off-by: André Ahlert <[email protected]>
Signed-off-by: Andre Ahlert <[email protected]> Signed-off-by: André Ahlert <[email protected]>
lazy_module was added to flyte.extend in main, resolving the review comment about avoiding _ imports. Signed-off-by: Andre Ahlert <[email protected]> Signed-off-by: André Ahlert <[email protected]>
…xample Add hf_dataset() prefetch function that streams parquet files from HuggingFace Hub directly to Flyte remote storage, following the same pattern as flyte.prefetch.hf_model(). Includes fallback to snapshot download if streaming fails. Switch encoder to batched parquet write via pq.ParquetWriter to avoid materializing the entire table in memory during write. Add workflow example showing prefetch + dataset processing between tasks. Signed-off-by: André Ahlert <[email protected]>
- ParquetWriter now wrapped in try/finally to prevent resource leak on error - Fix temp dir leak in _download_dataset_to_local (flat_dir now managed via TemporaryDirectory context manager) - Remove unused commit SHA fetch from _stream_dataset_to_remote - Add input validation for name and split params in hf_dataset() - Narrow except clause from bare Exception to (OSError, FileNotFoundError) - Rename store_hf_dataset_task to _store_hf_dataset_task (private convention) - Add asyncio_mode = auto to pytest config Signed-off-by: André Ahlert <[email protected]>
- Fix multi-split filename collision: when split=None, parquet files from different splits now go into separate subdirectories (train/, test/, etc.) instead of all writing to the root and overwriting each other - Rework example to use Dir.walk() + pq.read_table inside a task instead of calling data_dir.path directly, demonstrating the proper prefetch-to-task flow - Add HF_HUB_ENABLE_HF_TRANSFER=1 env var to prefetch image (was installing hf-transfer but never activating it) - Add HF_TOKEN warning when token is not set - Rename _store_hf_dataset_task back to store_hf_dataset_task to match the hf_model prefetch convention - Expand test coverage: streaming single/multi split, download fallback, store_hf_dataset_task integration - Update README to document both prefetch and type transformer Signed-off-by: André Ahlert <[email protected]>
…multi-split collision - Remove revision parameter from all signatures and HuggingFaceDatasetInfo. The plugin always uses refs/convert/parquet (HF's auto-converted branch), so the param was silently ignored. - Fix _download_dataset_to_local to preserve relative directory structure instead of flattening, preventing filename collision when split=None downloads train/0000.parquet and test/0000.parquet. - Add test_download_dataset_multi_split_no_collision to verify the fix. Signed-off-by: André Ahlert <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
andreahlert
left a comment
There was a problem hiding this comment.
a few questions. But after all, it has an overall better DX than my version. tks!
| readme = "README.md" | ||
| authors = [{ name = "André Ahlert", email = "[email protected]" }, { name = "Samhita Alla", email = "[email protected]" }] | ||
| requires-python = ">=3.10" | ||
| dependencies = [ |
There was a problem hiding this comment.
dropping hf-transfer is fine (HfFileSystem doesn't use it anyway), but the hf_hub_download fallback went with it. was killing the snapshot-download escapehatch intentional?
There was a problem hiding this comment.
since we're using HFS, we don't need it anymore.
| HuggingFaceDatasetToParquetEncodingHandler, | ||
| HuggingFaceIterableDatasetToParquetEncodingHandler, | ||
| ParquetToHuggingFaceDatasetDecodingHandler, | ||
| ParquetToHuggingFaceIterableDatasetDecodingHandler, |
There was a problem hiding this comment.
triple registration, entry point + both init.pys. lru_cache saves it but any of the three can go, right?
| ParquetToHuggingFaceIterableDatasetDecodingHandler, |
There was a problem hiding this comment.
thanks for the catch! we have init registration as a fallback to support backward compatibility, but i think we can remove it in this case.
| cache_key = hf_source_cache_key(source, shards) | ||
| expected_manifest = hf_cache_manifest(source, shards, cache_key) | ||
|
|
||
| if source.cache_root is None: |
There was a problem hiding this comment.
with cache_root=None the shards/sha256/manifest work runs and gets thrown away, every decode re-downloads to a new local dir. was cache-root meant to be strictly opt-in, or would defaulting to a raw_data remote path make sense?
There was a problem hiding this comment.
opt-in for now as we don't support artifacts in v2 yet. even if we default to a raw_data remote path, it won't persist across runs. so it isn't really helpful. hope that makes sense.
| ) | ||
| registry_record = await read_registry_record(source, cache_key) | ||
| remote_path = ( | ||
| registry_record.get("artifact_uri", default_remote_path) if registry_record is not None else default_remote_path |
There was a problem hiding this comment.
artifact_uri only matters on hit, the miss branch resets to default_remote_path at 504. migration case in mind, but i really dont know.. maybe can the indirection come out
There was a problem hiding this comment.
you're right. we don't really need artifact_uri. removed it.
| import typing | ||
| from pathlib import Path | ||
|
|
||
| import datasets |
There was a problem hiding this comment.
imho back to lazy_module("datasets")? entry-point discovery triggers this and drags pyarrow/numpy/fsspec in for users who never touch the plugin. Is there any other reason i'm not seeing here?
There was a problem hiding this comment.
moved imports inside register_huggingface_dataset_transformers() to avoid eager imports. think that should do.
| ) | ||
|
|
||
| parquet_files = await list_parquet_files(uri, filesystem) | ||
| parquet_files = await _localize_parquet_files(parquet_files) |
There was a problem hiding this comment.
localizing full shards before pyarrow kills projection/row-group pushdown (pq.read_table(..., filesystem=fs, columns=...) range-requests only what it needs). was there a case where fsspec+pyarrow broke down for you?
this one is not a problem. Just questioning to for learning purposes :)
There was a problem hiding this comment.
actually, i could use fsspec with pyarrow. i initially ran into remote storage auth-related errors, so i thought this would make things easier. but you’re right, i could use flyte’s storage abstraction to get the filesystem and read remote parquet files directly.
| from flyte._image import PythonWheels | ||
| from flyte.io import DataFrame | ||
|
|
||
| from flyteplugins.huggingface import from_hf |
There was a problem hiding this comment.
is there ever a world where we also want to support download models? In that case would be future-proof to rename this dataset_from_hf or expose a module flyteplugins.huggingface.datasets.from_hf so that we can also do flyteplugins.huggingface.models.from_hf
There was a problem hiding this comment.
makes sense! it’s now exposed in the module so we avoid the risk of collisions, and each namespace owns its own registration.
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
This PR adds a Hugging Face datasets plugin for Flyte. It lets tasks consume Hub datasets directly via
from_hf(...)and transparently materializes them asdatasets.Datasetordatasets.IterableDatasetwith optional shared caching throughcache_root.The plugin streams converted parquet shards from the Hub, stores them as sharded parquet artifacts, and reuses cached materializations across runs when available.
You can also return and pass real
datasets.Datasetobjects between tasks: