Skip to content

Commit

Permalink
Improve the algorithm progression example with zarr multiprocess
Browse files Browse the repository at this point in the history
This writes a single value at the maximum axes locations before fanning
out to multiple processes.
  • Loading branch information
manthey committed Jan 2, 2025
1 parent 96c5cea commit 421fe71
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions examples/algorithm_progression.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, algorithm, input_filename, input_params, param_order,

self.combos = list(itertools.product(*[p['range'] for p in input_params.values()]))

def getOverallSink(self):
def getOverallSink(self, maxValues=None):
msg = 'Not implemented'
raise Exception(msg)

Expand Down Expand Up @@ -118,7 +118,10 @@ def applyAlgorithm(self, sink, params):

def run(self):
starttime = time.time()
sink = self.getOverallSink()
source = large_image.open(self.input_filename)
maxValues = {'x': source.sizeX, 'y': source.sizeY, 's': source.metadata['bandCount']}
maxValues.update({p['axis']: len(p['range']) for p in self.param_order.values()})
sink = self.getOverallSink(maxValues)

print(f'Beginning {len(self.combos)} runs on {self.max_workers} workers...')
num_done = 0
Expand Down Expand Up @@ -149,7 +152,7 @@ def run(self):


class SweepAlgorithmMulti(SweepAlgorithm):
def getOverallSink(self):
def getOverallSink(self, maxValues=None):
os.makedirs(os.path.splitext(self.output_filename)[0], exist_ok=True)
algorithm_name = self.algorithm.__name__.replace('_', ' ').title()
self.yaml_dict = {
Expand Down Expand Up @@ -270,10 +273,20 @@ def addTile(self, tilesink, *args, **kwargs):


class SweepAlgorithmZarr(SweepAlgorithm):
def getOverallSink(self):
def getOverallSink(self, maxValues=None):
"""
Return the main sink for the entire image. If maxValues is specified,
add a single black pixel at the maximum location for all axes. This
causes the array of axes values in the zarr sink to be allocated and
tracked, whereas, otherwise, they could be ignored.
"""
import large_image_source_zarr

return large_image_source_zarr.new()
sink = large_image_source_zarr.new()
if maxValues:
sink.addTile(np.zeros((1, 1, maxValues.get('s', 1))),
**{k: v - 1 for k, v in maxValues.items() if k != 's'})
return sink

def writeOverallSink(self, sink):
sink.write(self.output_filename, lossy=self.lossy)
Expand Down

0 comments on commit 421fe71

Please sign in to comment.