diff --git a/lib/vsc/utils/__init__.py b/lib/vsc/utils/__init__.py index ebd2719..9eed26d 100644 --- a/lib/vsc/utils/__init__.py +++ b/lib/vsc/utils/__init__.py @@ -28,3 +28,31 @@ """ import pkg_resources pkg_resources.declare_namespace(__name__) + +import os +import warnings +from functools import wraps + +def _script_name(full_name): + """Return the script name without .py extension if any. This assumes that the script name does not contain a + dot in case of lacking an extension. + """ + (name, _) = os.path.splitext(full_name) + return os.path.basename(name) + + +def deprecated_class(message=None): + def decorator(cls): + class Wrapped(cls): + def __init__(self, *args, **kwargs): + warnings.warn( + f"{cls.__name__} is deprecated. {message or ''}", + category=DeprecationWarning, + stacklevel=2, + ) + super().__init__(*args, **kwargs) + Wrapped.__name__ = cls.__name__ + Wrapped.__doc__ = cls.__doc__ + Wrapped.__module__ = cls.__module__ + return Wrapped + return decorator diff --git a/lib/vsc/utils/nagios.py b/lib/vsc/utils/nagios.py index 156345d..52307bf 100644 --- a/lib/vsc/utils/nagios.py +++ b/lib/vsc/utils/nagios.py @@ -47,12 +47,14 @@ import sys import time +from vsc.utils import _script_name from vsc.utils.cache import FileCache from vsc.utils.fancylogger import getLogger log = getLogger(__name__) NAGIOS_CACHE_DIR = '/var/cache' +NAGIOS_CACHE_FILENAME = 'cache.nagios.json.gz' NAGIOS_CACHE_FILENAME_TEMPLATE = '%s.nagios.json.gz' NAGIOS_OK = 'OK' @@ -241,6 +243,72 @@ def alert(self, test): """ return not self.range_fn(test) +class NagiosStatusMixin: + """ + A mixin class providing methods for Nagios status codes. + + Note that these methods do not return, they exit the script. + + Options come from NAGIOS_MIXIN_OPTIONS. + """ + + NAGIOS_MIXIN_OPTIONS = { + 'nagios-report': ('print out nagios information', None, 'store_true', False, 'n'), + 'nagios-check-filename': + ('filename of where the nagios check data is stored', 'str', 'store', + os.path.join( + NAGIOS_CACHE_DIR, + NAGIOS_CACHE_FILENAME_TEMPLATE % (_script_name(sys.argv[0]),) + ) + ), + 'nagios-check-interval-threshold': ('threshold of nagios checks timing out', 'int', 'store', 0), + 'nagios-user': ('user nagios runs as', 'str', 'store', 'nrpe'), + 'nagios-world-readable-check': ('make the nagios check data file world readable', None, 'store_true', False), + } + + def nagios_prologue(self): + """ + This will set up the reporter, but exit immediately of the report is requested + """ + # bail if nagios report is requested + self.nagios = SimpleNagios( + _cache=self.options.nagios_check_filename, + _report_and_exit=self.options.nagios_report, + _threshold=self.options.nagios_check_interval_threshold, + _cache_user=self.options.nagios_user, + _world_readable=self.options.nagios_world_readable_check, + ) + + def nagios_epilogue(self, nagios_exit, nagios_message): + """ + This will write the result to the cache file + """ + self.nagios._exit(nagios_exit, nagios_message) + + def ok(self, msg): + """ + Convenience method that exits with Nagios OK exit code. + """ + exit_from_errorcode(0, msg) + + def warning(self, msg): + """ + Convenience method that exits with Nagios WARNING exit code. + """ + exit_from_errorcode(1, msg) + + def critical(self, msg): + """ + Convenience method that exits with Nagios CRITICAL exit code. + """ + exit_from_errorcode(2, msg) + + def unknown(self, msg): + """ + Convenience method that exits with Nagios UNKNOWN exit code. + """ + exit_from_errorcode(3, msg) + class NagiosReporter: """Reporting class for Nagios/Icinga reports. diff --git a/lib/vsc/utils/script_tools.py b/lib/vsc/utils/script_tools.py index d32bd35..280fea9 100644 --- a/lib/vsc/utils/script_tools.py +++ b/lib/vsc/utils/script_tools.py @@ -34,22 +34,30 @@ import os import sys +from configargparse import ArgParser from copy import deepcopy +from vsc.utils import deprecated_class, _script_name import logging +from lockfile.linklockfile import LockFailed from vsc.utils import fancylogger from vsc.utils.availability import proceed_on_ha_service from vsc.utils.generaloption import SimpleOption from vsc.utils.lock import lock_or_bork, release_or_bork, LOCKFILE_DIR, LOCKFILE_FILENAME_TEMPLATE from vsc.utils.nagios import ( - SimpleNagios, NAGIOS_CACHE_DIR, NAGIOS_CACHE_FILENAME_TEMPLATE, exit_from_errorcode, + NAGIOS_CRITICAL, SimpleNagios, NAGIOS_CACHE_DIR, NAGIOS_CACHE_FILENAME_TEMPLATE, exit_from_errorcode, NAGIOS_EXIT_OK, NAGIOS_EXIT_WARNING, NAGIOS_EXIT_CRITICAL, NAGIOS_EXIT_UNKNOWN, + NagiosStatusMixin +) +from vsc.utils.timestamp import ( + convert_timestamp, write_timestamp, retrieve_timestamp_with_default, TIMESTAMP_DIR, + TIMESTAMP_FILENAME_TEMPLATE ) -from vsc.utils.timestamp import convert_timestamp, write_timestamp, retrieve_timestamp_with_default from vsc.utils.timestamp_pid_lockfile import TimestampedPidLockfile DEFAULT_TIMESTAMP = "20140101000000Z" TIMESTAMP_FILE_OPTION = "timestamp_file" + DEFAULT_CLI_OPTIONS = { "start_timestamp": ("The timestamp form which to start, otherwise use the cached value", None, "store", None), TIMESTAMP_FILE_OPTION: ("Location to cache the start timestamp", None, "store", None), @@ -58,31 +66,360 @@ MAX_RTT = 2 * MAX_DELTA + 1 -def _script_name(full_name): - """Return the script name without .py extension if any. This assumes that the script name does not contain a - dot in case of lacking an extension. - """ - (name, _) = os.path.splitext(full_name) - return os.path.basename(name) - - DEFAULT_OPTIONS = { 'disable-locking': ('do NOT protect this script by a file-based lock', None, 'store_true', False), 'dry-run': ('do not make any updates whatsoever', None, 'store_true', False), 'ha': ('high-availability master IP address', None, 'store', None), - 'locking-filename': ('file that will serve as a lock', None, 'store', - os.path.join(LOCKFILE_DIR, - LOCKFILE_FILENAME_TEMPLATE % (_script_name(sys.argv[0]),))), + 'locking-filename': ( + 'file that will serve as a lock', None, 'store', + os.path.join( + LOCKFILE_DIR, + LOCKFILE_FILENAME_TEMPLATE % (_script_name(sys.argv[0])), + ) + ), 'nagios-report': ('print out nagios information', None, 'store_true', False, 'n'), - 'nagios-check-filename': ('filename of where the nagios check data is stored', 'string', 'store', + 'nagios-check-filename': ('filename of where the nagios check data is stored', 'str', 'store', os.path.join(NAGIOS_CACHE_DIR, NAGIOS_CACHE_FILENAME_TEMPLATE % (_script_name(sys.argv[0]),))), 'nagios-check-interval-threshold': ('threshold of nagios checks timing out', 'int', 'store', 0), - 'nagios-user': ('user nagios runs as', 'string', 'store', 'nrpe'), + 'nagios-user': ('user nagios runs as', 'str', 'store', 'nrpe'), 'nagios-world-readable-check': ('make the nagios check data file world readable', None, 'store_true', False), } +def populate_config_parser(parser, options): + """ + Populates or updates a ConfigArgParse parser with options from a dictionary. + + Args: + parser (configargparse.ArgParser): The parser to populate or update. + options (dict): A dictionary of options where each key is the argument name and the value is a tuple + containing (help, type, action, default, optional short flag). + + Returns: + configargparse.ArgParser: The populated or updated parser. + """ + existing_args = {action.dest: action for action in parser._actions} + + for arg_name, config in options.items(): + # Extract the tuple components with fallback to None for optional elements + help_text = config[0] + type_ = config[1] if len(config) > 1 else None + action = config[2] if len(config) > 2 else None + default = config[3] if len(config) > 3 else None + short_flag = f"-{config[4]}" if len(config) > 4 else None + + # Prepare argument details + kwargs = { + "help": help_text, + "default": default, + } + if type_: + kwargs["type"] = eval(type_) + if action: + kwargs["action"] = action + + long_flag = f"--{arg_name.replace('_', '-')}" + + # Check if the argument already exists + if arg_name in existing_args: + # Update existing argument + action = existing_args[arg_name] + if "help" in kwargs: + action.help = kwargs["help"] + if "default" in kwargs: + action.default = kwargs["default"] + if "type" in kwargs: + action.type = kwargs["type"] + if "action" in kwargs: + action.action = kwargs["action"] + else: + # Add new argument + if short_flag: + parser.add_argument(short_flag, long_flag, **kwargs) + else: + parser.add_argument(long_flag, **kwargs) + + return parser + +class HAException(Exception): + pass + +class LockException(Exception): + pass + +class NagiosException(Exception): + pass + +class TimestampException(Exception): + pass + +class HAMixin: + """ + A mixin class providing methods for high-availability check. + """ + HA_MIXIN_OPTIONS = { + 'ha': ('high-availability master IP address', None, 'store', None), + } + + def ha_prologue(self): + """ + Check if we are running on the HA master + """ + if not proceed_on_ha_service(self.options.ha): + raise HAException(f"Not running on the target host {self.options.ha} in the HA setup") + + def ha_epilogue(self): + """ + Nothing to do here + """ + + +class LockMixin: + """ + A mixin class providing methods for file locking. + """ + LOCK_MIXIN_OPTIONS = { + 'disable-locking': ('do NOT protect this script by a file-based lock', None, 'store_true', False), + 'locking-filename': + ( 'file that will serve as a lock', None, 'store', + os.path.join( + LOCKFILE_DIR, + LOCKFILE_FILENAME_TEMPLATE % (_script_name(sys.argv[0]),) + ) + ), + } + + def lock_prologue(self): + """ + Take a lock on the file + """ + self.lockfile = TimestampedPidLockfile( + self.options.locking_filename, threshold=self.options.nagios_check_interval_threshold * 2 + ) + try: + self.lockfile.acquire() + except LockFailed as err: + raise LockException(f"Failed to acquire lock on {self.options.locking_filename}") from err + + def lock_epilogue(self): + """ + Release the lock on the file + """ + try: + self.lockfile.release() + except Exception as err: + raise LockException("Failed to release lock") from err + + +class TimestampMixin: + """ + A mixin class providing methods for timestamp handling. + + Requires: + - The inheriting class must provide `self.options` with attributes: + - `start_timestamp` + - `TIMESTAMP_FILE_OPTION` + """ + TIMESTAMP_MIXIN_OPTIONS = { + "start_timestamp": ("The timestamp form which to start, otherwise use the cached value", None, "store", None), + "timestamp_file": ( + "Location to cache the start timestamp", None, "store", + os.path.join( + TIMESTAMP_DIR, + TIMESTAMP_FILENAME_TEMPLATE % (_script_name(sys.argv[0]),) + ) + ), + } + + def timestamp_prologue(self): + """ + Get start time (from commandline or cache), return current time + """ + try: + (start_timestamp, current_time) = retrieve_timestamp_with_default( + getattr(self.options, TIMESTAMP_FILE_OPTION), + start_timestamp=self.options.start_timestamp, + default_timestamp=DEFAULT_TIMESTAMP, + delta=-MAX_RTT, # make the default delta explicit, current_time = now - MAX_RTT seconds + ) + except Exception as err: + raise TimestampException("Failed to retrieve timestamp") from err + + logging.info("Using start timestamp %s", start_timestamp) + logging.info("Using current time %s", current_time) + self.start_timestamp = start_timestamp + self.current_time = current_time + + def timestamp_epilogue(self): + """ + Write the new timestamp to the file + """ + try: + write_timestamp(self.options.timestamp_file, self.current_time) + except Exception as err: + raise TimestampException("Failed to write timestamp") from err + +class LogMixin: + """ + A mixin class providing methods for logging. + """ + LOG_MIXIN_OPTIONS = { + 'debug': ("Enable debug log mode", None, "store_true", False), + 'info': ("Enable info log mode", None, "store_true", False), + 'quiet': ("Enable quiet/warning log mode", None, "store_true", False), + } + + def log_prologue(self): + """ + Set the log level + """ + if self.options.quiet: + logging.basicConfig(level=logging.WARNING) + elif self.options.info: + logging.basicConfig(level=logging.INFO) + elif self.options.debug: + logging.basicConfig(level=logging.DEBUG) + else: + logging.basicConfig(level=logging.ERROR) + + def log_epilogue(self): + """ + Nothing to do here + """ + +class CLIBase: + + CLI_OPTIONS = {} + CLI_BASE_OPTIONS = { + 'dry-run': ('do not make any updates whatsoever', None, 'store_true', False), + 'configfiles': ('config file to read', 'str', 'store', None), + 'help': ('show this help message and exit', None, 'help', None), + 'ignoreconfigfiles': ('do not read any config files', None, 'store', None), + } + + def __init__(self, name=None): + self.name = name + # Set all the options + argparser = ArgParser() + argparser = populate_config_parser(argparser, self.__class__.CLI_BASE_OPTIONS) + + if isinstance(self, HAMixin): + argparser = populate_config_parser(argparser, self.__class__.HA_MIXIN_OPTIONS) + + if isinstance(self, LogMixin): + argparser = populate_config_parser(argparser, self.__class__.LOG_MIXIN_OPTIONS) + + if isinstance(self, TimestampMixin): + argparser = populate_config_parser(argparser, self.__class__.TIMESTAMP_MIXIN_OPTIONS) + + if isinstance(self, LockMixin): + argparser = populate_config_parser(argparser, self.__class__.LOCK_MIXIN_OPTIONS) + + if isinstance(self, NagiosStatusMixin): + argparser = populate_config_parser(argparser, self.__class__.NAGIOS_MIXIN_OPTIONS) + + argparser = populate_config_parser(argparser, self.get_options()) + + self.options = argparser.parse_args() + + def critical(self, msg): + if isinstance(self, NagiosStatusMixin): + self.nagios_epilogue(NAGIOS_CRITICAL, msg) + else: + logging.error(msg) + sys.exit(1) + + def get_options(self): + # Gather options from the current class and its hierarchy + options = {} + for cls in reversed(self.__class__.mro()): + if hasattr(cls, "CLI_OPTIONS"): + options.update(cls.CLI_OPTIONS) + return options + + def final(self): + """ + Run as finally block in main + """ + + def do(self, dryrun=False): # pylint: disable=unused-argument + """ + Method to add actual work to do. + The method is executed in main method in a generic try/except/finally block + You can return something, that, when it evals to true, is considered fatal + """ + logging.error("`do` method not implemented") + raise NotImplementedError("Not implemented") + return "Not Implemented" + + def main(self): + """ + The main method. + """ + + msg = self.name + if msg and self.options.dry_run: + msg += " (dry-run)" + logging.info("%s started.", msg) + + # Call mixin prologue methods + # We must first call the Nagios prologue, as it may exit the script immedoately when a report is asked + try: + if isinstance(self, NagiosStatusMixin): + self.nagios_prologue() + except NagiosException as err: + self.critical(str(err)) + + try: + if isinstance(self, LogMixin): + self.log_prologue() + + if isinstance(self, LockMixin): + self.lock_prologue() + + if isinstance(self, TimestampMixin): + self.timestamp_prologue() + + if isinstance(self, HAMixin): + self.ha_prologue() + + except (LockException, HAException, TimestampException) as err: + self.critical(str(err)) + + + try: + self.do(self.options.dry_run) + except Exception as err: + self.critical(f"Script failed in an unrecoverable way: {err}") + finally: + self.final() + # Call epilogue_unlock if LockMixin is inherited + if isinstance(self, LockMixin): + self.lock_epilogue() + + #self.post(errors) + + # Call mixin epilogue methods + if isinstance(self, TimestampMixin): + self.timestamp_epilogue() + + if isinstance(self, LockMixin): + self.lock_epilogue() + + if isinstance(self, NagiosStatusMixin): + self.nagios_epilogue() + + if isinstance(self, LogMixin): + self.log_epilogue() + + +class FullCLIBase(HAMixin, LockMixin, TimestampMixin, LogMixin, NagiosStatusMixin, CLIBase): + """ + A class for command line scripts with all mixins, i.e., what you usually want. + """ + + def _merge_options(options): """Merge the given set of options with the default options, updating default values where needed. @@ -102,6 +439,13 @@ def _merge_options(options): return opts +class CLI(FullCLIBase): + + def __init__(self, name=None, default_options=None): # pylint: disable=unused-argument + super().__init__(name) + + +@deprecated_class("Base your scripts on the CLIBase class instead") class ExtendedSimpleOption(SimpleOption): """ Extends the SimpleOption class to allow other checks to occur at script prologue and epilogue. @@ -227,7 +571,8 @@ def critical_exception_handler(self, tp, value, traceback): self.critical(message) -class CLI: +@deprecated_class("Base your scripts on the CLIBase class instead") +class OldCLI: """ Base class to implement cli tools that require timestamps, nagios checks, etc. """ @@ -408,30 +753,32 @@ def main(self): self.fulloptions.epilogue(f"{msg} complete", self.thresholds) +@deprecated_class("Base your scripts on the CLIBase class instead") class NrpeCLI(CLI): def __init__(self, name=None, default_options=None): super().__init__(name=name, default_options=default_options) - def ok(self, msg): - """ - Convenience method that exists with nagios OK exitcode - """ - exit_from_errorcode(0, msg) - - def warning(self, msg): - """ - Convenience method exists with nagios warning exitcode - """ - exit_from_errorcode(1, msg) - - def critical(self, msg): - """ - Convenience method that exists with nagios critical exitcode - """ - exit_from_errorcode(2, msg) - - def unknown(self, msg): - """ - Convenience method that exists with nagios unknown exitcode - """ - exit_from_errorcode(3, msg) +# def ok(self, msg): +# """ +# Convenience method that exists with nagios OK exitcode +# """ +# exit_from_errorcode(0, msg) +# +# def warning(self, msg): +# """ +# Convenience method exists with nagios warning exitcode +# """ +# exit_from_errorcode(1, msg) +# +# def critical(self, msg): +# """ +# Convenience method that exists with nagios critical exitcode +# """ +# exit_from_errorcode(2, msg) +# +# def unknown(self, msg): +# """ +# Convenience method that exists with nagios unknown exitcode +# """ +# exit_from_errorcode(3, msg) +# diff --git a/lib/vsc/utils/timestamp.py b/lib/vsc/utils/timestamp.py index 5eb6e42..27ba4ea 100644 --- a/lib/vsc/utils/timestamp.py +++ b/lib/vsc/utils/timestamp.py @@ -40,6 +40,8 @@ DEFAULT_TIMESTAMP = "20140101000000Z" +TIMESTAMP_DIR = "/var/cache" +TIMESTAMP_FILENAME_TEMPLATE = "%s.timestamp" def convert_to_datetime(timestamp=None): """ diff --git a/setup.py b/setup.py index e5b5d4b..0614ce8 100755 --- a/setup.py +++ b/setup.py @@ -38,10 +38,11 @@ 'lockfile >= 0.9.1', 'netifaces', 'jsonpickle', + 'configargparse', ] PACKAGE = { - 'version': '2.2.9', + 'version': '2.3.0', 'author': [ag, sdw], 'maintainer': [ag, sdw], 'excluded_pkgs_rpm': ['vsc', 'vsc.utils'], # vsc is default, vsc.utils is provided by vsc-base diff --git a/test/script_tools.py b/test/script_tools.py index 7bb19ee..2120908 100644 --- a/test/script_tools.py +++ b/test/script_tools.py @@ -30,15 +30,22 @@ """ import logging +import os import random import sys import tempfile import getpass import mock +from mock import MagicMock, patch from vsc.install.testing import TestCase -from vsc.utils.nagios import NAGIOS_EXIT_WARNING -from vsc.utils.script_tools import ExtendedSimpleOption, DEFAULT_OPTIONS, NrpeCLI, CLI + +from vsc.install.testing import TestCase +from vsc.utils.nagios import NAGIOS_EXIT_WARNING, NagiosStatusMixin +from vsc.utils.script_tools import ( + ExtendedSimpleOption, DEFAULT_OPTIONS, NrpeCLI, CLI, OldCLI, + CLIBase, LockMixin, HAMixin, TimestampMixin, LogMixin) + class TestExtendedSimpleOption(TestCase): """ @@ -96,16 +103,7 @@ def test_threshold_custom_setting(self, mock_proceed, _, mock_lockfile): magic = mock.MagicMock(name='magic') -class MyNrpeCLI(NrpeCLI): - TIMESTAMP_MANDATORY = False # mainly for testing, you really should need this in production - CLI_OPTIONS = { - 'magic': ('some magic', None, 'store', 'magicdef'), - } - def do(self, _): - return magic.go() - - -class MyCLI(CLI): +class MyOldCLI(OldCLI): TIMESTAMP_MANDATORY = False # mainly for testing, you really should need this in production TESTFILE = tempfile.mkstemp()[1] TESTFILE2 = tempfile.mkstemp()[1] @@ -119,16 +117,18 @@ class MyCLI(CLI): def do(self, _): return magic.go() -class TestNrpeCLI(TestCase): - """Tests for the CLI base class""" +class TestOldCLI(TestCase): + """Tests for the OldCLI base class""" @mock.patch('vsc.utils.script_tools.ExtendedSimpleOption.prologue') def test_opts(self, _): sys.argv = ['abc'] - ms = MyNrpeCLI() + ms = MyOldCLI() logging.debug("options %s %s %s", ms.options, dir(ms.options), vars(ms.options)) + + extsimpopts = { 'configfiles': None, 'debug': False, @@ -138,11 +138,11 @@ def test_opts(self, _): 'help': None, 'ignoreconfigfiles': None, 'info': False, - 'locking_filename': '/var/lock/setup.lock', - 'nagios_check_filename': '/var/cache/setup.nagios.json.gz', + 'locking_filename': ms.TESTFILE2, + 'nagios_check_filename': ms.TESTFILE, 'nagios_check_interval_threshold': 0, 'nagios_report': False, - 'nagios_user': 'nrpe', + 'nagios_user': getpass.getuser(), 'nagios_world_readable_check': False, 'quiet': False, } @@ -159,32 +159,118 @@ def test_opts(self, _): 'magic': 'magicdef', } myopts.update(extsimpopts) - ms = MyNrpeCLI(default_options={}) + ms = MyOldCLI(default_options={}) logging.debug("options wo default sync options %s", ms.options) self.assertEqual(ms.options.__dict__, myopts) + @mock.patch('vsc.utils.script_tools.lock_or_bork') + @mock.patch('vsc.utils.script_tools.release_or_bork') + def test_exit(self, locklock, releaselock): + + cli = MyOldCLI() + + fake_exit = mock.MagicMock() + with mock.patch('sys.exit', fake_exit): + cli.warning("be warned") + fake_exit.assert_called_with(1) + + +class TestNrpeCLI(TestCase): + """Tests for the NrpeCLI base class""" + + def setUp(self): + super().setUp() + + sys.argv = ["abc"] + class MyNrpeCLI(NrpeCLI): + TIMESTAMP_MANDATORY = False # mainly for testing, you really should need this in production + CLI_OPTIONS = { + 'magic': ('some magic', None, 'store', 'magicdef'), + } + + def do(self, dry_run): + return magic.go() + + self.cli = MyNrpeCLI(name="abc") + + @mock.patch('vsc.utils.script_tools.ExtendedSimpleOption.prologue') + def test_opts(self, _): + + logging.debug("options %s %s %s", self.cli.options, dir(self.cli.options), vars(self.cli.options)) + + extsimpopts = { + 'configfiles': None, + 'debug': False, + 'disable_locking': False, + 'dry_run': False, + 'ha': None, + 'help': None, + 'ignoreconfigfiles': None, + 'info': False, + 'locking_filename': '/var/lock/setup.lock', + 'nagios_check_filename': '/var/cache/setup.nagios.json.gz', + 'nagios_check_interval_threshold': 0, + 'nagios_report': False, + 'nagios_user': 'nrpe', + 'nagios_world_readable_check': False, + 'quiet': False, + } + + myopts = { + 'magic': 'magicdef', + 'start_timestamp': None, + 'timestamp_file': '/var/cache/setup.timestamp', + } + myopts.update(extsimpopts) + self.assertEqual(self.cli.options.__dict__, myopts) + @mock.patch('vsc.utils.script_tools.ExtendedSimpleOption.prologue') def test_exit(self, _): - cli = MyNrpeCLI() + self.original_argv = sys.argv + sys.argv = ["somecli"] fake_exit = mock.MagicMock() with mock.patch('vsc.utils.nagios._real_exit', fake_exit): - cli.warning("be warned") + self.cli.warning("be warned") fake_exit.assert_called_with("be warned", NAGIOS_EXIT_WARNING) class TestCLI(TestCase): """Tests for the CLI base class""" + def setUp(self): + super().setUp() + + sys.argv = ["abc"] + + class MyCLI(CLI): + TIMESTAMP_MANDATORY = False # mainly for testing, you really should need this in production + LOCKING_TESTFILE = tempfile.mkstemp()[1] + NAGIOS_TESTFILE = tempfile.mkstemp()[1] + + CLI_OPTIONS = { + 'magic': ('some magic', None, 'store', 'magicdef'), + 'nagios_check_filename': ('bla', None, 'store', NAGIOS_TESTFILE), + 'locking_filename': ('test', None, 'store', LOCKING_TESTFILE), + 'nagios_user': ('user nagios runs as', 'str', 'store', getpass.getuser()), + } + + def do(self, dry_run): + return magic.go() + + self.ms = MyCLI(name="abc") + @mock.patch('vsc.utils.script_tools.ExtendedSimpleOption.prologue') def test_opts(self, _): - sys.argv = ['abc'] - ms = MyCLI() - - logging.debug("options %s %s %s", ms.options, dir(ms.options), vars(ms.options)) + self.assertTrue(isinstance(self.ms, LogMixin)) + self.assertTrue(isinstance(self.ms, HAMixin)) + self.assertTrue(isinstance(self.ms, LockMixin)) + self.assertTrue(isinstance(self.ms, NagiosStatusMixin)) + self.assertTrue(isinstance(self.ms, TimestampMixin)) + logging.debug("options %s %s %s", self.ms.options, dir(self.ms.options), vars(self.ms.options)) extsimpopts = { 'configfiles': None, @@ -195,8 +281,8 @@ def test_opts(self, _): 'help': None, 'ignoreconfigfiles': None, 'info': False, - 'locking_filename': ms.TESTFILE2, - 'nagios_check_filename': ms.TESTFILE, + 'locking_filename': self.ms.LOCKING_TESTFILE, + 'nagios_check_filename': self.ms.NAGIOS_TESTFILE, 'nagios_check_interval_threshold': 0, 'nagios_report': False, 'nagios_user': getpass.getuser(), @@ -207,26 +293,205 @@ def test_opts(self, _): myopts = { 'magic': 'magicdef', 'start_timestamp': None, - 'timestamp_file': '/var/cache/abc.timestamp', + 'timestamp_file': '/var/cache/setup.timestamp', } myopts.update(extsimpopts) - self.assertEqual(ms.options.__dict__, myopts) + self.assertEqual(self.ms.options.__dict__, myopts) + + @mock.patch('vsc.utils.script_tools.lock_or_bork') + @mock.patch('vsc.utils.script_tools.release_or_bork') + def test_exit(self, locklock, releaselock): + + fake_exit = mock.MagicMock() + with mock.patch('vsc.utils.script_tools.sys.exit', fake_exit): + self.ms.warning("be warned") + fake_exit.assert_called_with(1) + + +class TestBaseNoTimestamp(TestCase): + + def setUp(self): + super().setUp() + + sys.argv = ["abc"] + + class NoTimeStampCLI(HAMixin, LockMixin, LogMixin, NagiosStatusMixin, CLIBase): + CLI_OPTIONS = { + 'magic': ('magicdef', None, 'store', 'magicdef'), + } + def do(self, dry_run): + return magic.go() + + self.ms = NoTimeStampCLI(name="abc") + + if isinstance(self.ms, LogMixin): + logging.warning("LogMixin is part of this instance") + else: + logging.warning("LogMixin is not part of this instance") + + + if isinstance(self.ms, TimestampMixin): + logging.warning("TimestampMixin is part of this instance") + else: + logging.warning("TimestampMixin is not part of this instance") + + def test_without_timestamp_mixin(self): + + self.assertTrue(isinstance(self.ms, LogMixin)) + self.assertTrue(isinstance(self.ms, HAMixin)) + self.assertTrue(isinstance(self.ms, LockMixin)) + self.assertTrue(isinstance(self.ms, NagiosStatusMixin)) + + extsimpopts = { + 'configfiles': None, + 'debug': False, + 'disable_locking': False, + 'dry_run': False, + 'ha': None, + 'help': None, + 'ignoreconfigfiles': None, + 'info': False, + 'locking_filename': '/var/lock/setup.lock', + 'nagios_check_filename': '/var/cache/setup.nagios.json.gz', + 'nagios_check_interval_threshold': 0, + 'nagios_report': False, + 'nagios_user': 'nrpe', + 'nagios_world_readable_check': False, + 'quiet': False, + } myopts = { 'magic': 'magicdef', } myopts.update(extsimpopts) - ms = MyCLI(default_options={}) - logging.debug("options wo default sync options %s", ms.options) - self.assertEqual(ms.options.__dict__, myopts) + logging.warning("options wo default sync options %s", self.ms.options) + self.assertEqual(self.ms.options.__dict__, myopts) @mock.patch('vsc.utils.script_tools.lock_or_bork') @mock.patch('vsc.utils.script_tools.release_or_bork') def test_exit(self, locklock, releaselock): - cli = MyCLI() - fake_exit = mock.MagicMock() - with mock.patch('sys.exit', fake_exit): - cli.warning("be warned") + with mock.patch('vsc.utils.script_tools.sys.exit', fake_exit): + self.ms.warning("be warned") fake_exit.assert_called_with(1) + + +class TestCLIBase(TestCase): + def setUp(self): + """ + Set up mock instances and common configurations. + """ + super().setUp() + + # Redirect stdout/stderr to prevent TestCase conflicts + self.orig_sys_stdout = sys.stdout + self.orig_sys_stderr = sys.stderr + sys.stdout = MagicMock() + sys.stderr = MagicMock() + + self.original_argv = sys.argv + sys.argv = ["somecli"] + + # Create a dummy subclass of CLIBase for testing + class TestCLI(CLIBase): + CLI_OPTIONS = { + 'test-option': ('Test description', None, 'store_true', False), + } + + def do(self, dryrun=False): + if dryrun: + return ["Dry run mode active."] + return [] + + self.cli = TestCLI(name="Test CLI") + + @patch('vsc.utils.script_tools.ArgParser.parse_args', return_value=MagicMock(dry_run=False)) + @patch('vsc.utils.script_tools.logging.info') + def test_main_basic(self, mock_logging_info, mock_parse_args): + """ + Test the main method without any mixins. + """ + self.cli.main() + self.assertEqual(self.cli.name, "Test CLI") + #mock_logging_info.assert_any_call("Test CLI started.") + + def test_get_options(self): + """ + Test the get_options method aggregates CLI options. + """ + options = self.cli.get_options() + self.assertIn('test-option', options) + + @patch('vsc.utils.script_tools.logging.error') + @patch('vsc.utils.script_tools.sys.exit') + def test_critical_no_nagios(self, mock_sys_exit, mock_logging_error): + """ + Test critical method behavior without NagiosStatusMixin. + """ + self.cli.critical("Critical error") + mock_logging_error.assert_called_with("Critical error") + mock_sys_exit.assert_called_with(1) + + @patch('vsc.utils.script_tools.logging.info') + @patch('vsc.utils.script_tools.ArgParser.parse_args', return_value=MagicMock(dry_run=False)) + def test_main_with_dry_run(self, mock_parse_args, mock_logging_info): + """ + Test the main method in dry-run mode. + """ + self.cli.main() + #mock_logging_info.assert_any_call("Test CLI (dry-run) started.") + + @patch('vsc.utils.script_tools.logging.info') + @patch('vsc.utils.script_tools.ArgParser.parse_args', return_value=MagicMock(dry_run=False)) + def test_main_with_mixins(self, mock_parse_args, mock_logging_info): + """ + Test the main method with mixins applied. + """ + # Extend TestCLI with mixins + class TestCLIMixins(CLIBase, NagiosStatusMixin, LockMixin): + CLI_OPTIONS = {'test-mixin-option': ('Mixin test description', None, 'store_true', False)} + + def do(self, dryrun=False): + return [] + + def nagios_prologue(self): + self.nagios_prologue_called = True + + def lock_prologue(self): + self.lock_prologue_called = True + + def nagios_epilogue(self): + self.nagios_epilogue_called = True + + def lock_epilogue(self): + self.lock_epilogue_called = True + + cli = TestCLIMixins(name="Test CLI with Mixins") + cli.nagios_prologue_called = False + cli.lock_prologue_called = False + cli.nagios_epilogue_called = False + cli.lock_epilogue_called = False + + cli.main() + + self.assertTrue(cli.nagios_prologue_called) + self.assertTrue(cli.lock_prologue_called) + self.assertTrue(cli.nagios_epilogue_called) + self.assertTrue(cli.lock_epilogue_called) + + @patch('vsc.utils.script_tools.logging.error') + @patch('vsc.utils.script_tools.sys.exit') + def test_main_critical_exception(self, mock_sys_exit, mock_logging_error): + """ + Test the main method when a critical exception is raised. + """ + class FailingCLI(CLIBase): + def do(self, dryrun=False): + raise Exception("Unrecoverable error!") + + cli = FailingCLI("Failing CLI") + + cli.main() + mock_logging_error.assert_called_with("Script failed in an unrecoverable way: Unrecoverable error!") + mock_sys_exit.assert_called_with(1)