diff --git a/everybody_codes/runner.py b/everybody_codes/runner.py index 1d5a578..d54ded9 100755 --- a/everybody_codes/runner.py +++ b/everybody_codes/runner.py @@ -9,34 +9,14 @@ import requests import resource import time +import typing import click import inotify_simple # type: ignore +from lib import helpers -class LogFormatter(logging.Formatter): - def __init__(self, day: int = 0, *args, **kwargs): - super().__init__(*args, **kwargs) - self.last_call = time.perf_counter_ns() - self.day = day - self.part = 0 - - def set_part(self, part: int) -> None: - self.part = part - - def format(self, record) -> str: - msg = super().format(record) - if "DAYPART" in msg and self.day and self.part: - msg = msg.replace("DAYPART", f"{self.day}.{self.part}") - return msg - - def formatTime(self, record, datefmt=None): - if datefmt: - return super().formatTime(record, datefmt) - call_time = time.perf_counter_ns() - delta = call_time - self.last_call - self.last_call = call_time - return datetime.datetime.now().strftime(f"%H:%M:%S ({delta // 1_000_000:5}ms)") +T = typing.TypeVar("T") def get_solutions(day: int) -> list[int] | None: @@ -61,26 +41,6 @@ def get_solutions(day: int) -> list[int] | None: return want_raw.split("\t")[1:] -def format_ns(ns: float) -> str: - units = [("ns", 1000), ("µs", 1000), ("ms", 1000), ("s", 60), ("mn", 60)] - for unit, shift in units: - if ns < shift: - break - ns /= shift - else: - unit = "hr" - return f"{ns:>7.3f} {unit:>2}" - - -def timed(func, *args, **kwargs): - params = inspect.signature(func).parameters - kwargs = {k: v for k, v in kwargs.items() if k in params} - start = time.perf_counter_ns() - got = func(*args, **kwargs) - end = time.perf_counter_ns() - return format_ns(end - start), got - - def run_day(day: int, check: bool, solve: bool, test: bool, parts: tuple[int], formatter) -> None: module = importlib.import_module(f"quest_{day:02}") module = importlib.reload(module) @@ -90,7 +50,7 @@ def run_day(day: int, check: bool, solve: bool, test: bool, parts: tuple[int], f for test_no, (test_part, test_data, test_want) in enumerate(module.TESTS, 1): if test_part != part: continue - time_s, got = timed(module.solve, part=part, data=test_data, testing=True) + time_s, got = helpers.timed(module.solve, part=part, data=test_data, testing=True) if got == test_want: print(f"TEST {day:02}.{part} {time_s} PASS (test {test_no})") else: @@ -107,7 +67,7 @@ def run_day(day: int, check: bool, solve: bool, test: bool, parts: tuple[int], f print(f"SOLVE No input data found for day {day} part {part}") continue data = data_path.read_text().rstrip() - time_s, got = timed(module.solve, part=part, data=data, testing=False) + time_s, got = helpers.timed(module.solve, part=part, data=data, testing=False) print(f"SOLVE {day:02}.{part} {time_s} ---> {got}") if check: want = get_solutions(day) @@ -118,7 +78,7 @@ def run_day(day: int, check: bool, solve: bool, test: bool, parts: tuple[int], f formatter.set_part(part) data_path = pathlib.Path(f"inputs/{day:02}.{part}.txt") data = data_path.read_text().rstrip() - time_s, got = timed(module.solve, part=part, data=data, testing=False) + time_s, got = helpers.timed(module.solve, part=part, data=data, testing=False) if str(got) == want[part - 1]: print(f"CHECK {day:02}.{part} {time_s} PASS") else: @@ -134,17 +94,8 @@ def run_day(day: int, check: bool, solve: bool, test: bool, parts: tuple[int], f @click.option("--live", "-l", is_flag=True) @click.option("--verbose", "-v", count=True) def main(day: int, check: bool, solve: bool, test: bool, live: bool, parts: tuple[int], verbose: int) -> None: - os.nice(19) - log_level = [logging.WARN, logging.INFO, logging.DEBUG][min(2, verbose)] - handler = logging.StreamHandler() - formatter = LogFormatter(day, fmt="%(asctime)s [DAYPART|%(funcName)s():L%(lineno)s] %(message)s") - handler.setFormatter(formatter) - logging.getLogger().addHandler(handler) - logging.getLogger().setLevel(log_level) - try: - resource.setrlimit(resource.RLIMIT_RSS, (int(10e9), int(100e9))) - except ValueError: - pass + formatter = helpers.setup_logging(day, verbose) + helpers.setup_resources() run_day(day, check, solve, test, parts, formatter) if not live: return diff --git a/pylib/aoc.py b/pylib/aoc.py index 05cc97b..a6e1207 100644 --- a/pylib/aoc.py +++ b/pylib/aoc.py @@ -106,17 +106,6 @@ def print_point_set(board: set[complex]) -> None: print() -def format_ns(ns: float) -> str: - units = [("ns", 1000), ("µs", 1000), ("ms", 1000), ("s", 60), ("mn", 60)] - for unit, shift in units: - if ns < shift: - break - ns /= shift - else: - unit = "hr" - return f"{ns:>7.3f} {unit:>2}" - - class CachedIterable(Iterable[T]): """Cached Iterable by phy1729.""" diff --git a/pylib/helpers.py b/pylib/helpers.py index 7816434..cb29dd8 100644 --- a/pylib/helpers.py +++ b/pylib/helpers.py @@ -3,8 +3,14 @@ import collections import collections.abc import dataclasses +import datetime +import inspect +import logging import operator +import os import re +import resource +import time import typing COLOR_SOLID = '█' @@ -59,6 +65,76 @@ Interval = tuple[int, int] +class LogFormatter(logging.Formatter): + """Custom log formatter to insert time deltas and part numbers.""" + def __init__(self, day: int = 0, *args, **kwargs): + super().__init__(*args, **kwargs) + self.last_call = time.perf_counter_ns() + self.day = day + self.part = 0 + + def set_part(self, part: int) -> None: + """Set the part number.""" + self.part = part + + def format(self, record) -> str: + """Format a log record, replacing DAYPART with the day.part.""" + msg = super().format(record) + if "DAYPART" in msg and self.day and self.part: + msg = msg.replace("DAYPART", f"{self.day}.{self.part}") + return msg + + def formatTime(self, record, datefmt=None) -> str: + """Format the time, including milliseconds since the last log call.""" + if datefmt: + return super().formatTime(record, datefmt) + call_time = time.perf_counter_ns() + delta = call_time - self.last_call + self.last_call = call_time + return datetime.datetime.now().strftime(f"%H:%M:%S ({format_ns(delta)})") + + +def setup_logging(day: int, verbose: int) -> logging.Formatter: + """Configure logging.""" + log_level = [logging.WARN, logging.INFO, logging.DEBUG][min(2, verbose)] + handler = logging.StreamHandler() + formatter = LogFormatter(day, fmt="%(asctime)s [DAYPART|%(funcName)s():L%(lineno)s] %(message)s") + handler.setFormatter(formatter) + logging.getLogger().addHandler(handler) + logging.getLogger().setLevel(log_level) + return formatter + +def setup_resources() -> None: + """Configure resources to make runner play nicely.""" + os.nice(19) + try: + resource.setrlimit(resource.RLIMIT_RSS, (int(10e9), int(100e9))) + except ValueError: + pass + + +def format_ns(ns: float) -> str: + """Convert nanosecond duration into human time.""" + units = [("ns", 1000), ("µs", 1000), ("ms", 1000), ("s", 60), ("mn", 60)] + for unit, shift in units: + if ns < shift: + break + ns /= shift + else: + unit = "hr" + return f"{ns:>7.3f} {unit:>2}" + + +def timed(func: collections.abc.Callable[..., T], *args, **kwargs) -> tuple[str, T]: + """Run a function. Return how long it took along with the result.""" + params = inspect.signature(func).parameters + kwargs = {k: v for k, v in kwargs.items() if k in params} + start = time.perf_counter_ns() + got = func(*args, **kwargs) + end = time.perf_counter_ns() + return format_ns(end - start), got + + def neighbors(point: complex, directions: collections.abc.Sequence[complex] = STRAIGHT_NEIGHBORS) -> collections.abc.Iterable[complex]: """Return the 4/8 neighbors of a point.""" return (point + offset for offset in directions) @@ -132,6 +208,7 @@ def neighbors(self, point: complex, directions: collections.abc.Sequence[complex """Return neighboring points and values which are in the map.""" return {n: self.chars[n] for n in neighbors(point, directions) if n in self.all_coords} + def render_char_map(chars: dict[complex, str], height: int, width: int) -> str: """Render the map as a string.""" lines = [] diff --git a/runner.py b/runner.py index 0455be5..e8d1d63 100755 --- a/runner.py +++ b/runner.py @@ -23,6 +23,7 @@ import inotify_simple # type: ignore from pylib import aoc +from pylib import helpers from pylib import site EST = zoneinfo.ZoneInfo("America/New_York") @@ -374,15 +375,15 @@ def show_parsers(): @click.option("--year", type=int, required=False, help="AoC year") @click.option("--waitlive", is_flag=True, help="Wait for midnight then live solve.") @click.option("--december", is_flag=True, help="Live solve all days.") -@click.option("--live", is_flag=True, help="Live solve one day: setup, watch, test, submit.") +@click.option("--live", "-l", is_flag=True, help="Live solve one day: setup, watch, test, submit.") @click.option("--test", "-t", is_flag=True, help="Test if the sample input/solution works.") -@click.option("--solve", is_flag=True, help="Generate the solution.") +@click.option("--solve", "-s", is_flag=True, help="Generate the solution.") @click.option( "--check", "-c", is_flag=True, help="Check if the results in solution.txt match with the generated solution.", ) @click.option("--submit", is_flag=True, help="Submit the next part on AoC website.") -@click.option("--part", type=int, multiple=True, default=(1, 2), help="Which parts to run.") +@click.option("--part", "-p", type=int, multiple=True, default=(1, 2), help="Which parts to run.") @click.option("--watch", is_flag=True, help="If set, loop and repeat the action when the file is saved.") @click.option("--benchmark", is_flag=True, help="Time the solution.") @click.option("--all-days", is_flag=True, help="Run action for all days.") @@ -411,18 +412,8 @@ def main( verbose: int, ) -> None: """Run the code in some fashion.""" - os.nice(19) - - log_level = [logging.WARN, logging.INFO, logging.DEBUG][min(2, verbose)] - handler = logging.StreamHandler() - handler.setFormatter(LogFormatter(fmt="%(asctime)s [%(funcName)s():L%(lineno)s] %(message)s")) - logging.getLogger().addHandler(handler) - logging.getLogger().setLevel(log_level) - - try: - resource.setrlimit(resource.RLIMIT_RSS, (int(10e9), int(100e9))) - except ValueError: - pass + formatter = helpers.setup_logging(day, verbose) + helpers.setup_resources() dotenv.load_dotenv() if cookie: site.Website(0, 0, False).set_cookie(cookie)