diff --git a/.github/workflows/twister_tests.yml b/.github/workflows/twister_tests.yml index ed4bee2a18cfb..05ea87dfd4258 100644 --- a/.github/workflows/twister_tests.yml +++ b/.github/workflows/twister_tests.yml @@ -49,10 +49,18 @@ jobs: - name: install-packages run: | pip3 install -r scripts/requirements-base.txt -r scripts/requirements-build-test.txt - - name: Run pytest + - name: Run pytest for twisterlib env: ZEPHYR_BASE: ./ ZEPHYR_TOOLCHAIN_VARIANT: zephyr run: | echo "Run twister tests" PYTHONPATH=./scripts/tests pytest ./scripts/tests/twister + - name: Run pytest for pytest-twister-harness + env: + ZEPHYR_BASE: ./ + ZEPHYR_TOOLCHAIN_VARIANT: zephyr + PYTHONPATH: ./scripts/pylib/pytest-twister-harness/src:${PYTHONPATH} + run: | + echo "Run twister tests" + pytest ./scripts/pylib/pytest-twister-harness/tests diff --git a/doc/develop/test/index.rst b/doc/develop/test/index.rst index 8d4b7aacde194..c8c67d20ef64e 100644 --- a/doc/develop/test/index.rst +++ b/doc/develop/test/index.rst @@ -8,6 +8,7 @@ Testing ztest twister + pytest coverage BabbleSim ztest_deprecated diff --git a/doc/develop/test/pytest.rst b/doc/develop/test/pytest.rst new file mode 100644 index 0000000000000..453616b9e9149 --- /dev/null +++ b/doc/develop/test/pytest.rst @@ -0,0 +1,84 @@ +.. integration-with-pytest: + +Integration with pytest test framework +###################################### + +*Please mind that integration of twister with pytest is still work in progress. Not every platform +type is supported in pytest (yet). If you find any issue with the integration or have an idea for +an improvement, please, let us know about it and open a GitHub issue/enhancement.* + +Introduction +************ + +Pytest is a python framework that *“makes it easy to write small, readable tests, and can scale to +support complex functional testing for applications and libraries”* (``_). +Python is known for its free libraries and ease of using it for scripting. In addition, pytest +utilizes the concept of plugins and fixtures, increasing its expendability and reusability. +A pytest plugin `pytest-twister-harness` was introduced to provide an integration between pytest +and twister, allowing Zephyr’s community to utilize pytest functionality with keeping twister as +the main framework. + +Integration with twister +************************ + +By default, there is nothing to be done to enable pytest support in twister. The plugin is +developed as a part of Zephyr’s tree. To enable install-less operation, twister first extends +``PYTHONPATH`` with path to this plugin, and then during pytest call, it appends the command with +``-p twister_harness.plugin`` argument. If one prefers to use the installed version of the plugin, +they must add ``--allow-installed-plugin`` flag to twister’s call. + +Pytest-based test suites are discovered the same way as other twister tests, i.e., by a presence +of testcase/sample.yaml. Inside, a keyword ``harness`` tells twister how to handle a given test. +In the case of ``harness: pytest``, most of twister workflow (test suites discovery, +parallelization, building and reporting) remains the same as for other harnesses. The change +happens during the execution step. The below picture presents a simplified overview of the +integration. + +.. figure:: twister_and_pytest.svg + :figclass: align-center + + +If ``harness: pytest`` is used, twister delegates the test execution to pytest, by calling it as +a subprocess. Required parameters (such as build directory, device to be used, etc.) are passed +through a CLI command. When pytest is done, twister looks for a pytest report (results.xml) and +sets the test result accordingly. + +How to create a pytest test +*************************** + +An example of a pytest test is given at :zephyr_file:`samples/subsys/testsuite/pytest/shell/pytest/test_shell.py`. +Twister calls pytest for each configuration from the .yaml file which uses ``harness: pytest``. +By default, it points to ``pytest`` directory, located next to a directory with binary sources. +A keyword ``pytest_root`` placed under ``harness_config`` section can be used to point to another +location. + +Pytest scans the given folder looking for tests, following its default +`discovery rules `_ +One can also pass some extra arguments to the pytest from yaml file using ``pytest_args`` keyword +under ``harness_config``, e.g.: ``pytest_args: [‘-k=test_method’, ‘--log-level=DEBUG’]``. + +Two imports are important to include in .py sources: + +.. code-block:: python + + import pytest # noqa # pylint: disable=unused-import + from pytest_twister_harness.device.device_abstract import DeviceAbstract + +The first enables pytest-twister-harness plugin indirectly, as it is added with pytest. +It also gives access to ``dut`` fixture. The second is important for type checking and enabling +IDE hints for duts. The ``dut`` fixture is the core of pytest harness plugin. When used as an +argument of a test function it gives access to a DeviceAbstract type object. The fixture yields a +device prepared according to the requested type (native posix, qemu, hardware, etc.). All types of +devices share the same API. This allows for writing tests which are device-type-agnostic. + + +Limitations +*********** + +* Twister options like ``--pre-script``, ``--post-flash-script`` and ``--post-script`` are not handled by + pytest plugin during flashing/running application. +* The whole pytest call is reported as one test in the final twister report (xml or json). +* Device adapters in pytest plugin provide `iter_stdout` method to read from devices. In some + cases, it is not the most convenient way, and it will be considered how to improve this + (for example replace it with a simple read function with a given byte size and timeout arguments). +* Not every platform type is supported in the plugin (yet). diff --git a/doc/develop/test/twister_and_pytest.svg b/doc/develop/test/twister_and_pytest.svg new file mode 100644 index 0000000000000..0bf0c2895f5ef --- /dev/null +++ b/doc/develop/test/twister_and_pytest.svg @@ -0,0 +1,4 @@ + + + +
Test execution
Test execution
Twister
Twister
Collecting tests (basing on yaml files)
Collecting tests (ba...
Generation test configuration (test scenario + platform)
Generation test conf...
Applying filtration
Applying filtration
Spawn workers (parallelization)
Spawn workers (paral...
Building
Building
Is pytest test?
Is pytest test?
Run pytest with pytest-twister-harness plugin

Execute following actions in any order as many as needed:
1. Flash device(s)
2. Send something to device(s)
3. Parse and verify device(s) outputs
4. Perform any additional actions available from the Python level (like run MCUmgr, run external program/server/tool) 
Run pytest with pytest-twister-harness plug...
yes
yes
Execute test directly in Twister with fixed ("classic") order:

1. Flash device
2. Verify output
Execute test directly in Twister with fi...
no
no
Twister
Twister
Collect test results
Collect test results
Generate reports
Generate reports
Text is not SVG - cannot display
\ No newline at end of file diff --git a/samples/subsys/testsuite/pytest/CMakeLists.txt b/samples/subsys/testsuite/pytest/basic/CMakeLists.txt similarity index 100% rename from samples/subsys/testsuite/pytest/CMakeLists.txt rename to samples/subsys/testsuite/pytest/basic/CMakeLists.txt diff --git a/samples/subsys/testsuite/pytest/prj.conf b/samples/subsys/testsuite/pytest/basic/prj.conf similarity index 100% rename from samples/subsys/testsuite/pytest/prj.conf rename to samples/subsys/testsuite/pytest/basic/prj.conf diff --git a/samples/subsys/testsuite/pytest/pytest/conftest.py b/samples/subsys/testsuite/pytest/basic/pytest/conftest.py similarity index 100% rename from samples/subsys/testsuite/pytest/pytest/conftest.py rename to samples/subsys/testsuite/pytest/basic/pytest/conftest.py diff --git a/samples/subsys/testsuite/pytest/pytest/test_sample.py b/samples/subsys/testsuite/pytest/basic/pytest/test_sample.py similarity index 100% rename from samples/subsys/testsuite/pytest/pytest/test_sample.py rename to samples/subsys/testsuite/pytest/basic/pytest/test_sample.py diff --git a/samples/subsys/testsuite/pytest/src/main.c b/samples/subsys/testsuite/pytest/basic/src/main.c similarity index 100% rename from samples/subsys/testsuite/pytest/src/main.c rename to samples/subsys/testsuite/pytest/basic/src/main.c diff --git a/samples/subsys/testsuite/pytest/testcase.yaml b/samples/subsys/testsuite/pytest/basic/testcase.yaml similarity index 74% rename from samples/subsys/testsuite/pytest/testcase.yaml rename to samples/subsys/testsuite/pytest/basic/testcase.yaml index 95159d7d2ff46..55f2f11923be2 100644 --- a/samples/subsys/testsuite/pytest/testcase.yaml +++ b/samples/subsys/testsuite/pytest/basic/testcase.yaml @@ -3,7 +3,7 @@ tests: platform_allow: native_posix harness: pytest harness_config: - pytest_args: ["--custom-pytest-arg", "foo"] + pytest_args: ["--custom-pytest-arg", "foo", "--cmdopt", "."] tags: - testing - pytest diff --git a/samples/subsys/testsuite/pytest/shell/CMakeLists.txt b/samples/subsys/testsuite/pytest/shell/CMakeLists.txt new file mode 100644 index 0000000000000..a8fae2b99315a --- /dev/null +++ b/samples/subsys/testsuite/pytest/shell/CMakeLists.txt @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20.0) +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) +project(shell) + +FILE(GLOB app_sources src/*.c) +target_sources(app PRIVATE ${app_sources}) diff --git a/samples/subsys/testsuite/pytest/shell/prj.conf b/samples/subsys/testsuite/pytest/shell/prj.conf new file mode 100644 index 0000000000000..377883810656c --- /dev/null +++ b/samples/subsys/testsuite/pytest/shell/prj.conf @@ -0,0 +1,5 @@ +CONFIG_PRINTK=y +CONFIG_SHELL=y +CONFIG_LOG=y +CONFIG_SHELL_BACKEND_SERIAL=y +CONFIG_KERNEL_SHELL=y diff --git a/samples/subsys/testsuite/pytest/shell/pytest/test_shell.py b/samples/subsys/testsuite/pytest/shell/pytest/test_shell.py new file mode 100755 index 0000000000000..bb0a6dce349bd --- /dev/null +++ b/samples/subsys/testsuite/pytest/shell/pytest/test_shell.py @@ -0,0 +1,36 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +import time +import logging +import pytest # noqa # pylint: disable=unused-import + +from twister_harness.device.device_abstract import DeviceAbstract + +logger = logging.getLogger(__name__) + + +def wait_for_message(iter_stdout, message, timeout=60): + time_started = time.time() + for line in iter_stdout: + if line: + logger.debug("#: " + line) + if message in line: + return True + if time.time() > time_started + timeout: + return False + + +def test_shell_print_help(dut: DeviceAbstract): + time.sleep(1) # wait for application initialization on DUT + + dut.write(b'help\n') + assert wait_for_message(dut.iter_stdout, "see all available commands") + + +def test_shell_print_version(dut: DeviceAbstract): + time.sleep(1) # wait for application initialization on DUT + + dut.write(b'kernel version\n') + assert wait_for_message(dut.iter_stdout, "Zephyr version") diff --git a/samples/subsys/testsuite/pytest/shell/src/main.c b/samples/subsys/testsuite/pytest/shell/src/main.c new file mode 100644 index 0000000000000..983948975822b --- /dev/null +++ b/samples/subsys/testsuite/pytest/shell/src/main.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2023 Nordic Semiconductor ASA + * + * SPDX-License-Identifier: Apache-2.0 + */ + +int main(void) +{ + /* Shell application source code is injected by applied Kconfg SHELL + * options, no more "extra" functionalities are required for exemplary + * pytest test. + */ + return 0; +} diff --git a/samples/subsys/testsuite/pytest/shell/testcase.yaml b/samples/subsys/testsuite/pytest/shell/testcase.yaml new file mode 100644 index 0000000000000..bd9c57bed02a5 --- /dev/null +++ b/samples/subsys/testsuite/pytest/shell/testcase.yaml @@ -0,0 +1,14 @@ +tests: + sample.pytest.shell: + filter: CONFIG_SERIAL and dt_chosen_enabled("zephyr,shell-uart") + min_ram: 40 + harness: pytest + extra_configs: + - CONFIG_NATIVE_UART_0_ON_STDINOUT=y + integration_platforms: + - native_posix + - qemu_cortex_m3 + tags: + - testing + - pytest + - shell diff --git a/scripts/pylib/pytest-twister-harness/.gitignore b/scripts/pylib/pytest-twister-harness/.gitignore new file mode 100644 index 0000000000000..3372e28fffa89 --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/.gitignore @@ -0,0 +1,62 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Pycharm +.idea/ + +# VSCode +.vscode/ diff --git a/scripts/pylib/pytest-twister-harness/README.rst b/scripts/pylib/pytest-twister-harness/README.rst new file mode 100644 index 0000000000000..814282fc57b21 --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/README.rst @@ -0,0 +1,47 @@ +============== +Pytest Twister harness +============== + +Installation +------------ + +If you plan to use this plugin with Twister, then you don't need to install it +separately by pip. When Twister uses this plugin for pytest tests, it updates +`PYTHONPATH` variable, and then extends pytest command by +`-p twister_harness.plugin` argument. + + +Usage +----- + +Run exemplary test shell application by Twister: + +.. code-block:: sh + + cd ${ZEPHYR_BASE} + + # native_posix & QEMU + ./scripts/twister -p native_posix -p qemu_x86 -T samples/subsys/testsuite/pytest/shell + + # hardware + ./scripts/twister -p nrf52840dk_nrf52840 --device-testing --device-serial /dev/ttyACM0 -T samples/subsys/testsuite/pytest/shell + +or build shell application by west and call pytest directly: + +.. code-block:: sh + + export PYTHONPATH=${ZEPHYR_BASE}/scripts/pylib/pytest-twister-harness/src:${PYTHONPATH} + + cd ${ZEPHYR_BASE}/samples/subsys/testsuite/pytest/shell + + # native_posix + west build -p -b native_posix -- -DCONFIG_NATIVE_UART_0_ON_STDINOUT=y + pytest --twister-harness --device-type=native --build-dir=build -p twister_harness.plugin + + # QEMU + west build -p -b qemu_x86 -- -DQEMU_PIPE=qemu-fifo + pytest --twister-harness --device-type=qemu --build-dir=build -p twister_harness.plugin + + # hardware + west build -p -b nrf52840dk_nrf52840 + pytest --twister-harness --device-type=hardware --device-serial=/dev/ttyACM0 --build-dir=build -p twister_harness.plugin diff --git a/scripts/pylib/pytest-twister-harness/pyproject.toml b/scripts/pylib/pytest-twister-harness/pyproject.toml new file mode 100644 index 0000000000000..70119a11d0965 --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/pyproject.toml @@ -0,0 +1,6 @@ +[build-system] +build-backend = "setuptools.build_meta" +requires = [ + "setuptools >= 48.0.0", + "wheel", +] diff --git a/scripts/pylib/pytest-twister-harness/setup.cfg b/scripts/pylib/pytest-twister-harness/setup.cfg new file mode 100644 index 0000000000000..35a8d937e23dd --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/setup.cfg @@ -0,0 +1,33 @@ +[metadata] +name = pytest-twister-harness +version = attr: twister_harness.__version__ +description = Plugin for pytest to run tests which require interaction with real and simulated devices +long_description = file: README.rst +python_requires = ~=3.8 +classifiers = + Development Status :: 3 - Alpha + Intended Audience :: Developers + Topic :: Software Development :: Embedded Systems + Topic :: Software Development :: Quality Assurance + Operating System :: Posix :: Linux + Operating System :: Microsoft :: Windows + Programming Language :: Python :: 3.8 + Programming Language :: Python :: 3.9 + Programming Language :: Python :: 3.10 + Programming Language :: Python :: 3.11 + +[options] +packages = find: +package_dir = + =src +install_requires = + psutil + pyserial + pytest>=7.0.0 + +[options.packages.find] +where = src + +[options.entry_points] +pytest11 = + twister_harness = twister_harness.plugin diff --git a/scripts/pylib/pytest-twister-harness/setup.py b/scripts/pylib/pytest-twister-harness/setup.py new file mode 100644 index 0000000000000..fa79863f48f3c --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/setup.py @@ -0,0 +1,7 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +import setuptools + +setuptools.setup() diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/__init__.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/__init__.py new file mode 100644 index 0000000000000..356fb95b53ecc --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/__init__.py @@ -0,0 +1,5 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +__version__ = '0.0.1' diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/constants.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/constants.py new file mode 100644 index 0000000000000..e1e44b389f099 --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/constants.py @@ -0,0 +1,8 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +QEMU_FIFO_FILE_NAME: str = 'qemu-fifo' +END_OF_DATA = object() #: used for indicating that there will be no more data in queue diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/__init__.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/__init__.py new file mode 100644 index 0000000000000..235c666e965ce --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/__init__.py @@ -0,0 +1,3 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/device_abstract.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/device_abstract.py new file mode 100644 index 0000000000000..13db01d48c684 --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/device_abstract.py @@ -0,0 +1,94 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import abc +import logging +import os +from typing import Generator + +from twister_harness.log_files.log_file import LogFile, NullLogFile +from twister_harness.twister_harness_config import DeviceConfig + +logger = logging.getLogger(__name__) + + +class DeviceAbstract(abc.ABC): + """Class defines an interface for all devices.""" + + def __init__(self, device_config: DeviceConfig, **kwargs) -> None: + """ + :param device_config: device configuration + """ + self.device_config: DeviceConfig = device_config + self.handler_log_file: LogFile = NullLogFile.create() + self.device_log_file: LogFile = NullLogFile.create() + + def __repr__(self) -> str: + return f'{self.__class__.__name__}()' + + @property + def env(self) -> dict[str, str]: + env = os.environ.copy() + return env + + @abc.abstractmethod + def connect(self, timeout: float = 1) -> None: + """Connect with the device (e.g. via UART)""" + + @abc.abstractmethod + def disconnect(self) -> None: + """Close a connection with the device""" + + @abc.abstractmethod + def generate_command(self) -> None: + """ + Generate command which will be used during flashing or running device. + """ + + def flash_and_run(self, timeout: float = 60.0) -> None: + """ + Flash and run application on a device. + + :param timeout: time out in seconds + """ + + @property + @abc.abstractmethod + def iter_stdout(self) -> Generator[str, None, None]: + """Iterate stdout from a device.""" + + @abc.abstractmethod + def write(self, data: bytes) -> None: + """Write data bytes to device""" + + @abc.abstractmethod + def initialize_log_files(self): + """ + Initialize file to store logs. + """ + + def stop(self) -> None: + """Stop device.""" + + # @abc.abstractmethod + # def read(self, size=1) -> None: + # """Read size bytes from device""" + + # def read_until(self, expected, size=None): + # """Read until an expected bytes sequence is found""" + # lenterm = len(expected) + # line = bytearray() + # while True: + # c = self.read(1) + # if c: + # line += c + # if line[-lenterm:] == expected: + # break + # if size is not None and len(line) >= size: + # break + # else: + # break + # return bytes(line) diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/factory.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/factory.py new file mode 100644 index 0000000000000..542b1615a12f8 --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/factory.py @@ -0,0 +1,49 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import logging +from typing import Type + +from twister_harness.device.device_abstract import DeviceAbstract +from twister_harness.device.hardware_adapter import HardwareAdapter +from twister_harness.device.qemu_adapter import QemuAdapter +from twister_harness.device.simulator_adapter import ( + CustomSimulatorAdapter, + NativeSimulatorAdapter, + UnitSimulatorAdapter, +) +from twister_harness.exceptions import TwisterHarnessException + +logger = logging.getLogger(__name__) + + +class DeviceFactory: + _devices: dict[str, Type[DeviceAbstract]] = {} + + @classmethod + def discover(cls): + """Return available devices.""" + + @classmethod + def register_device_class(cls, name: str, klass: Type[DeviceAbstract]): + if name not in cls._devices: + cls._devices[name] = klass + + @classmethod + def get_device(cls, name: str) -> Type[DeviceAbstract]: + logger.debug('Get device type "%s"', name) + try: + return cls._devices[name] + except KeyError as e: + logger.error('There is no device with name "%s"', name) + raise TwisterHarnessException(f'There is no device with name "{name}"') from e + + +DeviceFactory.register_device_class('custom', CustomSimulatorAdapter) +DeviceFactory.register_device_class('native', NativeSimulatorAdapter) +DeviceFactory.register_device_class('unit', UnitSimulatorAdapter) +DeviceFactory.register_device_class('hardware', HardwareAdapter) +DeviceFactory.register_device_class('qemu', QemuAdapter) diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/fifo_handler.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/fifo_handler.py new file mode 100755 index 0000000000000..c6bda2f0a4ea3 --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/fifo_handler.py @@ -0,0 +1,91 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import io +import logging +import os +import threading +from pathlib import Path + +logger = logging.getLogger(__name__) + + +class FifoHandler: + """Creates FIFO file for reading and writing.""" + + def __init__(self, fifo: str | Path): + """ + :param fifo: path to fifo file + """ + self._fifo_in = str(fifo) + '.in' + self._fifo_out = str(fifo) + '.out' + self.file_in: io.BytesIO | None = None + self.file_out: io.BytesIO | None = None + self._threads: list[threading.Thread] = [] + + @staticmethod + def _make_fifo_file(filename: str) -> None: + if os.path.exists(filename): + os.unlink(filename) + os.mkfifo(filename) + logger.debug('Created new fifo file: %s', filename) + + @property + def is_open(self) -> bool: + try: + return bool( + self.file_in is not None and self.file_out is not None + and self.file_in.fileno() and self.file_out.fileno() + ) + except ValueError: + return False + + def connect(self): + self._make_fifo_file(self._fifo_in) + self._make_fifo_file(self._fifo_out) + self._threads = [ + threading.Thread(target=self._open_fifo_in, daemon=True), + threading.Thread(target=self._open_fifo_out, daemon=True) + ] + for t in self._threads: + t.start() + + def _open_fifo_in(self): + self.file_in = open(self._fifo_in, 'wb', buffering=0) + + def _open_fifo_out(self): + self.file_out = open(self._fifo_out, 'rb', buffering=0) + + def disconnect(self): + if self.file_in is not None: + self.file_in.close() + if self.file_out is not None: + self.file_out.close() + for t in self._threads: + t.join(timeout=1) + logger.debug(f'Unlink {self._fifo_in}') + os.unlink(self._fifo_in) + logger.debug(f'Unlink {self._fifo_out}') + os.unlink(self._fifo_out) + + def read(self, __size: int | None = None) -> bytes: + return self.file_out.read(__size) # type: ignore[union-attr] + + def readline(self, __size: int | None = None) -> bytes: + line = self.file_out.readline(__size) # type: ignore[union-attr] + return line + + def write(self, __buffer: bytes) -> int: + return self.file_in.write(__buffer) # type: ignore[union-attr] + + def flush(self): + if self.file_in: + self.file_in.flush() + if self.file_out: + self.file_out.flush() + + def fileno(self) -> int: + return self.file_out.fileno() # type: ignore[union-attr] diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/hardware_adapter.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/hardware_adapter.py new file mode 100644 index 0000000000000..8f7b99d67fb7c --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/hardware_adapter.py @@ -0,0 +1,202 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import logging +import os +import pty +import re +import shutil +import subprocess +from datetime import datetime +from typing import Generator + +import serial + +from twister_harness.device.device_abstract import DeviceAbstract +from twister_harness.exceptions import TwisterHarnessException +from twister_harness.helper import log_command +from twister_harness.log_files.log_file import DeviceLogFile, HandlerLogFile +from twister_harness.twister_harness_config import DeviceConfig + +logger = logging.getLogger(__name__) + + +class HardwareAdapter(DeviceAbstract): + """Adapter class for real device.""" + + def __init__(self, device_config: DeviceConfig, **kwargs) -> None: + super().__init__(device_config, **kwargs) + self.connection: serial.Serial | None = None + self.command: list[str] = [] + self.process_kwargs: dict = { + 'stdout': subprocess.PIPE, + 'stderr': subprocess.STDOUT, + 'env': self.env, + } + self.serial_pty_proc: subprocess.Popen | None = None + + def connect(self, timeout: float = 1) -> None: + """ + Open serial connection. + + :param timeout: Read timeout value in seconds + """ + if self.connection: + # already opened + return + + serial_name = self._open_serial_pty() or self.device_config.serial + logger.info('Opening serial connection for %s', serial_name) + try: + self.connection = serial.Serial( + serial_name, + baudrate=self.device_config.baud, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + bytesize=serial.EIGHTBITS, + timeout=timeout + ) + except serial.SerialException as e: + logger.exception('Cannot open connection: %s', e) + self._close_serial_pty() + raise + + self.connection.flush() + + def disconnect(self) -> None: + """Close serial connection.""" + if self.connection: + serial_name = self.connection.port + self.connection.close() + self.connection = None + logger.info('Closed serial connection for %s', serial_name) + self._close_serial_pty() + + def _open_serial_pty(self) -> str | None: + """Open a pty pair, run process and return tty name""" + if not self.device_config.serial_pty: + return None + master, slave = pty.openpty() + try: + self.serial_pty_proc = subprocess.Popen( + re.split(',| ', self.device_config.serial_pty), + stdout=master, + stdin=master, + stderr=master + ) + except subprocess.CalledProcessError as e: + logger.exception('Failed to run subprocess %s, error %s', + self.device_config.serial_pty, str(e)) + raise + return os.ttyname(slave) + + def _close_serial_pty(self) -> None: + """Terminate the process opened for serial pty script""" + if self.serial_pty_proc: + self.serial_pty_proc.terminate() + self.serial_pty_proc.communicate() + logger.info('Process %s terminated', self.device_config.serial_pty) + self.serial_pty_proc = None + + def generate_command(self) -> None: + """Return command to flash.""" + west = shutil.which('west') + if west is None: + raise TwisterHarnessException('west not found') + + command = [ + west, + 'flash', + '--skip-rebuild', + '--build-dir', str(self.device_config.build_dir), + ] + + command_extra_args = [] + if self.device_config.west_flash_extra_args: + command_extra_args.extend(self.device_config.west_flash_extra_args) + + if runner := self.device_config.runner: + command.extend(['--runner', runner]) + + if board_id := self.device_config.id: + if runner == 'pyocd': + command_extra_args.append('--board-id') + command_extra_args.append(board_id) + elif runner == 'nrfjprog': + command_extra_args.append('--dev-id') + command_extra_args.append(board_id) + elif runner == 'openocd' and self.device_config.product in ['STM32 STLink', 'STLINK-V3']: + command_extra_args.append('--cmd-pre-init') + command_extra_args.append(f'hla_serial {board_id}') + elif runner == 'openocd' and self.device_config.product == 'EDBG CMSIS-DAP': + command_extra_args.append('--cmd-pre-init') + command_extra_args.append(f'cmsis_dap_serial {board_id}') + elif runner == 'jlink': + command.append(f'--tool-opt=-SelectEmuBySN {board_id}') + elif runner == 'stm32cubeprogrammer': + command.append(f'--tool-opt=sn={board_id}') + + if command_extra_args: + command.append('--') + command.extend(command_extra_args) + self.command = command + + def flash_and_run(self, timeout: float = 60.0) -> None: + if not self.command: + msg = 'Flash command is empty, please verify if it was generated properly.' + logger.error(msg) + raise TwisterHarnessException(msg) + if self.device_config.id: + logger.info('Flashing device %s', self.device_config.id) + log_command(logger, 'Flashing command', self.command, level=logging.INFO) + try: + process = subprocess.Popen( + self.command, + **self.process_kwargs + ) + except subprocess.CalledProcessError: + logger.error('Error while flashing device') + raise TwisterHarnessException('Could not flash device') + else: + stdout = stderr = None + try: + stdout, stderr = process.communicate(timeout=self.device_config.flashing_timeout) + except subprocess.TimeoutExpired: + process.kill() + finally: + if stdout: + self.device_log_file.handle(data=stdout) + logger.debug(stdout.decode(errors='ignore')) + if stderr: + self.device_log_file.handle(data=stderr) + if process.returncode == 0: + logger.info('Flashing finished') + else: + raise TwisterHarnessException(f'Could not flash device {self.device_config.id}') + + @property + def iter_stdout(self) -> Generator[str, None, None]: + """Return output from serial.""" + if not self.connection: + return + self.connection.flush() + self.connection.reset_input_buffer() + while self.connection and self.connection.is_open: + stream = self.connection.readline() + self.handler_log_file.handle(data=stream) + yield stream.decode(errors='ignore').strip() + + def write(self, data: bytes) -> None: + """Write data to serial""" + if self.connection: + self.connection.write(data) + + def initialize_log_files(self) -> None: + self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir) + self.device_log_file = DeviceLogFile.create(build_dir=self.device_config.build_dir) + start_msg = f'\n==== Logging started at {datetime.now()} ====\n' + self.handler_log_file.handle(start_msg) + self.device_log_file.handle(start_msg) diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/qemu_adapter.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/qemu_adapter.py new file mode 100755 index 0000000000000..4c649b9e2e88c --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/qemu_adapter.py @@ -0,0 +1,196 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import logging +import os +import shutil +import signal +import subprocess +import threading +import time +from datetime import datetime +from pathlib import Path +from queue import Empty, Queue +from typing import Generator + +import psutil + +from twister_harness.constants import QEMU_FIFO_FILE_NAME +from twister_harness.device.device_abstract import DeviceAbstract +from twister_harness.device.fifo_handler import FifoHandler +from twister_harness.exceptions import TwisterHarnessException +from twister_harness.helper import log_command +from twister_harness.log_files.log_file import HandlerLogFile +from twister_harness.twister_harness_config import DeviceConfig + +logger = logging.getLogger(__name__) + + +class QemuAdapter(DeviceAbstract): + """Adapter for Qemu simulator""" + + def __init__(self, device_config: DeviceConfig, **kwargs) -> None: + super().__init__(device_config, **kwargs) + self._process: subprocess.Popen | None = None + self._process_ended_with_timeout: bool = False + self._exc: Exception | None = None #: store any exception which appeared running this thread + self._thread: threading.Thread | None = None + self._emulation_was_finished: bool = False + self.connection = FifoHandler(Path(self.device_config.build_dir).joinpath(QEMU_FIFO_FILE_NAME)) + self.command: list[str] = [] + self.timeout: float = 60 # running timeout in seconds + self.booting_timeout_in_ms: int = 10_000 #: wait time for booting Qemu in milliseconds + + def generate_command(self) -> None: + """Return command to flash.""" + if (west := shutil.which('west')) is None: + logger.error('west not found') + self.command = [] + else: + self.command = [west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run'] + + def connect(self, timeout: float = 1) -> None: + logger.debug('Opening connection') + self.connection.connect() + + def flash_and_run(self, timeout: float = 60.0) -> None: + self.timeout = timeout + if not self.command: + msg = 'Run simulation command is empty, please verify if it was generated properly.' + logger.error(msg) + raise TwisterHarnessException(msg) + + self._thread = threading.Thread(target=self._run_command, args=(self.timeout,), daemon=True) + self._thread.start() + # Give a time to start subprocess before test is executed + time.sleep(0.1) + # Check if subprocess (simulation) has started without errors + if self._exc is not None: + logger.error('Simulation failed due to an exception: %s', self._exc) + raise self._exc + + def _run_command(self, timeout: float) -> None: + log_command(logger, 'Running command', self.command, level=logging.INFO) + try: + self._process = subprocess.Popen( + self.command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=self.env + ) + stdout, _ = self._process.communicate(timeout=timeout) + return_code: int = self._process.returncode + except subprocess.TimeoutExpired: + logger.error('Running simulation finished after timeout: %s seconds', timeout) + self._process_ended_with_timeout = True + # we don't want to raise Timeout exception, but allowed a test to parse the output + # and set proper status + except subprocess.SubprocessError as e: + logger.error('Running simulation failed due to subprocess error %s', e) + self._exc = TwisterHarnessException(e.args) + except FileNotFoundError as e: + logger.error(f'Running simulation failed due to file not found: {e.filename}') + self._exc = TwisterHarnessException(f'File not found: {e.filename}') + except Exception as e: + logger.error('Running simulation failed: %s', e) + self._exc = TwisterHarnessException(e.args) + else: + if return_code == 0: + logger.info('Running simulation finished with return code %s', return_code) + elif return_code == -15: + logger.info('Running simulation terminated') + else: + logger.warning('Running simulation finished with return code %s', return_code) + for line in stdout.decode('utf-8').split('\n'): + logger.info(line) + finally: + self._emulation_was_finished = True + + def disconnect(self): + logger.debug('Closing connection') + self.connection.disconnect() + + def stop(self) -> None: + """Stop device.""" + time.sleep(0.1) # give a time to end while loop in running simulation + if self._process is not None and self._process.returncode is None: + logger.debug('Stopping all running processes for PID %s', self._process.pid) + # kill all child subprocesses + for child in psutil.Process(self._process.pid).children(recursive=True): + try: + os.kill(child.pid, signal.SIGTERM) + except ProcessLookupError: + pass + # kill subprocess if it is still running + os.kill(self._process.pid, signal.SIGTERM) + if self._thread is not None: + self._thread.join(timeout=1) # Should end immediately, but just in case we set timeout for 1 sec + if self._exc: + raise self._exc + + def _wait_for_fifo(self): + for _ in range(int(self.booting_timeout_in_ms / 10) or 1): + if self.connection.is_open: + break + elif self._emulation_was_finished: + msg = 'Problem with starting QEMU' + logger.error(msg) + raise TwisterHarnessException(msg) + time.sleep(0.1) + else: + msg = 'Problem with starting QEMU - fifo file was not created yet' + logger.error(msg) + raise TwisterHarnessException(msg) + + @property + def iter_stdout(self) -> Generator[str, None, None]: + if not self.connection: + return + # fifo file can be not create yet, so we need to wait for a while + self._wait_for_fifo() + + # create unblocking reading from fifo file + q: Queue = Queue() + + def read_lines(): + while self.connection and self.connection.is_open: + try: + line = self.connection.readline().decode('UTF-8').strip() + except (OSError, ValueError): + # file could be closed already so we should stop reading + break + if len(line) != 0: + q.put(line) + + t = threading.Thread(target=read_lines, daemon=True) + t.start() + + end_time = time.time() + self.timeout + try: + while True: + try: + stream = q.get(timeout=0.1) + self.handler_log_file.handle(data=stream + '\n') + yield stream + except Empty: # timeout appeared + pass + if time.time() > end_time: + break + except KeyboardInterrupt: + # let thread to finish smoothly + pass + finally: + t.join(1) + + def write(self, data: bytes) -> None: + """Write data to serial""" + if self.connection: + self.connection.write(data) + + def initialize_log_files(self): + self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir) + start_msg = f'\n==== Logging started at {datetime.now()} ====\n' + self.handler_log_file.handle(start_msg) diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/simulator_adapter.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/simulator_adapter.py new file mode 100755 index 0000000000000..558e206adf731 --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/simulator_adapter.py @@ -0,0 +1,220 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import abc +import asyncio +import asyncio.subprocess +import logging +import os +import shutil +import signal +import subprocess +import threading +import time +from asyncio.base_subprocess import BaseSubprocessTransport +from datetime import datetime +from functools import wraps +from pathlib import Path +from queue import Queue +from typing import Generator + +import psutil + +from twister_harness.constants import END_OF_DATA +from twister_harness.device.device_abstract import DeviceAbstract +from twister_harness.exceptions import TwisterHarnessException +from twister_harness.helper import log_command +from twister_harness.log_files.log_file import HandlerLogFile +from twister_harness.twister_harness_config import DeviceConfig + + +# Workaround for RuntimeError: Event loop is closed +# https://pythonalgos.com/runtimeerror-event-loop-is-closed-asyncio-fix/ +def silence_event_loop_closed(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except RuntimeError as e: + if str(e) != 'Event loop is closed': + raise + + return wrapper + + +BaseSubprocessTransport.__del__ = silence_event_loop_closed(BaseSubprocessTransport.__del__) # type: ignore + +logger = logging.getLogger(__name__) + + +class SimulatorAdapterBase(DeviceAbstract, abc.ABC): + + def __init__(self, device_config: DeviceConfig, **kwargs) -> None: + """ + :param twister_config: twister configuration + """ + super().__init__(device_config, **kwargs) + self._process: asyncio.subprocess.Process | None = None + self._process_ended_with_timeout: bool = False + self.queue: Queue = Queue() + self._stop_job: bool = False + self._exc: Exception | None = None #: store any exception which appeared running this thread + self._thread: threading.Thread | None = None + self.command: list[str] = [] + self.process_kwargs: dict = { + 'stdout': asyncio.subprocess.PIPE, + 'stderr': asyncio.subprocess.STDOUT, + 'stdin': asyncio.subprocess.PIPE, + 'env': self.env, + } + self._data_to_send: bytes | None = None + + def connect(self, timeout: float = 1) -> None: + pass # pragma: no cover + + def flash_and_run(self, timeout: float = 60.0) -> None: + if not self.command: + msg = 'Run simulation command is empty, please verify if it was generated properly.' + logger.error(msg) + raise TwisterHarnessException(msg) + self._thread = threading.Thread(target=self._run_simulation, args=(timeout,), daemon=True) + self._thread.start() + # Give a time to start subprocess before test is executed + time.sleep(0.1) + # Check if subprocess (simulation) has started without errors + if self._exc is not None: + logger.error('Simulation failed due to an exception: %s', self._exc) + raise self._exc + + def _run_simulation(self, timeout: float) -> None: + log_command(logger, 'Running command', self.command, level=logging.INFO) + try: + return_code: int = asyncio.run(self._run_command(timeout=timeout)) + except subprocess.SubprocessError as e: + logger.error('Running simulation failed due to subprocess error %s', e) + self._exc = TwisterHarnessException(e.args) + except FileNotFoundError as e: + logger.error(f'Running simulation failed due to file not found: {e.filename}') + self._exc = TwisterHarnessException(f'File not found: {e.filename}') + except Exception as e: + logger.error('Running simulation failed: %s', e) + self._exc = TwisterHarnessException(e.args) + else: + if return_code == 0: + logger.info('Running simulation finished with return code %s', return_code) + elif return_code == -15: + logger.info('Running simulation stopped interrupted by user') + else: + logger.warning('Running simulation finished with return code %s', return_code) + finally: + self.queue.put(END_OF_DATA) # indicate to the other threads that there will be no more data in queue + + async def _run_command(self, timeout: float = 60.): + assert isinstance(self.command, (list, tuple, set)) + # to avoid stupid and difficult to debug mistakes + # we are using asyncio to run subprocess to be able to read from stdout + # without blocking while loop (readline with timeout) + self._process = await asyncio.create_subprocess_exec( + *self.command, + **self.process_kwargs + ) + logger.debug('Started subprocess with PID %s', self._process.pid) + end_time = time.time() + timeout + while not self._stop_job and not self._process.stdout.at_eof(): # type: ignore[union-attr] + if line := await self._read_line(timeout=0.1): + self.queue.put(line.decode('utf-8').strip()) + if time.time() > end_time: + self._process_ended_with_timeout = True + logger.info(f'Finished process with PID {self._process.pid} after {timeout} seconds timeout') + break + if self._data_to_send: + self._process.stdin.write(self._data_to_send) # type: ignore[union-attr] + await self._process.stdin.drain() # type: ignore[union-attr] + self._data_to_send = None + + self.queue.put(END_OF_DATA) # indicate to the other threads that there will be no more data in queue + return await self._process.wait() + + async def _read_line(self, timeout=0.1) -> bytes | None: + try: + return await asyncio.wait_for(self._process.stdout.readline(), timeout=timeout) # type: ignore[union-attr] + except asyncio.TimeoutError: + return None + + def disconnect(self): + pass # pragma: no cover + + def stop(self) -> None: + """Stop device.""" + self._stop_job = True + time.sleep(0.1) # give a time to end while loop in running simulation + if self._process is not None and self._process.returncode is None: + logger.debug('Stopping all running processes for PID %s', self._process.pid) + # kill subprocess if it is still running + for child in psutil.Process(self._process.pid).children(recursive=True): + try: + os.kill(child.pid, signal.SIGTERM) + except ProcessLookupError: + pass + # kill subprocess if it is still running + os.kill(self._process.pid, signal.SIGTERM) + if self._thread is not None: + self._thread.join(timeout=1) # Should end immediately, but just in case we set timeout for 1 sec + if self._exc: + raise self._exc + + @property + def iter_stdout(self) -> Generator[str, None, None]: + """Return output from serial.""" + while True: + stream = self.queue.get() + if stream == END_OF_DATA: + logger.debug('No more data from running process') + break + self.handler_log_file.handle(data=stream + '\n') + yield stream + self.queue.task_done() + + def write(self, data: bytes) -> None: + """Write data to serial""" + while self._data_to_send: + # wait data will be write to self._process.stdin.write + time.sleep(0.1) + self._data_to_send = data + + def initialize_log_files(self): + self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir) + start_msg = f'\n==== Logging started at {datetime.now()} ====\n' + self.handler_log_file.handle(start_msg) + + +class NativeSimulatorAdapter(SimulatorAdapterBase): + """Simulator adapter to run `zephyr.exe` simulation""" + + def generate_command(self) -> None: + """Return command to run.""" + self.command = [ + str((Path(self.device_config.build_dir) / 'zephyr' / 'zephyr.exe').resolve()) + ] + + +class UnitSimulatorAdapter(SimulatorAdapterBase): + """Simulator adapter to run unit tests""" + + def generate_command(self) -> None: + """Return command to run.""" + self.command = [str((Path(self.device_config.build_dir) / 'testbinary').resolve())] + + +class CustomSimulatorAdapter(SimulatorAdapterBase): + + def generate_command(self) -> None: + """Return command to run.""" + if (west := shutil.which('west')) is None: + logger.error('west not found') + self.command = [] + else: + self.command = [west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run'] diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/exceptions.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/exceptions.py new file mode 100644 index 0000000000000..4fbd3b58c35e6 --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/exceptions.py @@ -0,0 +1,6 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +class TwisterHarnessException(Exception): + """General Twister harness exception.""" diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/__init__.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/__init__.py new file mode 100644 index 0000000000000..235c666e965ce --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/__init__.py @@ -0,0 +1,3 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/dut.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/dut.py new file mode 100644 index 0000000000000..90e82cf65cceb --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/dut.py @@ -0,0 +1,39 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +import logging +from typing import Generator, Type + +import pytest + +from twister_harness.device.device_abstract import DeviceAbstract +from twister_harness.device.factory import DeviceFactory +from twister_harness.twister_harness_config import DeviceConfig, TwisterHarnessConfig + +logger = logging.getLogger(__name__) + + +@pytest.fixture(scope='function') +def dut(request: pytest.FixtureRequest) -> Generator[DeviceAbstract, None, None]: + """Return device instance.""" + twister_harness_config: TwisterHarnessConfig = request.config.twister_harness_config # type: ignore + device_config: DeviceConfig = twister_harness_config.devices[0] + device_type = device_config.type + + device_class: Type[DeviceAbstract] = DeviceFactory.get_device(device_type) + + device = device_class(device_config) + + try: + device.connect() + device.generate_command() + device.initialize_log_files() + device.flash_and_run() + device.connect() + yield device + except KeyboardInterrupt: + pass + finally: # to make sure we close all running processes after user broke execution + device.disconnect() + device.stop() diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/helper.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/helper.py new file mode 100644 index 0000000000000..d60717d09f07f --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/helper.py @@ -0,0 +1,40 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import logging +import os.path +import platform +import shlex + +_WINDOWS = platform.system() == 'Windows' + +logger = logging.getLogger(__name__) + + +def log_command(logger: logging.Logger, msg: str, args: list, level: int = logging.DEBUG): + """ + Platform-independent helper for logging subprocess invocations. + + Will log a command string that can be copy/pasted into a POSIX + shell on POSIX platforms. This is not available on Windows, so + the entire args array is logged instead. + + :param logger: logging.Logger to use + :param msg: message to associate with the command + :param args: argument list as passed to subprocess module + :param level: log level + """ + msg = f'{msg}: %s' + if _WINDOWS: + logger.log(level, msg, str(args)) + else: + logger.log(level, msg, shlex.join(args)) + + +def normalize_filename(filename: str) -> str: + filename = os.path.expanduser(os.path.expandvars(filename)) + filename = os.path.normpath(os.path.abspath(filename)) + return filename diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/log.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/log.py new file mode 100644 index 0000000000000..84fa9ca5571df --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/log.py @@ -0,0 +1,71 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import logging.config +import os + +import pytest + + +def configure_logging(config: pytest.Config) -> None: + """Configure logging.""" + output_dir = config.option.output_dir + os.makedirs(output_dir, exist_ok=True) + log_file = os.path.join(output_dir, 'twister_harness.log') + + if hasattr(config, 'workerinput'): + worker_id = config.workerinput['workerid'] + log_file = os.path.join(output_dir, f'twister_harness_{worker_id}.log') + + log_format = '%(asctime)s:%(levelname)s:%(name)s: %(message)s' + log_level = config.getoption('--log-level') or config.getini('log_level') or logging.INFO + log_file = config.getoption('--log-file') or config.getini('log_file') or log_file + log_format = config.getini('log_cli_format') or log_format + + default_config = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'standard': { + 'format': log_format, + }, + 'simply': { + 'format': '%(asctime)s.%(msecs)d:%(levelname)s: %(message)s', + 'datefmt': '%H:%M:%S' + } + }, + 'handlers': { + 'file': { + 'class': 'logging.FileHandler', + 'level': 'DEBUG', + 'formatter': 'standard', + 'filters': [], + 'filename': log_file, + 'encoding': 'utf8', + 'mode': 'w' + }, + 'console': { + 'class': 'logging.StreamHandler', + 'level': 'DEBUG', + 'formatter': 'simply', + 'filters': [], + } + }, + 'loggers': { + '': { + 'handlers': ['console', 'file'], + 'level': 'WARNING', + 'propagate': False + }, + 'twister_harness': { + 'handlers': ['console', 'file'], + 'level': log_level, + 'propagate': False, + } + } + } + + logging.config.dictConfig(default_config) diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/__init__.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/__init__.py new file mode 100755 index 0000000000000..235c666e965ce --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/__init__.py @@ -0,0 +1,3 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/log_file.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/log_file.py new file mode 100755 index 0000000000000..7c4b3339b93f4 --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/log_file.py @@ -0,0 +1,71 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import logging +import os +import sys +from pathlib import Path + +from twister_harness.helper import normalize_filename + +logger = logging.getLogger(__name__) + + +class LogFile: + """Base class for logging files.""" + name = 'uninitialized' + + def __init__(self, filename: str | Path) -> None: + self.default_encoding = sys.getdefaultencoding() + self.filename = filename + + @staticmethod + def get_log_filename(build_dir: Path | str, name: str) -> str: + """ + :param build_dir: path to building directory. + :param name: name of the logging file. + :return: path to logging file + """ + if not build_dir: + filename = os.devnull + else: + name = name + '.log' + filename = os.path.join(build_dir, name) + filename = normalize_filename(filename=filename) + return filename + + def handle(self, data: str | bytes) -> None: + """Save information to logging file.""" + if data: + data = data.decode(encoding=self.default_encoding) if isinstance(data, bytes) else data + with open(file=self.filename, mode='a+', encoding=self.default_encoding) as log_file: + log_file.write(data) # type: ignore[arg-type] + + @classmethod + def create(cls, build_dir: Path | str = '') -> LogFile: + filename = cls.get_log_filename(build_dir=build_dir, name=cls.name) + return cls(filename) + + +class BuildLogFile(LogFile): + """Save logs from the building.""" + name = 'build' + + +class HandlerLogFile(LogFile): + """Save output from a device.""" + name = 'handler' + + +class DeviceLogFile(LogFile): + """Save errors during flashing onto device.""" + name = 'device' + + +class NullLogFile(LogFile): + """Placeholder for no initialized log file""" + def handle(self, data: str | bytes) -> None: + """This method does nothing.""" diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/plugin.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/plugin.py new file mode 100644 index 0000000000000..cc5f10054e2b1 --- /dev/null +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/plugin.py @@ -0,0 +1,136 @@ +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import logging +import os +from pathlib import Path + +import pytest + +from twister_harness.log import configure_logging +from twister_harness.twister_harness_config import TwisterHarnessConfig + +logger = logging.getLogger(__name__) + +pytest_plugins = ( + 'twister_harness.fixtures.dut' +) + + +def pytest_addoption(parser: pytest.Parser): + twister_harness_group = parser.getgroup('Twister harness') + twister_harness_group.addoption( + '--twister-harness', + action='store_true', + default=False, + help='Activate Twister harness plugin' + ) + parser.addini( + 'twister_harness', + 'Activate Twister harness plugin', + type='bool' + ) + twister_harness_group.addoption( + '-O', + '--outdir', + metavar='PATH', + dest='output_dir', + help='Output directory for logs. If not provided then use ' + '--build-dir path as default.' + ) + twister_harness_group.addoption( + '--platform', + help='Choose specific platform' + ) + twister_harness_group.addoption( + '--device-type', + choices=('native', 'qemu', 'hardware', 'unit', 'custom'), + help='Choose type of device (hardware, qemu, etc.)' + ) + twister_harness_group.addoption( + '--device-serial', + help='Serial device for accessing the board ' + '(e.g., /dev/ttyACM0)' + ) + twister_harness_group.addoption( + '--device-serial-baud', + type=int, + default=115200, + help='Serial device baud rate (default 115200)' + ) + twister_harness_group.addoption( + '--runner', + help='use the specified west runner (pyocd, nrfjprog, etc)' + ) + twister_harness_group.addoption( + '--device-id', + help='ID of connected hardware device (for example 000682459367)' + ) + twister_harness_group.addoption( + '--device-product', + help='Product name of connected hardware device (for example "STM32 STLink")' + ) + twister_harness_group.addoption( + '--device-serial-pty', + metavar='PATH', + help='Script for controlling pseudoterminal. ' + 'E.g --device-testing --device-serial-pty=