Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions can/interfaces/socketcan/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
"""

import errno
import json
import logging
import os
import re
import struct
import subprocess
from typing import cast, Iterable, Optional
from typing import cast, Optional, List

from can.interfaces.socketcan.constants import CAN_EFF_FLAG
from can import typechecking
from can.interfaces.socketcan.constants import CAN_EFF_FLAG

log = logging.getLogger(__name__)

Expand All @@ -38,35 +38,36 @@ def pack_filters(can_filters: Optional[typechecking.CanFilters] = None) -> bytes
return struct.pack(can_filter_fmt, *filter_data)


_PATTERN_CAN_INTERFACE = re.compile(r"(sl|v|vx)?can\d+")

def find_available_interfaces() -> List[str]:
"""Returns the names of all open can/vcan interfaces

def find_available_interfaces() -> Iterable[str]:
"""Returns the names of all open can/vcan interfaces using
the ``ip link list`` command. If the lookup fails, an error
The function calls the ``ip link list`` command. If the lookup fails, an error
is logged to the console and an empty list is returned.

:return: The list of available and active CAN interfaces or an empty list of the command failed
"""

try:
# adding "type vcan" would exclude physical can devices
command = ["ip", "-o", "link", "list", "up"]
output = subprocess.check_output(command, text=True)

except Exception as e: # pylint: disable=broad-except
command = ["ip", "-json", "link", "list", "up"]
output_str = subprocess.check_output(command, text=True)
except Exception: # pylint: disable=broad-except
# subprocess.CalledProcessError is too specific
log.error("failed to fetch opened can devices: %s", e)
log.exception("failed to fetch opened can devices from ip link")
return []

else:
# log.debug("find_available_interfaces(): output=\n%s", output)
# output contains some lines like "1: vcan42: <NOARP,UP,LOWER_UP> ..."
# extract the "vcan42" of each line
interfaces = [line.split(": ", 3)[1] for line in output.splitlines()]
log.debug(
"find_available_interfaces(): detected these interfaces (before filtering): %s",
interfaces,
)
return filter(_PATTERN_CAN_INTERFACE.match, interfaces)
try:
output_json = json.loads(output_str)
except json.JSONDecodeError:
log.exception("Failed to parse ip link JSON output: %s", output_str)
return []

log.debug(
"find_available_interfaces(): detected these interfaces (before filtering): %s",
output_json,
)

interfaces = [i["ifname"] for i in output_json if i["link_type"] == "can"]
return interfaces


def error_code_to_str(code: Optional[int]) -> str:
Expand Down
54 changes: 44 additions & 10 deletions test/test_socketcan_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
Tests helpers in `can.interfaces.socketcan.socketcan_common`.
"""

import gzip
from base64 import b64decode
import unittest
from unittest import mock

from subprocess import CalledProcessError

from can.interfaces.socketcan.utils import find_available_interfaces, error_code_to_str

Expand All @@ -26,17 +31,46 @@ def test_error_code_to_str(self):
string = error_code_to_str(error_code)
self.assertTrue(string) # not None or empty

@unittest.skipUnless(IS_LINUX, "socketcan is only available on Linux")
@unittest.skipUnless(
TEST_INTERFACE_SOCKETCAN, "socketcan is only available on Linux"
)
def test_find_available_interfaces(self):
result = list(find_available_interfaces())
self.assertGreaterEqual(len(result), 0)
for entry in result:
self.assertRegex(entry, r"(sl|v|vx)?can\d+")
if TEST_INTERFACE_SOCKETCAN:
self.assertGreaterEqual(len(result), 3)
self.assertIn("vcan0", result)
self.assertIn("vxcan0", result)
self.assertIn("slcan0", result)
result = find_available_interfaces()

self.assertGreaterEqual(len(result), 3)
self.assertIn("vcan0", result)
self.assertIn("vxcan0", result)
self.assertIn("slcan0", result)

def test_find_available_interfaces_w_patch(self):
# Contains lo, eth0, wlan0, vcan0, mycustomCan123
ip_output_gz_b64 = (
"H4sIAAAAAAAAA+2UzW+CMBjG7/wVhrNL+BC29IboEqNSwzQejDEViiMC5aNsmmX/+wpZTGUwDAcP"
"y5qmh+d5++bN80u7EXpsfZRnsUTf8yMXn0TQk/u8GqEQM1EMiMjpXoAOGZM3F6mUZxAuhoY55UpL"
"fbWoKjO4Hts7pl/kLdc+pDlrrmuaqnNq4vqZU8wSkSTHOeYHIjFOM4poOevKmlpwbfF+4EfHkLil"
"PRo/G6vZkrcPKcnjwnOxh/KA8h49JQGOimAkSaq03NFz/B0PiffIOfIXkeumOCtiEiUJXG++bp8S"
"5Dooo/WVZeFnvxmYUgsM01fpBmQWfDAN256M7SqioQ2NkWm8LKvGnIU3qTN+xylrV/FdaHrJzmFk"
"gkacozuzZMnhtAGkLANFAaoKBgOgaUDXG0F6Hrje7SDVWpDvAYpuIdmJV4dn2cSx9VUuGiFCe25Y"
"fwTi4KmW4ptzG0ULGvYPLN1APSqdMN3/82TRtOeqSbW5hmcnzygJTRTJivofcEvAgrAVvgD8aLkv"
"/AcAAA=="
)
ip_output = gzip.decompress(b64decode(ip_output_gz_b64)).decode("ascii")

with mock.patch("subprocess.check_output") as check_output:
check_output.return_value = ip_output
ifs = find_available_interfaces()

self.assertEqual(["vcan0", "mycustomCan123"], ifs)

def test_find_available_interfaces_exception(self):
with mock.patch("subprocess.check_output") as check_output:
check_output.return_value = "<h1>Not JSON</h1>"
result = find_available_interfaces()
self.assertEqual([], result)

check_output.side_effect = Exception("Something went wrong :-/")
result = find_available_interfaces()
self.assertEqual([], result)


if __name__ == "__main__":
Expand Down