Skip to content

Commit

Permalink
Merge pull request #1621 from girder/converter-pool-logging
Browse files Browse the repository at this point in the history
Improve logging during pooled concurrency in the image converter
  • Loading branch information
manthey authored Aug 28, 2024
2 parents 5e1c132 + 487a33f commit 528834a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Add a utility function to minimize caching ([#1617](../../pull/1617))
- Allow passing through converter options when writing a zarr sink to a non-zarr format ([#1618](../../pull/1618))
- Do less calculations when applying affine transforms in the multi-source ([#1619](../../pull/1619))
- Improve logging in the image converter ([#1621](../../pull/1621))

### Bug Fixes

Expand Down
57 changes: 47 additions & 10 deletions utilities/converter/large_image_converter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def _data_from_large_image(path, outputPath, **kwargs):
_pool_add(tasks, (pool.submit(
_convert_via_vips, img, savePath, outputPath, mime=mime, forTiled=False), ))
results['images'][key] = savePath
_drain_pool(pool, tasks)
_drain_pool(pool, tasks, 'associated images')
return results


Expand Down Expand Up @@ -218,7 +218,7 @@ def _generate_multiframe_tiff(inputPath, outputPath, tempPath, lidata, **kwargs)
_pool_add(tasks, (pool.submit(
_convert_via_vips, subInputPath, savePath, tempPath, False), ))
extraImages[key] = savePath
_drain_pool(pool, tasks)
_drain_pool(pool, tasks, 'subpage')
_output_tiff(outputList, outputPath, tempPath, lidata, extraImages, **kwargs)


Expand Down Expand Up @@ -349,20 +349,45 @@ def _concurrency_to_value(_concurrency=None, **kwargs):
return max(1, large_image.config.cpu_count(logical=True) + _concurrency)


def _get_thread_pool(memoryLimit=None, **kwargs):
def _get_thread_pool(memoryLimit=None, parentConcurrency=None, numItems=None, **kwargs):
"""
Allocate a thread pool based on the specific concurrency.
:param memoryLimit: if not None, limit the concurrency to no more than one
process per memoryLimit bytes of total memory.
"""
concurrency = _concurrency_to_value(**kwargs)
if parentConcurrency and parentConcurrency > 1 and concurrency > 1:
concurrency = max(1, int(math.ceil(concurrency / parentConcurrency)))
if memoryLimit:
if parentConcurrency:
memoryLimit *= parentConcurrency
concurrency = min(concurrency, large_image.config.total_memory() // memoryLimit)
if numItems and numItems >= 1 and concurrency > numItems:
concurrency = numItems
concurrency = max(1, concurrency)
return concurrent.futures.ThreadPoolExecutor(max_workers=concurrency)


def _pool_log(left, total, label):
"""
Log processing within a pool.
:param left: units left to process.
:param total: total units left to process.
:param label: label to log describing what is being processed.
"""
if not hasattr(logger, '_pool_log_starttime'):
logger._pool_log_starttime = time.time()
if not hasattr(logger, '_pool_log_lastlog'):
logger._pool_log_lastlog = time.time()
if time.time() - logger._pool_log_lastlog < 10:
return
elapsed = time.time() - logger._pool_log_starttime
logger.debug('%d/%d %s left %4.2fs', left, total, label, elapsed)
logger._pool_log_lastlog = time.time()


def _pool_add(tasks, newtask):
"""
Add a new task to a pool, then drain any finished tasks at the start of the
Expand All @@ -381,21 +406,24 @@ def _pool_add(tasks, newtask):
tasks.pop(0)


def _drain_pool(pool, tasks):
def _drain_pool(pool, tasks, label=''):
"""
Wait for all tasks in a pool to complete, then shutdown the pool.
:param pool: a concurrent futures pool.
:param tasks: a list containing either lists or tuples, the last element
of which is a task submitted to the pool. Altered.
"""
numtasks = len(tasks)
_pool_log(len(tasks), numtasks, label)
while len(tasks):
# This allows better stopping on a SIGTERM
try:
tasks[0][-1].result(0.1)
except concurrent.futures.TimeoutError:
continue
tasks.pop(0)
_pool_log(len(tasks), numtasks, label)
pool.shutdown(False)


Expand Down Expand Up @@ -507,7 +535,8 @@ def _convert_large_image_tile(tilelock, strips, tile):
strips[ty] = strips[ty].insert(vimg, x, 0, expand=True)


def _convert_large_image_frame(frame, numFrames, ts, frameOutputPath, tempPath, **kwargs):
def _convert_large_image_frame(frame, numFrames, ts, frameOutputPath, tempPath,
parentConcurrency=None, **kwargs):
"""
Convert a single frame from a large_image source. This parallelizes tile
reads. Once all tiles are converted to a composited vips image, a tiff
Expand All @@ -518,18 +547,24 @@ def _convert_large_image_frame(frame, numFrames, ts, frameOutputPath, tempPath,
:param ts: the open tile source.
:param frameOutputPath: the destination name for the tiff file.
:param tempPath: a temporary file in a temporary directory.
:param parentConcurrency: amount of concurrency used by parent task.
"""
# The iterator tile size is a balance between memory use and fewer calls
# and file handles.
_iterTileSize = 4096
logger.info('Processing frame %d/%d', frame + 1, numFrames)
strips = []
pool = _get_thread_pool(**kwargs)
pool = _get_thread_pool(
memoryLimit=FrameMemoryEstimate,
# allow multiple tiles even if we are using all the cores, as it
# balances I/O and computation
parentConcurrency=(parentConcurrency // 2),
**kwargs)
tasks = []
tilelock = threading.Lock()
for tile in ts.tileIterator(tile_size=dict(width=_iterTileSize), frame=frame):
_pool_add(tasks, (pool.submit(_convert_large_image_tile, tilelock, strips, tile), ))
_drain_pool(pool, tasks)
_drain_pool(pool, tasks, f'tiles from frame {frame + 1}/{numFrames}')
minbands = min(strip.bands for strip in strips)
maxbands = max(strip.bands for strip in strips)
if minbands != maxbands:
Expand All @@ -556,20 +591,21 @@ def _convert_large_image(inputPath, outputPath, tempPath, lidata, **kwargs):
numFrames = len(lidata['metadata'].get('frames', [0]))
outputList = []
tasks = []
pool = _get_thread_pool(memoryLimit=FrameMemoryEstimate, **kwargs)
startFrame = 0
endFrame = numFrames
if kwargs.get('onlyFrame') is not None and str(kwargs.get('onlyFrame')):
startFrame = int(kwargs.get('onlyFrame'))
endFrame = startFrame + 1
pool = _get_thread_pool(memoryLimit=FrameMemoryEstimate,
numItems=endFrame - startFrame, **kwargs)
for frame in range(startFrame, endFrame):
frameOutputPath = tempPath + '-%d-%s.tiff' % (
frame + 1, time.strftime('%Y%m%d-%H%M%S'))
_pool_add(tasks, (pool.submit(
_convert_large_image_frame, frame, numFrames, ts, frameOutputPath,
tempPath, **kwargs), ))
tempPath, pool._max_workers, **kwargs), ))
outputList.append(frameOutputPath)
_drain_pool(pool, tasks)
_drain_pool(pool, tasks, 'frames')
_output_tiff(outputList, outputPath, tempPath, lidata, **kwargs)


Expand Down Expand Up @@ -891,6 +927,7 @@ def convert(inputPath, outputPath=None, **kwargs): # noqa: C901
:returns: outputPath if successful
"""
logger._pool_log_starttime = time.time()
if kwargs.get('_concurrency'):
os.environ['VIPS_CONCURRENCY'] = str(_concurrency_to_value(**kwargs))
geospatial = kwargs.get('geospatial')
Expand Down

0 comments on commit 528834a

Please sign in to comment.