From 9109f6a02f4b5757873b6d7d57d956f70ad470cc Mon Sep 17 00:00:00 2001 From: martin barker Date: Wed, 29 Oct 2025 08:43:26 -0700 Subject: [PATCH 1/5] add back option for loading into memory --- Rhapso/fusion/multiscale_worker.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Rhapso/fusion/multiscale_worker.py b/Rhapso/fusion/multiscale_worker.py index d782ae2..05194c5 100644 --- a/Rhapso/fusion/multiscale_worker.py +++ b/Rhapso/fusion/multiscale_worker.py @@ -71,8 +71,12 @@ def run(): logger.info(f"Dataset size: {total_size_gb:.2f} GB") # Use dask array directly instead of computing (don't load into memory) + # Original implementation (loads entire array into memory - causes OOM for large datasets): + array = dataset.compute() + + # New implementation (keeps lazy evaluation for memory efficiency): logger.info("Using Dask array for lazy/chunked processing (not loading into memory)") - array = dataset + # array = dataset compressor_kwargs = { From 4c661edc4924e12e02e9f5644a65bdfa12d826c1 Mon Sep 17 00:00:00 2001 From: martin barker Date: Wed, 29 Oct 2025 15:56:57 -0700 Subject: [PATCH 2/5] debug for loop multiscale --- .../compress/zarr_writer.py | 45 +++++----- .../array_to_zarr.py | 15 +++- .../aind_z1_radial_correction/todo.md | 84 +++++++++++++++++++ 3 files changed, 119 insertions(+), 25 deletions(-) create mode 100644 Rhapso/fusion/multiscale/aind_z1_radial_correction/todo.md diff --git a/Rhapso/fusion/multiscale/aind_hcr_data_transformation/compress/zarr_writer.py b/Rhapso/fusion/multiscale/aind_hcr_data_transformation/compress/zarr_writer.py index dd4290a..91f822c 100644 --- a/Rhapso/fusion/multiscale/aind_hcr_data_transformation/compress/zarr_writer.py +++ b/Rhapso/fusion/multiscale/aind_hcr_data_transformation/compress/zarr_writer.py @@ -188,18 +188,19 @@ def _slice_along_dim(dim: int) -> Generator: @staticmethod def store( - in_array: da.Array, out_array: ArrayLike, block_shape: tuple + in_array: ArrayLike, out_array: ArrayLike, block_shape: tuple ) -> None: """ - Partitions the last 3 dimensions of a Dask array + Partitions the last 3 dimensions of an array into non-overlapping blocks and writes them sequentially to a Zarr array. This is meant to reduce the scheduling burden for massive (terabyte-scale) arrays. - :param in_array: The input Dask array + :param in_array: The input array (can be dask array or numpy array) :param block_shape: Tuple of (block_depth, block_height, block_width) :param out_array: The output array """ + import sys logger = logging.getLogger(__name__) # Calculate total number of blocks for progress tracking @@ -208,40 +209,42 @@ def store( total_blocks *= (arr_dim + block_dim - 1) // block_dim logger.info(f" Writing {total_blocks} blocks (block shape: {block_shape})...") + sys.stdout.flush() # Iterate through the input array in # steps equal to the block shape dimensions block_idx = 0 - log_interval = max(1, total_blocks // 10) # Log ~10 times total - for sl in BlockedArrayWriter.gen_slices(in_array.shape, block_shape): + logger.info(f" Progress: {block_idx}/{total_blocks} blocks ({(block_idx/total_blocks)*100:.1f}%)") + block_idx += 1 + + # Read block from input array block = in_array[sl] - da.store( - block, - out_array, - regions=sl, - lock=False, - compute=True, - return_stored=False, - ) - block_idx += 1 - if block_idx % log_interval == 0 or block_idx == total_blocks: - progress_pct = (block_idx / total_blocks) * 100 - logger.info(f" Progress: {block_idx}/{total_blocks} blocks ({progress_pct:.1f}%)") + # If it's a dask array, compute it to get numpy array + if hasattr(block, 'compute'): + block = block.compute() + + # Write block directly to output array (writes to S3) + out_array[sl] = block + + + + logger.info(f" ✓ All {total_blocks} blocks written successfully!") + sys.stdout.flush() @staticmethod - def get_block_shape(arr, target_size_mb=409600, mode="cycle", chunks=None): + def get_block_shape(arr, target_block_size_mb=409600, mode="cycle", chunks=None): """ Given the shape and chunk size of a pre-chunked array, determine the optimal block shape closest - to target_size. Expanded block dimensions are + to target block size. Expanded block dimensions are an integer multiple of the chunk dimension to ensure optimal access patterns. Args: arr: the input array - target_size_mb: target block size in megabytes, + target_block_size_mb: target block size in megabytes, default is 409600 mode: strategy. Must be one of "cycle", or "iso" @@ -259,7 +262,7 @@ def get_block_shape(arr, target_size_mb=409600, mode="cycle", chunks=None): return expand_chunks( chunks, arr.shape[-3:], - target_size_mb * 1024**2, + target_block_size_mb * 1024**2, arr.itemsize, mode, ) diff --git a/Rhapso/fusion/multiscale/aind_z1_radial_correction/array_to_zarr.py b/Rhapso/fusion/multiscale/aind_z1_radial_correction/array_to_zarr.py index 39ff588..7bedcc2 100644 --- a/Rhapso/fusion/multiscale/aind_z1_radial_correction/array_to_zarr.py +++ b/Rhapso/fusion/multiscale/aind_z1_radial_correction/array_to_zarr.py @@ -61,7 +61,7 @@ def convert_array_to_zarr( "clevel": 3, "shuffle": Blosc.SHUFFLE, }, - target_size_mb: Optional[int] = 24000, + target_block_size_mb: Optional[int] = 24000, ): """ Converts an array to zarr format @@ -197,7 +197,7 @@ def convert_array_to_zarr( block_shape = list( BlockedArrayWriter.get_block_shape( arr=previous_scale, - target_size_mb=target_size_mb, + target_block_size_mb=target_block_size_mb, chunks=chunk_size, ) ) @@ -238,8 +238,15 @@ def convert_array_to_zarr( ) logger.info(f"Level {level}/{n_lvls-1}: Writing to storage...") - BlockedArrayWriter.store(array_to_write, pyramid_group, block_shape) - logger.info(f"Level {level}/{n_lvls-1}: ✓ Complete ({level+1}/{n_lvls} levels done)") + import sys + sys.stdout.flush() + try: + BlockedArrayWriter.store(array_to_write, pyramid_group, block_shape) + logger.info(f"Level {level}/{n_lvls-1}: ✓ Complete ({level+1}/{n_lvls} levels done)") + except Exception as e: + logger.error(f"Level {level}/{n_lvls-1}: FAILED with error: {e}") + logger.exception("Full traceback:") + raise if __name__ == "__main__": BASE_PATH = "/data" diff --git a/Rhapso/fusion/multiscale/aind_z1_radial_correction/todo.md b/Rhapso/fusion/multiscale/aind_z1_radial_correction/todo.md new file mode 100644 index 0000000..c1884ef --- /dev/null +++ b/Rhapso/fusion/multiscale/aind_z1_radial_correction/todo.md @@ -0,0 +1,84 @@ +# 10.28.25 multiscale edits / changes v2.0 + +change1: multiscale addition +'do this first, then comment out zero resolution thing and point to this' + +--- ! new ! --- +1. remove dask from +``` + for sl in BlockedArrayWriter.gen_slices(in_array.shape, block_shape): + block = in_array[sl] + da.store( + block, + out_array, + regions=sl, + lock=False, + compute=True, + return_stored=False, + ) +``` +- confirm still works without dask +2. add custom fetch/write inside just like fusion (will have to pass in dataset, two vars, instead of output_volume) +- tricky: +at this point` for level in range(0, n_lvls):` will have to pass in dask group info per level +- can get from 'write_ome_ngff_metadata', can pull out zarr group into and pass into ray distirbute dpart + +3. iteralte for loop approach, look at output in s3 + - dont remove, comment out when done +4. ray locally distributed 'for sl in BlockedArrayWriter.gen_slices(in_array.shape, block_shape):' +5. aws distribute + +================= +change2: initial multiscale dataset improvement: + +write_ome_ngff_metadata +- writing zarr groups for each level +- later: pull out zero rez metadata + +- pyramid_group = new_channel_group.create_dataset( +- skip this step, unecesary duplication, copying full Resolution +- always write to the original folder + +- comment out 'pyramid_group' and uses +- add pre-fetch step: +- instead of pyramid_group creation, assign pyramid_group to preexisitng full res zarr data in s3 +- make pyramid_group an object that has access to the s3 data, pointer to group, dask array(?) +- zarr root/group point to existing s3 localization +- in fusion: we save to existing s3 location +- copy implementation of save to s3 in fusion for multiscale + +- we added prestep fetch of data in fusion, reference this +- in fusion: instead of passing in output volume, we initialize object we can pass + +```python +store = output_volume.store + write_root = getattr(store, "root", None) or getattr(store, "path", None) + write_ds = output_volume.path +``` +in fusion: passing in output volume, +pre existing zarr data in s3, we assign pointer to it in multiscale so we can write to it + +-- next steps: reuse objects, swap out this function create_dataset +``` + # # Writing first multiscale by default + # pyramid_group = new_channel_group.create_dataset( + # name="0", + # shape=dataset_shape, + # chunks=chunk_size, + # dtype=array.dtype, + # compressor=compressor, + # dimension_separator="/", + # overwrite=True, + # ) + # fetch from s3 pointer to full rez data (full res zarr data) +``` +================= + +change3: +- divide installs in setup.py && update readme +- remove unnecesary code + +temp roadmap: +- distribute multiscale +- fix exaSpim / validate +- add cache \ No newline at end of file From cfb4960d5ddfa324a7df88db6fca26a4ddf563c5 Mon Sep 17 00:00:00 2001 From: martin barker Date: Fri, 31 Oct 2025 13:05:47 -0700 Subject: [PATCH 3/5] multiscale_distributed local ray working --- .../compress/zarr_writer.py | 132 +++++++++++++++--- .../array_to_zarr.py | 4 +- Rhapso/fusion/multiscale_worker.py | 102 ++++++++++++-- 3 files changed, 202 insertions(+), 36 deletions(-) diff --git a/Rhapso/fusion/multiscale/aind_hcr_data_transformation/compress/zarr_writer.py b/Rhapso/fusion/multiscale/aind_hcr_data_transformation/compress/zarr_writer.py index 91f822c..f7ebfa7 100644 --- a/Rhapso/fusion/multiscale/aind_hcr_data_transformation/compress/zarr_writer.py +++ b/Rhapso/fusion/multiscale/aind_hcr_data_transformation/compress/zarr_writer.py @@ -12,6 +12,34 @@ import numpy as np from numpy.typing import ArrayLike +try: + import ray + RAY_AVAILABLE = True + + @ray.remote + def _process_block_remote(in_array, out_array, sl, block_idx, total_blocks): + """ + Ray remote function to process a single block. + Reads block from input, computes if lazy, writes to output. + + Returns the block index for progress tracking. + """ + # Read block from input array + block = in_array[sl] + + # If it's a dask array, compute it to get numpy array + if hasattr(block, 'compute'): + block = block.compute() + + # Write block directly to output array (writes to S3) + out_array[sl] = block + + return block_idx + +except ImportError: + RAY_AVAILABLE = False + _process_block_remote = None # Not available + def _get_size(shape: Tuple[int, ...], itemsize: int) -> int: """ @@ -188,17 +216,18 @@ def _slice_along_dim(dim: int) -> Generator: @staticmethod def store( - in_array: ArrayLike, out_array: ArrayLike, block_shape: tuple + in_array: ArrayLike, out_array: ArrayLike, block_shape: tuple, use_ray: bool = True, ray_num_cpus: int = None ) -> None: """ Partitions the last 3 dimensions of an array - into non-overlapping blocks and writes them sequentially - to a Zarr array. This is meant to reduce the - scheduling burden for massive (terabyte-scale) arrays. + into non-overlapping blocks and writes them to a Zarr array. + Can use Ray for parallel processing or sequential processing. :param in_array: The input array (can be dask array or numpy array) :param block_shape: Tuple of (block_depth, block_height, block_width) :param out_array: The output array + :param use_ray: If True, use Ray for parallel processing. If False, use sequential processing. + :param ray_num_cpus: Number of CPUs to use for Ray. If None, uses all available CPUs. """ import sys logger = logging.getLogger(__name__) @@ -208,29 +237,90 @@ def store( for arr_dim, block_dim in zip(in_array.shape, block_shape): total_blocks *= (arr_dim + block_dim - 1) // block_dim - logger.info(f" Writing {total_blocks} blocks (block shape: {block_shape})...") - sys.stdout.flush() + # Check if Ray should be used + use_ray = use_ray and RAY_AVAILABLE - # Iterate through the input array in - # steps equal to the block shape dimensions - block_idx = 0 - for sl in BlockedArrayWriter.gen_slices(in_array.shape, block_shape): - logger.info(f" Progress: {block_idx}/{total_blocks} blocks ({(block_idx/total_blocks)*100:.1f}%)") - block_idx += 1 + if use_ray: + # Initialize Ray if not already initialized + if not ray.is_initialized(): + logger.info(" Initializing Ray for parallel processing...") + + # Configure Ray with memory limits and object spilling + import os + + # Set environment variables for Ray memory management BEFORE initialization + # These must be set before ray.init() is called + os.environ.setdefault("RAY_memory_monitor_refresh_ms", "250") + os.environ.setdefault("RAY_memory_usage_threshold", "0.85") # Kill workers at 85% memory usage + + ray_config = { + "ignore_reinit_error": True, + "object_store_memory": int(8 * 1024 * 1024 * 1024), # 8 GB for object store + } + + # Set number of CPUs if specified + if ray_num_cpus is not None: + ray_config["num_cpus"] = ray_num_cpus + logger.info(f" Limiting Ray to {ray_num_cpus} CPUs to prevent OOM") + else: + ray_config["num_cpus"] = None # Use all available CPUs + + ray.init(**ray_config) + actual_cpus = ray.cluster_resources().get('CPU', 0) + logger.info(f" Ray initialized with {actual_cpus} CPUs and 8 GB object store") + logger.info(f" Memory monitor will kill workers at 85% memory usage") - # Read block from input array - block = in_array[sl] + logger.info(f" Writing {total_blocks} blocks IN PARALLEL using Ray (block shape: {block_shape})...") + sys.stdout.flush() - # If it's a dask array, compute it to get numpy array - if hasattr(block, 'compute'): - block = block.compute() + # Submit all blocks as Ray tasks + futures = [] + block_idx = 0 + for sl in BlockedArrayWriter.gen_slices(in_array.shape, block_shape): + future = _process_block_remote.remote(in_array, out_array, sl, block_idx, total_blocks) + futures.append(future) + block_idx += 1 - # Write block directly to output array (writes to S3) - out_array[sl] = block + # Wait for all tasks to complete and show progress + completed = 0 + while futures: + # Wait for at least one task to complete + done, futures = ray.wait(futures, num_returns=1, timeout=1.0) + completed += len(done) + if done: + progress_pct = (completed / total_blocks) * 100 + logger.info(f" Progress: {completed}/{total_blocks} blocks ({progress_pct:.1f}%)") + sys.stdout.flush() + logger.info(f" ✓ All {total_blocks} blocks written successfully using Ray!") - - logger.info(f" ✓ All {total_blocks} blocks written successfully!") + else: + # Sequential processing (original implementation) + if not RAY_AVAILABLE: + logger.warning(" Ray not available, falling back to sequential processing") + else: + logger.info(" Using sequential processing (Ray disabled)") + + logger.info(f" Writing {total_blocks} blocks SEQUENTIALLY (block shape: {block_shape})...") + sys.stdout.flush() + + block_idx = 0 + for sl in BlockedArrayWriter.gen_slices(in_array.shape, block_shape): + logger.info(f" Progress: {block_idx}/{total_blocks} blocks ({(block_idx/total_blocks)*100:.1f}%)") + block_idx += 1 + + # Read block from input array + block = in_array[sl] + + # If it's a dask array, compute it to get numpy array + if hasattr(block, 'compute'): + block = block.compute() + + # Write block directly to output array (writes to S3) + out_array[sl] = block + + logger.info(f" ✓ All {total_blocks} blocks written successfully!") + sys.stdout.flush() @staticmethod diff --git a/Rhapso/fusion/multiscale/aind_z1_radial_correction/array_to_zarr.py b/Rhapso/fusion/multiscale/aind_z1_radial_correction/array_to_zarr.py index 7bedcc2..7c28bc0 100644 --- a/Rhapso/fusion/multiscale/aind_z1_radial_correction/array_to_zarr.py +++ b/Rhapso/fusion/multiscale/aind_z1_radial_correction/array_to_zarr.py @@ -62,6 +62,8 @@ def convert_array_to_zarr( "shuffle": Blosc.SHUFFLE, }, target_block_size_mb: Optional[int] = 24000, + use_ray: Optional[bool] = True, + ray_num_cpus: Optional[int] = None, ): """ Converts an array to zarr format @@ -241,7 +243,7 @@ def convert_array_to_zarr( import sys sys.stdout.flush() try: - BlockedArrayWriter.store(array_to_write, pyramid_group, block_shape) + BlockedArrayWriter.store(array_to_write, pyramid_group, block_shape, use_ray=use_ray, ray_num_cpus=ray_num_cpus) logger.info(f"Level {level}/{n_lvls-1}: ✓ Complete ({level+1}/{n_lvls} levels done)") except Exception as e: logger.error(f"Level {level}/{n_lvls-1}: FAILED with error: {e}") diff --git a/Rhapso/fusion/multiscale_worker.py b/Rhapso/fusion/multiscale_worker.py index 05194c5..b00d8b5 100644 --- a/Rhapso/fusion/multiscale_worker.py +++ b/Rhapso/fusion/multiscale_worker.py @@ -4,6 +4,7 @@ import os import sys +import time from pathlib import Path import dask.array as da import logging @@ -22,10 +23,12 @@ def run(): """ Main run function for multiscale conversion """ + # Start timing + start_time = time.time() # Input and output paths input_zarr_path = "s3://martin-test-bucket/output7/channel_488.zarr" - output_zarr_path = "s3://martin-test-bucket/output7/multiscale_channel_488.zarr" + output_zarr_path = "s3://martin-test-bucket/output10/multiscale_channel_488.zarr" # Set parameters for multiscale conversion # Adjust these parameters based on your data characteristics @@ -33,6 +36,9 @@ def run(): voxel_size = [1.0, 1.0, 1.0] # Voxel size in micrometers (adjust if known) n_lvls = 6 # Number of pyramid levels scale_factor = [2, 2, 2] # Downsampling factor per level + target_block_size_mb = 512 # Target size for each processing block in MB (reduced from 512 to reduce memory pressure) + use_ray = True # Use Ray for parallel processing (set to False for sequential processing) + ray_num_cpus = 12 # Limit Ray to 4 CPUs to prevent OOM (reduced from 16) logger.info(f"Starting multiscale conversion") logger.info(f"Input: {input_zarr_path}") @@ -72,11 +78,11 @@ def run(): # Use dask array directly instead of computing (don't load into memory) # Original implementation (loads entire array into memory - causes OOM for large datasets): - array = dataset.compute() + # array = dataset.compute() # New implementation (keeps lazy evaluation for memory efficiency): logger.info("Using Dask array for lazy/chunked processing (not loading into memory)") - # array = dataset + array = dataset compressor_kwargs = { @@ -92,25 +98,93 @@ def run(): logger.info(f" Voxel size: {voxel_size}") logger.info(f" Number of levels: {n_lvls}") logger.info(f" Scale factor: {scale_factor}") + logger.info(f" Target block size: {target_block_size_mb} MB") + logger.info(f" → Each block will be ~{target_block_size_mb} MB in memory") + logger.info(f" → Larger blocks = fewer blocks but slower per block") + logger.info(f" → Smaller blocks = more blocks but faster per block") + logger.info(f" → Estimated blocks for level 0: ~{int(total_size_gb * 1024 / target_block_size_mb)}") + if use_ray: + logger.info(f" Parallel processing: ENABLED (Ray with {ray_num_cpus} CPUs)") + logger.info(f" → Max parallel memory usage: ~{ray_num_cpus * target_block_size_mb} MB") + else: + logger.info(f" Parallel processing: DISABLED (Sequential)") logger.info("=" * 60) sys.stdout.flush() - # Convert to multiscale zarr - convert_array_to_zarr( - array=array, - chunk_size=chunk_size, - output_path=output_zarr_path, - voxel_size=voxel_size, - n_lvls=n_lvls, - scale_factor=scale_factor, - compressor_kwargs=compressor_kwargs, - target_size_mb=24000, - ) + # Convert to multiscale zarr with comprehensive error handling + try: + convert_array_to_zarr( + array=array, + chunk_size=chunk_size, + output_path=output_zarr_path, + voxel_size=voxel_size, + n_lvls=n_lvls, + scale_factor=scale_factor, + compressor_kwargs=compressor_kwargs, + target_block_size_mb=target_block_size_mb, + use_ray=use_ray, + ray_num_cpus=ray_num_cpus, + ) + except MemoryError as e: + elapsed_seconds = time.time() - start_time + logger.error("=" * 60) + logger.error("MEMORY ERROR: Out of memory!") + logger.error("This typically happens when target_block_size_mb is too large.") + logger.error(f"Try reducing target_block_size_mb (currently {target_block_size_mb} MB).") + logger.error(f"Error details: {e}") + logger.error(f"Failed after: {elapsed_seconds:.1f} seconds ({elapsed_seconds/60:.2f} minutes)") + logger.error("=" * 60) + sys.stdout.flush() + raise + except TimeoutError as e: + elapsed_seconds = time.time() - start_time + logger.error("=" * 60) + logger.error("TIMEOUT ERROR: Operation timed out!") + logger.error("This can happen with very large blocks or slow S3 connections.") + logger.error(f"Current target_block_size_mb: {target_block_size_mb} MB") + logger.error(f"Error details: {e}") + logger.error(f"Failed after: {elapsed_seconds:.1f} seconds ({elapsed_seconds/60:.2f} minutes)") + logger.error("=" * 60) + sys.stdout.flush() + raise + except Exception as e: + elapsed_seconds = time.time() - start_time + logger.error("=" * 60) + logger.error("CONVERSION FAILED!") + logger.error(f"Error type: {type(e).__name__}") + logger.error(f"Error message: {e}") + logger.exception("Full traceback:") + logger.error("=" * 60) + logger.error("Troubleshooting tips:") + logger.error(" 1. Check if S3 bucket is accessible") + logger.error(" 2. Verify AWS credentials are valid") + logger.error(f" 3. Try reducing target_block_size_mb if blocks are too large (currently {target_block_size_mb} MB)") + logger.error(" 4. Check available memory and disk space") + logger.error(f"Failed after: {elapsed_seconds:.1f} seconds ({elapsed_seconds/60:.2f} minutes)") + logger.error("=" * 60) + sys.stdout.flush() + raise + + # Calculate elapsed time + end_time = time.time() + elapsed_seconds = end_time - start_time + elapsed_minutes = elapsed_seconds / 60 + elapsed_hours = elapsed_minutes / 60 + + # Format elapsed time nicely + if elapsed_hours >= 1: + time_str = f"{elapsed_hours:.2f} hours ({elapsed_minutes:.1f} minutes)" + elif elapsed_minutes >= 1: + time_str = f"{elapsed_minutes:.2f} minutes ({elapsed_seconds:.1f} seconds)" + else: + time_str = f"{elapsed_seconds:.2f} seconds" logger.info("=" * 60) logger.info("MULTISCALE CONVERSION COMPLETED SUCCESSFULLY!") logger.info(f"Output written to: {output_zarr_path}") + logger.info(f"Total time: {time_str}") logger.info("=" * 60) + sys.stdout.flush() if __name__ == "__main__": From d6f8569c3cae6fcab2f70f80505d254f8a8ff3dd Mon Sep 17 00:00:00 2001 From: martin barker Date: Wed, 5 Nov 2025 10:42:52 -0800 Subject: [PATCH 4/5] add ray local and optional dont copy fullrez to same folder multiscale output --- .../array_to_zarr.py | 72 +++++++++++++------ Rhapso/fusion/multiscale_worker.py | 8 ++- Rhapso/fusion/neuroglancer_link_gen_worker.py | 2 +- 3 files changed, 59 insertions(+), 23 deletions(-) diff --git a/Rhapso/fusion/multiscale/aind_z1_radial_correction/array_to_zarr.py b/Rhapso/fusion/multiscale/aind_z1_radial_correction/array_to_zarr.py index 7c28bc0..e2316bb 100644 --- a/Rhapso/fusion/multiscale/aind_z1_radial_correction/array_to_zarr.py +++ b/Rhapso/fusion/multiscale/aind_z1_radial_correction/array_to_zarr.py @@ -64,6 +64,7 @@ def convert_array_to_zarr( target_block_size_mb: Optional[int] = 24000, use_ray: Optional[bool] = True, ray_num_cpus: Optional[int] = None, + dont_copy_fullscale: Optional[bool] = False, ): """ Converts an array to zarr format @@ -155,11 +156,24 @@ def convert_array_to_zarr( scale_factor = [int(s) for s in scale_factor] voxel_size = [float(v) for v in voxel_size] - new_channel_group = root_group.create_group( - name=stack_name, overwrite=True - ) + # If dont_copy_fullscale is True, try to open existing group instead of overwriting + if dont_copy_fullscale: + try: + # Try to open existing group in read-write mode + new_channel_group = root_group[stack_name] + logger.info(f"Opened existing channel group: {stack_name}") + except KeyError: + # Group doesn't exist, create it + new_channel_group = root_group.create_group( + name=stack_name, overwrite=True + ) + logger.info(f"Created new channel group: {stack_name}") + else: + new_channel_group = root_group.create_group( + name=stack_name, overwrite=True + ) - # Writing OME-NGFF metadata + # Writing OME-NGFF metadata (always write to ensure metadata is up to date) write_ome_ngff_metadata( group=new_channel_group, arr_shape=dataset_shape, @@ -176,25 +190,14 @@ def convert_array_to_zarr( origin = [0,0,0] ) - # Writing first multiscale by default - pyramid_group = new_channel_group.create_dataset( - name="0", - shape=dataset_shape, - chunks=chunk_size, - dtype=array.dtype, - compressor=compressor, - dimension_separator="/", - overwrite=True, - ) - # Writing multiscales # Handle both numpy arrays and dask arrays if isinstance(array, da.Array): # Already a dask array, rechunk if needed - previous_scale = da.rechunk(array, chunks=pyramid_group.chunks) + previous_scale = da.rechunk(array, chunks=chunk_size) else: # Convert numpy array to dask array - previous_scale = da.from_array(array, pyramid_group.chunks) + previous_scale = da.from_array(array, chunk_size) block_shape = list( BlockedArrayWriter.get_block_shape( @@ -205,14 +208,41 @@ def convert_array_to_zarr( ) block_shape = extra_axes + tuple(block_shape) - logger.info(f"Writing {n_lvls} pyramid levels...") + # Determine start level based on dont_copy_fullscale flag + start_level = 1 if dont_copy_fullscale else 0 + levels_to_write = n_lvls - start_level + + if dont_copy_fullscale: + logger.info(f"Skipping level 0 (fullscale) - will only write levels 1-{n_lvls-1}") + # Open existing level 0 dataset to read from for computing level 1 + try: + existing_level0 = new_channel_group["0"] + pyramid_group = existing_level0 + logger.info(f"Using existing level 0 from {output_path} for downsampling") + except KeyError: + logger.error(f"Level 0 not found at {output_path}/0. Cannot skip fullscale copy if level 0 doesn't exist.") + raise ValueError(f"Level 0 not found at {output_path}/0. Set dont_copy_fullscale=False or ensure level 0 exists.") + else: + logger.info(f"Writing {n_lvls} pyramid levels...") + # Writing first multiscale by default + pyramid_group = new_channel_group.create_dataset( + name="0", + shape=dataset_shape, + chunks=chunk_size, + dtype=array.dtype, + compressor=compressor, + dimension_separator="/", + overwrite=True, + ) - for level in range(0, n_lvls): - if not level: + for level in range(start_level, n_lvls): + if level == 0: + # Only reached if dont_copy_fullscale is False array_to_write = previous_scale logger.info(f"Level {level}/{n_lvls-1}: Writing full resolution - shape {array_to_write.shape}") else: + # Read from the previous level (either written level 0 or existing level 0) previous_scale = da.from_zarr(pyramid_group, pyramid_group.chunks) new_scale_factor = ( [1] * (len(previous_scale.shape) - len(scale_factor)) @@ -244,7 +274,7 @@ def convert_array_to_zarr( sys.stdout.flush() try: BlockedArrayWriter.store(array_to_write, pyramid_group, block_shape, use_ray=use_ray, ray_num_cpus=ray_num_cpus) - logger.info(f"Level {level}/{n_lvls-1}: ✓ Complete ({level+1}/{n_lvls} levels done)") + logger.info(f"Level {level}/{n_lvls-1}: ✓ Complete ({level-start_level+1}/{levels_to_write} levels done)") except Exception as e: logger.error(f"Level {level}/{n_lvls-1}: FAILED with error: {e}") logger.exception("Full traceback:") diff --git a/Rhapso/fusion/multiscale_worker.py b/Rhapso/fusion/multiscale_worker.py index b00d8b5..478024d 100644 --- a/Rhapso/fusion/multiscale_worker.py +++ b/Rhapso/fusion/multiscale_worker.py @@ -28,7 +28,10 @@ def run(): # Input and output paths input_zarr_path = "s3://martin-test-bucket/output7/channel_488.zarr" - output_zarr_path = "s3://martin-test-bucket/output10/multiscale_channel_488.zarr" + output_zarr_path = "s3://martin-test-bucket/output7/channel_488.zarr" + + # Set dont_copy_fullscale to True to skip writing level 0 when input == output + dont_copy_fullscale = input_zarr_path == output_zarr_path # Set parameters for multiscale conversion # Adjust these parameters based on your data characteristics @@ -43,6 +46,8 @@ def run(): logger.info(f"Starting multiscale conversion") logger.info(f"Input: {input_zarr_path}") logger.info(f"Output: {output_zarr_path}") + if dont_copy_fullscale: + logger.info(f"dont_copy_fullscale=True: Will skip writing level 0 and only write levels 1-{n_lvls-1}") # Load the zarr dataset # Assuming the data is in the root or scale "0" of the zarr @@ -124,6 +129,7 @@ def run(): target_block_size_mb=target_block_size_mb, use_ray=use_ray, ray_num_cpus=ray_num_cpus, + dont_copy_fullscale=dont_copy_fullscale, ) except MemoryError as e: elapsed_seconds = time.time() - start_time diff --git a/Rhapso/fusion/neuroglancer_link_gen_worker.py b/Rhapso/fusion/neuroglancer_link_gen_worker.py index a2041e3..87fd471 100644 --- a/Rhapso/fusion/neuroglancer_link_gen_worker.py +++ b/Rhapso/fusion/neuroglancer_link_gen_worker.py @@ -5,7 +5,7 @@ from Rhapso.fusion.neuroglancer_link_gen.main import generate_neuroglancer_link # Hard-coded parameters -ZARR_PATH = "s3://martin-test-bucket/output7/multiscale_channel_488.zarr" +ZARR_PATH = "s3://martin-test-bucket/output7/channel_488.zarr" VMIN = 90 VMAX = 400 JSON_UPLOAD_BUCKET = "martin-test-bucket" From 782dde5cdb8e9e7430f209435903086d4a44a8db Mon Sep 17 00:00:00 2001 From: Martin Barker Date: Wed, 5 Nov 2025 14:49:11 -0800 Subject: [PATCH 5/5] remove todo.md --- .../aind_z1_radial_correction/todo.md | 84 ------------------- 1 file changed, 84 deletions(-) delete mode 100644 Rhapso/fusion/multiscale/aind_z1_radial_correction/todo.md diff --git a/Rhapso/fusion/multiscale/aind_z1_radial_correction/todo.md b/Rhapso/fusion/multiscale/aind_z1_radial_correction/todo.md deleted file mode 100644 index c1884ef..0000000 --- a/Rhapso/fusion/multiscale/aind_z1_radial_correction/todo.md +++ /dev/null @@ -1,84 +0,0 @@ -# 10.28.25 multiscale edits / changes v2.0 - -change1: multiscale addition -'do this first, then comment out zero resolution thing and point to this' - ---- ! new ! --- -1. remove dask from -``` - for sl in BlockedArrayWriter.gen_slices(in_array.shape, block_shape): - block = in_array[sl] - da.store( - block, - out_array, - regions=sl, - lock=False, - compute=True, - return_stored=False, - ) -``` -- confirm still works without dask -2. add custom fetch/write inside just like fusion (will have to pass in dataset, two vars, instead of output_volume) -- tricky: -at this point` for level in range(0, n_lvls):` will have to pass in dask group info per level -- can get from 'write_ome_ngff_metadata', can pull out zarr group into and pass into ray distirbute dpart - -3. iteralte for loop approach, look at output in s3 - - dont remove, comment out when done -4. ray locally distributed 'for sl in BlockedArrayWriter.gen_slices(in_array.shape, block_shape):' -5. aws distribute - -================= -change2: initial multiscale dataset improvement: - -write_ome_ngff_metadata -- writing zarr groups for each level -- later: pull out zero rez metadata - -- pyramid_group = new_channel_group.create_dataset( -- skip this step, unecesary duplication, copying full Resolution -- always write to the original folder - -- comment out 'pyramid_group' and uses -- add pre-fetch step: -- instead of pyramid_group creation, assign pyramid_group to preexisitng full res zarr data in s3 -- make pyramid_group an object that has access to the s3 data, pointer to group, dask array(?) -- zarr root/group point to existing s3 localization -- in fusion: we save to existing s3 location -- copy implementation of save to s3 in fusion for multiscale - -- we added prestep fetch of data in fusion, reference this -- in fusion: instead of passing in output volume, we initialize object we can pass - -```python -store = output_volume.store - write_root = getattr(store, "root", None) or getattr(store, "path", None) - write_ds = output_volume.path -``` -in fusion: passing in output volume, -pre existing zarr data in s3, we assign pointer to it in multiscale so we can write to it - --- next steps: reuse objects, swap out this function create_dataset -``` - # # Writing first multiscale by default - # pyramid_group = new_channel_group.create_dataset( - # name="0", - # shape=dataset_shape, - # chunks=chunk_size, - # dtype=array.dtype, - # compressor=compressor, - # dimension_separator="/", - # overwrite=True, - # ) - # fetch from s3 pointer to full rez data (full res zarr data) -``` -================= - -change3: -- divide installs in setup.py && update readme -- remove unnecesary code - -temp roadmap: -- distribute multiscale -- fix exaSpim / validate -- add cache \ No newline at end of file