From 487a33f88b7cb8bffd5afd05a955b5ef99a51a5f Mon Sep 17 00:00:00 2001 From: David Manthey Date: Wed, 28 Aug 2024 12:50:57 -0400 Subject: [PATCH] Improve logging during pooled concurrency in the image converter Slightly shift how concurrency is divided between sources and tiles. --- CHANGELOG.md | 1 + .../large_image_converter/__init__.py | 57 +++++++++++++++---- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd180677c..7623c1b12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Add a utility function to minimize caching ([#1617](../../pull/1617)) - Allow passing through converter options when writing a zarr sink to a non-zarr format ([#1618](../../pull/1618)) - Do less calculations when applying affine transforms in the multi-source ([#1619](../../pull/1619)) +- Improve logging in the image converter ([#1621](../../pull/1621)) ### Bug Fixes diff --git a/utilities/converter/large_image_converter/__init__.py b/utilities/converter/large_image_converter/__init__.py index f8490afa9..adaf94a7e 100644 --- a/utilities/converter/large_image_converter/__init__.py +++ b/utilities/converter/large_image_converter/__init__.py @@ -109,7 +109,7 @@ def _data_from_large_image(path, outputPath, **kwargs): _pool_add(tasks, (pool.submit( _convert_via_vips, img, savePath, outputPath, mime=mime, forTiled=False), )) results['images'][key] = savePath - _drain_pool(pool, tasks) + _drain_pool(pool, tasks, 'associated images') return results @@ -218,7 +218,7 @@ def _generate_multiframe_tiff(inputPath, outputPath, tempPath, lidata, **kwargs) _pool_add(tasks, (pool.submit( _convert_via_vips, subInputPath, savePath, tempPath, False), )) extraImages[key] = savePath - _drain_pool(pool, tasks) + _drain_pool(pool, tasks, 'subpage') _output_tiff(outputList, outputPath, tempPath, lidata, extraImages, **kwargs) @@ -349,7 +349,7 @@ def _concurrency_to_value(_concurrency=None, **kwargs): return max(1, large_image.config.cpu_count(logical=True) + _concurrency) -def _get_thread_pool(memoryLimit=None, **kwargs): +def _get_thread_pool(memoryLimit=None, parentConcurrency=None, numItems=None, **kwargs): """ Allocate a thread pool based on the specific concurrency. @@ -357,12 +357,37 @@ def _get_thread_pool(memoryLimit=None, **kwargs): process per memoryLimit bytes of total memory. """ concurrency = _concurrency_to_value(**kwargs) + if parentConcurrency and parentConcurrency > 1 and concurrency > 1: + concurrency = max(1, int(math.ceil(concurrency / parentConcurrency))) if memoryLimit: + if parentConcurrency: + memoryLimit *= parentConcurrency concurrency = min(concurrency, large_image.config.total_memory() // memoryLimit) + if numItems and numItems >= 1 and concurrency > numItems: + concurrency = numItems concurrency = max(1, concurrency) return concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) +def _pool_log(left, total, label): + """ + Log processing within a pool. + + :param left: units left to process. + :param total: total units left to process. + :param label: label to log describing what is being processed. + """ + if not hasattr(logger, '_pool_log_starttime'): + logger._pool_log_starttime = time.time() + if not hasattr(logger, '_pool_log_lastlog'): + logger._pool_log_lastlog = time.time() + if time.time() - logger._pool_log_lastlog < 10: + return + elapsed = time.time() - logger._pool_log_starttime + logger.debug('%d/%d %s left %4.2fs', left, total, label, elapsed) + logger._pool_log_lastlog = time.time() + + def _pool_add(tasks, newtask): """ Add a new task to a pool, then drain any finished tasks at the start of the @@ -381,7 +406,7 @@ def _pool_add(tasks, newtask): tasks.pop(0) -def _drain_pool(pool, tasks): +def _drain_pool(pool, tasks, label=''): """ Wait for all tasks in a pool to complete, then shutdown the pool. @@ -389,6 +414,8 @@ def _drain_pool(pool, tasks): :param tasks: a list containing either lists or tuples, the last element of which is a task submitted to the pool. Altered. """ + numtasks = len(tasks) + _pool_log(len(tasks), numtasks, label) while len(tasks): # This allows better stopping on a SIGTERM try: @@ -396,6 +423,7 @@ def _drain_pool(pool, tasks): except concurrent.futures.TimeoutError: continue tasks.pop(0) + _pool_log(len(tasks), numtasks, label) pool.shutdown(False) @@ -507,7 +535,8 @@ def _convert_large_image_tile(tilelock, strips, tile): strips[ty] = strips[ty].insert(vimg, x, 0, expand=True) -def _convert_large_image_frame(frame, numFrames, ts, frameOutputPath, tempPath, **kwargs): +def _convert_large_image_frame(frame, numFrames, ts, frameOutputPath, tempPath, + parentConcurrency=None, **kwargs): """ Convert a single frame from a large_image source. This parallelizes tile reads. Once all tiles are converted to a composited vips image, a tiff @@ -518,18 +547,24 @@ def _convert_large_image_frame(frame, numFrames, ts, frameOutputPath, tempPath, :param ts: the open tile source. :param frameOutputPath: the destination name for the tiff file. :param tempPath: a temporary file in a temporary directory. + :param parentConcurrency: amount of concurrency used by parent task. """ # The iterator tile size is a balance between memory use and fewer calls # and file handles. _iterTileSize = 4096 logger.info('Processing frame %d/%d', frame + 1, numFrames) strips = [] - pool = _get_thread_pool(**kwargs) + pool = _get_thread_pool( + memoryLimit=FrameMemoryEstimate, + # allow multiple tiles even if we are using all the cores, as it + # balances I/O and computation + parentConcurrency=(parentConcurrency // 2), + **kwargs) tasks = [] tilelock = threading.Lock() for tile in ts.tileIterator(tile_size=dict(width=_iterTileSize), frame=frame): _pool_add(tasks, (pool.submit(_convert_large_image_tile, tilelock, strips, tile), )) - _drain_pool(pool, tasks) + _drain_pool(pool, tasks, f'tiles from frame {frame + 1}/{numFrames}') minbands = min(strip.bands for strip in strips) maxbands = max(strip.bands for strip in strips) if minbands != maxbands: @@ -556,20 +591,21 @@ def _convert_large_image(inputPath, outputPath, tempPath, lidata, **kwargs): numFrames = len(lidata['metadata'].get('frames', [0])) outputList = [] tasks = [] - pool = _get_thread_pool(memoryLimit=FrameMemoryEstimate, **kwargs) startFrame = 0 endFrame = numFrames if kwargs.get('onlyFrame') is not None and str(kwargs.get('onlyFrame')): startFrame = int(kwargs.get('onlyFrame')) endFrame = startFrame + 1 + pool = _get_thread_pool(memoryLimit=FrameMemoryEstimate, + numItems=endFrame - startFrame, **kwargs) for frame in range(startFrame, endFrame): frameOutputPath = tempPath + '-%d-%s.tiff' % ( frame + 1, time.strftime('%Y%m%d-%H%M%S')) _pool_add(tasks, (pool.submit( _convert_large_image_frame, frame, numFrames, ts, frameOutputPath, - tempPath, **kwargs), )) + tempPath, pool._max_workers, **kwargs), )) outputList.append(frameOutputPath) - _drain_pool(pool, tasks) + _drain_pool(pool, tasks, 'frames') _output_tiff(outputList, outputPath, tempPath, lidata, **kwargs) @@ -891,6 +927,7 @@ def convert(inputPath, outputPath=None, **kwargs): # noqa: C901 :returns: outputPath if successful """ + logger._pool_log_starttime = time.time() if kwargs.get('_concurrency'): os.environ['VIPS_CONCURRENCY'] = str(_concurrency_to_value(**kwargs)) geospatial = kwargs.get('geospatial')