From 69f1fbd1952dd83a9587207d1250e23b3aaf55a9 Mon Sep 17 00:00:00 2001 From: Marcel Kanter Date: Fri, 3 Dec 2021 15:27:52 +0100 Subject: [PATCH 1/4] Adapter enumeration Each interface should provide a means for enumerating all matching adapters. The result should be an list of adapter identifiers. An adapter identifier is not limited or equal to serial number, it can be a place in a device tree (like PCI or USB tree). The adapter identifier then can be used with the init of the bus/interface. --- can/bus.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/can/bus.py b/can/bus.py index b9ccfcfad..566e1a5f1 100644 --- a/can/bus.py +++ b/can/bus.py @@ -450,6 +450,13 @@ def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]: def fileno(self) -> int: raise NotImplementedError("fileno is not implemented using current CAN bus") + def list_adapters(self) -> List[Any]: + """ Lists all adapters for this interface. The adapter identifier can be used to open a specific adapter. + + MAY NOT BE IMPLEMENTED BY ALL INTERFACES + """ + raise NotImplementedError() + class _SelfRemovingCyclicTask(CyclicSendTaskABC, ABC): """Removes itself from a bus. From 348001397eed0b9e30d3a942bb14f3e85d35b5d9 Mon Sep 17 00:00:00 2001 From: Marcel Kanter Date: Fri, 3 Dec 2021 15:29:18 +0100 Subject: [PATCH 2/4] Adapter enumeration for ixxat bus Add the enumeration functionality and test it. The enumeration is useful to skip tests which rely on connected hardware. --- can/interfaces/ixxat/__init__.py | 5 ++- can/interfaces/ixxat/canlib.py | 73 +++++++++++++++++--------------- doc/interfaces/ixxat.rst | 25 +++++------ test/test_interface_ixxat.py | 51 +++++++++++++++------- 4 files changed, 90 insertions(+), 64 deletions(-) diff --git a/can/interfaces/ixxat/__init__.py b/can/interfaces/ixxat/__init__.py index fc2cae0f3..be55c0d49 100644 --- a/can/interfaces/ixxat/__init__.py +++ b/can/interfaces/ixxat/__init__.py @@ -1,7 +1,8 @@ """ -Ctypes wrapper module for IXXAT Virtual CAN Interface V3 on win32 systems +Ctypes wrapper module for IXXAT Virtual CAN Interface on win32 systems Copyright (C) 2016 Giuseppe Corbelli +Copyright (c) 2021 Marcel Kanter """ -from can.interfaces.ixxat.canlib import IXXATBus, get_ixxat_hwids +from can.interfaces.ixxat.canlib import IXXATBus diff --git a/can/interfaces/ixxat/canlib.py b/can/interfaces/ixxat/canlib.py index ced840ff3..ef091dfc1 100644 --- a/can/interfaces/ixxat/canlib.py +++ b/can/interfaces/ixxat/canlib.py @@ -408,7 +408,7 @@ class IXXATBus(BusABC): }, } - def __init__(self, channel, can_filters=None, **kwargs): + def __init__(self, channel=0, can_filters=None, **kwargs): """ :param int channel: The Channel id to create this bus with. @@ -416,14 +416,20 @@ def __init__(self, channel, can_filters=None, **kwargs): :param list can_filters: See :meth:`can.BusABC.set_filters`. - :param bool receive_own_messages: - Enable self-reception of sent messages. - - :param int UniqueHardwareId: - UniqueHardwareId to connect (optional, will use the first found if not supplied) - :param int bitrate: Channel bitrate in bit/s + + :param String adapter: + adapter to connect (optional, will use the first found if not supplied) + + :param int rxFifoSize: + Set the receive FIFO size to this value. + + :param int txFifoSize: + Set the transmit FIFO size to this value. + + :param bool receive_own_messages: + Enable self-reception of sent messages. """ if _canlib is None: raise CanInterfaceNotImplementedError( @@ -433,7 +439,7 @@ def __init__(self, channel, can_filters=None, **kwargs): log.info("Got configuration of: %s", kwargs) # Configuration options bitrate = kwargs.get("bitrate", 500000) - UniqueHardwareId = kwargs.get("UniqueHardwareId", None) + adapter = kwargs.get("adapter", None) rxFifoSize = kwargs.get("rxFifoSize", 16) txFifoSize = kwargs.get("txFifoSize", 16) self._receive_own_messages = kwargs.get("receive_own_messages", False) @@ -461,10 +467,10 @@ def __init__(self, channel, can_filters=None, **kwargs): self._payload = (ctypes.c_byte * 8)() # Search for supplied device - if UniqueHardwareId is None: - log.info("Searching for first available device") + if adapter is None: + log.info("Searching for first available adapter") else: - log.info("Searching for unique HW ID %s", UniqueHardwareId) + log.info("Searching for adapter %s", adapter) _canlib.vciEnumDeviceOpen(ctypes.byref(self._device_handle)) while True: try: @@ -472,20 +478,20 @@ def __init__(self, channel, can_filters=None, **kwargs): self._device_handle, ctypes.byref(self._device_info) ) except StopIteration: - if UniqueHardwareId is None: + if adapter is None: raise VCIDeviceNotFoundError( "No IXXAT device(s) connected or device(s) in use by other process(es)." ) else: raise VCIDeviceNotFoundError( "Unique HW ID {} not connected or not available.".format( - UniqueHardwareId + adapter ) ) else: - if (UniqueHardwareId is None) or ( + if (adapter is None) or ( self._device_info.UniqueHardwareId.AsChar - == bytes(UniqueHardwareId, "ascii") + == bytes(adapter, "ascii") ): break else: @@ -762,6 +768,24 @@ def shutdown(self): _canlib.canControlClose(self._control_handle) _canlib.vciDeviceClose(self._device_handle) + def list_adapters(): + """Get a list of hardware ids of all available IXXAT adapters.""" + adapters = [] + device_handle = HANDLE() + device_info = structures.VCIDEVICEINFO() + + _canlib.vciEnumDeviceOpen(ctypes.byref(device_handle)) + while True: + try: + _canlib.vciEnumDeviceNext(device_handle, ctypes.byref(device_info)) + except StopIteration: + break + else: + adapters.append(device_info.UniqueHardwareId.AsChar.decode("ascii")) + _canlib.vciEnumDeviceClose(device_handle) + + return adapters + class CyclicSendTask(LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC): """A message in the cyclic transmit list.""" @@ -827,22 +851,3 @@ def _format_can_status(status_flags: int): return "CAN status message: {}".format(", ".join(states)) else: return "Empty CAN status message" - - -def get_ixxat_hwids(): - """Get a list of hardware ids of all available IXXAT devices.""" - hwids = [] - device_handle = HANDLE() - device_info = structures.VCIDEVICEINFO() - - _canlib.vciEnumDeviceOpen(ctypes.byref(device_handle)) - while True: - try: - _canlib.vciEnumDeviceNext(device_handle, ctypes.byref(device_info)) - except StopIteration: - break - else: - hwids.append(device_info.UniqueHardwareId.AsChar.decode("ascii")) - _canlib.vciEnumDeviceClose(device_handle) - - return hwids diff --git a/doc/interfaces/ixxat.rst b/doc/interfaces/ixxat.rst index 929221733..8ff1765e7 100644 --- a/doc/interfaces/ixxat.rst +++ b/doc/interfaces/ixxat.rst @@ -3,7 +3,7 @@ IXXAT Virtual CAN Interface =========================== -Interface to `IXXAT `__ Virtual CAN Interface V3 SDK. Works on Windows. +Interface to `IXXAT `__ Virtual CAN Interface SDK. Works on Windows. The Linux ECI SDK is currently unsupported, however on Linux some devices are supported with :doc:`socketcan`. @@ -26,21 +26,21 @@ Bus Configuration file ------------------ + The simplest configuration file would be:: [default] interface = ixxat channel = 0 -Python-can will search for the first IXXAT device available and open the first channel. +Python-can will search for the first IXXAT adapter available and open the first channel. ``interface`` and ``channel`` parameters are interpreted by frontend ``can.interfaces.interface`` module, while the following parameters are optional and are interpreted by IXXAT implementation. * ``bitrate`` (default 500000) Channel bitrate -* ``UniqueHardwareId`` (default first device) Unique hardware ID of the IXXAT device +* ``adapter`` (default first adapter) Unique hardware ID of the IXXAT device * ``rxFifoSize`` (default 16) Number of RX mailboxes * ``txFifoSize`` (default 16) Number of TX mailboxes -* ``extended`` (default False) Allow usage of extended IDs Filtering @@ -55,14 +55,15 @@ VCI documentation, section "Message filters" for more info. List available devices ---------------------- -In case you have connected multiple IXXAT devices, you have to select them by using their unique hardware id. -To get a list of all connected IXXAT you can use the function ``get_ixxat_hwids()`` as demonstrated below: - - >>> from can.interfaces.ixxat import get_ixxat_hwids - >>> for hwid in get_ixxat_hwids(): - ... print("Found IXXAT with hardware id '%s'." % hwid) - Found IXXAT with hardware id 'HW441489'. - Found IXXAT with hardware id 'HW107422'. + +In case you have connected multiple IXXAT adapters, you have to select them by using their unique hardware id. +To get a list of all connected IXXAT adapters you can use the function ``list_adapters()`` as demonstrated below: + + >>> from can.interfaces.ixxat import IXXATBus + >>> for hwid in IXXATBus.list_adapters(): + ... print("Found IXXAT adapter with hardware id '%s'." % hwid) + Found IXXAT adapter with hardware id 'HW441489'. + Found IXXAT adapter with hardware id 'HW107422'. Internals diff --git a/test/test_interface_ixxat.py b/test/test_interface_ixxat.py index 55618c769..55bb11b51 100644 --- a/test/test_interface_ixxat.py +++ b/test/test_interface_ixxat.py @@ -1,12 +1,14 @@ """ Unittest for ixxat interface. -Run only this test: -python setup.py test --addopts "--verbose -s test/test_interface_ixxat.py" +Copyright (c) 2020, 2021 Marcel Kanter """ import unittest import can +import sys + +from can.interfaces.ixxat import IXXATBus class SoftwareTestCase(unittest.TestCase): @@ -15,10 +17,7 @@ class SoftwareTestCase(unittest.TestCase): """ def setUp(self): - try: - bus = can.Bus(interface="ixxat", channel=0) - bus.shutdown() - except can.CanInterfaceNotImplementedError: + if sys.platform != "win32" and sys.platform != "cygwin": raise unittest.SkipTest("not available on this platform") def test_bus_creation(self): @@ -34,28 +33,48 @@ def test_bus_creation(self): with self.assertRaises(ValueError): can.Bus(interface="ixxat", channel=0, txFifoSize=0) + # non-existent channel (use arbitrary high value) + with self.assertRaises(can.CanInitializationError): + can.Bus(interface="ixxat", channel=0xFFFF) + + def test_adapter_enumeration(self): + # Enumeration of adapters should always work and the result should support len and be iterable + adapters = IXXATBus.list_adapters() + n = 0 + for adapter in adapters: + n += 1 + self.assertEqual(len(adapters), n) + class HardwareTestCase(unittest.TestCase): """ Test cases that rely on an existing/connected hardware. + THEY NEED TO BE EXECUTED WITH AT LEAST ONE CONNECTED ADAPTER! """ def setUp(self): - try: - bus = can.Bus(interface="ixxat", channel=0) - bus.shutdown() - except can.CanInterfaceNotImplementedError: + if sys.platform != "win32" and sys.platform != "cygwin": raise unittest.SkipTest("not available on this platform") def test_bus_creation(self): - # non-existent channel -> use arbitrary high value - with self.assertRaises(can.CanInitializationError): - can.Bus(interface="ixxat", channel=0xFFFF) + # Test the enumeration of all adapters by opening and closing each adapter + adapters = IXXATBus.list_adapters() + for adapter in adapters: + bus = can.Bus(interface="ixxat", adapter=adapter) + bus.shutdown() def test_send_after_shutdown(self): - with can.Bus(interface="ixxat", channel=0) as bus: - with self.assertRaises(can.CanOperationError): - bus.send(can.Message(arbitration_id=0x3FF, dlc=0)) + # At least one adapter is needed, skip the test if none can be found + adapters = IXXATBus.list_adapters() + if len(adapters) == 0: + raise unittest.SkipTest("No adapters found") + + bus = can.Bus(interface="ixxat", channel=0) + msg = can.Message(arbitration_id=0x3FF, dlc=0) + # Intentionally close the bus now and try to send afterwards. This should lead to an Exception + bus.shutdown() + with self.assertRaises(can.CanOperationError): + bus.send(msg) if __name__ == "__main__": From 03565db76e0f0de0331fa3c94159b51b3a4c827a Mon Sep 17 00:00:00 2001 From: Marcel Kanter Date: Fri, 3 Dec 2021 18:25:41 +0100 Subject: [PATCH 3/4] Several fixes to (hopefully) get the tests running under MacOS and Linux Create a HRESULT fallback if platform is not win32 Make the method list_adapters a class method correctly Fix the formatting --- can/bus.py | 3 ++- can/ctypesutil.py | 2 +- can/interfaces/ixxat/canlib.py | 10 +++++----- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/can/bus.py b/can/bus.py index 566e1a5f1..1abe2c647 100644 --- a/can/bus.py +++ b/can/bus.py @@ -450,7 +450,8 @@ def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]: def fileno(self) -> int: raise NotImplementedError("fileno is not implemented using current CAN bus") - def list_adapters(self) -> List[Any]: + @classmethod + def list_adapters(cls) -> List[Any]: """ Lists all adapters for this interface. The adapter identifier can be used to open a specific adapter. MAY NOT BE IMPLEMENTED BY ALL INTERFACES diff --git a/can/ctypesutil.py b/can/ctypesutil.py index 7c1e1f573..8d52f8603 100644 --- a/can/ctypesutil.py +++ b/can/ctypesutil.py @@ -79,7 +79,7 @@ def map_symbol( if sys.platform == "win32": HRESULT = ctypes.HRESULT -elif sys.platform == "cygwin": +else: class HRESULT(ctypes.c_long): pass diff --git a/can/interfaces/ixxat/canlib.py b/can/interfaces/ixxat/canlib.py index ef091dfc1..d619b0448 100644 --- a/can/interfaces/ixxat/canlib.py +++ b/can/interfaces/ixxat/canlib.py @@ -21,7 +21,7 @@ LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC, ) -from can.ctypesutil import CLibrary, HANDLE, PHANDLE, HRESULT as ctypes_HRESULT +from can.ctypesutil import CLibrary, HANDLE, PHANDLE, HRESULT from . import constants, structures from .exceptions import * @@ -136,7 +136,7 @@ def __check_status(result, function, arguments): # void VCIAPI vciFormatError (HRESULT hrError, PCHAR pszText, UINT32 dwsize); _canlib.map_symbol( - "vciFormatError", None, (ctypes_HRESULT, ctypes.c_char_p, ctypes.c_uint32) + "vciFormatError", None, (HRESULT, ctypes.c_char_p, ctypes.c_uint32) ) # Hack to have vciFormatError as a free function vciFormatError = functools.partial(__vciFormatError, _canlib) @@ -490,8 +490,7 @@ def __init__(self, channel=0, can_filters=None, **kwargs): ) else: if (adapter is None) or ( - self._device_info.UniqueHardwareId.AsChar - == bytes(adapter, "ascii") + self._device_info.UniqueHardwareId.AsChar == bytes(adapter, "ascii") ): break else: @@ -768,7 +767,8 @@ def shutdown(self): _canlib.canControlClose(self._control_handle) _canlib.vciDeviceClose(self._device_handle) - def list_adapters(): + @classmethod + def list_adapters(cls): """Get a list of hardware ids of all available IXXAT adapters.""" adapters = [] device_handle = HANDLE() From 10a8e8e676208e0d227648f2ce71bfe3c28ddd44 Mon Sep 17 00:00:00 2001 From: Marcel Kanter Date: Fri, 3 Dec 2021 18:45:34 +0100 Subject: [PATCH 4/4] Check if the driver can be loaded, raise an exception or skip the test if it is not loaded. --- can/bus.py | 2 +- can/interfaces/ixxat/canlib.py | 5 +++++ test/test_interface_ixxat.py | 23 ++++++++++++++++++----- 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/can/bus.py b/can/bus.py index 1abe2c647..2f8c1d7ee 100644 --- a/can/bus.py +++ b/can/bus.py @@ -452,7 +452,7 @@ def fileno(self) -> int: @classmethod def list_adapters(cls) -> List[Any]: - """ Lists all adapters for this interface. The adapter identifier can be used to open a specific adapter. + """Lists all adapters for this interface. The adapter identifier can be used to open a specific adapter. MAY NOT BE IMPLEMENTED BY ALL INTERFACES """ diff --git a/can/interfaces/ixxat/canlib.py b/can/interfaces/ixxat/canlib.py index d619b0448..e33abf5b8 100644 --- a/can/interfaces/ixxat/canlib.py +++ b/can/interfaces/ixxat/canlib.py @@ -774,6 +774,11 @@ def list_adapters(cls): device_handle = HANDLE() device_info = structures.VCIDEVICEINFO() + if _canlib is None: + raise CanInterfaceNotImplementedError( + "The IXXAT VCI library has not been initialized. Check the logs for more details." + ) + _canlib.vciEnumDeviceOpen(ctypes.byref(device_handle)) while True: try: diff --git a/test/test_interface_ixxat.py b/test/test_interface_ixxat.py index 55bb11b51..34f09ff93 100644 --- a/test/test_interface_ixxat.py +++ b/test/test_interface_ixxat.py @@ -9,6 +9,7 @@ import sys from can.interfaces.ixxat import IXXATBus +from can.exceptions import CanInterfaceNotImplementedError class SoftwareTestCase(unittest.TestCase): @@ -38,8 +39,12 @@ def test_bus_creation(self): can.Bus(interface="ixxat", channel=0xFFFF) def test_adapter_enumeration(self): - # Enumeration of adapters should always work and the result should support len and be iterable - adapters = IXXATBus.list_adapters() + # Enumeration of adapters should always work (if the driver is installed) and the result should support len and be iterable + try: + adapters = IXXATBus.list_adapters() + except CanInterfaceNotImplementedError: + raise unittest.SkipTest("Maybe the driver is not installed.") + n = 0 for adapter in adapters: n += 1 @@ -58,20 +63,28 @@ def setUp(self): def test_bus_creation(self): # Test the enumeration of all adapters by opening and closing each adapter - adapters = IXXATBus.list_adapters() + try: + adapters = IXXATBus.list_adapters() + except CanInterfaceNotImplementedError: + raise unittest.SkipTest("Maybe the driver is not installed.") + for adapter in adapters: bus = can.Bus(interface="ixxat", adapter=adapter) bus.shutdown() def test_send_after_shutdown(self): # At least one adapter is needed, skip the test if none can be found - adapters = IXXATBus.list_adapters() + try: + adapters = IXXATBus.list_adapters() + except CanInterfaceNotImplementedError: + raise unittest.SkipTest("Maybe the driver is not installed.") + if len(adapters) == 0: raise unittest.SkipTest("No adapters found") bus = can.Bus(interface="ixxat", channel=0) msg = can.Message(arbitration_id=0x3FF, dlc=0) - # Intentionally close the bus now and try to send afterwards. This should lead to an Exception + # Intentionally close the bus now and try to send afterwards. This should lead to an CanOperationError bus.shutdown() with self.assertRaises(can.CanOperationError): bus.send(msg)