Skip to content

Commit

Permalink
Improve logging during pooled concurrency in the image converter
Browse files Browse the repository at this point in the history
Slightly shift how concurrency is divided between sources and tiles.
  • Loading branch information
manthey committed Aug 28, 2024
1 parent 5e1c132 commit 487a33f
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Add a utility function to minimize caching ([#1617](../../pull/1617))
- Allow passing through converter options when writing a zarr sink to a non-zarr format ([#1618](../../pull/1618))
- Do less calculations when applying affine transforms in the multi-source ([#1619](../../pull/1619))
- Improve logging in the image converter ([#1621](../../pull/1621))

### Bug Fixes

Expand Down
57 changes: 47 additions & 10 deletions utilities/converter/large_image_converter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def _data_from_large_image(path, outputPath, **kwargs):
_pool_add(tasks, (pool.submit(
_convert_via_vips, img, savePath, outputPath, mime=mime, forTiled=False), ))
results['images'][key] = savePath
_drain_pool(pool, tasks)
_drain_pool(pool, tasks, 'associated images')
return results


Expand Down Expand Up @@ -218,7 +218,7 @@ def _generate_multiframe_tiff(inputPath, outputPath, tempPath, lidata, **kwargs)
_pool_add(tasks, (pool.submit(
_convert_via_vips, subInputPath, savePath, tempPath, False), ))
extraImages[key] = savePath
_drain_pool(pool, tasks)
_drain_pool(pool, tasks, 'subpage')
_output_tiff(outputList, outputPath, tempPath, lidata, extraImages, **kwargs)


Expand Down Expand Up @@ -349,20 +349,45 @@ def _concurrency_to_value(_concurrency=None, **kwargs):
return max(1, large_image.config.cpu_count(logical=True) + _concurrency)


def _get_thread_pool(memoryLimit=None, **kwargs):
def _get_thread_pool(memoryLimit=None, parentConcurrency=None, numItems=None, **kwargs):
"""
Allocate a thread pool based on the specific concurrency.
:param memoryLimit: if not None, limit the concurrency to no more than one
process per memoryLimit bytes of total memory.
"""
concurrency = _concurrency_to_value(**kwargs)
if parentConcurrency and parentConcurrency > 1 and concurrency > 1:
concurrency = max(1, int(math.ceil(concurrency / parentConcurrency)))
if memoryLimit:
if parentConcurrency:
memoryLimit *= parentConcurrency
concurrency = min(concurrency, large_image.config.total_memory() // memoryLimit)
if numItems and numItems >= 1 and concurrency > numItems:
concurrency = numItems
concurrency = max(1, concurrency)
return concurrent.futures.ThreadPoolExecutor(max_workers=concurrency)


def _pool_log(left, total, label):
"""
Log processing within a pool.
:param left: units left to process.
:param total: total units left to process.
:param label: label to log describing what is being processed.
"""
if not hasattr(logger, '_pool_log_starttime'):
logger._pool_log_starttime = time.time()

Check warning on line 381 in utilities/converter/large_image_converter/__init__.py

View check run for this annotation

Codecov / codecov/patch

utilities/converter/large_image_converter/__init__.py#L381

Added line #L381 was not covered by tests
if not hasattr(logger, '_pool_log_lastlog'):
logger._pool_log_lastlog = time.time()
if time.time() - logger._pool_log_lastlog < 10:
return
elapsed = time.time() - logger._pool_log_starttime
logger.debug('%d/%d %s left %4.2fs', left, total, label, elapsed)
logger._pool_log_lastlog = time.time()


def _pool_add(tasks, newtask):
"""
Add a new task to a pool, then drain any finished tasks at the start of the
Expand All @@ -381,21 +406,24 @@ def _pool_add(tasks, newtask):
tasks.pop(0)


def _drain_pool(pool, tasks):
def _drain_pool(pool, tasks, label=''):
"""
Wait for all tasks in a pool to complete, then shutdown the pool.
:param pool: a concurrent futures pool.
:param tasks: a list containing either lists or tuples, the last element
of which is a task submitted to the pool. Altered.
"""
numtasks = len(tasks)
_pool_log(len(tasks), numtasks, label)
while len(tasks):
# This allows better stopping on a SIGTERM
try:
tasks[0][-1].result(0.1)
except concurrent.futures.TimeoutError:
continue
tasks.pop(0)
_pool_log(len(tasks), numtasks, label)
pool.shutdown(False)


Expand Down Expand Up @@ -507,7 +535,8 @@ def _convert_large_image_tile(tilelock, strips, tile):
strips[ty] = strips[ty].insert(vimg, x, 0, expand=True)


def _convert_large_image_frame(frame, numFrames, ts, frameOutputPath, tempPath, **kwargs):
def _convert_large_image_frame(frame, numFrames, ts, frameOutputPath, tempPath,
parentConcurrency=None, **kwargs):
"""
Convert a single frame from a large_image source. This parallelizes tile
reads. Once all tiles are converted to a composited vips image, a tiff
Expand All @@ -518,18 +547,24 @@ def _convert_large_image_frame(frame, numFrames, ts, frameOutputPath, tempPath,
:param ts: the open tile source.
:param frameOutputPath: the destination name for the tiff file.
:param tempPath: a temporary file in a temporary directory.
:param parentConcurrency: amount of concurrency used by parent task.
"""
# The iterator tile size is a balance between memory use and fewer calls
# and file handles.
_iterTileSize = 4096
logger.info('Processing frame %d/%d', frame + 1, numFrames)
strips = []
pool = _get_thread_pool(**kwargs)
pool = _get_thread_pool(
memoryLimit=FrameMemoryEstimate,
# allow multiple tiles even if we are using all the cores, as it
# balances I/O and computation
parentConcurrency=(parentConcurrency // 2),
**kwargs)
tasks = []
tilelock = threading.Lock()
for tile in ts.tileIterator(tile_size=dict(width=_iterTileSize), frame=frame):
_pool_add(tasks, (pool.submit(_convert_large_image_tile, tilelock, strips, tile), ))
_drain_pool(pool, tasks)
_drain_pool(pool, tasks, f'tiles from frame {frame + 1}/{numFrames}')
minbands = min(strip.bands for strip in strips)
maxbands = max(strip.bands for strip in strips)
if minbands != maxbands:
Expand All @@ -556,20 +591,21 @@ def _convert_large_image(inputPath, outputPath, tempPath, lidata, **kwargs):
numFrames = len(lidata['metadata'].get('frames', [0]))
outputList = []
tasks = []
pool = _get_thread_pool(memoryLimit=FrameMemoryEstimate, **kwargs)
startFrame = 0
endFrame = numFrames
if kwargs.get('onlyFrame') is not None and str(kwargs.get('onlyFrame')):
startFrame = int(kwargs.get('onlyFrame'))
endFrame = startFrame + 1
pool = _get_thread_pool(memoryLimit=FrameMemoryEstimate,
numItems=endFrame - startFrame, **kwargs)
for frame in range(startFrame, endFrame):
frameOutputPath = tempPath + '-%d-%s.tiff' % (
frame + 1, time.strftime('%Y%m%d-%H%M%S'))
_pool_add(tasks, (pool.submit(
_convert_large_image_frame, frame, numFrames, ts, frameOutputPath,
tempPath, **kwargs), ))
tempPath, pool._max_workers, **kwargs), ))
outputList.append(frameOutputPath)
_drain_pool(pool, tasks)
_drain_pool(pool, tasks, 'frames')
_output_tiff(outputList, outputPath, tempPath, lidata, **kwargs)


Expand Down Expand Up @@ -891,6 +927,7 @@ def convert(inputPath, outputPath=None, **kwargs): # noqa: C901
:returns: outputPath if successful
"""
logger._pool_log_starttime = time.time()
if kwargs.get('_concurrency'):
os.environ['VIPS_CONCURRENCY'] = str(_concurrency_to_value(**kwargs))
geospatial = kwargs.get('geospatial')
Expand Down

0 comments on commit 487a33f

Please sign in to comment.