diff --git a/can/interfaces/socketcan/utils.py b/can/interfaces/socketcan/utils.py index 25f04617f..7a8538135 100644 --- a/can/interfaces/socketcan/utils.py +++ b/can/interfaces/socketcan/utils.py @@ -3,15 +3,15 @@ """ import errno +import json import logging import os -import re import struct import subprocess -from typing import cast, Iterable, Optional +from typing import cast, Optional, List -from can.interfaces.socketcan.constants import CAN_EFF_FLAG from can import typechecking +from can.interfaces.socketcan.constants import CAN_EFF_FLAG log = logging.getLogger(__name__) @@ -38,35 +38,36 @@ def pack_filters(can_filters: Optional[typechecking.CanFilters] = None) -> bytes return struct.pack(can_filter_fmt, *filter_data) -_PATTERN_CAN_INTERFACE = re.compile(r"(sl|v|vx)?can\d+") - +def find_available_interfaces() -> List[str]: + """Returns the names of all open can/vcan interfaces -def find_available_interfaces() -> Iterable[str]: - """Returns the names of all open can/vcan interfaces using - the ``ip link list`` command. If the lookup fails, an error + The function calls the ``ip link list`` command. If the lookup fails, an error is logged to the console and an empty list is returned. + + :return: The list of available and active CAN interfaces or an empty list of the command failed """ try: - # adding "type vcan" would exclude physical can devices - command = ["ip", "-o", "link", "list", "up"] - output = subprocess.check_output(command, text=True) - - except Exception as e: # pylint: disable=broad-except + command = ["ip", "-json", "link", "list", "up"] + output_str = subprocess.check_output(command, text=True) + except Exception: # pylint: disable=broad-except # subprocess.CalledProcessError is too specific - log.error("failed to fetch opened can devices: %s", e) + log.exception("failed to fetch opened can devices from ip link") return [] - else: - # log.debug("find_available_interfaces(): output=\n%s", output) - # output contains some lines like "1: vcan42: ..." - # extract the "vcan42" of each line - interfaces = [line.split(": ", 3)[1] for line in output.splitlines()] - log.debug( - "find_available_interfaces(): detected these interfaces (before filtering): %s", - interfaces, - ) - return filter(_PATTERN_CAN_INTERFACE.match, interfaces) + try: + output_json = json.loads(output_str) + except json.JSONDecodeError: + log.exception("Failed to parse ip link JSON output: %s", output_str) + return [] + + log.debug( + "find_available_interfaces(): detected these interfaces (before filtering): %s", + output_json, + ) + + interfaces = [i["ifname"] for i in output_json if i["link_type"] == "can"] + return interfaces def error_code_to_str(code: Optional[int]) -> str: diff --git a/test/test_socketcan_helpers.py b/test/test_socketcan_helpers.py index ad53836f2..29ceb11c0 100644 --- a/test/test_socketcan_helpers.py +++ b/test/test_socketcan_helpers.py @@ -4,7 +4,12 @@ Tests helpers in `can.interfaces.socketcan.socketcan_common`. """ +import gzip +from base64 import b64decode import unittest +from unittest import mock + +from subprocess import CalledProcessError from can.interfaces.socketcan.utils import find_available_interfaces, error_code_to_str @@ -26,17 +31,46 @@ def test_error_code_to_str(self): string = error_code_to_str(error_code) self.assertTrue(string) # not None or empty - @unittest.skipUnless(IS_LINUX, "socketcan is only available on Linux") + @unittest.skipUnless( + TEST_INTERFACE_SOCKETCAN, "socketcan is only available on Linux" + ) def test_find_available_interfaces(self): - result = list(find_available_interfaces()) - self.assertGreaterEqual(len(result), 0) - for entry in result: - self.assertRegex(entry, r"(sl|v|vx)?can\d+") - if TEST_INTERFACE_SOCKETCAN: - self.assertGreaterEqual(len(result), 3) - self.assertIn("vcan0", result) - self.assertIn("vxcan0", result) - self.assertIn("slcan0", result) + result = find_available_interfaces() + + self.assertGreaterEqual(len(result), 3) + self.assertIn("vcan0", result) + self.assertIn("vxcan0", result) + self.assertIn("slcan0", result) + + def test_find_available_interfaces_w_patch(self): + # Contains lo, eth0, wlan0, vcan0, mycustomCan123 + ip_output_gz_b64 = ( + "H4sIAAAAAAAAA+2UzW+CMBjG7/wVhrNL+BC29IboEqNSwzQejDEViiMC5aNsmmX/+wpZTGUwDAcP" + "y5qmh+d5++bN80u7EXpsfZRnsUTf8yMXn0TQk/u8GqEQM1EMiMjpXoAOGZM3F6mUZxAuhoY55UpL" + "fbWoKjO4Hts7pl/kLdc+pDlrrmuaqnNq4vqZU8wSkSTHOeYHIjFOM4poOevKmlpwbfF+4EfHkLil" + "PRo/G6vZkrcPKcnjwnOxh/KA8h49JQGOimAkSaq03NFz/B0PiffIOfIXkeumOCtiEiUJXG++bp8S" + "5Dooo/WVZeFnvxmYUgsM01fpBmQWfDAN256M7SqioQ2NkWm8LKvGnIU3qTN+xylrV/FdaHrJzmFk" + "gkacozuzZMnhtAGkLANFAaoKBgOgaUDXG0F6Hrje7SDVWpDvAYpuIdmJV4dn2cSx9VUuGiFCe25Y" + "fwTi4KmW4ptzG0ULGvYPLN1APSqdMN3/82TRtOeqSbW5hmcnzygJTRTJivofcEvAgrAVvgD8aLkv" + "/AcAAA==" + ) + ip_output = gzip.decompress(b64decode(ip_output_gz_b64)).decode("ascii") + + with mock.patch("subprocess.check_output") as check_output: + check_output.return_value = ip_output + ifs = find_available_interfaces() + + self.assertEqual(["vcan0", "mycustomCan123"], ifs) + + def test_find_available_interfaces_exception(self): + with mock.patch("subprocess.check_output") as check_output: + check_output.return_value = "

Not JSON

" + result = find_available_interfaces() + self.assertEqual([], result) + + check_output.side_effect = Exception("Something went wrong :-/") + result = find_available_interfaces() + self.assertEqual([], result) if __name__ == "__main__":