diff --git a/scripts/register_mip_tiles_antspy.py b/scripts/register_mip_tiles_antspy.py index 103278f..10fdcdc 100755 --- a/scripts/register_mip_tiles_antspy.py +++ b/scripts/register_mip_tiles_antspy.py @@ -10,9 +10,7 @@ Transforms are estimated against a fixed anchor position using only expected left-right overlap regions to suppress spurious global matches. -uv run scripts/register_mip_tiles_antspy.py --input-dir /endosome/archive/bioinformatics/Danuser_lab/Dean/dean/2026-03-12-dvOPM-liver/ - │ CH00_000000.n5_mip_export/latest --projection xy --time-index 0 --registration-channel 0 --overlap-fraction 0.05 --aff-iterations 50x25x10x0 --aff-shrink-factors 8x4x2x1 - │ --aff-smoothing-sigmas 3x2x1x0 --aff-random-sampling-rate 0.2 +uv run scripts/register_mip_tiles_antspy.py --input-dir /endosome/archive/bioinformatics/Danuser_lab/Dean/dean/2026-03-12-dvOPM-liver/CH00_000000.n5_mip_export/latest --projection xy --time-index 0 --registration-channel 0 --overlap-fraction 0.05 --aff-iterations 50x25x10x0 --aff-shrink-factors 8x4x2x1 --aff-smoothing-sigmas 3x2x1x0 --aff-random-sampling-rate 0.2 """ from __future__ import annotations diff --git a/src/clearex/gui/app.py b/src/clearex/gui/app.py index 3cbe0c7..68c9abf 100644 --- a/src/clearex/gui/app.py +++ b/src/clearex/gui/app.py @@ -86,6 +86,7 @@ DEFAULT_ZARR_PYRAMID_PTCZYX, DEFAULT_SLURM_CLUSTER_JOB_EXTRA_DIRECTIVES, DaskBackendConfig, + ExecutionPolicy, LocalClusterRecommendation, LocalClusterConfig, PTCZYX_AXES, @@ -96,17 +97,23 @@ ZarrSaveConfig, analysis_chainable_output_component, analysis_operation_for_output_component, + calibration_profile_from_dict, + calibration_profile_to_dict, collect_analysis_input_references, dask_backend_from_dict, dask_backend_to_dict, default_analysis_operation_parameters, - format_dask_backend_summary, + execution_policy_from_dict, + execution_policy_to_dict, + format_execution_plan_summary, + format_execution_policy_summary, format_local_cluster_recommendation_summary, format_pyramid_levels, format_zarr_chunks_ptczyx, format_zarr_pyramid_ptczyx, normalize_analysis_operation_parameters, parse_pyramid_levels, + plan_execution, recommend_local_cluster_config, resolve_analysis_input_component, validate_analysis_input_references, @@ -190,6 +197,10 @@ class GuiUnavailableError(RuntimeError): _GUI_APP_ICON = "icon.png" _CLEAREX_SETTINGS_DIR_NAME = ".clearex" _CLEAREX_DASK_BACKEND_SETTINGS_FILE = "dask_backend_settings.json" +_CLEAREX_EXECUTION_POLICY_SETTINGS_FILE = "execution_policy_settings.json" +_CLEAREX_EXECUTION_CALIBRATION_PROFILES_FILE = ( + "execution_calibration_profiles.json" +) _CLEAREX_ZARR_SAVE_SETTINGS_FILE = "zarr_save_settings.json" _CLEAREX_EXPERIMENT_LIST_FORMAT = "clearex-experiment-list/v1" _CLEAREX_EXPERIMENT_LIST_FILE_SUFFIX = ".clearex-experiment-list.json" @@ -840,6 +851,30 @@ def _resolve_dask_backend_settings_path( return directory / _CLEAREX_DASK_BACKEND_SETTINGS_FILE +def _resolve_execution_policy_settings_path( + settings_directory: Optional[Path] = None, +) -> Path: + """Resolve the user settings JSON path for persisted execution policy.""" + directory = ( + settings_directory + if settings_directory is not None + else _resolve_clearex_settings_directory() + ) + return directory / _CLEAREX_EXECUTION_POLICY_SETTINGS_FILE + + +def _resolve_execution_calibration_profiles_path( + settings_directory: Optional[Path] = None, +) -> Path: + """Resolve the user settings JSON path for persisted calibration profiles.""" + directory = ( + settings_directory + if settings_directory is not None + else _resolve_clearex_settings_directory() + ) + return directory / _CLEAREX_EXECUTION_CALIBRATION_PROFILES_FILE + + def _resolve_zarr_save_settings_path( settings_directory: Optional[Path] = None, ) -> Path: @@ -959,6 +994,80 @@ def _load_last_used_dask_backend_config( return dask_backend_from_dict(payload) +def _load_last_used_execution_policy( + settings_path: Optional[Path] = None, +) -> Optional[ExecutionPolicy]: + """Load the last-used execution policy from JSON.""" + path = ( + settings_path + if settings_path is not None + else _resolve_execution_policy_settings_path() + ) + resolved = path.expanduser() + if not resolved.exists(): + return None + + try: + raw_text = resolved.read_text(encoding="utf-8") + except Exception as exc: + logging.getLogger(__name__).warning( + "Failed to read execution policy settings %s: %s", + resolved, + exc, + ) + return None + + if not raw_text.strip(): + return None + + try: + payload = json.loads(raw_text) + except json.JSONDecodeError as exc: + logging.getLogger(__name__).warning( + "Failed to decode execution policy settings %s: %s", + resolved, + exc, + ) + return None + + if isinstance(payload, dict) and not payload: + return None + return execution_policy_from_dict(payload) + + +def _load_execution_calibration_profiles( + settings_path: Optional[Path] = None, +) -> Dict[str, Any]: + """Load persisted execution calibration profiles from JSON.""" + path = ( + settings_path + if settings_path is not None + else _resolve_execution_calibration_profiles_path() + ) + resolved = path.expanduser() + if not resolved.exists(): + return {} + try: + raw_text = resolved.read_text(encoding="utf-8") + except Exception: + return {} + if not raw_text.strip(): + return {} + try: + payload = json.loads(raw_text) + except json.JSONDecodeError: + return {} + if not isinstance(payload, dict): + return {} + profiles: Dict[str, Any] = {} + for key, value in payload.items(): + profile = calibration_profile_from_dict(value) + if profile is None: + continue + profiles[str(key)] = profile + return profiles + + def _load_last_used_zarr_save_config( settings_path: Optional[Path] = None, ) -> Optional[ZarrSaveConfig]: @@ -1071,6 +1180,63 @@ def _save_last_used_dask_backend_config( return True +def _save_last_used_execution_policy( + config: ExecutionPolicy, + settings_path: Optional[Path] = None, +) -> bool: + """Persist the most recently used execution policy.""" + path = ( + settings_path + if settings_path is not None + else _resolve_execution_policy_settings_path() + ) + resolved = path.expanduser() + _ensure_clearex_settings_directory(resolved.parent) + + try: + payload = execution_policy_to_dict(config) + serialized = json.dumps(payload, indent=2, sort_keys=True) + resolved.write_text(f"{serialized}\n", encoding="utf-8") + except Exception as exc: + logging.getLogger(__name__).warning( + "Failed to save execution policy settings %s: %s", + resolved, + exc, + ) + return False + return True + + +def _save_execution_calibration_profiles( + profiles: Mapping[str, Any], + settings_path: Optional[Path] = None, +) -> bool: + """Persist execution calibration profiles.""" + path = ( + settings_path + if settings_path is not None + else _resolve_execution_calibration_profiles_path() + ) + resolved = path.expanduser() + _ensure_clearex_settings_directory(resolved.parent) + + try: + payload = { + str(key): calibration_profile_to_dict(profile) + for key, profile in profiles.items() + } + serialized = json.dumps(payload, indent=2, sort_keys=True) + resolved.write_text(f"{serialized}\n", encoding="utf-8") + except Exception as exc: + logging.getLogger(__name__).warning( + "Failed to save execution calibration profiles %s: %s", + resolved, + exc, + ) + return False + return True + + def _save_last_used_zarr_save_config( config: ZarrSaveConfig, settings_path: Optional[Path] = None, @@ -1135,6 +1301,15 @@ def _should_apply_persisted_dask_backend(initial: Optional[WorkflowConfig]) -> b return initial.dask_backend == DaskBackendConfig() +def _should_apply_persisted_execution_policy( + initial: Optional[WorkflowConfig], +) -> bool: + """Return whether persisted execution policy should override GUI defaults.""" + if initial is None: + return True + return initial.execution_policy == ExecutionPolicy() + + def _should_apply_persisted_zarr_save(initial: Optional[WorkflowConfig]) -> bool: """Return whether persisted Zarr save settings should override defaults. @@ -3649,6 +3824,262 @@ def _on_apply(self) -> None: self.accept() + class ExecutionPolicyDialog(QDialog): + """Dialog for configuring automatic execution planning.""" + + def __init__( + self, + *, + initial_policy: ExecutionPolicy, + initial_backend: DaskBackendConfig, + workload: str, + summary_workflow_factory: Callable[ + [ExecutionPolicy, DaskBackendConfig], WorkflowConfig + ], + recommendation_shape_tpczyx: Optional[ + Tuple[int, int, int, int, int, int] + ] = None, + recommendation_chunks_tpczyx: Optional[ + Tuple[int, int, int, int, int, int] + ] = None, + recommendation_dtype_itemsize: Optional[int] = None, + parent: Optional[QDialog] = None, + ) -> None: + """Initialize execution-policy dialog state.""" + super().__init__(parent) + self.setWindowTitle("Execution Planning") + self.result_policy: Optional[ExecutionPolicy] = None + self.result_backend: Optional[DaskBackendConfig] = None + self._advanced_backend = initial_backend + self._workload = str(workload).strip().lower() or "analysis" + self._summary_workflow_factory = summary_workflow_factory + self._recommendation_shape_tpczyx = recommendation_shape_tpczyx + self._recommendation_chunks_tpczyx = recommendation_chunks_tpczyx + self._recommendation_dtype_itemsize = recommendation_dtype_itemsize + self._refresh_calibration_once = False + self._build_ui() + self._hydrate(initial_policy) + self.setStyleSheet(_popup_dialog_stylesheet()) + _apply_initial_dialog_geometry( + self, + minimum_size=_DASK_BACKEND_DIALOG_MINIMUM_SIZE, + preferred_size=_DASK_BACKEND_DIALOG_PREFERRED_SIZE, + content_size_hint=(self.sizeHint().width(), self.sizeHint().height()), + ) + + def _build_ui(self) -> None: + """Construct dialog controls and wire signals.""" + root = QVBoxLayout(self) + apply_popup_root_spacing(root) + + overview = QLabel( + "Choose whether ClearEx plans worker resources automatically " + "or uses the advanced backend override." + ) + overview.setWordWrap(True) + root.addWidget(overview) + + form = QFormLayout() + apply_form_spacing(form) + self._mode_combo = QComboBox() + self._mode_combo.addItem("Auto", "auto") + self._mode_combo.addItem("Advanced", "advanced") + form.addRow("Mode", self._mode_combo) + + self._max_workers_input = QLineEdit() + self._max_workers_input.setPlaceholderText("blank = auto") + form.addRow("Max workers", self._max_workers_input) + + self._memory_per_worker_input = QLineEdit() + self._memory_per_worker_input.setPlaceholderText("auto") + form.addRow("Memory per worker", self._memory_per_worker_input) + root.addLayout(form) + + button_row = QHBoxLayout() + apply_row_spacing(button_row) + self._calibrate_button = _configure_fixed_height_button( + QPushButton("Calibrate") + ) + self._advanced_button = _configure_fixed_height_button( + QPushButton("Advanced Backend") + ) + button_row.addWidget(self._calibrate_button) + button_row.addWidget(self._advanced_button) + button_row.addStretch(1) + root.addLayout(button_row) + + self._summary_label = QLabel("") + self._summary_label.setWordWrap(True) + self._summary_label.setObjectName("metadataFieldValue") + root.addWidget(self._summary_label) + + footer = QHBoxLayout() + apply_footer_row_spacing(footer) + self._defaults_button = _configure_fixed_height_button( + QPushButton("Reset Defaults") + ) + self._cancel_button = _configure_fixed_height_button( + QPushButton("Cancel") + ) + self._apply_button = _configure_fixed_height_button( + QPushButton("Apply") + ) + self._apply_button.setObjectName("runButton") + footer.addWidget(self._defaults_button) + footer.addStretch(1) + footer.addWidget(self._cancel_button) + footer.addWidget(self._apply_button) + root.addLayout(footer) + + self._mode_combo.currentIndexChanged.connect(self._on_mode_changed) + self._max_workers_input.textChanged.connect(self._refresh_summary) + self._memory_per_worker_input.textChanged.connect(self._refresh_summary) + self._calibrate_button.clicked.connect(self._on_calibrate) + self._advanced_button.clicked.connect(self._on_edit_advanced_backend) + self._defaults_button.clicked.connect(self._on_reset_defaults) + self._cancel_button.clicked.connect(self.reject) + self._apply_button.clicked.connect(self._on_apply) + + def _parse_optional_positive_int( + self, + text: str, + *, + field_name: str, + ) -> Optional[int]: + """Parse optional positive integers from line-edit text.""" + stripped = text.strip() + if not stripped: + return None + try: + value = int(stripped) + except ValueError as exc: + raise ValueError(f"{field_name} must be an integer.") from exc + if value <= 0: + raise ValueError(f"{field_name} must be greater than zero.") + return value + + def _current_policy( + self, + *, + force_refresh: bool, + ) -> ExecutionPolicy: + """Build an execution policy from current widget state.""" + return ExecutionPolicy( + mode=str(self._mode_combo.currentData()), + max_workers=self._parse_optional_positive_int( + self._max_workers_input.text(), + field_name="Max workers", + ), + memory_per_worker_limit=( + self._memory_per_worker_input.text().strip() or "auto" + ), + calibration_policy=( + "refresh" + if force_refresh + else "use_if_available" + ), + ) + + def _refresh_summary(self) -> None: + """Refresh the execution-plan summary for current controls.""" + try: + policy = self._current_policy( + force_refresh=self._refresh_calibration_once + ) + workflow = self._summary_workflow_factory( + policy, + self._advanced_backend, + ) + profiles = ( + {} + if self._refresh_calibration_once + else _load_execution_calibration_profiles() + ) + plan = plan_execution( + workflow, + workload=self._workload, + shape_tpczyx=self._recommendation_shape_tpczyx, + chunks_tpczyx=self._recommendation_chunks_tpczyx, + dtype_itemsize=self._recommendation_dtype_itemsize, + calibration_profiles=profiles, + ) + except Exception as exc: + self._summary_label.setText( + f"Could not derive execution plan: {type(exc).__name__}: {exc}" + ) + return + + text = ( + f"Policy: {format_execution_policy_summary(policy)}\n" + f"Plan: {format_execution_plan_summary(plan)}" + ) + self._summary_label.setText(text) + self._summary_label.setToolTip(text) + + def _hydrate(self, initial_policy: ExecutionPolicy) -> None: + """Populate controls from an initial execution policy.""" + index = self._mode_combo.findData(initial_policy.mode) + if index < 0: + index = 0 + self._mode_combo.setCurrentIndex(index) + self._max_workers_input.setText( + "" + if initial_policy.max_workers is None + else str(initial_policy.max_workers) + ) + self._memory_per_worker_input.setText( + str(initial_policy.memory_per_worker_limit) + ) + self._refresh_calibration_once = False + self._on_mode_changed(index) + self._refresh_summary() + + def _on_mode_changed(self, _: int) -> None: + """Update enabled state after policy mode changes.""" + auto_mode = str(self._mode_combo.currentData()) == "auto" + self._max_workers_input.setEnabled(auto_mode) + self._memory_per_worker_input.setEnabled(auto_mode) + self._calibrate_button.setEnabled(auto_mode) + self._refresh_summary() + + def _on_calibrate(self) -> None: + """Mark the next execution to refresh the cached profile.""" + self._refresh_calibration_once = True + self._refresh_summary() + + def _on_edit_advanced_backend(self) -> None: + """Open the advanced backend dialog and store its result.""" + dialog = DaskBackendConfigDialog( + initial=self._advanced_backend, + recommendation_shape_tpczyx=self._recommendation_shape_tpczyx, + recommendation_chunks_tpczyx=self._recommendation_chunks_tpczyx, + recommendation_dtype_itemsize=self._recommendation_dtype_itemsize, + parent=self, + ) + if dialog.exec() != QDialog.DialogCode.Accepted: + return + if dialog.result_config is None: + return + self._advanced_backend = dialog.result_config + self._refresh_summary() + + def _on_reset_defaults(self) -> None: + """Reset controls to default execution policy values.""" + self._advanced_backend = DaskBackendConfig() + self._hydrate(ExecutionPolicy()) + + def _on_apply(self) -> None: + """Validate current state and accept the dialog.""" + try: + self.result_policy = self._current_policy( + force_refresh=self._refresh_calibration_once + ) + except ValueError as exc: + QMessageBox.warning(self, "Invalid Execution Planning", str(exc)) + return + self.result_backend = self._advanced_backend + self.accept() + class DataStoreMaterializationWorker(QThread): """Background worker that materializes canonical store data. @@ -3682,6 +4113,7 @@ def __init__( *, experiment: NavigateExperiment, source_data_path: Path, + execution_policy: ExecutionPolicy, dask_backend: DaskBackendConfig, zarr_save: ZarrSaveConfig, ) -> None: @@ -3706,6 +4138,7 @@ def __init__( super().__init__() self._experiment = experiment self._source_data_path = source_data_path + self._execution_policy = execution_policy self._dask_backend = dask_backend self._zarr_save = zarr_save @@ -3745,8 +4178,25 @@ def run(self) -> None: """ try: with ExitStack() as exit_stack: + profiles = _load_execution_calibration_profiles() + planning_workflow = WorkflowConfig( + execution_policy=self._execution_policy, + dask_backend=self._dask_backend, + zarr_save=self._zarr_save, + ) + plan = plan_execution( + planning_workflow, + workload="io", + chunks_tpczyx=self._zarr_save.chunks_tpczyx(), + calibration_profiles=profiles, + ) + if plan.calibration_profile is not None: + profiles[plan.calibration_profile.profile_key] = ( + plan.calibration_profile + ) + _save_execution_calibration_profiles(profiles) client = _configure_dask_backend_client( - self._dask_backend, + plan.backend_config, exit_stack=exit_stack, ) result = materialize_experiment_data_store( @@ -3799,6 +4249,7 @@ def __init__( self, *, requests: Sequence[ExperimentStorePreparationRequest], + execution_policy: ExecutionPolicy, dask_backend: DaskBackendConfig, zarr_save: ZarrSaveConfig, force_rebuild: bool = False, @@ -3825,6 +4276,7 @@ def __init__( """ super().__init__() self._requests = list(requests) + self._execution_policy = execution_policy self._dask_backend = dask_backend self._zarr_save = zarr_save self._force_rebuild = bool(force_rebuild) @@ -3921,8 +4373,25 @@ def run(self) -> None: try: with ExitStack() as exit_stack: + profiles = _load_execution_calibration_profiles() + planning_workflow = WorkflowConfig( + execution_policy=self._execution_policy, + dask_backend=self._dask_backend, + zarr_save=self._zarr_save, + ) + plan = plan_execution( + planning_workflow, + workload="io", + chunks_tpczyx=self._zarr_save.chunks_tpczyx(), + calibration_profiles=profiles, + ) + if plan.calibration_profile is not None: + profiles[plan.calibration_profile.profile_key] = ( + plan.calibration_profile + ) + _save_execution_calibration_profiles(profiles) client = _configure_dask_backend_client( - self._dask_backend, + plan.backend_config, exit_stack=exit_stack, ) for index, request in enumerate(self._requests): @@ -4391,6 +4860,7 @@ def __init__(self, initial: WorkflowConfig) -> None: self._opener = ImageOpener() self.result_config: Optional[WorkflowConfig] = None self._metadata_labels: Dict[str, QLabel] = {} + self._execution_policy: ExecutionPolicy = initial.execution_policy self._dask_backend_config: DaskBackendConfig = initial.dask_backend self._zarr_save_config: ZarrSaveConfig = initial.zarr_save self._chunks = initial.chunks @@ -4590,7 +5060,7 @@ def _build_ui(self) -> None: zarr_layout.addLayout(zarr_button_row) root.addWidget(zarr_group) - dask_backend_group = QGroupBox("Dask Backend") + dask_backend_group = QGroupBox("Execution Planning") dask_backend_layout = QVBoxLayout(dask_backend_group) apply_stack_spacing(dask_backend_layout) dask_backend_layout.setContentsMargins(10, 8, 10, 10) @@ -4604,7 +5074,7 @@ def _build_ui(self) -> None: dask_backend_button_row = QHBoxLayout() apply_row_spacing(dask_backend_button_row) dask_backend_button_row.addStretch(1) - self._dask_backend_button = QPushButton("Edit Dask Backend") + self._dask_backend_button = QPushButton("Edit Execution Planning") dask_backend_button_row.addWidget(self._dask_backend_button) dask_backend_layout.addLayout(dask_backend_button_row) root.addWidget(dask_backend_group) @@ -4714,7 +5184,28 @@ def _refresh_dask_backend_summary(self) -> None: None Summary labels are updated in-place. """ - summary = format_dask_backend_summary(self._dask_backend_config) + try: + workflow = WorkflowConfig( + execution_policy=self._execution_policy, + dask_backend=self._dask_backend_config, + zarr_save=self._zarr_save_config, + chunks=self._chunks, + ) + profiles = _load_execution_calibration_profiles() + plan = plan_execution( + workflow, + workload="io", + shape_tpczyx=self._current_local_cluster_shape_tpczyx(), + chunks_tpczyx=self._zarr_save_config.chunks_tpczyx(), + dtype_itemsize=self._current_dtype_itemsize(), + calibration_profiles=profiles, + ) + summary = ( + f"Policy: {format_execution_policy_summary(self._execution_policy)}\n" + f"Plan: {format_execution_plan_summary(plan)}" + ) + except Exception as exc: + summary = f"Could not derive execution plan: {type(exc).__name__}: {exc}" self._dask_backend_summary.setText(summary) self._dask_backend_summary.setToolTip(summary) @@ -4746,19 +5237,32 @@ def _on_edit_dask_backend(self) -> None: None Selected backend values are stored in-place. """ - dialog = DaskBackendConfigDialog( - initial=self._dask_backend_config, + dialog = ExecutionPolicyDialog( + initial_policy=self._execution_policy, + initial_backend=self._dask_backend_config, + workload="io", + summary_workflow_factory=lambda policy, backend: WorkflowConfig( + execution_policy=policy, + dask_backend=backend, + zarr_save=self._zarr_save_config, + chunks=self._chunks, + ), recommendation_shape_tpczyx=self._current_local_cluster_shape_tpczyx(), recommendation_chunks_tpczyx=self._zarr_save_config.chunks_tpczyx(), recommendation_dtype_itemsize=self._current_dtype_itemsize(), parent=self, ) result = dialog.exec() - if result != QDialog.DialogCode.Accepted or dialog.result_config is None: + if ( + result != QDialog.DialogCode.Accepted + or dialog.result_policy is None + or dialog.result_backend is None + ): return - self._dask_backend_config = dialog.result_config + self._execution_policy = dialog.result_policy + self._dask_backend_config = dialog.result_backend self._refresh_dask_backend_summary() - self._set_status("Updated Dask backend settings.") + self._set_status("Updated execution planning settings.") def _on_edit_zarr_settings(self) -> None: """Open Zarr settings dialog and apply selected configuration. @@ -6152,6 +6656,7 @@ def _accept_with_store_path( ), analysis_apply_to_all=False, prefer_dask=True, + execution_policy=self._execution_policy, dask_backend=self._dask_backend_config, chunks=self._chunks, flatfield=False, @@ -6163,6 +6668,12 @@ def _accept_with_store_path( mip_export=False, zarr_save=self._zarr_save_config, ) + _save_last_used_execution_policy( + replace( + self._execution_policy, + calibration_policy="use_if_available", + ) + ) _save_last_used_dask_backend_config(self._dask_backend_config) _save_last_used_zarr_save_config(self._zarr_save_config) self.accept() @@ -6276,6 +6787,7 @@ def _on_next(self) -> None: worker = BatchDataStoreMaterializationWorker( requests=pending_requests, + execution_policy=self._execution_policy, dask_backend=self._dask_backend_config, zarr_save=self._zarr_save_config, force_rebuild=rebuild_requested, @@ -6805,6 +7317,7 @@ def __init__(self, initial: WorkflowConfig) -> None: _analysis_targets_for_workflow(initial) ) self._active_analysis_target: Optional[AnalysisTarget] = None + self._execution_policy: ExecutionPolicy = initial.execution_policy self._dask_backend_config: DaskBackendConfig = initial.dask_backend self.result_config: Optional[WorkflowConfig] = None self._analysis_scope_combo: Optional[QComboBox] = None @@ -7661,7 +8174,7 @@ def _build_ui(self) -> None: max(28, int(self._status_label.fontMetrics().height()) + 10) ) status_stack.addWidget(self._status_label) - self._dask_backend_summary_label = QLabel("Dask backend: n/a") + self._dask_backend_summary_label = QLabel("Execution planning: n/a") self._dask_backend_summary_label.setObjectName("statusLabel") self._dask_backend_summary_label.setWordWrap(True) self._dask_backend_summary_label.setTextInteractionFlags( @@ -7675,7 +8188,7 @@ def _build_ui(self) -> None: ) status_stack.addWidget(self._dask_backend_summary_label) footer.addLayout(status_stack, 1) - self._dask_backend_button = QPushButton("Edit Dask Backend") + self._dask_backend_button = QPushButton("Edit Execution Planning") self._dask_dashboard_button = QPushButton("Open Dask Dashboard") self._cancel_button = QPushButton("Cancel") self._run_button = QPushButton("Run") @@ -11064,7 +11577,7 @@ def _set_parameter_help(self, text: str) -> None: self._parameter_help_label.setText(str(text)) def _refresh_dask_backend_summary(self) -> None: - """Refresh footer summary text for active Dask backend settings. + """Refresh footer summary text for active execution planning. Parameters ---------- @@ -11082,8 +11595,46 @@ def _refresh_dask_backend_summary(self) -> None: """ if self._dask_backend_summary_label is None: return - summary = format_dask_backend_summary(self._dask_backend_config) - text = f"Dask backend: {summary}" + workflow = WorkflowConfig( + file=self._base_config.file, + analysis_targets=self._analysis_targets, + analysis_selected_experiment_path=( + self._base_config.analysis_selected_experiment_path + ), + analysis_apply_to_all=bool( + self._analysis_apply_to_all_checkbox.isChecked() + if self._analysis_apply_to_all_checkbox is not None + else False + ), + prefer_dask=self._base_config.prefer_dask, + execution_policy=self._execution_policy, + dask_backend=self._dask_backend_config, + chunks=self._base_config.chunks, + flatfield=self._operation_checkboxes["flatfield"].isChecked(), + deconvolution=self._operation_checkboxes["deconvolution"].isChecked(), + shear_transform=self._operation_checkboxes["shear_transform"].isChecked(), + particle_detection=self._operation_checkboxes["particle_detection"].isChecked(), + usegment3d=self._operation_checkboxes["usegment3d"].isChecked(), + registration=self._operation_checkboxes["registration"].isChecked(), + visualization=self._operation_checkboxes["visualization"].isChecked(), + mip_export=self._operation_checkboxes["mip_export"].isChecked(), + zarr_save=self._base_config.zarr_save, + analysis_parameters=normalize_analysis_operation_parameters( + self._base_config.analysis_parameters + ), + ) + plan = plan_execution( + workflow, + workload="analysis", + shape_tpczyx=self._analysis_store_shape_tpczyx(), + chunks_tpczyx=self._base_config.zarr_save.chunks_tpczyx(), + dtype_itemsize=self._analysis_store_dtype_itemsize(), + calibration_profiles=_load_execution_calibration_profiles(), + ) + text = ( + f"Policy: {format_execution_policy_summary(self._execution_policy)}\n" + f"Plan: {format_execution_plan_summary(plan)}" + ) self._dask_backend_summary_label.setText(text) self._dask_backend_summary_label.setToolTip(text) self._refresh_dask_dashboard_button_state() @@ -11162,7 +11713,7 @@ def _analysis_store_dtype_itemsize(self) -> Optional[int]: return None def _on_edit_dask_backend(self) -> None: - """Open backend settings dialog and apply selected configuration. + """Open execution-planning dialog and apply selected configuration. Parameters ---------- @@ -11178,20 +11729,61 @@ def _on_edit_dask_backend(self) -> None: None Validation and persistence errors are handled internally. """ - dialog = DaskBackendConfigDialog( - initial=self._dask_backend_config, + dialog = ExecutionPolicyDialog( + initial_policy=self._execution_policy, + initial_backend=self._dask_backend_config, + workload="analysis", + summary_workflow_factory=lambda policy, backend: WorkflowConfig( + file=self._base_config.file, + analysis_targets=self._analysis_targets, + analysis_selected_experiment_path=( + self._base_config.analysis_selected_experiment_path + ), + analysis_apply_to_all=bool( + self._analysis_apply_to_all_checkbox.isChecked() + if self._analysis_apply_to_all_checkbox is not None + else False + ), + prefer_dask=self._base_config.prefer_dask, + execution_policy=policy, + dask_backend=backend, + chunks=self._base_config.chunks, + flatfield=self._operation_checkboxes["flatfield"].isChecked(), + deconvolution=self._operation_checkboxes["deconvolution"].isChecked(), + shear_transform=self._operation_checkboxes["shear_transform"].isChecked(), + particle_detection=self._operation_checkboxes["particle_detection"].isChecked(), + usegment3d=self._operation_checkboxes["usegment3d"].isChecked(), + registration=self._operation_checkboxes["registration"].isChecked(), + visualization=self._operation_checkboxes["visualization"].isChecked(), + mip_export=self._operation_checkboxes["mip_export"].isChecked(), + zarr_save=self._base_config.zarr_save, + analysis_parameters=normalize_analysis_operation_parameters( + self._base_config.analysis_parameters + ), + ), recommendation_shape_tpczyx=self._analysis_store_shape_tpczyx(), recommendation_chunks_tpczyx=self._base_config.zarr_save.chunks_tpczyx(), recommendation_dtype_itemsize=self._analysis_store_dtype_itemsize(), parent=self, ) result = dialog.exec() - if result != QDialog.DialogCode.Accepted or dialog.result_config is None: + if ( + result != QDialog.DialogCode.Accepted + or dialog.result_policy is None + or dialog.result_backend is None + ): return - self._dask_backend_config = dialog.result_config + self._execution_policy = dialog.result_policy + self._dask_backend_config = dialog.result_backend + _save_last_used_execution_policy( + replace( + self._execution_policy, + calibration_policy="use_if_available", + ) + ) _save_last_used_dask_backend_config(self._dask_backend_config) self._refresh_dask_backend_summary() - self._set_status("Updated Dask backend settings.") + self._set_status("Updated execution planning settings.") @staticmethod def _normalize_dashboard_url( @@ -11328,6 +11920,8 @@ def _resolve_dask_dashboard_url(self) -> Optional[str]: None Parsing failures are handled internally. """ + if self._execution_policy.mode != "advanced": + return None mode = str(self._dask_backend_config.mode).strip().lower() if mode == DASK_BACKEND_LOCAL_CLUSTER: return self._normalize_dashboard_url("127.0.0.1:8787") @@ -13897,6 +14491,7 @@ def _on_run(self) -> None: else False ), "prefer_dask": self._base_config.prefer_dask, + "execution_policy": self._execution_policy, "dask_backend": self._dask_backend_config, "chunks": self._base_config.chunks, "flatfield": selected_flags["flatfield"], @@ -13914,6 +14509,12 @@ def _on_run(self) -> None: workflow_kwargs["usegment3d"] = selected_flags["usegment3d"] self.result_config = WorkflowConfig(**workflow_kwargs) self._persist_analysis_gui_state_for_target(selected_target) + _save_last_used_execution_policy( + replace( + self._execution_policy, + calibration_policy="use_if_available", + ) + ) _save_last_used_dask_backend_config(self._dask_backend_config) sequence = self._selected_operations_in_sequence() sequence_text = " -> ".join( @@ -14010,12 +14611,26 @@ def launch_gui( settings_directory = _ensure_clearex_settings_directory() settings_path = _resolve_dask_backend_settings_path(settings_directory) + execution_policy_settings_path = _resolve_execution_policy_settings_path( + settings_directory + ) zarr_settings_path = _resolve_zarr_save_settings_path(settings_directory) effective_initial = initial or WorkflowConfig() + persisted_execution_policy = _load_last_used_execution_policy( + settings_path=execution_policy_settings_path + ) persisted_backend = _load_last_used_dask_backend_config(settings_path=settings_path) persisted_zarr_save = _load_last_used_zarr_save_config( settings_path=zarr_settings_path ) + if ( + persisted_execution_policy is not None + and _should_apply_persisted_execution_policy(initial) + ): + effective_initial = replace( + effective_initial, + execution_policy=persisted_execution_policy, + ) if persisted_backend is not None and _should_apply_persisted_dask_backend(initial): effective_initial = replace(effective_initial, dask_backend=persisted_backend) if persisted_zarr_save is not None and _should_apply_persisted_zarr_save(initial): @@ -14111,7 +14726,9 @@ def _reset_analysis_selection_for_next_run(workflow: WorkflowConfig) -> Workflow ), "analysis_apply_to_all": workflow.analysis_apply_to_all, "prefer_dask": workflow.prefer_dask, + "execution_policy": workflow.execution_policy, "dask_backend": workflow.dask_backend, + "execution_plan": None, "chunks": workflow.chunks, "flatfield": False, "deconvolution": False, diff --git a/src/clearex/io/cli.py b/src/clearex/io/cli.py index c87f366..f93497c 100644 --- a/src/clearex/io/cli.py +++ b/src/clearex/io/cli.py @@ -232,6 +232,32 @@ def create_parser() -> argparse.ArgumentParser: default=None, help="Chunk spec for Dask, e.g. '256,256,64' or single int", ) + parser.add_argument( + "--execution-mode", + type=str, + choices=("auto", "advanced"), + default=None, + help="Execution planning mode for Dask resources", + ) + parser.add_argument( + "--max-workers", + type=int, + default=None, + help="Maximum worker count for automatic execution planning", + ) + parser.add_argument( + "--memory-per-worker", + type=str, + default=None, + help="Preferred per-worker memory limit for automatic execution planning", + ) + parser.add_argument( + "--calibrate", + required=False, + default=False, + action="store_true", + help="Refresh the cached execution profile before planning", + ) parser.add_argument( "--gui", diff --git a/src/clearex/io/provenance.py b/src/clearex/io/provenance.py index 374156b..b022676 100644 --- a/src/clearex/io/provenance.py +++ b/src/clearex/io/provenance.py @@ -51,8 +51,12 @@ from clearex.workflow import ( WorkflowConfig, dask_backend_to_dict, + execution_plan_to_dict, + execution_policy_to_dict, format_dask_backend_summary, format_chunks, + format_execution_plan_summary, + format_execution_policy_summary, format_zarr_chunks_ptczyx, format_zarr_pyramid_ptczyx, ) @@ -359,6 +363,22 @@ def _default_steps(workflow: WorkflowConfig) -> list[Dict[str, Any]]: "name": "load_data", "parameters": { "prefer_dask": workflow.prefer_dask, + "execution_policy_summary": format_execution_policy_summary( + workflow.execution_policy + ), + "execution_policy": execution_policy_to_dict( + workflow.execution_policy + ), + "execution_plan_summary": ( + format_execution_plan_summary(workflow.execution_plan) + if workflow.execution_plan is not None + else None + ), + "execution_plan": ( + execution_plan_to_dict(workflow.execution_plan) + if workflow.execution_plan is not None + else None + ), "chunks": format_chunks(workflow.chunks) or None, "dask_backend_summary": format_dask_backend_summary( workflow.dask_backend @@ -982,6 +1002,20 @@ def persist_run_provenance( workflow_payload = { "file": workflow.file, "prefer_dask": workflow.prefer_dask, + "execution_policy_summary": format_execution_policy_summary( + workflow.execution_policy + ), + "execution_policy": execution_policy_to_dict(workflow.execution_policy), + "execution_plan_summary": ( + format_execution_plan_summary(workflow.execution_plan) + if workflow.execution_plan is not None + else None + ), + "execution_plan": ( + execution_plan_to_dict(workflow.execution_plan) + if workflow.execution_plan is not None + else None + ), "dask_backend_summary": format_dask_backend_summary(workflow.dask_backend), "dask_backend": dask_backend_to_dict(workflow.dask_backend), "chunks": format_chunks(workflow.chunks) or None, diff --git a/src/clearex/main.py b/src/clearex/main.py index 6a088c8..f8423f2 100644 --- a/src/clearex/main.py +++ b/src/clearex/main.py @@ -28,8 +28,9 @@ # Standard Library Imports from contextlib import ExitStack from datetime import datetime, timezone +import inspect from pathlib import Path -from typing import Any, Callable, Dict, Optional, Sequence +from typing import Any, Callable, Dict, Mapping, Optional, Sequence import argparse import json import logging @@ -120,17 +121,26 @@ def run_usegment3d_analysis(*, zarr_path, parameters, client, progress_callback) DASK_BACKEND_SLURM_CLUSTER, DASK_BACKEND_SLURM_RUNNER, AnalysisInputReference, + CalibrationProfile, DaskBackendConfig, - LocalClusterConfig, + ExecutionPolicy, WorkflowConfig, WorkflowExecutionCancelled, analysis_chainable_output_component, + calibration_profile_from_dict, + calibration_profile_to_dict, collect_analysis_input_references, dask_backend_from_dict, dask_backend_to_dict, + execution_plan_to_dict, + execution_policy_from_dict, + execution_policy_to_dict, format_dask_backend_summary, format_chunks, + format_execution_plan_summary, + format_execution_policy_summary, normalize_analysis_operation_parameters, + plan_execution, recommend_local_cluster_config, resolve_analysis_input_component, resolve_analysis_execution_sequence, @@ -142,6 +152,10 @@ def run_usegment3d_analysis(*, zarr_path, parameters, client, progress_callback) _CLEAREX_SETTINGS_DIR_NAME = ".clearex" _CLEAREX_DASK_BACKEND_SETTINGS_FILE = "dask_backend_settings.json" +_CLEAREX_EXECUTION_POLICY_SETTINGS_FILE = "execution_policy_settings.json" +_CLEAREX_EXECUTION_CALIBRATION_PROFILES_FILE = ( + "execution_calibration_profiles.json" +) _ANALYSIS_OPERATIONS_REQUIRING_DASK_CLIENT = frozenset( { @@ -476,10 +490,43 @@ def _build_workflow_config(args: argparse.Namespace) -> WorkflowConfig: ) persisted_dask_backend = _load_persisted_dask_backend_config() + persisted_execution_policy = _load_persisted_execution_policy() + effective_execution_policy = ( + persisted_execution_policy + if persisted_execution_policy is not None + else ExecutionPolicy() + ) + execution_mode_arg = getattr(args, "execution_mode", None) + max_workers_arg = getattr(args, "max_workers", None) + memory_per_worker_arg = getattr(args, "memory_per_worker", None) + refresh_calibration = bool(getattr(args, "calibrate", False)) + effective_execution_policy = ExecutionPolicy( + mode=( + str(execution_mode_arg).strip().lower() + if execution_mode_arg is not None and str(execution_mode_arg).strip() + else effective_execution_policy.mode + ), + max_workers=( + int(max_workers_arg) + if max_workers_arg is not None + else effective_execution_policy.max_workers + ), + memory_per_worker_limit=( + str(memory_per_worker_arg).strip() + if memory_per_worker_arg is not None and str(memory_per_worker_arg).strip() + else effective_execution_policy.memory_per_worker_limit + ), + calibration_policy=( + "refresh" + if refresh_calibration + else effective_execution_policy.calibration_policy + ), + ) return WorkflowConfig( file=args.file, prefer_dask=args.dask, + execution_policy=effective_execution_policy, dask_backend=( persisted_dask_backend if persisted_dask_backend is not None @@ -552,6 +599,77 @@ def _load_persisted_dask_backend_config() -> Optional[DaskBackendConfig]: return dask_backend_from_dict(payload) +def _resolve_persisted_execution_policy_settings_path() -> Path: + """Resolve the user settings JSON path for persisted execution policy.""" + return ( + Path.home() + / _CLEAREX_SETTINGS_DIR_NAME + / _CLEAREX_EXECUTION_POLICY_SETTINGS_FILE + ).expanduser() + + +def _resolve_persisted_execution_calibration_profiles_path() -> Path: + """Resolve the user settings JSON path for persisted calibration profiles.""" + return ( + Path.home() + / _CLEAREX_SETTINGS_DIR_NAME + / _CLEAREX_EXECUTION_CALIBRATION_PROFILES_FILE + ).expanduser() + + +def _load_persisted_execution_policy() -> Optional[ExecutionPolicy]: + """Load persisted execution policy for CLI/headless execution.""" + settings_path = _resolve_persisted_execution_policy_settings_path() + if not settings_path.exists(): + return None + try: + payload = json.loads(settings_path.read_text(encoding="utf-8")) + except Exception: + return None + if not isinstance(payload, dict) or not payload: + return None + return execution_policy_from_dict(payload) + + +def _load_persisted_execution_calibration_profiles() -> Dict[str, CalibrationProfile]: + """Load persisted execution calibration profiles.""" + settings_path = _resolve_persisted_execution_calibration_profiles_path() + if not settings_path.exists(): + return {} + try: + payload = json.loads(settings_path.read_text(encoding="utf-8")) + except Exception: + return {} + if not isinstance(payload, dict): + return {} + profiles: Dict[str, CalibrationProfile] = {} + for key, value in payload.items(): + profile = calibration_profile_from_dict(value) + if profile is None: + continue + profiles[str(key)] = profile + return profiles + + +def _save_persisted_execution_calibration_profiles( + profiles: Mapping[str, CalibrationProfile], +) -> None: + """Persist execution calibration profiles best-effort.""" + settings_path = _resolve_persisted_execution_calibration_profiles_path() + try: + settings_path.parent.mkdir(parents=True, exist_ok=True) + payload = { + str(key): calibration_profile_to_dict(profile) + for key, profile in profiles.items() + } + settings_path.write_text( + json.dumps(payload, indent=2, sort_keys=True) + "\n", + encoding="utf-8", + ) + except Exception: + return + + def _extract_axis_map(info: ImageInfo) -> Dict[str, int]: """Map axis labels to corresponding dimension sizes. @@ -760,6 +878,8 @@ def _configure_dask_backend( exit_stack: ExitStack, *, workload: str = "io", + shape_tpczyx: Optional[tuple[int, int, int, int, int, int]] = None, + dtype_itemsize: Optional[int] = None, ) -> Optional[Any]: """Initialize and register the configured Dask backend. @@ -787,107 +907,97 @@ def _configure_dask_backend( Backend initialization errors are converted into warnings and the workflow continues without a distributed client. This keeps local/headless paths operational even when optional Dask distributed backends are unavailable. - When LocalCluster ``n_workers`` is unset, runtime applies aggressive - host/data-aware defaults from - :func:`clearex.workflow.recommend_local_cluster_config`, including worker - count and, when left at defaults, thread and memory settings. """ if not workflow.prefer_dask: logger.info("Dask lazy loading disabled; skipping backend startup.") return None - backend = workflow.dask_backend workload_name = workload.strip().lower() + calibration_profiles = _load_persisted_execution_calibration_profiles() + execution_plan = plan_execution( + workflow, + workload=workload_name, + shape_tpczyx=shape_tpczyx, + dtype_itemsize=dtype_itemsize, + calibration_profiles=calibration_profiles, + ) + workflow.execution_plan = execution_plan + backend = execution_plan.backend_config + if execution_plan.calibration_profile is not None: + calibration_profiles[execution_plan.calibration_profile.profile_key] = ( + execution_plan.calibration_profile + ) + _save_persisted_execution_calibration_profiles(calibration_profiles) + + logger.info( + "Execution policy: %s", + format_execution_policy_summary(workflow.execution_policy), + ) logger.info( - "Dask backend selection: " - f"{format_dask_backend_summary(backend)} (workload={workload_name})" + "Execution plan: %s", + format_execution_plan_summary(execution_plan), ) try: if backend.mode == DASK_BACKEND_LOCAL_CLUSTER: local_cfg = backend.local_cluster - requested_processes = workload_name == "analysis" - default_local_cfg = LocalClusterConfig() - effective_n_workers = local_cfg.n_workers - effective_threads_per_worker = local_cfg.threads_per_worker - effective_memory_limit = local_cfg.memory_limit - if effective_n_workers is None: + detected_gpu_count = int(execution_plan.environment.gpu_count) + legacy_local_worker_cap = ( + int(workflow.dask_backend.local_cluster.n_workers) + if workflow.dask_backend.mode == DASK_BACKEND_LOCAL_CLUSTER + and workflow.dask_backend.local_cluster.n_workers is not None + else None + ) + if execution_plan.worker_kind == "gpu_process": recommendation = recommend_local_cluster_config( + shape_tpczyx=shape_tpczyx, chunks_tpczyx=workflow.zarr_save.chunks_tpczyx(), + dtype_itemsize=dtype_itemsize, ) - effective_n_workers = recommendation.config.n_workers - if local_cfg.threads_per_worker == default_local_cfg.threads_per_worker: - effective_threads_per_worker = ( - recommendation.config.threads_per_worker - ) - if ( - str(local_cfg.memory_limit).strip().lower() - == str(default_local_cfg.memory_limit).strip().lower() - ): - effective_memory_limit = recommendation.config.memory_limit - logger.info( - "Auto-selected aggressive LocalCluster settings from " - "host/data recommendation: " - f"workers={effective_n_workers}, " - f"threads_per_worker={effective_threads_per_worker}, " - f"memory_limit={effective_memory_limit}, " - f"gpus={recommendation.detected_gpu_count}." + detected_gpu_count = max( + detected_gpu_count, + int(recommendation.detected_gpu_count), ) - - if workload_name == "analysis": - gpu_worker_cap: Optional[int] = None - use_gpu_local_cluster = False - if bool(getattr(workflow, "usegment3d", False)): - try: - normalized_params = normalize_analysis_operation_parameters( - workflow.analysis_parameters - ) - except Exception: - normalized_params = {} - usegment3d_params = dict(normalized_params.get("usegment3d", {})) - gpu_requested = bool( - usegment3d_params.get("gpu", False) - or usegment3d_params.get("require_gpu", False) - ) - if gpu_requested: - gpu_recommendation = recommend_local_cluster_config( - chunks_tpczyx=workflow.zarr_save.chunks_tpczyx(), - ) - detected_gpu_count = int(gpu_recommendation.detected_gpu_count) - if detected_gpu_count > 0: - gpu_worker_cap = max(1, detected_gpu_count) - use_gpu_local_cluster = True - - if gpu_worker_cap is not None: - requested_workers = ( - int(effective_n_workers) - if effective_n_workers is not None - else int(gpu_worker_cap) - ) - if requested_workers > int(gpu_worker_cap): - logger.info( - "GPU-aware LocalCluster cap applied for analysis: " - f"requested_workers={requested_workers}, " - f"capped_workers={int(gpu_worker_cap)}." - ) - effective_n_workers = int(gpu_worker_cap) - else: - use_gpu_local_cluster = False - - effective_worker_count = ( - int(effective_n_workers) if effective_n_workers is not None else 1 + use_gpu_local_cluster = ( + execution_plan.worker_kind == "gpu_process" + and detected_gpu_count > 0 ) - use_processes = bool(requested_processes or effective_worker_count > 1) - if not requested_processes and use_processes: + effective_worker_count = int(local_cfg.n_workers or execution_plan.workers) + if ( + workload_name != "analysis" + and legacy_local_worker_cap is not None + and workflow.execution_policy.max_workers is None + ): + effective_worker_count = int(legacy_local_worker_cap) + elif use_gpu_local_cluster: + effective_worker_cap = int( + workflow.execution_policy.max_workers + if workflow.execution_policy.max_workers is not None + else ( + legacy_local_worker_cap + if legacy_local_worker_cap is not None + else effective_worker_count + ) + ) + effective_worker_count = min( + max(1, effective_worker_cap), + max(1, detected_gpu_count), + ) + use_processes = True + if execution_plan.worker_kind == "thread": + use_processes = False + elif workload_name != "analysis" and effective_worker_count <= 1: + use_processes = False + elif workload_name != "analysis" and effective_worker_count > 1: logger.info( "Using process-based LocalCluster for multi-worker I/O " "execution (memory isolation enabled)." ) client = create_dask_client( - n_workers=effective_n_workers, - threads_per_worker=effective_threads_per_worker, + n_workers=effective_worker_count, + threads_per_worker=int(local_cfg.threads_per_worker), processes=use_processes, - memory_limit=effective_memory_limit, + memory_limit=local_cfg.memory_limit, local_directory=local_cfg.local_directory, gpu_enabled=use_gpu_local_cluster, ) @@ -930,16 +1040,14 @@ def _configure_dask_backend( cluster_cfg = backend.slurm_cluster if ( - workload.strip().lower() == "analysis" + workload_name == "analysis" and int(cluster_cfg.processes) == 1 and int(cluster_cfg.cores) > 1 ): logger.warning( "SLURMCluster is configured with processes=1 and cores=%s. " - "CPU-bound Python analyses (for example shear transform) may " - "underutilize allocated CPUs with this layout. " - "For maximum process-level parallelism, increase processes " - "toward cores in the Dask backend configuration.", + "CPU-bound Python analyses may underutilize allocated CPUs " + "with this layout.", cluster_cfg.cores, ) extra_directives = [ @@ -996,6 +1104,43 @@ def _configure_dask_backend( return None +def _callable_accepts_keyword_argument( + callback: Callable[..., Any], + *, + keyword: str, +) -> bool: + """Return whether a callable accepts one keyword argument. + + Parameters + ---------- + callback : callable + Callable to inspect. + keyword : str + Keyword argument name. + + Returns + ------- + bool + ``True`` when the callable explicitly accepts the keyword or a + ``**kwargs`` catch-all. + + Notes + ----- + Inspection failures default to ``True`` so opaque callables remain + callable through this compatibility check. + """ + try: + signature = inspect.signature(callback) + except (TypeError, ValueError): + return True + if keyword in signature.parameters: + return True + return any( + parameter.kind is inspect.Parameter.VAR_KEYWORD + for parameter in signature.parameters.values() + ) + + def _run_workflow( workflow: WorkflowConfig, logger: logging.Logger, @@ -1173,6 +1318,14 @@ def _emit_analysis_progress(percent: int, message: str) -> None: "parameters": { "source_path": input_path, "prefer_dask": workflow.prefer_dask, + "execution_policy": execution_policy_to_dict( + workflow.execution_policy + ), + "execution_plan": ( + execution_plan_to_dict(workflow.execution_plan) + if workflow.execution_plan is not None + else None + ), "chunks": format_chunks(workflow.chunks) or None, "dask_backend": dask_backend_to_dict(workflow.dask_backend), }, @@ -1258,17 +1411,28 @@ def _emit_analysis_progress(percent: int, message: str) -> None: ) _emit_analysis_progress(100, str(first_issue.message)) - analysis_client = ( - _configure_dask_backend( - workflow=workflow, - logger=logger, - exit_stack=analysis_stack, - workload="analysis", - ) - if failure_exc is None + analysis_client = None + if ( + failure_exc is None and _analysis_execution_requires_dask_client(execution_sequence) - else None - ) + ): + configure_kwargs: Dict[str, Any] = { + "workflow": workflow, + "logger": logger, + "exit_stack": analysis_stack, + "workload": "analysis", + } + dtype_itemsize = ( + int(getattr(image_info.dtype, "itemsize", 0)) + if image_info is not None + else None + ) + if dtype_itemsize is not None and _callable_accepts_keyword_argument( + _configure_dask_backend, + keyword="dtype_itemsize", + ): + configure_kwargs["dtype_itemsize"] = dtype_itemsize + analysis_client = _configure_dask_backend(**configure_kwargs) produced_components: Dict[str, str] = {"data": "data"} total_operations = max(1, len(execution_sequence)) @@ -2379,7 +2543,9 @@ def _mip_export_progress(percent: int, message: str) -> None: provenance_workflow = WorkflowConfig( file=input_path, prefer_dask=workflow.prefer_dask, + execution_policy=workflow.execution_policy, dask_backend=workflow.dask_backend, + execution_plan=workflow.execution_plan, chunks=workflow.chunks, flatfield=workflow.flatfield, deconvolution=workflow.deconvolution, diff --git a/src/clearex/workflow.py b/src/clearex/workflow.py index eed8382..085c9cf 100644 --- a/src/clearex/workflow.py +++ b/src/clearex/workflow.py @@ -26,10 +26,14 @@ from copy import deepcopy from dataclasses import dataclass, field +import hashlib +from importlib.metadata import PackageNotFoundError, version +import json import math import os +import re import subprocess -from typing import Any, Collection, Dict, Literal, Mapping, Optional, Sequence, Tuple, Union +from typing import Any, Collection, Dict, Literal, Mapping, Optional, Sequence, Tuple, Union, cast ChunkSpec = Optional[Union[int, Tuple[int, ...]]] @@ -2761,6 +2765,26 @@ def zarr_save_from_dict(payload: Any) -> ZarrSaveConfig: DASK_BACKEND_SLURM_CLUSTER: "SLURMCluster", } +EXECUTION_POLICY_AUTO = "auto" +EXECUTION_POLICY_ADVANCED = "advanced" +ExecutionPolicyMode = Literal["auto", "advanced"] + +EXECUTION_CALIBRATION_USE_IF_AVAILABLE = "use_if_available" +EXECUTION_CALIBRATION_REFRESH = "refresh" +ExecutionCalibrationPolicy = Literal["use_if_available", "refresh"] + +EXECUTION_GPU_MODE_NEVER = "never" +EXECUTION_GPU_MODE_OPTIONAL = "optional" +EXECUTION_GPU_MODE_REQUIRED = "required" +ExecutionGpuMode = Literal["never", "optional", "required"] + +EXECUTION_WORKER_KIND_THREAD = "thread" +EXECUTION_WORKER_KIND_PROCESS = "process" +EXECUTION_WORKER_KIND_GPU_PROCESS = "gpu_process" +ExecutionWorkerKind = Literal["thread", "process", "gpu_process"] + +EXECUTION_PLAN_MODEL_VERSION = "1" + DEFAULT_SLURM_CLUSTER_JOB_EXTRA_DIRECTIVES: Tuple[str, ...] = ( "--nodes=1", "--ntasks=1", @@ -3218,6 +3242,316 @@ def __post_init__(self) -> None: object.__setattr__(self, "mode", mode) +@dataclass(frozen=True) +class ExecutionPolicy: + """Operator-facing execution-planning policy.""" + + mode: ExecutionPolicyMode = EXECUTION_POLICY_AUTO + max_workers: Optional[int] = None + memory_per_worker_limit: str = "auto" + calibration_policy: ExecutionCalibrationPolicy = ( + EXECUTION_CALIBRATION_USE_IF_AVAILABLE + ) + + def __post_init__(self) -> None: + """Validate execution-policy values.""" + mode = str(self.mode).strip().lower() + if mode not in {EXECUTION_POLICY_AUTO, EXECUTION_POLICY_ADVANCED}: + raise ValueError("Execution policy mode must be 'auto' or 'advanced'.") + object.__setattr__(self, "mode", mode) + object.__setattr__( + self, + "max_workers", + _normalize_optional_positive_int( + self.max_workers, + field_name="ExecutionPolicy max_workers", + ), + ) + memory_limit = ( + str(self.memory_per_worker_limit).strip() + if self.memory_per_worker_limit is not None + else "auto" + ) + object.__setattr__( + self, + "memory_per_worker_limit", + memory_limit or "auto", + ) + calibration_policy = str(self.calibration_policy).strip().lower() + if calibration_policy not in { + EXECUTION_CALIBRATION_USE_IF_AVAILABLE, + EXECUTION_CALIBRATION_REFRESH, + }: + raise ValueError( + "Execution policy calibration_policy must be " + "'use_if_available' or 'refresh'." + ) + object.__setattr__(self, "calibration_policy", calibration_policy) + + +@dataclass(frozen=True) +class AnalysisResourceDescriptor: + """Backend-agnostic resource model for one analysis operation.""" + + operation_name: str + chunk_basis: str + uses_overlap: bool + seed_memory_multiplier: float + seed_cpu_intensity: float + io_intensity: float + gpu_mode: ExecutionGpuMode = EXECUTION_GPU_MODE_NEVER + preferred_worker_kind: ExecutionWorkerKind = EXECUTION_WORKER_KIND_PROCESS + supports_chunk_calibration: bool = False + + def __post_init__(self) -> None: + """Validate descriptor values.""" + if float(self.seed_memory_multiplier) <= 0: + raise ValueError( + "AnalysisResourceDescriptor seed_memory_multiplier must be > 0." + ) + if float(self.seed_cpu_intensity) <= 0: + raise ValueError( + "AnalysisResourceDescriptor seed_cpu_intensity must be > 0." + ) + if float(self.io_intensity) < 0: + raise ValueError("AnalysisResourceDescriptor io_intensity cannot be negative.") + gpu_mode = str(self.gpu_mode).strip().lower() + if gpu_mode not in { + EXECUTION_GPU_MODE_NEVER, + EXECUTION_GPU_MODE_OPTIONAL, + EXECUTION_GPU_MODE_REQUIRED, + }: + raise ValueError("AnalysisResourceDescriptor gpu_mode is invalid.") + object.__setattr__(self, "gpu_mode", gpu_mode) + worker_kind = str(self.preferred_worker_kind).strip().lower() + if worker_kind not in { + EXECUTION_WORKER_KIND_THREAD, + EXECUTION_WORKER_KIND_PROCESS, + EXECUTION_WORKER_KIND_GPU_PROCESS, + }: + raise ValueError( + "AnalysisResourceDescriptor preferred_worker_kind is invalid." + ) + object.__setattr__(self, "preferred_worker_kind", worker_kind) + + +@dataclass(frozen=True) +class EnvironmentCapabilities: + """Detected execution-environment capabilities.""" + + cpu_count: int + memory_bytes: int + gpu_count: int + gpu_memory_bytes: Optional[int] + attached_scheduler_file: Optional[str] = None + scheduler_mode: str = "local" + + def __post_init__(self) -> None: + """Normalize environment-capability values.""" + object.__setattr__(self, "cpu_count", max(1, int(self.cpu_count))) + object.__setattr__(self, "memory_bytes", max(1 << 30, int(self.memory_bytes))) + object.__setattr__(self, "gpu_count", max(0, int(self.gpu_count))) + if self.gpu_memory_bytes is not None: + object.__setattr__( + self, + "gpu_memory_bytes", + max(1, int(self.gpu_memory_bytes)), + ) + object.__setattr__( + self, + "attached_scheduler_file", + _normalize_optional_text(self.attached_scheduler_file), + ) + scheduler_mode = str(self.scheduler_mode).strip().lower() or "local" + object.__setattr__(self, "scheduler_mode", scheduler_mode) + + +@dataclass(frozen=True) +class CalibrationProfile: + """Versioned execution-calibration profile.""" + + profile_key: str + operation_names: Tuple[str, ...] + parameter_signature: str + chunk_shape_tpczyx: Tuple[int, int, int, int, int, int] + dtype_itemsize: int + sample_chunk_count: int + estimated_peak_memory_bytes: int + estimated_seconds_per_chunk: float + cpu_utilization: float + source: str = "geometry_estimate" + confidence: float = 0.35 + environment_fingerprint: str = "" + software_version: str = "" + model_version: str = EXECUTION_PLAN_MODEL_VERSION + + def __post_init__(self) -> None: + """Validate profile values.""" + object.__setattr__(self, "profile_key", str(self.profile_key).strip()) + object.__setattr__( + self, + "operation_names", + tuple( + str(name).strip() for name in self.operation_names if str(name).strip() + ), + ) + object.__setattr__( + self, + "parameter_signature", + str(self.parameter_signature).strip(), + ) + object.__setattr__( + self, + "chunk_shape_tpczyx", + tuple(int(v) for v in self.chunk_shape_tpczyx), + ) + object.__setattr__(self, "dtype_itemsize", max(1, int(self.dtype_itemsize))) + object.__setattr__( + self, + "sample_chunk_count", + max(1, int(self.sample_chunk_count)), + ) + object.__setattr__( + self, + "estimated_peak_memory_bytes", + max(1, int(self.estimated_peak_memory_bytes)), + ) + object.__setattr__( + self, + "estimated_seconds_per_chunk", + max(0.01, float(self.estimated_seconds_per_chunk)), + ) + object.__setattr__( + self, + "cpu_utilization", + max(0.05, float(self.cpu_utilization)), + ) + object.__setattr__(self, "source", str(self.source).strip() or "geometry_estimate") + object.__setattr__( + self, + "confidence", + max(0.0, min(1.0, float(self.confidence))), + ) + object.__setattr__( + self, + "environment_fingerprint", + str(self.environment_fingerprint).strip(), + ) + object.__setattr__( + self, + "software_version", + str(self.software_version).strip(), + ) + object.__setattr__( + self, + "model_version", + str(self.model_version).strip() or EXECUTION_PLAN_MODEL_VERSION, + ) + + +@dataclass(frozen=True) +class WorkerEnvelope: + """Generic worker capacity envelope.""" + + cpus: int + memory_bytes: int + gpus: int = 0 + gpu_memory_bytes: Optional[int] = None + scratch_directory: Optional[str] = None + + def __post_init__(self) -> None: + """Normalize envelope values.""" + object.__setattr__(self, "cpus", max(1, int(self.cpus))) + object.__setattr__(self, "memory_bytes", max(1 << 30, int(self.memory_bytes))) + object.__setattr__(self, "gpus", max(0, int(self.gpus))) + if self.gpu_memory_bytes is not None: + object.__setattr__( + self, + "gpu_memory_bytes", + max(1, int(self.gpu_memory_bytes)), + ) + object.__setattr__( + self, + "scratch_directory", + _normalize_optional_text(self.scratch_directory), + ) + + +@dataclass(frozen=True) +class ExecutionPlan: + """Effective execution plan derived from workflow context.""" + + policy_mode: ExecutionPolicyMode + workload: str + selected_operations: Tuple[str, ...] + worker_kind: ExecutionWorkerKind + backend_config: DaskBackendConfig + workers: int + threads_per_worker: int + memory_per_worker_limit: str + estimated_chunk_bytes: int + estimated_working_set_bytes: int + estimated_chunk_count: Optional[int] + requires_gpu: bool + environment: EnvironmentCapabilities + calibration_profile: Optional[CalibrationProfile] = None + + def __post_init__(self) -> None: + """Normalize execution-plan values.""" + object.__setattr__( + self, + "policy_mode", + str(self.policy_mode).strip().lower() or EXECUTION_POLICY_AUTO, + ) + object.__setattr__( + self, + "workload", + str(self.workload).strip().lower() or "analysis", + ) + object.__setattr__( + self, + "selected_operations", + tuple( + str(name).strip() for name in self.selected_operations if str(name).strip() + ), + ) + worker_kind = str(self.worker_kind).strip().lower() + if worker_kind not in { + EXECUTION_WORKER_KIND_THREAD, + EXECUTION_WORKER_KIND_PROCESS, + EXECUTION_WORKER_KIND_GPU_PROCESS, + }: + raise ValueError("ExecutionPlan worker_kind is invalid.") + object.__setattr__(self, "worker_kind", worker_kind) + object.__setattr__(self, "workers", max(1, int(self.workers))) + object.__setattr__( + self, + "threads_per_worker", + max(1, int(self.threads_per_worker)), + ) + object.__setattr__( + self, + "memory_per_worker_limit", + str(self.memory_per_worker_limit).strip() or "auto", + ) + object.__setattr__( + self, + "estimated_chunk_bytes", + max(1, int(self.estimated_chunk_bytes)), + ) + object.__setattr__( + self, + "estimated_working_set_bytes", + max(1, int(self.estimated_working_set_bytes)), + ) + if self.estimated_chunk_count is not None: + object.__setattr__( + self, + "estimated_chunk_count", + max(1, int(self.estimated_chunk_count)), + ) + + @dataclass(frozen=True) class LocalClusterRecommendation: """Recommended LocalCluster settings derived from host and data context. @@ -3711,6 +4045,436 @@ def format_local_cluster_recommendation_summary( return " | ".join(parts) +_MEMORY_TEXT_PATTERN = re.compile( + r"^\s*(?P\d+(?:\.\d+)?)\s*(?P[kmgt]?i?b)?\s*$", + re.IGNORECASE, +) + + +def _parse_memory_limit_bytes(value: Optional[str]) -> Optional[int]: + """Parse a human-readable memory limit into bytes.""" + if value is None: + return None + text = str(value).strip() + if not text or text.lower() == "auto": + return None + match = _MEMORY_TEXT_PATTERN.match(text) + if match is None: + return None + scalar = float(match.group("value")) + unit = str(match.group("unit") or "b").lower() + multipliers = { + "b": 1, + "kb": 1000, + "mb": 1000**2, + "gb": 1000**3, + "tb": 1000**4, + "kib": 1 << 10, + "mib": 1 << 20, + "gib": 1 << 30, + "tib": 1 << 40, + } + multiplier = multipliers.get(unit) + if multiplier is None: + return None + return max(1, int(scalar * multiplier)) + + +def _clearex_software_version() -> str: + """Return a best-effort ClearEx software version string.""" + try: + return str(version("clearex")).strip() + except PackageNotFoundError: + return "unknown" + except Exception: + return "unknown" + + +def _environment_fingerprint(capabilities: EnvironmentCapabilities) -> str: + """Build a stable environment fingerprint for profile keys.""" + parts = [ + f"cpu={capabilities.cpu_count}", + f"memory={capabilities.memory_bytes}", + f"gpu={capabilities.gpu_count}", + f"gpu_memory={capabilities.gpu_memory_bytes or 0}", + f"scheduler={capabilities.scheduler_mode}", + ] + if capabilities.attached_scheduler_file: + parts.append(f"scheduler_file={capabilities.attached_scheduler_file}") + return "|".join(parts) + + +def detect_environment_capabilities( + *, + scheduler_file: Optional[str] = None, +) -> EnvironmentCapabilities: + """Detect generic execution-environment capabilities.""" + detected_gpu_count, detected_gpu_memory = _detect_local_gpu_info() + attached_scheduler_file = _normalize_optional_text( + scheduler_file or os.environ.get("DASK_SCHEDULER_FILE") + ) + scheduler_mode = "attached_scheduler" if attached_scheduler_file else "local" + return EnvironmentCapabilities( + cpu_count=_detect_local_cpu_count(), + memory_bytes=_detect_local_memory_bytes(), + gpu_count=detected_gpu_count, + gpu_memory_bytes=detected_gpu_memory, + attached_scheduler_file=attached_scheduler_file, + scheduler_mode=scheduler_mode, + ) + + +def default_analysis_resource_descriptors() -> Dict[str, AnalysisResourceDescriptor]: + """Return seeded analysis resource descriptors keyed by operation name.""" + descriptor_defaults: Dict[str, Dict[str, Any]] = { + "flatfield": { + "seed_cpu_intensity": 0.9, + "io_intensity": 0.4, + "gpu_mode": EXECUTION_GPU_MODE_NEVER, + "preferred_worker_kind": EXECUTION_WORKER_KIND_PROCESS, + }, + "deconvolution": { + "seed_cpu_intensity": 1.2, + "io_intensity": 0.35, + "gpu_mode": EXECUTION_GPU_MODE_OPTIONAL, + "preferred_worker_kind": EXECUTION_WORKER_KIND_PROCESS, + }, + "shear_transform": { + "seed_cpu_intensity": 1.1, + "io_intensity": 0.25, + "gpu_mode": EXECUTION_GPU_MODE_NEVER, + "preferred_worker_kind": EXECUTION_WORKER_KIND_PROCESS, + }, + "particle_detection": { + "seed_cpu_intensity": 0.85, + "io_intensity": 0.2, + "gpu_mode": EXECUTION_GPU_MODE_NEVER, + "preferred_worker_kind": EXECUTION_WORKER_KIND_PROCESS, + }, + "usegment3d": { + "seed_cpu_intensity": 1.0, + "io_intensity": 0.3, + "gpu_mode": EXECUTION_GPU_MODE_OPTIONAL, + "preferred_worker_kind": EXECUTION_WORKER_KIND_GPU_PROCESS, + }, + "registration": { + "seed_cpu_intensity": 0.9, + "io_intensity": 0.35, + "gpu_mode": EXECUTION_GPU_MODE_NEVER, + "preferred_worker_kind": EXECUTION_WORKER_KIND_PROCESS, + }, + "visualization": { + "seed_cpu_intensity": 0.4, + "io_intensity": 0.6, + "gpu_mode": EXECUTION_GPU_MODE_OPTIONAL, + "preferred_worker_kind": EXECUTION_WORKER_KIND_THREAD, + }, + "mip_export": { + "seed_cpu_intensity": 0.6, + "io_intensity": 0.8, + "gpu_mode": EXECUTION_GPU_MODE_NEVER, + "preferred_worker_kind": EXECUTION_WORKER_KIND_PROCESS, + }, + } + descriptors: Dict[str, AnalysisResourceDescriptor] = {} + normalized_defaults = normalize_analysis_operation_parameters(None) + for operation_name in ANALYSIS_OPERATION_ORDER: + params = dict(normalized_defaults.get(operation_name, {})) + descriptor_overrides = descriptor_defaults.get(operation_name, {}) + descriptors[operation_name] = AnalysisResourceDescriptor( + operation_name=operation_name, + chunk_basis=str(params.get("chunk_basis", "3d")).strip() or "3d", + uses_overlap=bool(params.get("use_map_overlap", False)), + seed_memory_multiplier=float(params.get("memory_overhead_factor", 1.0)), + seed_cpu_intensity=float( + descriptor_overrides.get("seed_cpu_intensity", 1.0) + ), + io_intensity=float(descriptor_overrides.get("io_intensity", 0.25)), + gpu_mode=str( + descriptor_overrides.get("gpu_mode", EXECUTION_GPU_MODE_NEVER) + ), + preferred_worker_kind=str( + descriptor_overrides.get( + "preferred_worker_kind", + EXECUTION_WORKER_KIND_PROCESS, + ) + ), + supports_chunk_calibration=False, + ) + return descriptors + + +def _selected_analysis_parameter_signature( + operation_names: Sequence[str], + analysis_parameters: Optional[Mapping[str, Mapping[str, Any]]], +) -> str: + """Return a stable parameter signature for selected operations.""" + payload: Dict[str, Any] = {} + normalized = normalize_analysis_operation_parameters(analysis_parameters) + for operation_name in operation_names: + payload[str(operation_name)] = normalized.get(str(operation_name), {}) + return json.dumps(payload, sort_keys=True, default=str, separators=(",", ":")) + + +def build_execution_calibration_profile( + *, + operation_names: Sequence[str], + analysis_parameters: Optional[Mapping[str, Mapping[str, Any]]], + chunks_tpczyx: Tuple[int, int, int, int, int, int], + dtype_itemsize: int, + capabilities: EnvironmentCapabilities, + descriptors: Optional[Mapping[str, AnalysisResourceDescriptor]] = None, + estimated_chunk_count: Optional[int] = None, +) -> CalibrationProfile: + """Build a versioned execution profile from dataset geometry and defaults.""" + descriptor_map = ( + dict(descriptors) if descriptors is not None else default_analysis_resource_descriptors() + ) + normalized = normalize_analysis_operation_parameters(analysis_parameters) + selected_operations = tuple( + str(name).strip() for name in operation_names if str(name).strip() + ) + if not selected_operations: + selected_operations = ("analysis",) + itemsize = max(1, int(dtype_itemsize)) + effective_chunks = tuple(int(v) for v in chunks_tpczyx) + estimated_chunk_bytes = max(1, math.prod(effective_chunks) * itemsize) + + memory_multiplier = 1.0 + cpu_intensity = 0.5 + overlap_factor = 1.0 + for operation_name in selected_operations: + params = dict(normalized.get(operation_name, {})) + descriptor = descriptor_map.get(operation_name) + if descriptor is None: + continue + memory_multiplier = max( + memory_multiplier, + float(descriptor.seed_memory_multiplier), + ) + cpu_intensity = max(cpu_intensity, float(descriptor.seed_cpu_intensity)) + if bool(params.get("use_map_overlap", descriptor.uses_overlap)): + overlap_zyx = params.get("overlap_zyx", [0, 0, 0]) + if isinstance(overlap_zyx, Collection) and len(overlap_zyx) == 3: + zyx_chunks = effective_chunks[3:] + candidate_factor = 1.0 + for chunk_value, overlap_value in zip( + zyx_chunks, + overlap_zyx, + strict=False, + ): + chunk_size = max(1, int(chunk_value)) + overlap_size = max(0, int(overlap_value)) + candidate_factor *= min( + 4.0, + (chunk_size + (2 * overlap_size)) / chunk_size, + ) + overlap_factor = max(overlap_factor, candidate_factor) + + estimated_peak_memory_bytes = max( + estimated_chunk_bytes, + int(math.ceil(estimated_chunk_bytes * memory_multiplier * overlap_factor)), + ) + estimated_seconds_per_chunk = max( + 0.05, + round( + (estimated_peak_memory_bytes / float(256 << 20)) + * max(0.25, float(cpu_intensity)), + 3, + ), + ) + sample_chunk_count = max( + 1, + min(10, int(estimated_chunk_count or 10)), + ) + parameter_signature = _selected_analysis_parameter_signature( + selected_operations, + normalized, + ) + fingerprint = _environment_fingerprint(capabilities) + key_material = { + "model_version": EXECUTION_PLAN_MODEL_VERSION, + "operations": list(selected_operations), + "parameter_signature": parameter_signature, + "chunks_tpczyx": list(effective_chunks), + "dtype_itemsize": itemsize, + "environment": fingerprint, + "software_version": _clearex_software_version(), + } + profile_key = hashlib.sha256( + json.dumps(key_material, sort_keys=True, separators=(",", ":")).encode( + "utf-8" + ) + ).hexdigest() + return CalibrationProfile( + profile_key=profile_key, + operation_names=selected_operations, + parameter_signature=parameter_signature, + chunk_shape_tpczyx=effective_chunks, + dtype_itemsize=itemsize, + sample_chunk_count=sample_chunk_count, + estimated_peak_memory_bytes=estimated_peak_memory_bytes, + estimated_seconds_per_chunk=estimated_seconds_per_chunk, + cpu_utilization=min(1.0, max(0.1, cpu_intensity / 1.25)), + source="geometry_estimate", + confidence=0.35, + environment_fingerprint=fingerprint, + software_version=_clearex_software_version(), + model_version=EXECUTION_PLAN_MODEL_VERSION, + ) + + +def execution_policy_to_dict(config: ExecutionPolicy) -> Dict[str, Any]: + """Serialize execution policy into a JSON-friendly mapping.""" + return { + "mode": config.mode, + "max_workers": config.max_workers, + "memory_per_worker_limit": config.memory_per_worker_limit, + "calibration_policy": config.calibration_policy, + } + + +def execution_policy_from_dict(payload: Any) -> ExecutionPolicy: + """Deserialize an execution-policy mapping.""" + defaults = ExecutionPolicy() + if not isinstance(payload, Mapping): + return defaults + try: + return ExecutionPolicy( + mode=str(payload.get("mode", defaults.mode)).strip().lower() + or defaults.mode, + max_workers=payload.get("max_workers", defaults.max_workers), + memory_per_worker_limit=payload.get( + "memory_per_worker_limit", + defaults.memory_per_worker_limit, + ), + calibration_policy=payload.get( + "calibration_policy", + defaults.calibration_policy, + ), + ) + except Exception: + return defaults + + +def calibration_profile_to_dict(profile: CalibrationProfile) -> Dict[str, Any]: + """Serialize a calibration profile.""" + return { + "profile_key": profile.profile_key, + "operation_names": list(profile.operation_names), + "parameter_signature": profile.parameter_signature, + "chunk_shape_tpczyx": list(profile.chunk_shape_tpczyx), + "dtype_itemsize": profile.dtype_itemsize, + "sample_chunk_count": profile.sample_chunk_count, + "estimated_peak_memory_bytes": profile.estimated_peak_memory_bytes, + "estimated_seconds_per_chunk": profile.estimated_seconds_per_chunk, + "cpu_utilization": profile.cpu_utilization, + "source": profile.source, + "confidence": profile.confidence, + "environment_fingerprint": profile.environment_fingerprint, + "software_version": profile.software_version, + "model_version": profile.model_version, + } + + +def calibration_profile_from_dict(payload: Any) -> Optional[CalibrationProfile]: + """Deserialize a calibration profile mapping.""" + if not isinstance(payload, Mapping): + return None + try: + return CalibrationProfile( + profile_key=str(payload.get("profile_key", "")).strip(), + operation_names=tuple(payload.get("operation_names", tuple())), + parameter_signature=str(payload.get("parameter_signature", "")).strip(), + chunk_shape_tpczyx=tuple(payload.get("chunk_shape_tpczyx", (1, 1, 1, 256, 256, 256))), + dtype_itemsize=payload.get("dtype_itemsize", 2), + sample_chunk_count=payload.get("sample_chunk_count", 1), + estimated_peak_memory_bytes=payload.get("estimated_peak_memory_bytes", 1), + estimated_seconds_per_chunk=payload.get("estimated_seconds_per_chunk", 0.1), + cpu_utilization=payload.get("cpu_utilization", 0.5), + source=str(payload.get("source", "geometry_estimate")).strip(), + confidence=payload.get("confidence", 0.35), + environment_fingerprint=str( + payload.get("environment_fingerprint", "") + ).strip(), + software_version=str(payload.get("software_version", "")).strip(), + model_version=str(payload.get("model_version", EXECUTION_PLAN_MODEL_VERSION)).strip() + or EXECUTION_PLAN_MODEL_VERSION, + ) + except Exception: + return None + + +def environment_capabilities_to_dict( + capabilities: EnvironmentCapabilities, +) -> Dict[str, Any]: + """Serialize environment capabilities.""" + return { + "cpu_count": capabilities.cpu_count, + "memory_bytes": capabilities.memory_bytes, + "gpu_count": capabilities.gpu_count, + "gpu_memory_bytes": capabilities.gpu_memory_bytes, + "attached_scheduler_file": capabilities.attached_scheduler_file, + "scheduler_mode": capabilities.scheduler_mode, + } + + +def execution_plan_to_dict(plan: ExecutionPlan) -> Dict[str, Any]: + """Serialize an execution plan.""" + return { + "policy_mode": plan.policy_mode, + "workload": plan.workload, + "selected_operations": list(plan.selected_operations), + "worker_kind": plan.worker_kind, + "workers": plan.workers, + "threads_per_worker": plan.threads_per_worker, + "memory_per_worker_limit": plan.memory_per_worker_limit, + "estimated_chunk_bytes": plan.estimated_chunk_bytes, + "estimated_working_set_bytes": plan.estimated_working_set_bytes, + "estimated_chunk_count": plan.estimated_chunk_count, + "requires_gpu": plan.requires_gpu, + "backend_config": dask_backend_to_dict(plan.backend_config), + "environment": environment_capabilities_to_dict(plan.environment), + "calibration_profile": ( + calibration_profile_to_dict(plan.calibration_profile) + if plan.calibration_profile is not None + else None + ), + } + + +def format_execution_policy_summary(config: ExecutionPolicy) -> str: + """Format a compact execution-policy summary.""" + max_workers_text = ( + str(config.max_workers) if config.max_workers is not None else "auto" + ) + return ( + f"{config.mode} " + f"(max_workers={max_workers_text}, " + f"memory_per_worker={config.memory_per_worker_limit}, " + f"calibration={config.calibration_policy})" + ) + + +def format_execution_plan_summary(plan: ExecutionPlan) -> str: + """Format a compact execution-plan summary.""" + backend_summary = format_dask_backend_summary(plan.backend_config) + parts = [ + backend_summary, + f"workers={plan.workers}", + f"threads={plan.threads_per_worker}", + f"memory={plan.memory_per_worker_limit}", + f"~{_format_binary_size(plan.estimated_working_set_bytes)} working set/chunk", + ] + if plan.requires_gpu: + parts.append("gpu=yes") + if plan.calibration_profile is not None: + parts.append( + f"profile={plan.calibration_profile.source}:{plan.calibration_profile.confidence:.2f}" + ) + return " | ".join(parts) + + def dask_backend_to_dict(config: DaskBackendConfig) -> Dict[str, Any]: """Serialize Dask backend config into JSON-friendly mappings. @@ -3903,10 +4667,13 @@ def dask_backend_from_dict(payload: Any) -> DaskBackendConfig: slurm_cluster = defaults.slurm_cluster mode_value = str(payload.get("mode", defaults.mode)).strip().lower() - mode = ( - mode_value - if mode_value in DASK_BACKEND_MODE_LABELS - else DASK_BACKEND_LOCAL_CLUSTER + mode = cast( + DaskBackendMode, + ( + mode_value + if mode_value in DASK_BACKEND_MODE_LABELS + else DASK_BACKEND_LOCAL_CLUSTER + ), ) try: @@ -4045,8 +4812,12 @@ class WorkflowConfig: ``analysis_targets`` instead of only the selected one. prefer_dask : bool Whether to open data using lazy Dask-backed arrays when supported. + execution_policy : ExecutionPolicy + Operator-facing execution-planning policy used for automatic sizing. dask_backend : DaskBackendConfig - Backend orchestration mode and runtime settings for Dask execution. + Advanced backend orchestration override and persisted scheduler hints. + execution_plan : ExecutionPlan, optional + Effective execution plan derived at runtime. chunks : int or tuple of int, optional Chunking configuration used for Dask reads. flatfield : bool @@ -4076,7 +4847,9 @@ class WorkflowConfig: analysis_selected_experiment_path: Optional[str] = None analysis_apply_to_all: bool = False prefer_dask: bool = True + execution_policy: ExecutionPolicy = field(default_factory=ExecutionPolicy) dask_backend: DaskBackendConfig = field(default_factory=DaskBackendConfig) + execution_plan: Optional[ExecutionPlan] = None chunks: ChunkSpec = None flatfield: bool = False deconvolution: bool = False @@ -4108,6 +4881,8 @@ def __post_init__(self) -> None: ValueError If analysis parameter mappings are invalid. """ + if not isinstance(self.execution_policy, ExecutionPolicy): + self.execution_policy = execution_policy_from_dict(self.execution_policy) self.analysis_targets = normalize_analysis_targets(self.analysis_targets) selected_experiment_path = ( str(self.analysis_selected_experiment_path).strip() @@ -4196,6 +4971,454 @@ def selected_analysis_target(self) -> Optional[AnalysisTarget]: return None +def _selected_operations_for_execution_plan( + workflow: WorkflowConfig, + *, + workload: str, + analysis_parameters: Optional[Dict[str, Dict[str, Any]]] = None, +) -> tuple[str, ...]: + """Return ordered operations relevant to an execution plan.""" + workload_name = str(workload).strip().lower() or "analysis" + if workload_name != "analysis": + return ("io",) + return tuple( + resolve_analysis_execution_sequence( + flatfield=workflow.flatfield, + deconvolution=workflow.deconvolution, + shear_transform=workflow.shear_transform, + particle_detection=workflow.particle_detection, + usegment3d=workflow.usegment3d, + registration=workflow.registration, + visualization=workflow.visualization, + mip_export=workflow.mip_export, + analysis_parameters=analysis_parameters or workflow.analysis_parameters, + ) + ) + + +def _effective_analysis_descriptor( + operation_name: str, + parameters: Optional[Mapping[str, Any]], + *, + seeded_descriptors: Optional[Mapping[str, AnalysisResourceDescriptor]] = None, +) -> AnalysisResourceDescriptor: + """Return one effective descriptor for a selected analysis operation.""" + descriptor_map = ( + dict(seeded_descriptors) + if seeded_descriptors is not None + else default_analysis_resource_descriptors() + ) + base = descriptor_map.get( + str(operation_name), + AnalysisResourceDescriptor( + operation_name=str(operation_name), + chunk_basis="3d", + uses_overlap=False, + seed_memory_multiplier=1.0, + seed_cpu_intensity=1.0, + io_intensity=0.25, + ), + ) + params = dict(parameters or {}) + gpu_mode = base.gpu_mode + preferred_worker_kind = base.preferred_worker_kind + if str(operation_name) == "usegment3d": + if bool(params.get("require_gpu", False)): + gpu_mode = EXECUTION_GPU_MODE_REQUIRED + preferred_worker_kind = EXECUTION_WORKER_KIND_GPU_PROCESS + elif bool(params.get("gpu", False)): + gpu_mode = EXECUTION_GPU_MODE_OPTIONAL + preferred_worker_kind = EXECUTION_WORKER_KIND_GPU_PROCESS + else: + gpu_mode = EXECUTION_GPU_MODE_NEVER + preferred_worker_kind = EXECUTION_WORKER_KIND_PROCESS + elif str(operation_name) == "visualization" and not bool( + params.get("require_gpu_rendering", True) + ): + gpu_mode = EXECUTION_GPU_MODE_NEVER + elif str(operation_name) == "deconvolution" and bool(params.get("gpu_job", False)): + gpu_mode = EXECUTION_GPU_MODE_OPTIONAL + preferred_worker_kind = EXECUTION_WORKER_KIND_GPU_PROCESS + return AnalysisResourceDescriptor( + operation_name=str(operation_name), + chunk_basis=str(params.get("chunk_basis", base.chunk_basis)).strip() + or base.chunk_basis, + uses_overlap=bool(params.get("use_map_overlap", base.uses_overlap)), + seed_memory_multiplier=float( + params.get("memory_overhead_factor", base.seed_memory_multiplier) + ), + seed_cpu_intensity=float(base.seed_cpu_intensity), + io_intensity=float(base.io_intensity), + gpu_mode=gpu_mode, + preferred_worker_kind=preferred_worker_kind, + supports_chunk_calibration=base.supports_chunk_calibration, + ) + + +def _aggregate_execution_descriptor( + *, + workload: str, + operation_names: Sequence[str], + analysis_parameters: Optional[Dict[str, Dict[str, Any]]], +) -> AnalysisResourceDescriptor: + """Aggregate selected operations into one planning descriptor.""" + workload_name = str(workload).strip().lower() or "analysis" + if workload_name != "analysis": + return AnalysisResourceDescriptor( + operation_name="io", + chunk_basis="3d", + uses_overlap=False, + seed_memory_multiplier=1.5, + seed_cpu_intensity=0.65, + io_intensity=1.0, + gpu_mode=EXECUTION_GPU_MODE_NEVER, + preferred_worker_kind=EXECUTION_WORKER_KIND_PROCESS, + supports_chunk_calibration=False, + ) + + normalized = normalize_analysis_operation_parameters(analysis_parameters) + effective_descriptors = [ + _effective_analysis_descriptor( + operation_name, + normalized.get(str(operation_name), {}), + ) + for operation_name in operation_names + ] + if not effective_descriptors: + return AnalysisResourceDescriptor( + operation_name="analysis", + chunk_basis="3d", + uses_overlap=False, + seed_memory_multiplier=1.0, + seed_cpu_intensity=1.0, + io_intensity=0.25, + gpu_mode=EXECUTION_GPU_MODE_NEVER, + preferred_worker_kind=EXECUTION_WORKER_KIND_PROCESS, + supports_chunk_calibration=False, + ) + + gpu_mode = EXECUTION_GPU_MODE_NEVER + preferred_worker_kind = EXECUTION_WORKER_KIND_THREAD + for descriptor in effective_descriptors: + if descriptor.gpu_mode == EXECUTION_GPU_MODE_REQUIRED: + gpu_mode = EXECUTION_GPU_MODE_REQUIRED + elif ( + descriptor.gpu_mode == EXECUTION_GPU_MODE_OPTIONAL + and gpu_mode != EXECUTION_GPU_MODE_REQUIRED + ): + gpu_mode = EXECUTION_GPU_MODE_OPTIONAL + if descriptor.preferred_worker_kind == EXECUTION_WORKER_KIND_GPU_PROCESS: + preferred_worker_kind = EXECUTION_WORKER_KIND_GPU_PROCESS + elif ( + descriptor.preferred_worker_kind == EXECUTION_WORKER_KIND_PROCESS + and preferred_worker_kind != EXECUTION_WORKER_KIND_GPU_PROCESS + ): + preferred_worker_kind = EXECUTION_WORKER_KIND_PROCESS + + return AnalysisResourceDescriptor( + operation_name="analysis_sequence", + chunk_basis=( + "3d" + if any(desc.chunk_basis == "3d" for desc in effective_descriptors) + else "2d" + ), + uses_overlap=any(desc.uses_overlap for desc in effective_descriptors), + seed_memory_multiplier=max( + float(desc.seed_memory_multiplier) for desc in effective_descriptors + ), + seed_cpu_intensity=max( + float(desc.seed_cpu_intensity) for desc in effective_descriptors + ), + io_intensity=max(float(desc.io_intensity) for desc in effective_descriptors), + gpu_mode=gpu_mode, + preferred_worker_kind=preferred_worker_kind, + supports_chunk_calibration=any( + desc.supports_chunk_calibration for desc in effective_descriptors + ), + ) + + +def _estimate_chunk_count( + shape_tpczyx: Optional[Tuple[int, int, int, int, int, int]], + chunks_tpczyx: Tuple[int, int, int, int, int, int], +) -> Optional[int]: + """Estimate the number of chunks in a canonical dataset.""" + if shape_tpczyx is None: + return None + return max( + 1, + math.prod( + [ + max(1, math.ceil(int(dim) / max(1, int(chunk)))) + for dim, chunk in zip(shape_tpczyx, chunks_tpczyx, strict=False) + ] + ), + ) + + +def plan_execution( + workflow: WorkflowConfig, + *, + workload: str = "analysis", + shape_tpczyx: Optional[Tuple[int, int, int, int, int, int]] = None, + chunks_tpczyx: Optional[Tuple[int, int, int, int, int, int]] = None, + dtype_itemsize: Optional[int] = None, + calibration_profiles: Optional[Mapping[str, CalibrationProfile]] = None, +) -> ExecutionPlan: + """Derive an execution plan for the requested workflow context.""" + workload_name = str(workload).strip().lower() or "analysis" + effective_chunks = cast( + Tuple[int, int, int, int, int, int], + tuple( + int(v) + for v in ( + chunks_tpczyx + if chunks_tpczyx is not None + else workflow.zarr_save.chunks_tpczyx() + ) + ), + ) + itemsize = max(1, int(dtype_itemsize or 2)) + estimated_chunk_bytes = max(1, math.prod(effective_chunks) * itemsize) + normalized_parameters = normalize_analysis_operation_parameters( + workflow.analysis_parameters + ) + selected_operations = _selected_operations_for_execution_plan( + workflow, + workload=workload_name, + analysis_parameters=normalized_parameters, + ) + descriptor = _aggregate_execution_descriptor( + workload=workload_name, + operation_names=selected_operations, + analysis_parameters=normalized_parameters, + ) + capabilities = detect_environment_capabilities( + scheduler_file=workflow.dask_backend.slurm_runner.scheduler_file + ) + estimated_chunk_count = _estimate_chunk_count(shape_tpczyx, effective_chunks) + calibration_profile = build_execution_calibration_profile( + operation_names=selected_operations, + analysis_parameters=normalized_parameters, + chunks_tpczyx=effective_chunks, + dtype_itemsize=itemsize, + capabilities=capabilities, + estimated_chunk_count=estimated_chunk_count, + ) + if ( + workflow.execution_policy.calibration_policy + == EXECUTION_CALIBRATION_USE_IF_AVAILABLE + and calibration_profiles is not None + ): + cached_profile = calibration_profiles.get(calibration_profile.profile_key) + if cached_profile is not None: + calibration_profile = cached_profile + + if workflow.execution_policy.mode == EXECUTION_POLICY_ADVANCED: + backend_config = workflow.dask_backend + workers = 1 + threads_per_worker = 1 + memory_per_worker_limit = "auto" + if backend_config.mode == DASK_BACKEND_LOCAL_CLUSTER: + if backend_config.local_cluster.n_workers is None: + recommendation = recommend_local_cluster_config( + shape_tpczyx=shape_tpczyx, + chunks_tpczyx=effective_chunks, + dtype_itemsize=itemsize, + cpu_count=capabilities.cpu_count, + memory_bytes=capabilities.memory_bytes, + gpu_count=capabilities.gpu_count, + gpu_memory_bytes=capabilities.gpu_memory_bytes, + ) + workers = int(recommendation.config.n_workers or 1) + threads_per_worker = int(recommendation.config.threads_per_worker) + memory_per_worker_limit = str(recommendation.config.memory_limit) + backend_config = DaskBackendConfig( + mode=DASK_BACKEND_LOCAL_CLUSTER, + local_cluster=LocalClusterConfig( + n_workers=workers, + threads_per_worker=threads_per_worker, + memory_limit=memory_per_worker_limit, + local_directory=backend_config.local_cluster.local_directory, + ), + slurm_runner=backend_config.slurm_runner, + slurm_cluster=backend_config.slurm_cluster, + ) + else: + workers = int(backend_config.local_cluster.n_workers or 1) + threads_per_worker = int(backend_config.local_cluster.threads_per_worker) + memory_per_worker_limit = str(backend_config.local_cluster.memory_limit) + elif backend_config.mode == DASK_BACKEND_SLURM_CLUSTER: + workers = int(backend_config.slurm_cluster.workers) + threads_per_worker = int( + max( + 1, + int(backend_config.slurm_cluster.cores) + // max(1, int(backend_config.slurm_cluster.processes)), + ) + ) + memory_per_worker_limit = str(backend_config.slurm_cluster.memory) + elif backend_config.mode == DASK_BACKEND_SLURM_RUNNER: + workers = max( + 1, + int(backend_config.slurm_runner.wait_for_workers or 1), + ) + threads_per_worker = 1 + memory_per_worker_limit = workflow.execution_policy.memory_per_worker_limit + return ExecutionPlan( + policy_mode=workflow.execution_policy.mode, + workload=workload_name, + selected_operations=selected_operations, + worker_kind=descriptor.preferred_worker_kind, + backend_config=backend_config, + workers=workers, + threads_per_worker=threads_per_worker, + memory_per_worker_limit=memory_per_worker_limit, + estimated_chunk_bytes=estimated_chunk_bytes, + estimated_working_set_bytes=calibration_profile.estimated_peak_memory_bytes, + estimated_chunk_count=estimated_chunk_count, + requires_gpu=descriptor.gpu_mode == EXECUTION_GPU_MODE_REQUIRED, + environment=capabilities, + calibration_profile=calibration_profile, + ) + + legacy_local_worker_cap = ( + int(workflow.dask_backend.local_cluster.n_workers) + if workflow.dask_backend.mode == DASK_BACKEND_LOCAL_CLUSTER + and workflow.dask_backend.local_cluster.n_workers is not None + else None + ) + requested_max_workers = ( + int(workflow.execution_policy.max_workers) + if workflow.execution_policy.max_workers is not None + else ( + int(legacy_local_worker_cap) + if legacy_local_worker_cap is not None + else 64 + ) + ) + reserve_bytes = min( + max(2 << 30, capabilities.memory_bytes // 10), + max(1 << 30, capabilities.memory_bytes // 6), + ) + usable_bytes = max(1 << 30, capabilities.memory_bytes - reserve_bytes) + minimum_safe_memory_bytes = max( + 1 << 30, + int(math.ceil(calibration_profile.estimated_peak_memory_bytes * 1.3)), + ) + requested_memory_bytes = _parse_memory_limit_bytes( + workflow.execution_policy.memory_per_worker_limit + ) + if requested_memory_bytes is None and ( + workflow.dask_backend.mode == DASK_BACKEND_LOCAL_CLUSTER + ): + requested_memory_bytes = _parse_memory_limit_bytes( + workflow.dask_backend.local_cluster.memory_limit + ) + + if descriptor.preferred_worker_kind == EXECUTION_WORKER_KIND_THREAD: + workers = 1 + threads_per_worker = max( + 1, + min(capabilities.cpu_count, requested_max_workers, 8), + ) + worker_memory_bytes = max( + minimum_safe_memory_bytes, + requested_memory_bytes or usable_bytes, + ) + else: + threads_per_worker = 1 + if capabilities.cpu_count >= 32 and estimated_chunk_bytes < (16 << 20): + threads_per_worker = 2 + workers_by_cpu = max(1, capabilities.cpu_count // threads_per_worker) + worker_memory_bytes = max( + minimum_safe_memory_bytes, + requested_memory_bytes or minimum_safe_memory_bytes, + ) + workers_by_memory = max(1, usable_bytes // max(1, worker_memory_bytes)) + workers_by_chunk_count = ( + max(1, int(estimated_chunk_count)) + if estimated_chunk_count is not None + else requested_max_workers + ) + if descriptor.gpu_mode == EXECUTION_GPU_MODE_NEVER: + workers_by_gpu = requested_max_workers + else: + workers_by_gpu = max(1, capabilities.gpu_count or 1) + workers = max( + 1, + min( + requested_max_workers, + workers_by_cpu, + workers_by_memory, + workers_by_chunk_count, + workers_by_gpu, + ), + ) + if threads_per_worker > 1 and workers * threads_per_worker > capabilities.cpu_count: + threads_per_worker = max(1, capabilities.cpu_count // max(1, workers)) + worker_memory_bytes = max( + worker_memory_bytes, + usable_bytes // max(1, workers), + ) + + requires_gpu = descriptor.gpu_mode == EXECUTION_GPU_MODE_REQUIRED + use_gpu_local_cluster = ( + descriptor.preferred_worker_kind == EXECUTION_WORKER_KIND_GPU_PROCESS + and capabilities.gpu_count > 0 + ) + memory_per_worker_limit = _format_worker_memory_limit(worker_memory_bytes) + if capabilities.attached_scheduler_file: + backend_config = DaskBackendConfig( + mode=DASK_BACKEND_SLURM_RUNNER, + slurm_runner=SlurmRunnerConfig( + scheduler_file=capabilities.attached_scheduler_file, + wait_for_workers=workers, + ), + ) + else: + backend_config = DaskBackendConfig( + mode=DASK_BACKEND_LOCAL_CLUSTER, + local_cluster=LocalClusterConfig( + n_workers=workers, + threads_per_worker=threads_per_worker, + memory_limit=memory_per_worker_limit, + local_directory=workflow.dask_backend.local_cluster.local_directory, + ), + slurm_runner=workflow.dask_backend.slurm_runner, + slurm_cluster=workflow.dask_backend.slurm_cluster, + ) + if use_gpu_local_cluster and capabilities.gpu_count > 0: + workers = min(workers, max(1, capabilities.gpu_count)) + backend_config = DaskBackendConfig( + mode=DASK_BACKEND_LOCAL_CLUSTER, + local_cluster=LocalClusterConfig( + n_workers=workers, + threads_per_worker=threads_per_worker, + memory_limit=memory_per_worker_limit, + local_directory=workflow.dask_backend.local_cluster.local_directory, + ), + slurm_runner=workflow.dask_backend.slurm_runner, + slurm_cluster=workflow.dask_backend.slurm_cluster, + ) + return ExecutionPlan( + policy_mode=workflow.execution_policy.mode, + workload=workload_name, + selected_operations=selected_operations, + worker_kind=descriptor.preferred_worker_kind, + backend_config=backend_config, + workers=workers, + threads_per_worker=threads_per_worker, + memory_per_worker_limit=memory_per_worker_limit, + estimated_chunk_bytes=estimated_chunk_bytes, + estimated_working_set_bytes=calibration_profile.estimated_peak_memory_bytes, + estimated_chunk_count=estimated_chunk_count, + requires_gpu=requires_gpu, + environment=capabilities, + calibration_profile=calibration_profile, + ) + + def parse_chunks(chunks: Optional[str]) -> ChunkSpec: """Parse chunk spec from CLI/GUI text. diff --git a/tests/test_main.py b/tests/test_main.py index 2a47529..5a1c507 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -17,7 +17,7 @@ import clearex.main as main_module from clearex.io.provenance import persist_run_provenance from clearex.io.read import ImageInfo -from clearex.workflow import WorkflowConfig, WorkflowExecutionCancelled +from clearex.workflow import ExecutionPolicy, WorkflowConfig, WorkflowExecutionCancelled from clearex.workflow import DaskBackendConfig, LocalClusterConfig @@ -108,6 +108,7 @@ def _fake_create_dask_client(**kwargs): workflow = WorkflowConfig( prefer_dask=True, + execution_policy=ExecutionPolicy(mode="advanced"), dask_backend=DaskBackendConfig( local_cluster=LocalClusterConfig( n_workers=4, @@ -144,6 +145,7 @@ def _fake_create_dask_client(**kwargs): workflow = WorkflowConfig( prefer_dask=True, + execution_policy=ExecutionPolicy(mode="advanced"), dask_backend=DaskBackendConfig( local_cluster=LocalClusterConfig( n_workers=1, @@ -186,6 +188,7 @@ def _fake_recommend_local_cluster_config(**kwargs): workflow = WorkflowConfig( prefer_dask=True, + execution_policy=ExecutionPolicy(mode="advanced"), usegment3d=True, analysis_parameters={ "usegment3d": { @@ -241,6 +244,7 @@ def _unexpected_recommendation(**kwargs): workflow = WorkflowConfig( prefer_dask=True, + execution_policy=ExecutionPolicy(mode="advanced"), usegment3d=True, analysis_parameters={ "usegment3d": {