diff --git a/loki/batch/__init__.py b/loki/batch/__init__.py index 88ee90d9b..2b25a67dd 100644 --- a/loki/batch/__init__.py +++ b/loki/batch/__init__.py @@ -15,6 +15,7 @@ """ from loki.batch.configure import * # noqa +from loki.batch.executor import * # noqa from loki.batch.item import * # noqa from loki.batch.item_factory import * # noqa from loki.batch.pipeline import * # noqa diff --git a/loki/batch/executor.py b/loki/batch/executor.py new file mode 100644 index 000000000..53524e583 --- /dev/null +++ b/loki/batch/executor.py @@ -0,0 +1,44 @@ +# (C) Copyright 2018- ECMWF. +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +""" +A dummy "executor" utility that allows parallel non-threaded execution +under the same API as thread or ProcessPoolExecutors. +""" + +from concurrent.futures import Executor, Future + +__all__ = ['SerialExecutor'] + + +class SerialExecutor(Executor): + """ + A dummy "executor" utility that allows parallel non-threaded + execution with the same API as a ``ProcessPoolExecutors``. + """ + + def submit(self, fn, *args, **kwargs): # pylint: disable=arguments-differ + """ + Executes the callable, *fn* as ``fn(*args, **kwargs)`` + and wraps the return value in a *Future* object. + """ + f = Future() + try: + # Execute function on given args + result = fn(*args, **kwargs) + except BaseException as e: + f.set_exception(e) + else: + f.set_result(result) + + return f + + def map(self, fn, *args, **kwargs): + """ + Maps the callable *fn* via ``map(fn, *args, **kwargs)``. + """ + return map(fn, *args, **kwargs) diff --git a/loki/batch/item_factory.py b/loki/batch/item_factory.py index 970ce0afe..637c4c62e 100644 --- a/loki/batch/item_factory.py +++ b/loki/batch/item_factory.py @@ -15,7 +15,6 @@ from loki.logging import warning from loki.module import Module from loki.subroutine import Subroutine -from loki.sourcefile import Sourcefile from loki.tools import CaseInsensitiveDict, as_tuple from loki.types import BasicType @@ -313,39 +312,6 @@ def get_or_create_item_from_item(self, name, item, config=None): raise RuntimeError(f'Failed to clone item {item.name} as {name}') - def get_or_create_file_item_from_path(self, path, config, frontend_args=None): - """ - Utility method to create a :any:`FileItem` for a given path - - This is used to instantiate items for the first time during the scheduler's - discovery phase. It will use a cached item if it exists, or parse the source - file using the given :data:`frontend_args`. - - Parameters - ---------- - path : str or pathlib.Path - The file path of the source file - config : :any:`SchedulerConfig` - The config object from which the item configuration will be derived - frontend_args : dict, optional - Frontend arguments that are given to :any:`Sourcefile.from_file` when - parsing the file - """ - item_name = str(path).lower() - if file_item := self.item_cache.get(item_name): - return file_item - - if not frontend_args: - frontend_args = {} - if config: - frontend_args = config.create_frontend_args(path, frontend_args) - - source = Sourcefile.from_file(path, **frontend_args) - item_conf = config.create_item_config(item_name) if config else None - file_item = FileItem(item_name, source=source, config=item_conf) - self.item_cache[item_name] = file_item - return file_item - def get_or_create_file_item_from_source(self, source, config): """ Utility method to create a :any:`FileItem` corresponding to a given source object diff --git a/loki/batch/scheduler.py b/loki/batch/scheduler.py index b51328e22..570693512 100644 --- a/loki/batch/scheduler.py +++ b/loki/batch/scheduler.py @@ -6,11 +6,13 @@ # nor does it submit to any jurisdiction. from enum import Enum, auto +from concurrent.futures import ProcessPoolExecutor from os.path import commonpath from pathlib import Path from codetiming import Timer from loki.batch.configure import SchedulerConfig +from loki.batch.executor import SerialExecutor from loki.batch.item import ( FileItem, ModuleItem, ProcedureItem, ProcedureBindingItem, InterfaceItem, TypeDefItem, ExternalItem @@ -22,6 +24,7 @@ from loki.batch.transformation import Transformation from loki.frontend import FP, REGEX, RegexParserClass +from loki.sourcefile import Sourcefile from loki.tools import as_tuple, CaseInsensitiveDict, flatten from loki.logging import info, perf, warning, error @@ -150,6 +153,9 @@ class Scheduler: full_parse: bool, optional Flag indicating whether a full parse of all sourcefiles is required. By default a full parse is executed, use this flag to suppress. + num_workers : int, default: 0 + Number of processes to use for parallel processing. Use the default + value ``0`` to bypass parallel processing and process serially. frontend : :any:`Frontend`, optional Frontend to use for full parse of source files (default :any:`FP`). output_dir : str or path @@ -159,9 +165,12 @@ class Scheduler: # TODO: Should be user-definable! source_suffixes = ['.f90', '.F90', '.f', '.F'] - def __init__(self, paths, config=None, seed_routines=None, preprocess=False, - includes=None, defines=None, definitions=None, xmods=None, - omni_includes=None, full_parse=True, frontend=FP, output_dir=None): + def __init__( + self, paths, config=None, seed_routines=None, preprocess=False, + includes=None, defines=None, definitions=None, xmods=None, + omni_includes=None, full_parse=True, num_workers=0, + frontend=FP, output_dir=None + ): # Derive config from file or dict if isinstance(config, SchedulerConfig): self.config = config @@ -172,6 +181,13 @@ def __init__(self, paths, config=None, seed_routines=None, preprocess=False, self.full_parse = full_parse + if num_workers > 0: + # Create the parallel pool executor for parallel processing + self.executor = ProcessPoolExecutor(max_workers=num_workers) + else: + # Create a dummy executor that mimics the concurrent Exectuor API + self.executor = SerialExecutor() + # Build-related arguments to pass to the sources self.paths = [Path(p) for p in as_tuple(paths)] self.seeds = tuple( @@ -202,6 +218,27 @@ def __init__(self, paths, config=None, seed_routines=None, preprocess=False, # Attach interprocedural call-tree information self._enrich() + def __del__(self): + # Shut down the parallel process pool + self.executor.shutdown() + + @staticmethod + def _parse_source(source, frontend_args): + """ + Utility function that exposes the parsing step for one + :any:`SourceFile` as a pure function for the parallel + executor. + + Parameters + ---------- + source : :any:`Sourcefile` + The sourcefile object to trigger full parse on + frontend_args : dict + Dict of arguments to pass to :meth:`make_complete` + """ + source.make_complete(**frontend_args) + return source + @Timer(logger=info, text='[Loki::Scheduler] Performed initial source scan in {:.2f}s') def _discover(self): """ @@ -223,8 +260,18 @@ def _discover(self): path_list = list(set(flatten(path_list))) # Filter duplicates and flatten # Instantiate FileItem instances for all files in the search path - for path in path_list: - self.item_factory.get_or_create_file_item_from_path(path, self.config, frontend_args) + # TODO: This is essentially item-cache creation, and should live on ItemFactory + path_fargs = tuple( + (path, self.config.create_frontend_args(path, frontend_args)) for path in path_list + ) + with Timer(logger=info, text='[Loki::Scheduler] Scheduler:: Initial file parse in {:.2f}s'): + src_futures = tuple( + self.executor.submit(Sourcefile.from_file, path, **fargs) + for path, fargs in path_fargs + ) + sources = tuple(src.result() for src in src_futures) + for source in sources: + self.item_factory.get_or_create_file_item_from_source(source, self.config) # Instantiate the basic list of items for files and top-level program units # in each file, i.e., modules and subroutines @@ -314,14 +361,27 @@ def _parse_items(self): # Force the parsing of the routines default_frontend_args = self.build_args.copy() default_frontend_args['definitions'] = as_tuple(default_frontend_args['definitions']) + self.definitions - for item in SFilter(self.file_graph, reverse=True): - frontend_args = self.config.create_frontend_args(item.name, default_frontend_args) - item.source.make_complete(**frontend_args) + + # Get the iteration order from the Sfilter + items = SFilter(self.file_graph, reverse=True) + + # Build the arguments for the parser function and call parallel map + with Timer(logger=perf, text='[Loki::Scheduler] Performed the actual parse loop in {:.2f}s'): + sources = tuple(item.source for item in items) + fargs = tuple( + self.config.create_frontend_args(item.name, default_frontend_args) + for item in items + ) + f_sources = self.executor.map(self._parse_source, sources, fargs) + + # Set the "completed" Sourcefile on the item + for item, source in zip(items, f_sources): + item.source = source # Re-build the SGraph after parsing to pick up all new connections self._sgraph = SGraph.from_seed(self.seeds, self.item_factory, self.config) - @Timer(logger=perf, text='[Loki::Scheduler] Enriched call tree in {:.2f}s') + @Timer(logger=info, text='[Loki::Scheduler] Enriched call tree in {:.2f}s') def _enrich(self): """ For items that have a specific enrichment list provided as part of their diff --git a/loki/batch/tests/test_batch.py b/loki/batch/tests/test_batch.py index f2df683b8..7d281a1d8 100644 --- a/loki/batch/tests/test_batch.py +++ b/loki/batch/tests/test_batch.py @@ -713,7 +713,8 @@ def test_procedure_item_from_item1(testdir, default_config): # A file with a single subroutine definition that calls a routine via interface block item_factory = ItemFactory() scheduler_config = SchedulerConfig.from_dict(default_config) - file_item = item_factory.get_or_create_file_item_from_path(proj/'source/comp1.F90', config=scheduler_config) + source = Sourcefile.from_file(proj/'source/comp1.F90') + file_item = item_factory.get_or_create_file_item_from_source(source, config=scheduler_config) item = file_item.create_definition_items(item_factory=item_factory, config=scheduler_config)[0] assert item.name == '#comp1' assert isinstance(item, ProcedureItem) @@ -746,7 +747,8 @@ def test_procedure_item_from_item2(testdir, default_config): # where the type is imported via an import statement in the module scope item_factory = ItemFactory() scheduler_config = SchedulerConfig.from_dict(default_config) - file_item = item_factory.get_or_create_file_item_from_path(proj/'module/other_mod.F90', config=scheduler_config) + source = Sourcefile.from_file(proj/'module/other_mod.F90') + file_item = item_factory.get_or_create_file_item_from_source(source, config=scheduler_config) mod_item = file_item.create_definition_items(item_factory=item_factory, config=scheduler_config)[0] assert mod_item.name == 'other_mod' assert isinstance(mod_item, ModuleItem) diff --git a/loki/cli/loki_transform.py b/loki/cli/loki_transform.py index 0d0320959..b41a7c948 100644 --- a/loki/cli/loki_transform.py +++ b/loki/cli/loki_transform.py @@ -39,8 +39,10 @@ @click.option('--log-level', '-l', default='info', envvar='LOKI_LOGGING', type=click.Choice(['debug', 'detail', 'perf', 'info', 'warning', 'error']), help='Log level to output during batch processing') +@click.option('--num-workers', type=int, default=0, envvar='LOKI_NUM_WORKERS', + help='Number of worker processes to use for parallel processing steps.') def convert( - frontend_opts, scheduler_opts, mode, config, plan_file, callgraph, root, log_level + frontend_opts, scheduler_opts, mode, config, plan_file, callgraph, root, log_level, num_workers ): """ Batch-processing mode for Fortran-to-Fortran transformations that @@ -85,7 +87,8 @@ def convert( full_parse = processing_strategy == ProcessingStrategy.DEFAULT scheduler = Scheduler( paths=paths, config=config, full_parse=full_parse, - definitions=definitions, output_dir=scheduler_opts.build, **frontend_opts.asdict + definitions=definitions, output_dir=scheduler_opts.build, + num_workers=num_workers, **frontend_opts.asdict ) # If requested, apply a custom pipeline from the scheduler config @@ -132,6 +135,8 @@ def convert( @click.option('--log-level', '-l', default='info', envvar='LOKI_LOGGING', type=click.Choice(['debug', 'detail', 'perf', 'info', 'warning', 'error']), help='Log level to output during batch processing') +@click.option('--num-workers', type=int, default=0, envvar='LOKI_NUM_WORKERS', + help='Number of worker processes to use for parallel processing steps.') @click.pass_context def plan(ctx, *_args, **_kwargs): """ diff --git a/loki/expression/operations.py b/loki/expression/operations.py index e6ae2ebb5..4af5184f9 100644 --- a/loki/expression/operations.py +++ b/loki/expression/operations.py @@ -134,15 +134,17 @@ class Cast(StrCompareMixin, pmbl.Call): Internal representation of a data type cast. """ - init_arg_names = ('name', 'expression', 'kind') + init_arg_names = ('function', 'expression', 'kind') - def __init__(self, name, expression, kind=None, **kwargs): + def __getinitargs__(self): + return (self.function, self.expression, self.kind) + + def __init__(self, function, expression, kind=None, **kwargs): + if not isinstance(function, pmbl.Expression): + function = pmbl.make_variable(function) assert kind is None or isinstance(kind, pmbl.Expression) self.kind = kind - super().__init__(pmbl.make_variable(name), as_tuple(expression), **kwargs) - - def __getinitargs__(self): - return (self.name, self.expression, self.kind) + super().__init__(function, as_tuple(expression), **kwargs) mapper_method = intern('map_cast') @@ -154,6 +156,10 @@ def name(self): def expression(self): return self.parameters + @expression.setter + def expression(self, expr): + self.parameters = expr + class Reference(StrCompareMixin, pmbl.Expression): """ diff --git a/loki/expression/tests/test_symbols.py b/loki/expression/tests/test_symbols.py index 34d6e9b4f..221c03bad 100644 --- a/loki/expression/tests/test_symbols.py +++ b/loki/expression/tests/test_symbols.py @@ -179,7 +179,7 @@ def test_symbol_recreation(): exprs.append( sym.LoopRange((sym.IntLiteral(1), i)) ) exprs.append( sym.RangeIndex((sym.IntLiteral(1), i)) ) - exprs.append( sym.Cast(name='int', expression=b, kind=i) ) + exprs.append( sym.Cast(function='int', expression=b, kind=i) ) exprs.append( sym.Reference(expression=b) ) exprs.append( sym.Dereference(expression=b) ) diff --git a/loki/module.py b/loki/module.py index d2ff143aa..a0eaa9fb4 100644 --- a/loki/module.py +++ b/loki/module.py @@ -249,7 +249,8 @@ def __getstate__(self): s = self.__dict__.copy() # TODO: We need to remove the AST, as certain AST types # (eg. FParser) are not pickle-safe. - del s['_ast'] + if '_ast' in s: + del s['_ast'] return s def __setstate__(self, s): diff --git a/loki/tests/test_pickle.py b/loki/tests/test_pickle.py index 7e1d1d044..dd9420cba 100644 --- a/loki/tests/test_pickle.py +++ b/loki/tests/test_pickle.py @@ -306,3 +306,30 @@ def test_pickle_scheduler_item(here, frontend, tmp_path): assert loads(dumps(item_a.source.ir)) == item_a.source.ir assert loads(dumps(item_a.source)) == item_a.source assert loads(dumps(item_a)) == item_a + + +@pytest.mark.parametrize('frontend', available_frontends()) +def test_pickle_expressions(frontend): + + fcode = """ + subroutine my_routine(n, a, b, c) + integer, intent(in) :: n + real, intent(in) :: a(n), b(n), c(n) + real :: x, y, z + integer :: i, j + + a(:) = b(:) + c(:) ! Test arrays + x = max(x, a(1)) ! Test scalar and intrinsic + + i = real(x, 4) ! Test casts + end subroutine my_routine +""" + routine = Subroutine.from_source(fcode, frontend=frontend) + + # Ensure equivalence after pickle-cyle with scope-level replication + routine_new = loads(dumps(routine)) + assert routine_new.spec == routine.spec + assert routine_new.body == routine.body + assert routine_new.contains == routine.contains + assert routine_new.symbol_attrs == routine.symbol_attrs + assert routine_new == routine diff --git a/loki/transformations/single_column/scc_cuf.py b/loki/transformations/single_column/scc_cuf.py index 3232f1138..b13f28df5 100644 --- a/loki/transformations/single_column/scc_cuf.py +++ b/loki/transformations/single_column/scc_cuf.py @@ -472,13 +472,14 @@ def driver_launch_configuration(self, routine, block_dim, targets=None): # GRIDDIM lhs = routine.variable_map["griddim"] - rhs = sym.InlineCall(function=func_dim3, parameters=(sym.IntLiteral(1), sym.IntLiteral(1), - sym.InlineCall(function=func_ceiling, - parameters=as_tuple( - sym.Cast(name="REAL", - expression=upper) / - sym.Cast(name="REAL", - expression=step))))) + rhs = sym.InlineCall( + function=func_dim3, parameters=( + sym.IntLiteral(1), sym.IntLiteral(1), + sym.InlineCall(function=func_ceiling, parameters=as_tuple( + sym.Cast("REAL", expression=upper) / sym.Cast("REAL", expression=step) + )) + ) + ) griddim_assignment = ir.Assignment(lhs=lhs, rhs=rhs) mapper[call] = (blockdim_assignment, griddim_assignment, ir.Comment(""), call.clone(chevron=(routine.variable_map["GRIDDIM"], routine.variable_map["BLOCKDIM"]),), @@ -493,11 +494,12 @@ def driver_launch_configuration(self, routine, block_dim, targets=None): blockdim_assignment = ir.Assignment(lhs=lhs, rhs=rhs) # GRIDDIM lhs = griddim_var - rhs = sym.InlineCall(function=func_dim3, parameters=(sym.InlineCall(function=func_ceiling, - parameters=as_tuple( - sym.Cast(name="REAL", expression=upper) / - sym.Cast(name="REAL", expression=step))), - sym.IntLiteral(1), sym.IntLiteral(1))) + rhs = sym.InlineCall(function=func_dim3, parameters=( + sym.InlineCall(function=func_ceiling, parameters=as_tuple( + sym.Cast("REAL", expression=upper) / sym.Cast("REAL", expression=step) + )), + sym.IntLiteral(1), sym.IntLiteral(1)) + ) griddim_assignment = ir.Assignment(lhs=lhs, rhs=rhs) routine.body = Transformer(mapper=mapper).visit(routine.body) diff --git a/loki/transformations/temporaries/pool_allocator.py b/loki/transformations/temporaries/pool_allocator.py index 2604516ba..da48ff484 100644 --- a/loki/transformations/temporaries/pool_allocator.py +++ b/loki/transformations/temporaries/pool_allocator.py @@ -439,7 +439,7 @@ def _get_stack_storage_and_size_var(self, routine, stack_size): _kind = routine.symbol_map.get('REAL64', None) or Variable(name='REAL64') # Convert stack_size from bytes to integer - stack_type_bytes = Cast(name='REAL', expression=Literal(1), kind=_kind) + stack_type_bytes = Cast('REAL', expression=Literal(1), kind=_kind) stack_type_bytes = InlineCall(Variable(name='C_SIZEOF'), parameters=as_tuple(stack_type_bytes)) stack_size_assign = Assignment(lhs=stack_size_var, rhs=stack_size) @@ -577,15 +577,15 @@ def _get_c_sizeof_arg(self, arr): """ if arr.type.dtype == BasicType.REAL: - param = Cast(name='REAL', expression=IntLiteral(1)) + param = Cast('REAL', expression=IntLiteral(1)) elif arr.type.dtype == BasicType.INTEGER: - param = Cast(name='INT', expression=IntLiteral(1)) + param = Cast('INT', expression=IntLiteral(1)) elif arr.type.dtype == BasicType.CHARACTER: - param = Cast(name='CHAR', expression=IntLiteral(1)) + param = Cast('CHAR', expression=IntLiteral(1)) elif arr.type.dtype == BasicType.LOGICAL: - param = Cast(name='LOGICAL', expression=LogicLiteral('.TRUE.')) + param = Cast('LOGICAL', expression=LogicLiteral('.TRUE.')) elif arr.type.dtype == BasicType.COMPLEX: - param = Cast(name='CMPLX', expression=(IntLiteral(1), IntLiteral(1))) + param = Cast('CMPLX', expression=(IntLiteral(1), IntLiteral(1))) param.kind = getattr(arr.type, 'kind', None) # pylint: disable=possibly-used-before-assignment @@ -871,7 +871,7 @@ def create_pool_allocator(self, routine, stack_size): lhs=stack_end, rhs=Sum((stack_ptr, stack_size_var)) ) else: - _real_size_bytes = Cast(name='REAL', expression=Literal(1), kind=_kind) + _real_size_bytes = Cast('REAL', expression=Literal(1), kind=_kind) _real_size_bytes = InlineCall(Variable(name='C_SIZEOF'), parameters=as_tuple(_real_size_bytes)) stack_incr = Assignment(