Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/source/runtime/ingestion-and-canonical-store.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,27 @@ to convert the source into canonical ``*.ome.zarr``.
If stale legacy ClearEx groups such as ``data`` or ``results`` exist inside the
source ``.n5`` tree, they are ignored for source selection.

Materialization Execution Model
-------------------------------

Pressing ``Next`` in the setup flow starts
``materialize_experiment_data_store(...)`` and enters the Dask-backed ingestion
path. Setup metadata selection itself does not materialize TIFF payloads.

Current execution behavior is format-dependent:

- TIFF/OME-TIFF, HDF5, ``.npy``, and ``.npz`` sources are opened as lazy Dask
arrays and written in bounded parallel batches, but the write graph currently
executes through Dask's local threaded scheduler.
- Generic Zarr and canonical OME-Zarr sources can use the active distributed
Dask ``Client`` for canonical writes.
- Navigate BDV ``.n5`` sources remain Dask-parallel through TensorStore-backed
reads and the active client path.

This distinction is intentional in the current implementation because some
file-backed graphs, especially TIFF/HDF-backed graphs with locks or
non-serializable handles, do not reliably serialize to distributed workers.

Canonical Store Path Policy
---------------------------

Expand Down
8 changes: 6 additions & 2 deletions src/clearex/flatfield/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2401,11 +2401,15 @@ def _fit_profile_tiled(
flatfield_sum[
y_read_start:y_read_stop,
x_read_start:x_read_stop,
] += flatfield_tile.astype(np.float64) * blend_weights_64
] += (
flatfield_tile.astype(np.float64) * blend_weights_64
)
darkfield_sum[
y_read_start:y_read_stop,
x_read_start:x_read_stop,
] += darkfield_tile.astype(np.float64) * blend_weights_64
] += (
darkfield_tile.astype(np.float64) * blend_weights_64
)
weight_sum[
y_read_start:y_read_stop,
x_read_start:x_read_stop,
Expand Down
9 changes: 9 additions & 0 deletions src/clearex/gui/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,19 @@ This folder owns the PyQt6 UX in `app.py`.
- `Create Experiment List` for recursive folder scans or saved-list reloads,
- drag and drop of experiments, folders, or `.clearex-experiment-list.json` files
- Add/remove experiment entries from the list and persist the list for reuse
- the setup experiment list supports `Delete` / `Backspace` to remove the
current selection without using the `Remove Selected` button
- Auto-load metadata when the current list selection changes
- for Navigate BDV ``file_type: N5``, source metadata must come from the
Navigate experiment context plus TensorStore-backed BDV source summary,
not from sending the raw ``.n5`` path through the generic Zarr reader
- Setup metadata/source-resolution contexts are cached per experiment path to
avoid repeated recursive TIFF candidate scans and source re-opens when
reselecting experiments in the same session; cache entries are invalidated
on explicit reload, source-directory override changes, and list removal
- Navigate TIFF metadata verification should use `tifffile.TiffFile(...)`
header parsing instead of constructing a Dask-backed source array during
setup-screen metadata refresh
- Configure Dask backend and OME-Zarr save options
- Configure `Spatial Calibration` for the currently selected experiment:
- map world `z/y/x` to Navigate stage `X/Y/Z/F` or `none`,
Expand Down
230 changes: 219 additions & 11 deletions src/clearex/gui/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
QIcon,
QImage,
QImageReader,
QKeyEvent,
QPixmap,
)
from PyQt6.QtWidgets import (
Expand Down Expand Up @@ -2070,6 +2071,54 @@ def _apply_experiment_overrides(
return out


def _path_signature(path: Path) -> Optional[tuple[int, int]]:
"""Return a lightweight `(mtime_ns, size)` signature for cache validation.

Parameters
----------
path : pathlib.Path
Path to stat.

Returns
-------
tuple[int, int], optional
Signature tuple when stat succeeds, otherwise ``None``.
"""
try:
stat_result = path.stat()
except OSError:
return None
return int(stat_result.st_mtime_ns), int(stat_result.st_size)


@dataclass(frozen=True)
class ExperimentMetadataCacheEntry:
"""Cache one resolved setup metadata context for an experiment path.

Parameters
----------
experiment : NavigateExperiment
Parsed experiment metadata.
source_data_path : pathlib.Path
Resolved acquisition source path.
image_info : ImageInfo
Source metadata summary loaded for the setup panel.
experiment_signature : tuple[int, int], optional
Cached signature of the experiment descriptor file.
source_signature : tuple[int, int], optional
Cached signature of the resolved source path.
override_directory : pathlib.Path, optional
Source-directory override active at cache creation time.
"""

experiment: NavigateExperiment
source_data_path: Path
image_info: ImageInfo
experiment_signature: Optional[tuple[int, int]]
source_signature: Optional[tuple[int, int]]
override_directory: Optional[Path]


@dataclass(frozen=True)
class ExperimentStorePreparationRequest:
"""Describe one experiment store that may need canonical materialization.
Expand Down Expand Up @@ -4876,6 +4925,9 @@ def __init__(self, initial: WorkflowConfig) -> None:
self._experiment_list_file_path: Optional[Path] = None
self._experiment_list_dirty = False
self._source_data_directory_overrides: Dict[Path, Path] = {}
self._experiment_metadata_cache: Dict[
Path, ExperimentMetadataCacheEntry
] = {}
self._spatial_calibration_drafts: Dict[Path, SpatialCalibrationConfig] = {}
self._current_spatial_calibration: SpatialCalibrationConfig = (
initial.spatial_calibration
Expand Down Expand Up @@ -5774,6 +5826,128 @@ def _experiment_list_paths(self) -> list[Path]:
experiment_paths.append(path)
return experiment_paths

def _prune_experiment_metadata_cache(self, valid_paths: Sequence[Path]) -> None:
"""Drop cached metadata entries no longer present in the list.

Parameters
----------
valid_paths : sequence[pathlib.Path]
Experiment paths that should remain cache-eligible.

Returns
-------
None
Cache is pruned in-place.
"""
keep = {Path(path).expanduser().resolve() for path in valid_paths}
for cached_path in list(self._experiment_metadata_cache):
if cached_path not in keep:
self._experiment_metadata_cache.pop(cached_path, None)

def _invalidate_experiment_metadata_cache_entry(
self,
experiment_path: Path,
) -> None:
"""Invalidate one cached experiment metadata entry.

Parameters
----------
experiment_path : pathlib.Path
Experiment path whose cache entry should be removed.

Returns
-------
None
Cache is updated in-place.
"""
resolved_path = Path(experiment_path).expanduser().resolve()
self._experiment_metadata_cache.pop(resolved_path, None)

def _cached_experiment_metadata_context(
self,
experiment_path: Path,
) -> Optional[tuple[Path, NavigateExperiment, Path, ImageInfo]]:
"""Return a valid cached metadata context when available.

Parameters
----------
experiment_path : pathlib.Path
Experiment descriptor path to resolve.

Returns
-------
tuple[pathlib.Path, NavigateExperiment, pathlib.Path, ImageInfo], optional
Cached context tuple when signatures and overrides still match.
"""
resolved_experiment_path = Path(experiment_path).expanduser().resolve()
entry = self._experiment_metadata_cache.get(resolved_experiment_path)
if entry is None:
return None

current_override = self._source_data_directory_overrides.get(
resolved_experiment_path
)
if current_override != entry.override_directory:
self._experiment_metadata_cache.pop(resolved_experiment_path, None)
return None

if _path_signature(resolved_experiment_path) != entry.experiment_signature:
self._experiment_metadata_cache.pop(resolved_experiment_path, None)
return None

resolved_source_path = Path(entry.source_data_path).expanduser().resolve()
if _path_signature(resolved_source_path) != entry.source_signature:
self._experiment_metadata_cache.pop(resolved_experiment_path, None)
return None

return (
resolved_experiment_path,
entry.experiment,
resolved_source_path,
entry.image_info,
)

def _store_experiment_metadata_context(
self,
*,
experiment_path: Path,
experiment: NavigateExperiment,
source_data_path: Path,
info: ImageInfo,
) -> None:
"""Store one resolved metadata context in the in-memory cache.

Parameters
----------
experiment_path : pathlib.Path
Experiment descriptor path.
experiment : NavigateExperiment
Parsed experiment metadata.
source_data_path : pathlib.Path
Resolved source path for the experiment.
info : ImageInfo
Source metadata summary.

Returns
-------
None
Cache is updated in-place.
"""
resolved_experiment_path = Path(experiment_path).expanduser().resolve()
resolved_source_path = Path(source_data_path).expanduser().resolve()
self._experiment_metadata_cache[resolved_experiment_path] = (
ExperimentMetadataCacheEntry(
experiment=experiment,
source_data_path=resolved_source_path,
image_info=info,
experiment_signature=_path_signature(resolved_experiment_path),
source_signature=_path_signature(resolved_source_path),
override_directory=self._source_data_directory_overrides.get(
resolved_experiment_path
),
)
)

def _current_selected_experiment_path(self) -> Optional[Path]:
"""Return the current experiment selection path.

Expand Down Expand Up @@ -5810,6 +5984,7 @@ def _set_experiment_list_paths(
Widget state is updated in-place.
"""
normalized_paths = _deduplicate_resolved_paths(experiment_paths)
self._prune_experiment_metadata_cache(normalized_paths)
prior_block_state = self._experiment_list.blockSignals(True)
self._experiment_list.clear()
for experiment_path in normalized_paths:
Expand Down Expand Up @@ -6003,7 +6178,7 @@ def _clear_loaded_experiment_context(self) -> None:
self._loaded_target_store_path = None

def eventFilter(self, watched: QObject, event: QEvent) -> bool:
"""Handle drag/drop events routed through the experiment list.
"""Handle list keyboard removal and drag/drop routed through the list.

Parameters
----------
Expand All @@ -6019,6 +6194,15 @@ def eventFilter(self, watched: QObject, event: QEvent) -> bool:
event filter result.
"""
if watched in {self._experiment_list, self._experiment_list.viewport()}:
if (
watched is self._experiment_list
and isinstance(event, QKeyEvent)
and event.type() == QEvent.Type.KeyPress
and event.key() in {Qt.Key.Key_Delete, Qt.Key.Key_Backspace}
):
self._on_remove_selected_experiments()
event.accept()
return True
if isinstance(event, QDragEnterEvent):
if self._can_accept_experiment_drop(event.mimeData()):
event.acceptProposedAction()
Expand Down Expand Up @@ -6342,6 +6526,7 @@ def _on_remove_selected_experiments(self) -> None:

for removed_path in selected_paths:
self._source_data_directory_overrides.pop(removed_path, None)
self._invalidate_experiment_metadata_cache_entry(removed_path)
if (
self._experiment_list_file_path is not None
and remaining_paths != self._experiment_list_paths()
Expand Down Expand Up @@ -6543,9 +6728,25 @@ def _load_metadata_for_experiment_path(
return

try:
loaded_path, experiment, source_data_path, info = (
self._load_experiment_context(path=resolved_experiment_path)
cached_context = (
None
if force_reload
else self._cached_experiment_metadata_context(
resolved_experiment_path
)
)
if cached_context is None:
loaded_path, experiment, source_data_path, info = (
self._load_experiment_context(path=resolved_experiment_path)
)
self._store_experiment_metadata_context(
experiment_path=loaded_path,
experiment=experiment,
source_data_path=source_data_path,
info=info,
)
else:
loaded_path, experiment, source_data_path, info = cached_context
except Exception as exc:
logging.getLogger(__name__).exception(
"Failed to load experiment metadata from %s.",
Expand Down Expand Up @@ -6649,6 +6850,7 @@ def _prompt_for_source_data_directory(
self._source_data_directory_overrides[
experiment.path.expanduser().resolve()
] = override_directory
self._invalidate_experiment_metadata_cache_entry(experiment.path)
return source_data_path

def _load_experiment_context(
Expand Down Expand Up @@ -6715,11 +6917,17 @@ def _resolve_store_preparation_request(
experiment = self._loaded_experiment
source_data_path = self._loaded_source_data_path
else:
_loaded_path, experiment, source_data_path = (
self._resolve_experiment_source_context(
path=resolved_experiment_path
)
cached_context = self._cached_experiment_metadata_context(
resolved_experiment_path
)
if cached_context is not None:
_loaded_path, experiment, source_data_path, _info = cached_context
else:
_loaded_path, experiment, source_data_path = (
self._resolve_experiment_source_context(
path=resolved_experiment_path
)
)
target_store = resolve_data_store_path(experiment, source_data_path)
return ExperimentStorePreparationRequest(
experiment_path=resolved_experiment_path,
Expand Down Expand Up @@ -10045,9 +10253,7 @@ def _build_registration_parameter_rows(self, form: QFormLayout) -> None:
)
form.addRow(fusion_section)

perf_section, perf_form = self._build_parameter_section_card(
"Performance"
)
perf_section, perf_form = self._build_parameter_section_card("Performance")
self._registration_max_pairwise_voxels_spin = QSpinBox()
self._registration_max_pairwise_voxels_spin.setRange(0, 100_000_000)
self._registration_max_pairwise_voxels_spin.setSingleStep(50_000)
Expand Down Expand Up @@ -10959,7 +11165,9 @@ def _read_component_combo_value(combo: QComboBox) -> str:
selected_data = combo.currentData()
return str(selected_data).strip() if selected_data is not None else ""

def _available_channels_for_component(component_value: str) -> tuple[int, ...]:
def _available_channels_for_component(
component_value: str,
) -> tuple[int, ...]:
component = str(component_value).strip() or "data"
resolved_component = resolve_analysis_input_component(component)
if store_root is None:
Expand Down
Loading
Loading