-
Notifications
You must be signed in to change notification settings - Fork 12
feat: Implement support for multiple storage backends #99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
83e005d
4c2e1e3
e737b82
60b6b6a
352f05f
f4ef143
ca0b1db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,14 +32,55 @@ jobs: | |
- name: pre-commit | ||
run: pixi run pre-commit-run --color=always --show-diff-on-failure | ||
|
||
unit-tests: | ||
unit-tests-linux: | ||
name: Unit Tests (${{ matrix.os == 'ubuntu-latest' && 'Linux' || 'Windows' }}) - ${{ matrix.environment }} | ||
timeout-minutes: 30 | ||
runs-on: ${{ matrix.os }} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need the matrix here at all? |
||
strategy: | ||
fail-fast: true | ||
matrix: | ||
os: [ubuntu-latest, windows-latest] | ||
os: [ubuntu-latest] | ||
environment: [py310, py311, py312, py313] | ||
services: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Services are only supported on linux OS in Github CI. I didnt find a way to dry, so I just created two jobs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Out of curiosity: did you try using |
||
minio: | ||
# we use the bitnami/minio image to avoid the need to set up the minio server | ||
# through a 'command' section, which is not supported by GitHub Actions. | ||
image: bitnami/minio:latest | ||
ports: | ||
- 9000:9000 | ||
env: | ||
MINIO_ROOT_USER: minioadmin | ||
MINIO_ROOT_PASSWORD: minioadmin | ||
options: >- | ||
--name minio | ||
steps: | ||
- name: Checkout branch | ||
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 | ||
- name: Set up pixi | ||
uses: prefix-dev/setup-pixi@14c8aabd75893f83f4ab30c03e7cf853c8208961 # v0.8.10 | ||
with: | ||
environments: ${{ matrix.environment }} | ||
- name: Install repository | ||
run: pixi run -e ${{ matrix.environment }} postinstall | ||
- name: Run pytest | ||
run: pixi run -e ${{ matrix.environment }} test-coverage --color=yes | ||
env: | ||
AWS_SECRET_ACCESS_KEY: minioadmin | ||
AWS_ACCESS_KEY_ID: minioadmin | ||
AWS_ENDPOINT_URL: http://127.0.0.1:9000 | ||
- name: Upload codecov | ||
uses: codecov/codecov-action@18283e04ce6e62d37312384ff67231eb8fd56d24 # v5.4.3 | ||
with: | ||
files: ./coverage.xml | ||
token: ${{ secrets.CODECOV_TOKEN }} | ||
unit-tests-windows: | ||
name: Unit Tests (Windows) - ${{ matrix.environment }} | ||
timeout-minutes: 30 | ||
runs-on: ${{ matrix.os }} | ||
strategy: | ||
fail-fast: true | ||
matrix: | ||
os: [windows-latest] | ||
environment: [py310, py311, py312, py313] | ||
steps: | ||
- name: Checkout branch | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -14,13 +14,47 @@ permissions: | |||||
|
||||||
jobs: | ||||||
unit-tests: | ||||||
name: Unit Tests (${{ matrix.os == 'ubuntu-latest' && 'Linux' || 'Windows' }}) | ||||||
name: Unit Tests (Linuxs) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
timeout-minutes: 30 | ||||||
runs-on: ${{ matrix.os }} | ||||||
runs-on: ubuntu-latest | ||||||
services: | ||||||
minio: | ||||||
# we use the bitnami/minio image to avoid the need to set up the minio server | ||||||
# through a 'command' section, which is not supported by GitHub Actions. | ||||||
image: bitnami/minio:latest | ||||||
ports: | ||||||
- 9000:9000 | ||||||
env: | ||||||
MINIO_ROOT_USER: minioadmin | ||||||
MINIO_ROOT_PASSWORD: minioadmin | ||||||
options: >- | ||||||
--name minio | ||||||
strategy: | ||||||
fail-fast: true | ||||||
steps: | ||||||
- name: Checkout branch | ||||||
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 | ||||||
- name: Set up pixi | ||||||
uses: prefix-dev/setup-pixi@14c8aabd75893f83f4ab30c03e7cf853c8208961 # v0.8.10 | ||||||
with: | ||||||
environments: nightly | ||||||
- name: Install polars nightly | ||||||
run: pixi run -e nightly install-polars-nightly | ||||||
- name: Install repository | ||||||
run: pixi run -e nightly postinstall | ||||||
- name: Run pytest | ||||||
run: pixi run -e nightly test-coverage --color=yes | ||||||
env: | ||||||
AWS_SECRET_ACCESS_KEY: minioadmin | ||||||
AWS_ACCESS_KEY_ID: minioadmin | ||||||
AWS_ENDPOINT_URL: http://127.0.0.1:9000 | ||||||
|
||||||
unit-tests-windows: | ||||||
name: Unit Tests (Windows) | ||||||
timeout-minutes: 30 | ||||||
runs-on: windows-latest | ||||||
strategy: | ||||||
fail-fast: true | ||||||
matrix: | ||||||
os: [ubuntu-latest, windows-latest] | ||||||
steps: | ||||||
- name: Checkout branch | ||||||
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
from pathlib import Path | ||
from typing import Annotated, Any, cast | ||
|
||
import fsspec | ||
import polars as pl | ||
import polars.exceptions as plexc | ||
|
||
|
@@ -678,15 +679,21 @@ def sink_parquet(self, directory: str | Path, **kwargs: Any) -> None: | |
self._to_parquet(directory, sink=True, **kwargs) | ||
|
||
def _to_parquet(self, directory: str | Path, *, sink: bool, **kwargs: Any) -> None: | ||
path = Path(directory) if isinstance(directory, str) else directory | ||
path.mkdir(parents=True, exist_ok=True) | ||
with open(path / "schema.json", "w") as f: | ||
directory = str(directory) | ||
fs, _ = fsspec.url_to_fs(directory) | ||
assert isinstance(fs, fsspec.AbstractFileSystem) | ||
directory = directory.rstrip(fs.sep) | ||
fs.makedirs(directory, exist_ok=True) | ||
|
||
with fs.open(f"{directory}{fs.sep}schema.json", "w") as f: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to ensure that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done with |
||
f.write(self.serialize()) | ||
|
||
member_schemas = self.member_schemas() | ||
for key, lf in self.to_dict().items(): | ||
destination = ( | ||
path / key if "partition_by" in kwargs else path / f"{key}.parquet" | ||
f"{directory}{fs.sep}{key}" | ||
if "partition_by" in kwargs | ||
else f"{directory}{fs.sep}{key}.parquet" | ||
) | ||
if sink: | ||
member_schemas[key].sink_parquet( | ||
|
@@ -751,9 +758,8 @@ def read_parquet( | |
Be aware that this method suffers from the same limitations as | ||
:meth:`serialize`. | ||
""" | ||
path = Path(directory) | ||
data = cls._from_parquet(path, scan=True, **kwargs) | ||
if not cls._requires_validation_for_reading_parquets(path, validation): | ||
data = cls._from_parquet(directory, scan=True, **kwargs) | ||
if not cls._requires_validation_for_reading_parquets(directory, validation): | ||
cls._validate_input_keys(data) | ||
return cls._init(data) | ||
return cls.validate(data, cast=True) | ||
|
@@ -812,20 +818,19 @@ def scan_parquet( | |
Be aware that this method suffers from the same limitations as | ||
:meth:`serialize`. | ||
""" | ||
path = Path(directory) | ||
data = cls._from_parquet(path, scan=True, **kwargs) | ||
if not cls._requires_validation_for_reading_parquets(path, validation): | ||
data = cls._from_parquet(directory, scan=True, **kwargs) | ||
if not cls._requires_validation_for_reading_parquets(directory, validation): | ||
cls._validate_input_keys(data) | ||
return cls._init(data) | ||
return cls.validate(data, cast=True) | ||
|
||
@classmethod | ||
def _from_parquet( | ||
cls, path: Path, scan: bool, **kwargs: Any | ||
cls, path: str | Path, scan: bool, **kwargs: Any | ||
) -> dict[str, pl.LazyFrame]: | ||
data = {} | ||
for key in cls.members(): | ||
if (source_path := cls._member_source_path(path, key)) is not None: | ||
if (source_path := cls._member_source_path(str(path), key)) is not None: | ||
data[key] = ( | ||
pl.scan_parquet(source_path, **kwargs) | ||
if scan | ||
|
@@ -834,29 +839,36 @@ def _from_parquet( | |
return data | ||
|
||
@classmethod | ||
def _member_source_path(cls, base_path: Path, name: str) -> Path | None: | ||
if (path := base_path / name).exists() and base_path.is_dir(): | ||
def _member_source_path(cls, base_path: str, name: str) -> str | None: | ||
fs, _ = fsspec.url_to_fs(base_path) | ||
base_path = base_path.rstrip(fs.sep) | ||
assert isinstance(fs, fsspec.AbstractFileSystem) | ||
|
||
if fs.exists(path := f"{base_path}{fs.sep}{name}") and fs.isdir(path): | ||
# We assume that the member is stored as a hive-partitioned dataset | ||
return path | ||
if (path := base_path / f"{name}.parquet").exists(): | ||
if fs.exists(path := f"{base_path}{fs.sep}{name}.parquet"): | ||
# We assume that the member is stored as a single parquet file | ||
return path | ||
return None | ||
|
||
@classmethod | ||
def _requires_validation_for_reading_parquets( | ||
cls, | ||
directory: Path, | ||
directory: str | Path, | ||
validation: Validation, | ||
) -> bool: | ||
if validation == "skip": | ||
return False | ||
|
||
directory = str(directory) | ||
fs, _ = fsspec.url_to_fs(directory) | ||
directory = directory.rstrip(fs.sep) | ||
assert isinstance(fs, fsspec.AbstractFileSystem) | ||
# First, we check whether the path provides the serialization of the collection. | ||
# If it does, we check whether it matches this collection. If it does, we assume | ||
# that the data adheres to the collection and we do not need to run validation. | ||
if (json_serialization := directory / "schema.json").exists(): | ||
metadata = json_serialization.read_text() | ||
if fs.exists(json_serialization := f"{directory}{fs.sep}schema.json"): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s.a. I think we need to make sure that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above |
||
metadata = fs.read_text(json_serialization) | ||
serialized_collection = deserialize_collection(metadata) | ||
if cls.matches(serialized_collection): | ||
return False | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.