Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 164 additions & 26 deletions src/cellink/_core/donordata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dataclasses import dataclass

import h5py
import numpy as np
import pandas as pd
import scanpy as sc
import zarr
Expand All @@ -22,6 +23,19 @@

logger = logging.getLogger(__name__)


def _has_dask_X(adata) -> bool:
"""Check if an AnnData has a dask-backed X matrix (lazy loading).

Safe for MuData / non-AnnData inputs (returns False).
"""
try:
import dask.array as da
except ImportError:
return False
return isinstance(getattr(adata, "X", None), da.Array)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to also check for CSR and CSC data sets



HIGHLIGHT_COLOR = "bold deep_pink2"


Expand Down Expand Up @@ -54,19 +68,38 @@ def __init__(
):
if donor_id not in C.obs.columns:
raise ValueError(f"'{donor_id}' not found in C.obs")
if donor_id not in G.obs.columns and donor_id != G.obs.index.name:
raise ValueError(f"'{donor_id}' must be in gdata.obs or set as index")
if donor_id != G.obs.index.name:
G.obs = G.obs.set_index(donor_id)

if isinstance(G.obs, pd.DataFrame):
if donor_id not in G.obs.columns and donor_id != G.obs.index.name:
raise ValueError(f"'{donor_id}' must be in gdata.obs or set as index")
if donor_id != G.obs.index.name:
G.obs = G.obs.set_index(donor_id)
else:
# Lazy AnnData (e.g. from read_lazy) — obs is xarray-backed.
# donor_id must already be the obs index (obs_names).
logger.debug("G.obs is not a pandas DataFrame; assuming donor_id is the obs index.")

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I have never seen an example where G.obs is not a pd.DataFrame? Or is it only a pd.Index then? If not, what else? Should we cast it to a pd.Index then?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have an entire zarr based ann data and you don't load everything into mem (i.e., lazy loading and eager_obs_var = False) then it won't be a pandas data frame but just an xarray data set


self._var_dims_to_sync = [] if var_dims_to_sync is None else var_dims_to_sync
self.donor_id = donor_id
# Deferred obs filter for lazy G: applying fancy row-indexing on a dask array
# before var-slicing forces materialising a large intermediate. We store the
# desired donor subset here and apply it in-memory inside to_memory(), AFTER
# the cheap var-slice has already narrowed the columns.
self._lazy_G_obs_filter: pd.Index | None = None
# Same deferral idea for lazy C — but stores the *donors to keep*; the
# cell-level boolean mask & sort are computed at to_memory() time, after
# the var-slice has narrowed C.
self._lazy_C_obs_filter: pd.Index | None = None
self._match_donors(G, C)
self.uns = uns

def _match_donors(self, G: AnnData | MuData, C: AnnData | MuData) -> None:
G_idx = G.obs.index
C_idx = pd.Index(C.obs[self.donor_id].unique())
G_idx = pd.Index(G.obs_names)
# C.obs[donor_id] may be a pandas.Series (eager) or an xarray.DataArray
# (when C comes from anndata.experimental.read_lazy). Coerce to numpy.
donor_col = C.obs[self.donor_id]
donor_arr = np.asarray(donor_col.values if hasattr(donor_col, "values") else donor_col)
C_idx = pd.Index(np.unique(donor_arr))
keep_donors = G_idx.intersection(C_idx)

if len(keep_donors) == 0:
Expand All @@ -77,27 +110,115 @@ def _match_donors(self, G: AnnData | MuData, C: AnnData | MuData) -> None:
),
self.donor_id,
)
if not keep_donors.equals(G.obs_names):
G = G[keep_donors]

keep_cells = C.obs[self.donor_id].isin(keep_donors)
if not keep_cells.all():
C = C[keep_cells]
# --- G obs filter ---
if not keep_donors.equals(G_idx):
if _has_dask_X(G):
# Defer obs-filtering for lazy G. Applying a fancy row-index on a
# dask array before a column-slice forces dask to materialise the
# full row-filtered intermediate (all columns) before selecting the
# target columns — potentially reading gigabytes of data.
# We record keep_donors and apply the filter cheaply in to_memory()
# once only the needed columns have been loaded.
self._lazy_G_obs_filter = keep_donors
else:
G = G[keep_donors]

# Sort cells by donor order
sorted_cells = C.obs.iloc[
pd.Categorical(C.obs[self.donor_id], categories=keep_donors, ordered=True).argsort()
].index
if not sorted_cells.equals(C.obs_names):
C = C[sorted_cells]
# --- C cell filter & sort ---
if _has_dask_X(C):
# Mirror the G deferral. Store keep_donors; the cell mask & donor-order
# sort are computed in to_memory() once C.X has been narrowed (var-slice).
self._lazy_C_obs_filter = keep_donors
else:
keep_cells = C.obs[self.donor_id].isin(keep_donors)
if not keep_cells.all():
C = C[keep_cells]
# Sort cells by donor order (only C — G order is kept as-is).
# Stable sort so within-donor cell order is preserved deterministically.
sorted_cells = C.obs.iloc[
pd.Categorical(C.obs[self.donor_id], categories=keep_donors, ordered=True).argsort(kind="stable")
].index
if not sorted_cells.equals(C.obs_names):
C = C[sorted_cells]

self._C = C
self._G = G

@property
def _lazy_G(self) -> bool:
"""Whether G has a dask-backed X (lazy loading)."""
return _has_dask_X(self._G)

@property
def _lazy_C(self) -> bool:
"""Whether C has a dask-backed X (lazy loading)."""
return _has_dask_X(self._C)

@property
def is_lazy(self) -> bool:
"""Whether G or C has a dask-backed X matrix (lazy loading)."""
return self._lazy_G or self._lazy_C

def to_memory(self) -> DonorData:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have an inplace flag here and enable inplace materialization?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe good to have an alias function compute() since dask also uses compute()?

def compute(self) -> DonorData:
    return self.to_memory()

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that! Will add it

"""Return a new DonorData with both G and C materialised into memory.

If neither side is lazy, returns ``self`` unchanged. For lazy
(dask-backed) G or C, calls ``.to_memory()`` on the respective side.

When obs filters were deferred (to avoid expensive dask fancy row-indexing
before column-slicing), they are applied here in-memory after the var-slice
has already narrowed the columns.
"""
if not self.is_lazy:
return self

# --- G side ---
G = self._G
if self._lazy_G:
G = G.to_memory() if hasattr(G, "to_memory") else G.copy()
# read_lazy → to_memory() can lose the obs index name; restore it
if G.obs.index.name is None and self.donor_id is not None:
G.obs.index.name = self.donor_id
if self._lazy_G_obs_filter is not None:
G = G[self._lazy_G_obs_filter]

# --- C side ---
C = self._C
if self._lazy_C:
C = C.to_memory() if hasattr(C, "to_memory") else C.copy()
if self._lazy_C_obs_filter is not None:
keep_donors = self._lazy_C_obs_filter
# Cell-level filter & donor-order sort, mirroring the eager path
# in _match_donors(). Now that obs is a pandas DataFrame, .isin()
# and Categorical-argsort behave normally.
if self.donor_id in C.obs.columns:
keep_cells = C.obs[self.donor_id].isin(keep_donors)
if not keep_cells.all():
C = C[keep_cells]
sorted_cells = C.obs.iloc[
pd.Categorical(C.obs[self.donor_id], categories=keep_donors, ordered=True).argsort(
kind="stable"
)
].index
if not sorted_cells.equals(C.obs_names):
C = C[sorted_cells]
if C.is_view:
C = C.copy()

return DonorData(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not convinced of this. This calls __init__ again, which calls _match_donors again. But at this point G and C are already correctly filtered, so _match_donors is redundant. Also, this could be dangerous. It recomputes keep_donors from scratch on the already-filtered data, which should give the same result, but if anything about the obs index changed during materialisation it could silently re-filter in an unexpected way.
How about we skip __init__ and build the result object directly:

@classmethod
def _from_parts(cls, G, C, donor_id, var_dims_to_sync, uns) -> DonorData:
    """Construct a DonorData directly from already-matched parts, bypassing _match_donors."""
    result = object.__new__(cls)
    result._G = G
    result._C = C
    result.donor_id = donor_id
    result._var_dims_to_sync = var_dims_to_sync
    result.uns = uns
    result._lazy_G_obs_filter = None
    result._lazy_C_obs_filter = None
    return result

and in in_memory():

return DonorData._from_parts(
    G=G,
    C=C,
    donor_id=self.donor_id,
    var_dims_to_sync=self._var_dims_to_sync,
    uns=self.uns,
)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. Implemented this

G=G,
C=C,
donor_id=self.donor_id,
var_dims_to_sync=self._var_dims_to_sync,
uns=self.uns,
)

def copy(self) -> DonorData:
if self._G.is_view:
# For lazy sides, don't materialise — leave them alone.
# For eager sides, copy if it's currently a view.
if not self._lazy_G and self._G.is_view:
self._G = self._G.copy()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But now it mutates self._G, so it's not a clean copy? The original dd object is changed, which shouldn't happen in a function called "copy".

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for noticing! should be fixed now!

if self._C.is_view:
if not self._lazy_C and self._C.is_view:
self._C = self._C.copy()
return self

Expand Down Expand Up @@ -196,6 +317,7 @@ def C(self) -> AnnData:
def C(self, value: AnnData | MuData) -> None:
if not isinstance(value, AnnData | MuData):
raise ValueError("C must be an AnnData or MuData object")
self._lazy_C_obs_filter = None # reset — new C takes over
self._C = value
self._match_donors(self._G, self._C)

Expand All @@ -207,6 +329,7 @@ def G(self) -> AnnData:
def G(self, value: AnnData | MuData) -> None:
if not isinstance(value, AnnData | MuData):
raise ValueError("G must be an AnnData or MuData object")
self._lazy_G_obs_filter = None # reset — new G takes over
self._G = value
self._match_donors(self._G, self._C)

Expand All @@ -231,7 +354,7 @@ def sel(
C_obs: slice = slice(None),
C_var: slice = slice(None),
):
_G = self.G[G_obs]
_G = self._G[G_obs]
_G = _G[:, G_var]
_C = self.C[C_obs]

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why C and not _C here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

_C = _C[:, C_var]
Expand All @@ -251,8 +374,9 @@ def __getitem__(self, key):
(idx,) if isinstance(idx, str) else idx for idx in key
) # needed because Mudata[str] looks up modalities
key = key + (slice(None),) * (4 - len(key))
# Only slice if key is not slice(None)
_G = self.G[key[0]] if key[0] is not slice(None) else self.G
# Slice self._G directly (not self.G) so the deferred obs-filter view is never
# embedded into a new lazy dask chain — that would recreate the bottleneck.
_G = self._G[key[0]] if key[0] is not slice(None) else self._G
_G = _G[:, key[1]] if key[1] is not slice(None) else _G
_C = self.C[key[2]] if key[2] is not slice(None) else self.C

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above. Why C and not _C?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

_C = _C[:, key[3]] if key[3] is not slice(None) else _C
Expand Down Expand Up @@ -319,6 +443,12 @@ def aggregate(
verbose:
Whether to print verbose output. Defaults to False.
"""
if self._lazy_C:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only in case of lazy C and not lazy G?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because this function only uses C. adata used below is self.C. So G is irrelevant here

raise RuntimeError(
"Cannot aggregate on a lazy C. "
"Call dd.to_memory() first or subset with dd[..., :, var_subset].to_memory()."
)

if filter_key is not None:
assert filter_value is not None, "filter_value must be provided if filter_key is provided"

Expand Down Expand Up @@ -374,9 +504,10 @@ def aggregate(

def prep_repr(self) -> str:
"""String representation of DonorData showing side-by-side dd.G and dd.C views."""
n_G_obs, n_G_vars, n_C_obs, n_C_vars = self.shape
# Split the representations into lines
G_lines, G_highlight = _anndata_repr(self.G, self.G.n_obs, self.G.n_vars, self._var_dims_to_sync)
C_lines, C_highlight = _anndata_repr(self.C, self.C.n_obs, self.C.n_vars, [self.donor_id])
G_lines, G_highlight = _anndata_repr(self.G, n_G_obs, n_G_vars, self._var_dims_to_sync)
C_lines, C_highlight = _anndata_repr(self.C, n_C_obs, n_C_vars, [self.donor_id])

def pad_lists(l1, l2):
max_lines = max(len(l1), len(l2))
Expand Down Expand Up @@ -412,7 +543,7 @@ def highlight_lines(lines, highlights):
n_cells_per_donor = self.C.obs[self.donor_id].value_counts()
min_n_cells, max_n_cells = n_cells_per_donor.min(), n_cells_per_donor.max()
header_line = Text(
f"DonorData(n_donors={self.G.shape[0]:,}, "
f"DonorData(n_donors={n_G_obs:,}, "
f"n_cells_per_donor=[{min_n_cells:,}-{max_n_cells:,}], "
f"donor_id='{self.donor_id}')",
style=HIGHLIGHT_COLOR,
Expand Down Expand Up @@ -441,7 +572,14 @@ def __str__(self) -> str:

@property
def shape(self) -> tuple[tuple[int, int], tuple[int, int]]:
return *self.G.shape, *self.C.shape
# When obs filters are deferred for lazy G/C, self._G/self._C still hold all
# rows; report the filtered counts so dd.shape stays user-meaningful.
n_G_obs = len(self._lazy_G_obs_filter) if self._lazy_G_obs_filter is not None else self._G.shape[0]
# For lazy C, _lazy_C_obs_filter stores donors (not cells), so we can't
# report exact cell counts without reading C.obs[donor_id]; fall back to
# the unfiltered count. Worst-case overestimate; matches the pre-filter shape.
n_C_obs = self._C.shape[0]

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should raise a warning then if self._lazy_C_obs_filter.

Maybe offer a flag, whether C.obs[donor_id] is supposed to be computed. Maybe also easier to substract cells from filtered patients from full matrix?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the warning. I don't fully get your second point but maybe its sufficient with just the warning for now?

return n_G_obs, self._G.shape[1], n_C_obs, self._C.shape[1]


def _anndata_repr(adata, n_obs, n_vars, highlight_keys=None) -> str:
Expand Down
2 changes: 1 addition & 1 deletion src/cellink/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ._export import to_plink, write_variants_to_vcf
from ._readwrite import read_dd, read_h5_dd, read_zarr_dd
from ._readwrite import read_dd, read_h5_dd, read_lazy_dd, read_zarr_dd
from ._sgkit import from_sgkit_dataset, read_bgen, read_plink, read_sgkit_zarr
from ._pgen import stream_pgen_to_zarr, read_pgen_zarr
Loading
Loading