Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
rustflags=["-D", "warnings", "-W", "unreachable-pub", "-W", "bare-trait-objects"]
12 changes: 12 additions & 0 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1962,6 +1962,18 @@ class PyStore:
def supports_deletes(self) -> bool: ...
async def set(self, key: str, value: bytes) -> None: ...
async def set_if_not_exists(self, key: str, value: bytes) -> None: ...
def get_virtual_ref(
self, key: str
) -> tuple[str, int, int, str | None, datetime.datetime | None] | None: ...
async def get_virtual_ref_async(
self, key: str
) -> tuple[str, int, int, str | None, datetime.datetime | None] | None: ...
def all_virtual_refs(
self,
) -> list[tuple[str, str, int, int, str | None, datetime.datetime | None]]: ...
async def all_virtual_refs_async(
self,
) -> list[tuple[str, str, int, int, str | None, datetime.datetime | None]]: ...
def set_virtual_ref(
self,
key: str,
Expand Down
1 change: 1 addition & 0 deletions icechunk-python/python/icechunk/session.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import contextlib
import warnings
from datetime import datetime
from collections.abc import AsyncIterator, Callable, Generator, Iterable, Sequence
from typing import Any, NoReturn, Self

Expand Down
90 changes: 90 additions & 0 deletions icechunk-python/python/icechunk/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,96 @@ async def set_if_not_exists(self, key: str, value: Buffer) -> None:
"""
return await self._store.set_if_not_exists(key, value.to_bytes())

def get_virtual_ref(
self, key: str
) -> tuple[str, int, int, str | datetime | None] | None:
"""Get a virtual reference for a chunk.

Parameters
----------
key : str
The chunk to retrieve the reference for. This is the fully qualified zarr key eg: 'array/c/0/0/0'

Returns
-------
tuple[str, int, int, str | datetime | None] | None
A tuple of (location, offset, length, checksum) if the chunk has a virtual reference, None otherwise.
The checksum will be either an etag string or a datetime object.
"""
result = self._store.get_virtual_ref(key)
if result is None:
return None
location, offset, length, etag, last_modified = result
checksum = etag if etag is not None else last_modified
return (location, offset, length, checksum)

async def get_virtual_ref_async(
self, key: str
) -> tuple[str, int, int, str | datetime | None] | None:
"""Get a virtual reference for a chunk asynchronously.

Parameters
----------
key : str
The chunk to retrieve the reference for. This is the fully qualified zarr key eg: 'array/c/0/0/0'

Returns
-------
tuple[str, int, int, str | datetime | None] | None
A tuple of (location, offset, length, checksum) if the chunk has a virtual reference, None otherwise.
The checksum will be either an etag string or a datetime object.
"""
result = await self._store.get_virtual_ref_async(key)
if result is None:
return None
location, offset, length, etag, last_modified = result
checksum = etag if etag is not None else last_modified
return (location, offset, length, checksum)

def all_virtual_refs(
self,
) -> list[tuple[str, str, int, int, str | datetime | None]]:
"""
Return all virtual references in the store.

Returns
-------
list[tuple[str, str, int, int, str | datetime | None]]
A list of tuples containing:
- zarr_key: The full zarr key (e.g., "array/c/0/0/1")
- location: The storage location URL
- offset: Byte offset in the file
- length: Length in bytes
- checksum: Either an etag string or datetime, or None
"""
result = self._store.all_virtual_refs()
return [
(key, location, offset, length, etag if etag is not None else last_modified)
for key, location, offset, length, etag, last_modified in result
]

async def all_virtual_refs_async(
self,
) -> list[tuple[str, str, int, int, str | datetime | None]]:
"""
Return all virtual references in the store (async version).

Returns
-------
list[tuple[str, str, int, int, str | datetime | None]]
A list of tuples containing:
- zarr_key: The full zarr key (e.g., "array/c/0/0/1")
- location: The storage location URL
- offset: Byte offset in the file
- length: Length in bytes
- checksum: Either an etag string or datetime, or None
"""
result = await self._store.all_virtual_refs_async()
return [
(key, location, offset, length, etag if etag is not None else last_modified)
for key, location, offset, length, etag, last_modified in result
]

def set_virtual_ref(
self,
key: str,
Expand Down
3 changes: 2 additions & 1 deletion icechunk-python/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use async_stream::try_stream;
use futures::{StreamExt, TryStreamExt};
use icechunk::{
Store,
format::{ChunkIndices, Path, manifest::ChunkPayload},
format::manifest::ChunkPayload,
format::{ChunkIndices, Path},
session::{Session, SessionErrorKind},
store::{StoreError, StoreErrorKind},
};
Expand Down
140 changes: 140 additions & 0 deletions icechunk-python/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ use crate::{
};

type KeyRanges = Vec<(String, (Option<ChunkOffset>, Option<ChunkOffset>))>;
type VirtualRefResult = Option<(
String,
ChunkOffset,
ChunkLength,
Option<String>,
Option<chrono::DateTime<Utc>>,
)>;
type VirtualRefTuple =
(String, String, u64, u64, Option<String>, Option<chrono::DateTime<Utc>>);

#[derive(FromPyObject, Clone, Debug)]
enum ChecksumArgument {
Expand Down Expand Up @@ -304,6 +313,137 @@ impl PyStore {
})
}

fn get_virtual_ref(
&self,
py: Python<'_>,
key: String,
) -> PyIcechunkStoreResult<VirtualRefResult> {
py.detach(move || {
let store = Arc::clone(&self.0);

pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
let vref = store
.get_virtual_ref(&key)
.await
.map_err(PyIcechunkStoreError::from)?;

Ok(vref.map(|vref| {
let location = vref.location.url().to_string();
let (etag, last_modified) = match vref.checksum {
Some(Checksum::ETag(etag)) => (Some(etag.0), None),
Some(Checksum::LastModified(secs)) => (
None,
Some(
chrono::DateTime::from_timestamp(secs.0 as i64, 0)
.unwrap_or_default(),
),
),
None => (None, None),
};
(location, vref.offset, vref.length, etag, last_modified)
}))
})
})
}

fn get_virtual_ref_async<'py>(
&'py self,
py: Python<'py>,
key: String,
) -> PyResult<Bound<'py, PyAny>> {
let store = Arc::clone(&self.0);
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let vref =
store.get_virtual_ref(&key).await.map_err(PyIcechunkStoreError::from)?;

Ok(vref.map(|vref| {
let location = vref.location.url().to_string();
let (etag, last_modified) = match vref.checksum {
Some(Checksum::ETag(etag)) => (Some(etag.0), None),
Some(Checksum::LastModified(secs)) => (
None,
Some(
chrono::DateTime::from_timestamp(secs.0 as i64, 0)
.unwrap_or_default(),
),
),
None => (None, None),
};
(location, vref.offset, vref.length, etag, last_modified)
}))
})
}

fn all_virtual_refs(
&self,
py: Python<'_>,
) -> PyIcechunkStoreResult<Vec<VirtualRefTuple>> {
py.detach(move || {
let store = Arc::clone(&self.0);

pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
let refs =
store.all_virtual_refs().await.map_err(PyIcechunkStoreError::from)?;

let res: Vec<_> = refs
.into_iter()
.map(|(key, vref)| {
let location = vref.location.url().to_string();
let (etag, last_modified) = match vref.checksum {
Some(Checksum::ETag(etag)) => (Some(etag.0), None),
Some(Checksum::LastModified(secs)) => (
None,
Some(
chrono::DateTime::from_timestamp(secs.0 as i64, 0)
.unwrap_or_default(),
),
),
None => (None, None),
};
(key, location, vref.offset, vref.length, etag, last_modified)
})
.collect();

Ok(res)
})
})
}

fn all_virtual_refs_async<'py>(
&'py self,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let store = Arc::clone(&self.0);
pyo3_async_runtimes::tokio::future_into_py::<_, Vec<VirtualRefTuple>>(
py,
async move {
let refs =
store.all_virtual_refs().await.map_err(PyIcechunkStoreError::from)?;

let res: Vec<_> = refs
.into_iter()
.map(|(key, vref)| {
let location = vref.location.url().to_string();
let (etag, last_modified) = match vref.checksum {
Some(Checksum::ETag(etag)) => (Some(etag.0), None),
Some(Checksum::LastModified(secs)) => (
None,
Some(
chrono::DateTime::from_timestamp(secs.0 as i64, 0)
.unwrap_or_default(),
),
),
None => (None, None),
};
(key, location, vref.offset, vref.length, etag, last_modified)
})
.collect();

Ok(res)
},
)
}

#[allow(clippy::too_many_arguments)]
fn set_virtual_ref(
&self,
Expand Down
58 changes: 58 additions & 0 deletions icechunk-python/tests/test_virtual_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,30 @@ async def test_write_minio_virtual_refs(
validate_container=True,
)

vref_0_0_0 = await store.get_virtual_ref_async("c/0/0/0")
assert vref_0_0_0 is not None
assert vref_0_0_0[0] == f"s3://testbucket/{prefix}/chunk-1"
assert vref_0_0_0[1] == 0
assert vref_0_0_0[2] == 4

vref_0_0_1 = await store.get_virtual_ref_async("c/0/0/1")
assert vref_0_0_1 is not None
assert vref_0_0_1[0] == f"s3://testbucket/{prefix}/chunk-2"
assert vref_0_0_1[1] == 1
assert vref_0_0_1[2] == 4

vref_none = await store.get_virtual_ref_async("c/0/0/3")
assert vref_none is None

all_refs = await store.all_virtual_refs_async()
all_refs_dict = {
key: (location, offset, length) for key, location, offset, length, _ in all_refs
}

assert len(all_refs_dict) >= 2
assert all_refs_dict.get("c/0/0/0") == (f"s3://testbucket/{prefix}/chunk-1", 0, 4)
assert all_refs_dict.get("c/0/0/1") == (f"s3://testbucket/{prefix}/chunk-2", 1, 4)

buffer_prototype = zarr.core.buffer.default_buffer_prototype()

first = await store.get("c/0/0/0", prototype=buffer_prototype)
Expand Down Expand Up @@ -287,6 +311,27 @@ async def test_public_virtual_refs(
length=288,
)

if use_async:
vref = await store.get_virtual_ref_async("year/c/0")
else:
vref = store.get_virtual_ref("year/c/0")

assert vref is not None
assert vref[0].endswith("/netcdf/oscar_vel2018.nc")
assert vref[1] == 22306
assert vref[2] == 288

if use_async:
all_refs = await store.all_virtual_refs_async()
else:
all_refs = store.all_virtual_refs()

assert len(all_refs) == 1
assert all_refs[0][0] == "year/c/0"
assert all_refs[0][1].endswith("/netcdf/oscar_vel2018.nc")
assert all_refs[0][2] == 22306
assert all_refs[0][3] == 288

nodes = [n async for n in store.list()]
assert "year/c/0" in nodes

Expand Down Expand Up @@ -400,6 +445,12 @@ def test_error_on_nonexisting_virtual_chunk_container(
],
)

vref = store.get_virtual_ref("c/0")
assert vref is not None
assert vref[0] == "file:///foo"
assert vref[1] == 0
assert vref[2] == 4

with pytest.raises(
IcechunkError, match=r"file:///foo.* edit the repository configuration"
):
Expand Down Expand Up @@ -439,6 +490,13 @@ def test_error_on_non_authorized_virtual_chunk_container(
],
)

# don't need authorization just to view what the virtual chunk is, only to fetch from that location
vref = store.get_virtual_ref("c/0")
assert vref is not None
assert vref[0] == "file:///foo/bar"
assert vref[1] == 0
assert vref[2] == 4

with pytest.raises(IcechunkError, match=r"file:///foo.*authorize"):
array[0]

Expand Down
Loading
Loading