From d7f5cf37b6a6ffa12273134f97b2a2b9df4c395c Mon Sep 17 00:00:00 2001 From: Philipp van Kempen Date: Thu, 2 May 2024 22:21:15 +0200 Subject: [PATCH 1/7] session: yield futures once they are completed --- mlonmcu/session/session.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/mlonmcu/session/session.py b/mlonmcu/session/session.py index 6c25167de..ff9259016 100644 --- a/mlonmcu/session/session.py +++ b/mlonmcu/session/session.py @@ -173,13 +173,13 @@ def process_runs( assert num_workers > 0, "num_workers can not be < 1" workers = [] # results = [] - workers = [] pbar = None # Outer progress bar pbar2 = None # Inner progress bar num_runs = len(self.runs) num_failures = 0 stage_failures = {} - worker_run_idx = [] + # worker_run_idx = [] + worker_run_idx = {} def _init_progress(total, msg="Processing..."): """Helper function to initialize a progress bar for the session.""" @@ -210,13 +210,15 @@ def _join_workers(workers): """Helper function to collect all worker threads.""" nonlocal num_failures results = [] - for i, w in enumerate(workers): + # for i, w in enumerate(workers): + for w in concurrent.futures.as_completed(workers): try: results.append(w.result()) except Exception as e: logger.exception(e) logger.error("An exception was thrown by a worker during simulation") - run_index = worker_run_idx[i] + # run_index = worker_run_idx[i] + run_index = worker_run_idx[w] run = self.runs[run_index] if run.failing: num_failures += 1 @@ -270,11 +272,12 @@ def _used_stages(runs, until): if run.failing: logger.warning("Skiping stage '%s' for failed run", run_stage) else: - worker_run_idx.append(i) - workers.append(executor.submit(_process, pbar, run, until=stage, skip=skipped_stages)) + w = executor.submit(_process, pbar, run, until=stage, skip=skipped_stages) + workers.append(w) + worker_run_idx[w] = i _join_workers(workers) workers = [] - worker_run_idx = [] + worker_run_idx = {} if progress: _update_progress(pbar2) if progress: @@ -306,8 +309,9 @@ def _used_stages(runs, until): total_threads, cpu_count, ) - worker_run_idx.append(i) - workers.append(executor.submit(_process, pbar, run, until=until, skip=skipped_stages)) + w = executor.submit(_process, pbar, run, until=until, skip=skipped_stages) + workers.append(w) + worker_run_idx[w] = i _join_workers(workers) if num_failures == 0: logger.info("All runs completed successfuly!") From 48c132cbafc603cdad15ca0d0bca32eeb5eb24c1 Mon Sep 17 00:00:00 2001 From: Philipp van Kempen Date: Thu, 2 May 2024 22:38:02 +0200 Subject: [PATCH 2/7] session: update pbar in _join_workers --- mlonmcu/session/session.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/mlonmcu/session/session.py b/mlonmcu/session/session.py index ff9259016..d204007a5 100644 --- a/mlonmcu/session/session.py +++ b/mlonmcu/session/session.py @@ -203,20 +203,21 @@ def _close_progress(pbar): def _process(pbar, run, until, skip): """Helper function to invoke the run.""" run.process(until=until, skip=skip, export=export) - if progress: - _update_progress(pbar) - def _join_workers(workers): + def _join_workers(pbar, workers): """Helper function to collect all worker threads.""" nonlocal num_failures results = [] # for i, w in enumerate(workers): + # for w in workers: for w in concurrent.futures.as_completed(workers): try: results.append(w.result()) except Exception as e: logger.exception(e) logger.error("An exception was thrown by a worker during simulation") + if progress: + _update_progress(pbar) # run_index = worker_run_idx[i] run_index = worker_run_idx[w] run = self.runs[run_index] @@ -275,7 +276,7 @@ def _used_stages(runs, until): w = executor.submit(_process, pbar, run, until=stage, skip=skipped_stages) workers.append(w) worker_run_idx[w] = i - _join_workers(workers) + _join_workers(pbar, workers) workers = [] worker_run_idx = {} if progress: From 3d1568f7fff6206f358b56899eae3ffb642b6233 Mon Sep 17 00:00:00 2001 From: Philipp van Kempen Date: Sat, 4 May 2024 09:10:18 +0200 Subject: [PATCH 3/7] session: move pbar utilities to progress.py --- mlonmcu/session/progress.py | 44 +++++++++++++++++++++++++++++++++++++ mlonmcu/session/session.py | 34 +++++++--------------------- 2 files changed, 52 insertions(+), 26 deletions(-) create mode 100644 mlonmcu/session/progress.py diff --git a/mlonmcu/session/progress.py b/mlonmcu/session/progress.py new file mode 100644 index 000000000..a7cb421ee --- /dev/null +++ b/mlonmcu/session/progress.py @@ -0,0 +1,44 @@ +# +# Copyright (c) 2024 TUM Department of Electrical and Computer Engineering. +# +# This file is part of MLonMCU. +# See https://github.com/tum-ei-eda/mlonmcu.git for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Progress bar utilities for MLonMCU session.""" +from tqdm import tqdm + +from mlonmcu.logging import get_logger + +logger = get_logger() + + +def init_progress(total, msg="Processing..."): + """Helper function to initialize a progress bar for the session.""" + return tqdm( + total=total, + desc=msg, + ncols=100, + bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}s]", + leave=None, + ) + +def update_progress(pbar, count=1): + """Helper function to update the progress bar for the session.""" + pbar.update(count) + +def close_progress(pbar): + """Helper function to close the session progressbar, if available.""" + if pbar: + pbar.close() diff --git a/mlonmcu/session/session.py b/mlonmcu/session/session.py index d204007a5..701684494 100644 --- a/mlonmcu/session/session.py +++ b/mlonmcu/session/session.py @@ -36,6 +36,7 @@ from .postprocess.postprocess import SessionPostprocess from .run import RunStage +from .progress import init_progress, update_progress, close_progress logger = get_logger() # TODO: rename to get_mlonmcu_logger @@ -181,25 +182,6 @@ def process_runs( # worker_run_idx = [] worker_run_idx = {} - def _init_progress(total, msg="Processing..."): - """Helper function to initialize a progress bar for the session.""" - return tqdm( - total=total, - desc=msg, - ncols=100, - bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}s]", - leave=None, - ) - - def _update_progress(pbar, count=1): - """Helper function to update the progress bar for the session.""" - pbar.update(count) - - def _close_progress(pbar): - """Helper function to close the session progressbar, if available.""" - if pbar: - pbar.close() - def _process(pbar, run, until, skip): """Helper function to invoke the run.""" run.process(until=until, skip=skip, export=export) @@ -217,7 +199,7 @@ def _join_workers(pbar, workers): logger.exception(e) logger.error("An exception was thrown by a worker during simulation") if progress: - _update_progress(pbar) + update_progress(pbar) # run_index = worker_run_idx[i] run_index = worker_run_idx[w] run = self.runs[run_index] @@ -229,7 +211,7 @@ def _join_workers(pbar, workers): else: stage_failures[failed_stage] = [run_index] if progress: - _close_progress(pbar) + close_progress(pbar) return results def _used_stages(runs, until): @@ -247,11 +229,11 @@ def _used_stages(runs, until): with concurrent.futures.ThreadPoolExecutor(num_workers) as executor: if per_stage: if progress: - pbar2 = _init_progress(len(used_stages), msg="Processing stages") + pbar2 = init_progress(len(used_stages), msg="Processing stages") for stage in used_stages: run_stage = RunStage(stage).name if progress: - pbar = _init_progress(len(self.runs), msg=f"Processing stage {run_stage}") + pbar = init_progress(len(self.runs), msg=f"Processing stage {run_stage}") else: logger.info("%s Processing stage %s", self.prefix, run_stage) for i, run in enumerate(self.runs): @@ -280,12 +262,12 @@ def _used_stages(runs, until): workers = [] worker_run_idx = {} if progress: - _update_progress(pbar2) + update_progress(pbar2) if progress: - _close_progress(pbar2) + close_progress(pbar2) else: if progress: - pbar = _init_progress(len(self.runs), msg="Processing all runs") + pbar = init_progress(len(self.runs), msg="Processing all runs") else: logger.info(self.prefix + "Processing all stages") for i, run in enumerate(self.runs): From 001fbcbffb5d135309a82de8778b8e034f0eb44f Mon Sep 17 00:00:00 2001 From: Philipp van Kempen Date: Tue, 7 May 2024 06:54:59 +0200 Subject: [PATCH 4/7] session.py: move lots of code to schedule.py --- mlonmcu/session/schedule.py | 252 ++++++++++++++++++++++++++++++++++++ mlonmcu/session/session.py | 166 ++---------------------- 2 files changed, 260 insertions(+), 158 deletions(-) create mode 100644 mlonmcu/session/schedule.py diff --git a/mlonmcu/session/schedule.py b/mlonmcu/session/schedule.py new file mode 100644 index 000000000..996282b23 --- /dev/null +++ b/mlonmcu/session/schedule.py @@ -0,0 +1,252 @@ +# +# Copyright (c) 2024 TUM Department of Electrical and Computer Engineering. +# +# This file is part of MLonMCU. +# See https://github.com/tum-ei-eda/mlonmcu.git for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Definition of MLonMCU session schedulers.""" +import os +import shutil +import filelock +import tempfile +import multiprocessing +from datetime import datetime +from enum import Enum +from pathlib import Path +import concurrent.futures +from typing import List, Optional + +from tqdm import tqdm + +from mlonmcu.session.run import Run +from mlonmcu.logging import get_logger +from mlonmcu.report import Report +from mlonmcu.config import filter_config + +from .postprocess.postprocess import SessionPostprocess +from .run import RunStage +from .progress import init_progress, update_progress, close_progress + +logger = get_logger() # TODO: rename to get_mlonmcu_logger + + +def handle_executor(name: str): + # TODO: handle (thread_pool, process_pool, remote, hybrid) + EXECUTOR_LOOKUP = { + "thread_pool": concurrent.futures.ThreadPoolExecutor, + } + ret = EXECUTOR_LOOKUP.get(name, None) + assert ret is not None, f"Executor not found: {name}" + return ret + + +def _process(pbar, run, until, skip, export): + """Helper function to invoke the run.""" + run.process(until=until, skip=skip, export=export) + + +def _used_stages(runs, until): + """Determines the stages which are used by at least one run.""" + used = [] + for stage_index in list(range(RunStage.LOAD, until + 1)) + [RunStage.POSTPROCESS]: + stage = RunStage(stage_index) + if any(run.has_stage(stage) for run in runs): + used.append(stage) + return used + +class SessionScheduler: + """TODO""" + + def __init__(self, runs: List[Run], until: RunStage = RunStage.DONE, per_stage: bool = False, progress: bool = False, executor: str = "thread_pool", num_workers: int = 1): + self.runs = runs + self.until = until + self.per_stage = per_stage + self.progress = progress + self._executor_cls = handle_executor(executor) + self._executor_args = [num_workers] + self.num_workers = num_workers + self._futures = [] + # TODO: contextmanager? + self.num_failures = 0 + self.stage_failures = {} + # worker_run_idx = [] + self._future_run_idx = {} + self.used_stages, self.skipped_stages = self.prepare() + + def prepare(self): + used_stages = _used_stages(self.runs, self.until) + skipped_stages = [stage for stage in RunStage if stage not in used_stages] + return used_stages, skipped_stages + + @property + def num_runs(self): + return len(self.runs) + + @property + def num_success(self): + return self.num_runs - self.num_failures + + def reset(self): + raise NotImplementedError(".reset() not implemented") + + def _join_futures(self, pbar): + """Helper function to collect all worker threads.""" + results = [] + for f in concurrent.futures.as_completed(self._futures): + try: + results.append(f.result()) + except Exception as e: + logger.exception(e) + logger.error("An exception was thrown by a worker during simulation") + if self.progress: + update_progress(pbar) + run_index = self._future_run_idx[f] + run = self.runs[run_index] + if run.failing: + self.num_failures += 1 + failed_stage = RunStage(run.next_stage).name + if failed_stage in self.stage_failures: + self.stage_failures[failed_stage].append(run_index) + else: + self.stage_failures[failed_stage] = [run_index] + if self.progress: + close_progress(pbar) + return results + + def process( + self, + export=False, + context=None, + ): + pbar = None # Outer progress bar + pbar2 = None # Inner progress bar + + with self._executor_cls(*self._executor_args) as executor: + if self.per_stage: + if self.progress: + pbar2 = init_progress(len(self.used_stages), msg="Processing stages") + for stage in self.used_stages: + run_stage = RunStage(stage).name + if self.progress: + pbar = init_progress(len(self.runs), msg=f"Processing stage {run_stage}") + else: + logger.info("%s Processing stage %s", self.prefix, run_stage) + for i, run in enumerate(self.runs): + if i == 0: + total_threads = min(self.num_runs, self.num_workers) + cpu_count = multiprocessing.cpu_count() + if (stage == RunStage.COMPILE) and run.compile_platform: + total_threads *= run.compile_platform.num_threads + if total_threads > 2 * cpu_count: + if pbar2: + print() + logger.warning( + "The chosen configuration leads to a maximum of %d threads being" + + " processed which heavily exceeds the available CPU resources (%d)." + + " It is recommended to lower the value of 'mlif.num_threads'!", + total_threads, + cpu_count, + ) + if run.failing: + logger.warning("Skiping stage '%s' for failed run", run_stage) + else: + f = executor.submit(_process, pbar, run, until=stage, skip=self.skipped_stages, export=export) + self._futures.append(f) + self._future_run_idx[f] = i + self._join_futures(pbar) + self._futures = [] + self._future_run_idx = {} + if self.progress: + update_progress(pbar2) + if self.progress: + close_progress(pbar2) + else: + if self.progress: + pbar = init_progress(self.num_runs, msg="Processing all runs") + else: + logger.info(self.prefix + "Processing all stages") + for i, run in enumerate(self.runs): + if i == 0: + total_threads = min(len(self.runs), self.num_workers) + cpu_count = multiprocessing.cpu_count() + if ( + (self.until >= RunStage.COMPILE) + and run.compile_platform is not None + and run.compile_platform.name == "mlif" + ): + total_threads *= ( + run.compile_platform.num_threads + ) # TODO: This should also be used for non-mlif platforms + if total_threads > 2 * cpu_count: + if pbar2: + print() + logger.warning( + "The chosen configuration leads to a maximum of %d being processed which" + + " heavily exceeds the available CPU resources (%d)." + + " It is recommended to lower the value of 'mlif.num_threads'!", + total_threads, + cpu_count, + ) + f = executor.submit(_process, pbar, run, until=self.until, skip=self.skipped_stages) + self._futures.append(f) + self._future_run_idx[f] = i + self._join_futures(pbar) + # return num_failures == 0 + + def postprocess(self, dest): + report = self.get_reports() + logger.info("Postprocessing session report") + # Warning: currently we only support one instance of the same type of postprocess, + # also it will be applied to all rows! + session_postprocesses = [] + for run in self.runs: + for postprocess in run.postprocesses: + if isinstance(postprocess, SessionPostprocess): + if postprocess.name not in [p.name for p in session_postprocesses]: + session_postprocesses.append(postprocess) + if self.progress: + pbar = init_progress(len(session_postprocesses), msg="Postprocessing session") + for postprocess in session_postprocesses: + try: + artifacts = postprocess.post_session(report) + except Exception as e: + logger.exception(e) + self.num_failing += 1 + break + if self.progress: + update_progress(pbar) + if artifacts is not None: + for artifact in artifacts: + # Postprocess has an artifact: write to disk! + logger.debug("Writing postprocess artifact to disk: %s", artifact.name) + artifact.export(dest) + if self.progress: + close_progress(pbar) + + def print_summary(self): + if self.num_failures == 0: + logger.info("All runs completed successfuly!") + elif self.num_failures == self.num_runs: + logger.error("All runs have failed to complete!") + else: + logger.warning("%d out or %d runs completed successfully!", self.num_success, self.num_runs) + summary = "\n".join( + [ + f"\t{stage}: \t{len(failed)} failed run(s): " + " ".join([str(idx) for idx in failed]) + for stage, failed in self.stage_failures.items() + if len(failed) > 0 + ] + ) + logger.info("Summary:\n%s", summary) diff --git a/mlonmcu/session/session.py b/mlonmcu/session/session.py index 701684494..f75292a50 100644 --- a/mlonmcu/session/session.py +++ b/mlonmcu/session/session.py @@ -37,6 +37,7 @@ from .postprocess.postprocess import SessionPostprocess from .run import RunStage from .progress import init_progress, update_progress, close_progress +from .schedule import SessionScheduler logger = get_logger() # TODO: rename to get_mlonmcu_logger @@ -172,163 +173,12 @@ def process_runs( self.enumerate_runs() self.report = None assert num_workers > 0, "num_workers can not be < 1" - workers = [] - # results = [] - pbar = None # Outer progress bar - pbar2 = None # Inner progress bar - num_runs = len(self.runs) - num_failures = 0 - stage_failures = {} - # worker_run_idx = [] - worker_run_idx = {} - - def _process(pbar, run, until, skip): - """Helper function to invoke the run.""" - run.process(until=until, skip=skip, export=export) - - def _join_workers(pbar, workers): - """Helper function to collect all worker threads.""" - nonlocal num_failures - results = [] - # for i, w in enumerate(workers): - # for w in workers: - for w in concurrent.futures.as_completed(workers): - try: - results.append(w.result()) - except Exception as e: - logger.exception(e) - logger.error("An exception was thrown by a worker during simulation") - if progress: - update_progress(pbar) - # run_index = worker_run_idx[i] - run_index = worker_run_idx[w] - run = self.runs[run_index] - if run.failing: - num_failures += 1 - failed_stage = RunStage(run.next_stage).name - if failed_stage in stage_failures: - stage_failures[failed_stage].append(run_index) - else: - stage_failures[failed_stage] = [run_index] - if progress: - close_progress(pbar) - return results - - def _used_stages(runs, until): - """Determines the stages which are used by at least one run.""" - used = [] - for stage_index in list(range(RunStage.LOAD, until + 1)) + [RunStage.POSTPROCESS]: - stage = RunStage(stage_index) - if any(run.has_stage(stage) for run in runs): - used.append(stage) - return used - - used_stages = _used_stages(self.runs, until) - skipped_stages = [stage for stage in RunStage if stage not in used_stages] - - with concurrent.futures.ThreadPoolExecutor(num_workers) as executor: - if per_stage: - if progress: - pbar2 = init_progress(len(used_stages), msg="Processing stages") - for stage in used_stages: - run_stage = RunStage(stage).name - if progress: - pbar = init_progress(len(self.runs), msg=f"Processing stage {run_stage}") - else: - logger.info("%s Processing stage %s", self.prefix, run_stage) - for i, run in enumerate(self.runs): - if i == 0: - total_threads = min(len(self.runs), num_workers) - cpu_count = multiprocessing.cpu_count() - if (stage == RunStage.COMPILE) and run.compile_platform: - total_threads *= run.compile_platform.num_threads - if total_threads > 2 * cpu_count: - if pbar2: - print() - logger.warning( - "The chosen configuration leads to a maximum of %d threads being" - + " processed which heavily exceeds the available CPU resources (%d)." - + " It is recommended to lower the value of 'mlif.num_threads'!", - total_threads, - cpu_count, - ) - if run.failing: - logger.warning("Skiping stage '%s' for failed run", run_stage) - else: - w = executor.submit(_process, pbar, run, until=stage, skip=skipped_stages) - workers.append(w) - worker_run_idx[w] = i - _join_workers(pbar, workers) - workers = [] - worker_run_idx = {} - if progress: - update_progress(pbar2) - if progress: - close_progress(pbar2) - else: - if progress: - pbar = init_progress(len(self.runs), msg="Processing all runs") - else: - logger.info(self.prefix + "Processing all stages") - for i, run in enumerate(self.runs): - if i == 0: - total_threads = min(len(self.runs), num_workers) - cpu_count = multiprocessing.cpu_count() - if ( - (until >= RunStage.COMPILE) - and run.compile_platform is not None - and run.compile_platform.name == "mlif" - ): - total_threads *= ( - run.compile_platform.num_threads - ) # TODO: This should also be used for non-mlif platforms - if total_threads > 2 * cpu_count: - if pbar2: - print() - logger.warning( - "The chosen configuration leads to a maximum of %d being processed which" - + " heavily exceeds the available CPU resources (%d)." - + " It is recommended to lower the value of 'mlif.num_threads'!", - total_threads, - cpu_count, - ) - w = executor.submit(_process, pbar, run, until=until, skip=skipped_stages) - workers.append(w) - worker_run_idx[w] = i - _join_workers(workers) - if num_failures == 0: - logger.info("All runs completed successfuly!") - elif num_failures == num_runs: - logger.error("All runs have failed to complete!") - else: - num_success = num_runs - num_failures - logger.warning("%d out or %d runs completed successfully!", num_success, num_runs) - summary = "\n".join( - [ - f"\t{stage}: \t{len(failed)} failed run(s): " + " ".join([str(idx) for idx in failed]) - for stage, failed in stage_failures.items() - if len(failed) > 0 - ] - ) - logger.info("Summary:\n%s", summary) - - report = self.get_reports() - logger.info("Postprocessing session report") - # Warning: currently we only support one instance of the same type of postprocess, - # also it will be applied to all rows! - session_postprocesses = [] - for run in self.runs: - for postprocess in run.postprocesses: - if isinstance(postprocess, SessionPostprocess): - if postprocess.name not in [p.name for p in session_postprocesses]: - session_postprocesses.append(postprocess) - for postprocess in session_postprocesses: - artifacts = postprocess.post_session(report) - if artifacts is not None: - for artifact in artifacts: - # Postprocess has an artifact: write to disk! - logger.debug("Writting postprocess artifact to disk: %s", artifact.name) - artifact.export(self.dir) + executor = "thread_pool" + scheduler = SessionScheduler(self.runs, until, executor=executor, per_stage=per_stage, progress=progress, num_workers=num_workers) + self.runs = scheduler.process(export=export, context=context) + report = self.get_reports + scheduler.print_summary() + report = scheduler.postprocess(report, dest=self.dir) report_file = Path(self.dir) / f"report.{self.report_fmt}" report.export(report_file) results_dir = context.environment.paths["results"].path @@ -339,7 +189,7 @@ def _used_stages(runs, until): if print_report: logger.info("Report:\n%s", str(report.df)) - return num_failures == 0 + return scheduler.num_failures == 0 def discard(self): """Discard a run and remove its directory.""" From 5543f662f4495a0432300245fef1ffc489dda192 Mon Sep 17 00:00:00 2001 From: Philipp van Kempen Date: Tue, 7 May 2024 07:10:04 +0200 Subject: [PATCH 5/7] mlonmcuscheduler fixes --- mlonmcu/session/schedule.py | 5 +++-- mlonmcu/session/session.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/mlonmcu/session/schedule.py b/mlonmcu/session/schedule.py index 996282b23..e3a5d6ca8 100644 --- a/mlonmcu/session/schedule.py +++ b/mlonmcu/session/schedule.py @@ -203,10 +203,10 @@ def process( self._futures.append(f) self._future_run_idx[f] = i self._join_futures(pbar) + return self.runs # return num_failures == 0 - def postprocess(self, dest): - report = self.get_reports() + def postprocess(self, report, dest): logger.info("Postprocessing session report") # Warning: currently we only support one instance of the same type of postprocess, # also it will be applied to all rows! @@ -234,6 +234,7 @@ def postprocess(self, dest): artifact.export(dest) if self.progress: close_progress(pbar) + return report def print_summary(self): if self.num_failures == 0: diff --git a/mlonmcu/session/session.py b/mlonmcu/session/session.py index f75292a50..2c8f34240 100644 --- a/mlonmcu/session/session.py +++ b/mlonmcu/session/session.py @@ -176,7 +176,7 @@ def process_runs( executor = "thread_pool" scheduler = SessionScheduler(self.runs, until, executor=executor, per_stage=per_stage, progress=progress, num_workers=num_workers) self.runs = scheduler.process(export=export, context=context) - report = self.get_reports + report = self.get_reports() scheduler.print_summary() report = scheduler.postprocess(report, dest=self.dir) report_file = Path(self.dir) / f"report.{self.report_fmt}" From 26a895906cd37865dea797fafe2c103b48cfbbba Mon Sep 17 00:00:00 2001 From: Philipp van Kempen Date: Mon, 13 May 2024 00:01:55 +0200 Subject: [PATCH 6/7] schedule.py: add comments --- mlonmcu/session/schedule.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mlonmcu/session/schedule.py b/mlonmcu/session/schedule.py index e3a5d6ca8..ae7202249 100644 --- a/mlonmcu/session/schedule.py +++ b/mlonmcu/session/schedule.py @@ -56,6 +56,8 @@ def _process(pbar, run, until, skip, export): """Helper function to invoke the run.""" run.process(until=until, skip=skip, export=export) +# TODO: alternative _process functions + def _used_stages(runs, until): """Determines the stages which are used by at least one run.""" From c796801b2e8008fd4ff085ef2b03f0fac47ad6df Mon Sep 17 00:00:00 2001 From: Philipp van Kempen Date: Mon, 13 May 2024 00:05:42 +0200 Subject: [PATCH 7/7] lint --- mlonmcu/session/progress.py | 2 ++ mlonmcu/session/schedule.py | 16 ++++++++++++++-- mlonmcu/session/session.py | 4 +++- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/mlonmcu/session/progress.py b/mlonmcu/session/progress.py index a7cb421ee..8431b2306 100644 --- a/mlonmcu/session/progress.py +++ b/mlonmcu/session/progress.py @@ -34,10 +34,12 @@ def init_progress(total, msg="Processing..."): leave=None, ) + def update_progress(pbar, count=1): """Helper function to update the progress bar for the session.""" pbar.update(count) + def close_progress(pbar): """Helper function to close the session progressbar, if available.""" if pbar: diff --git a/mlonmcu/session/schedule.py b/mlonmcu/session/schedule.py index ae7202249..34df2ef23 100644 --- a/mlonmcu/session/schedule.py +++ b/mlonmcu/session/schedule.py @@ -56,6 +56,7 @@ def _process(pbar, run, until, skip, export): """Helper function to invoke the run.""" run.process(until=until, skip=skip, export=export) + # TODO: alternative _process functions @@ -68,10 +69,19 @@ def _used_stages(runs, until): used.append(stage) return used + class SessionScheduler: """TODO""" - def __init__(self, runs: List[Run], until: RunStage = RunStage.DONE, per_stage: bool = False, progress: bool = False, executor: str = "thread_pool", num_workers: int = 1): + def __init__( + self, + runs: List[Run], + until: RunStage = RunStage.DONE, + per_stage: bool = False, + progress: bool = False, + executor: str = "thread_pool", + num_workers: int = 1, + ): self.runs = runs self.until = until self.per_stage = per_stage @@ -164,7 +174,9 @@ def process( if run.failing: logger.warning("Skiping stage '%s' for failed run", run_stage) else: - f = executor.submit(_process, pbar, run, until=stage, skip=self.skipped_stages, export=export) + f = executor.submit( + _process, pbar, run, until=stage, skip=self.skipped_stages, export=export + ) self._futures.append(f) self._future_run_idx[f] = i self._join_futures(pbar) diff --git a/mlonmcu/session/session.py b/mlonmcu/session/session.py index 2c8f34240..6f3cb22d8 100644 --- a/mlonmcu/session/session.py +++ b/mlonmcu/session/session.py @@ -174,7 +174,9 @@ def process_runs( self.report = None assert num_workers > 0, "num_workers can not be < 1" executor = "thread_pool" - scheduler = SessionScheduler(self.runs, until, executor=executor, per_stage=per_stage, progress=progress, num_workers=num_workers) + scheduler = SessionScheduler( + self.runs, until, executor=executor, per_stage=per_stage, progress=progress, num_workers=num_workers + ) self.runs = scheduler.process(export=export, context=context) report = self.get_reports() scheduler.print_summary()