diff --git a/can/interfaces/ics_neovi/neovi_bus.py b/can/interfaces/ics_neovi/neovi_bus.py index bd4fc5445..4972ed479 100644 --- a/can/interfaces/ics_neovi/neovi_bus.py +++ b/can/interfaces/ics_neovi/neovi_bus.py @@ -171,8 +171,8 @@ def __init__(self, channel, can_filters=None, **kwargs): super().__init__(channel=channel, can_filters=can_filters, **kwargs) - logger.info("CAN Filters: {}".format(can_filters)) - logger.info("Got configuration of: {}".format(kwargs)) + logger.info(f"CAN Filters: {can_filters}") + logger.info(f"Got configuration of: {kwargs}") if "override_library_name" in kwargs: ics.override_library_name(kwargs.get("override_library_name")) @@ -215,12 +215,12 @@ def __init__(self, channel, can_filters=None, **kwargs): self._use_system_timestamp = bool(kwargs.get("use_system_timestamp", False)) self._receive_own_messages = kwargs.get("receive_own_messages", True) - self.channel_info = "%s %s CH:%s" % ( + self.channel_info = "{} {} CH:{}".format( self.dev.Name, self.get_serial_number(self.dev), self.channels, ) - logger.info("Using device: {}".format(self.channel_info)) + logger.info(f"Using device: {self.channel_info}") self.rx_buffer = deque() self.message_receipts = defaultdict(Event) @@ -230,7 +230,7 @@ def channel_to_netid(channel_name_or_id): try: channel = int(channel_name_or_id) except ValueError: - netid = "NETID_{}".format(channel_name_or_id.upper()) + netid = f"NETID_{channel_name_or_id.upper()}" if hasattr(ics, netid): channel = getattr(ics, netid) else: @@ -298,9 +298,9 @@ def _find_device(self, type_filter=None, serial=None): msg = ["No device"] if type_filter is not None: - msg.append("with type {}".format(type_filter)) + msg.append(f"with type {type_filter}") if serial is not None: - msg.append("with serial {}".format(serial)) + msg.append(f"with serial {serial}") msg.append("found.") raise CanInitializationError(" ".join(msg)) diff --git a/can/interfaces/ixxat/canlib_vcinpl.py b/can/interfaces/ixxat/canlib_vcinpl.py index fa88e5f90..d74da2539 100644 --- a/can/interfaces/ixxat/canlib_vcinpl.py +++ b/can/interfaces/ixxat/canlib_vcinpl.py @@ -119,7 +119,7 @@ def __check_status(result, function, args): result = ctypes.c_ulong(result).value if result == constants.VCI_E_TIMEOUT: - raise VCITimeout("Function {} timed out".format(function._name)) + raise VCITimeout(f"Function {function._name} timed out") elif result == constants.VCI_E_RXQUEUE_EMPTY: raise VCIRxQueueEmptyError() elif result == constants.VCI_E_NO_MORE_ITEMS: @@ -469,7 +469,7 @@ def __init__( channel = int(channel) if bitrate not in self.CHANNEL_BITRATES[0]: - raise ValueError("Invalid bitrate {}".format(bitrate)) + raise ValueError(f"Invalid bitrate {bitrate}") if rx_fifo_size <= 0: raise ValueError("rx_fifo_size must be > 0") @@ -887,7 +887,7 @@ def _format_can_status(status_flags: int): status_flags &= ~flag if status_flags: - states.append("unknown state 0x{:02x}".format(status_flags)) + states.append(f"unknown state 0x{status_flags:02x}") if states: return "CAN status message: {}".format(", ".join(states)) diff --git a/can/interfaces/ixxat/canlib_vcinpl2.py b/can/interfaces/ixxat/canlib_vcinpl2.py index 108ad2c02..2e3125e9b 100644 --- a/can/interfaces/ixxat/canlib_vcinpl2.py +++ b/can/interfaces/ixxat/canlib_vcinpl2.py @@ -117,7 +117,7 @@ def __check_status(result, function, args): :class:VCIError """ if result == constants.VCI_E_TIMEOUT: - raise VCITimeout("Function {} timed out".format(function._name)) + raise VCITimeout(f"Function {function._name} timed out") elif result == constants.VCI_E_RXQUEUE_EMPTY: raise VCIRxQueueEmptyError() elif result == constants.VCI_E_NO_MORE_ITEMS: @@ -1011,7 +1011,7 @@ def _format_can_status(status_flags: int): status_flags &= ~flag if status_flags: - states.append("unknown state 0x{:02x}".format(status_flags)) + states.append(f"unknown state 0x{status_flags:02x}") if states: return "CAN status message: {}".format(", ".join(states)) diff --git a/can/interfaces/ixxat/structures.py b/can/interfaces/ixxat/structures.py index b784437e0..419a52973 100644 --- a/can/interfaces/ixxat/structures.py +++ b/can/interfaces/ixxat/structures.py @@ -162,7 +162,7 @@ class CANMSG(ctypes.Structure): ] def __str__(self) -> str: - return """ID: 0x{0:04x}{1} DLC: {2:02d} DATA: {3}""".format( + return """ID: 0x{:04x}{} DLC: {:02d} DATA: {}""".format( self.dwMsgId, "[RTR]" if self.uMsgInfo.Bits.rtr else "", self.uMsgInfo.Bits.dlc, diff --git a/can/interfaces/kvaser/canlib.py b/can/interfaces/kvaser/canlib.py index f60a43bc5..a8bb7bac7 100644 --- a/can/interfaces/kvaser/canlib.py +++ b/can/interfaces/kvaser/canlib.py @@ -410,8 +410,8 @@ def __init__(self, channel, can_filters=None, **kwargs): """ - log.info("CAN Filters: {}".format(can_filters)) - log.info("Got configuration of: {}".format(kwargs)) + log.info(f"CAN Filters: {can_filters}") + log.info(f"Got configuration of: {kwargs}") bitrate = kwargs.get("bitrate", 500000) tseg1 = kwargs.get("tseg1", 0) tseg2 = kwargs.get("tseg2", 0) diff --git a/can/interfaces/nixnet.py b/can/interfaces/nixnet.py index f1def62ad..1eba09b31 100644 --- a/can/interfaces/nixnet.py +++ b/can/interfaces/nixnet.py @@ -124,7 +124,7 @@ def __init__( ) from None self._is_filtered = False - super(NiXNETcanBus, self).__init__( + super().__init__( channel=channel, can_filters=can_filters, bitrate=bitrate, diff --git a/can/interfaces/pcan/basic.py b/can/interfaces/pcan/basic.py index 38ded44f6..743fb55ee 100644 --- a/can/interfaces/pcan/basic.py +++ b/can/interfaces/pcan/basic.py @@ -663,7 +663,7 @@ def __init__(self): try: aKey = winreg.OpenKey(aReg, r"SOFTWARE\PEAK-System\PEAK-Drivers") winreg.CloseKey(aKey) - except WindowsError: + except OSError: logger.error("Exception: The PEAK-driver couldn't be found!") finally: winreg.CloseKey(aReg) diff --git a/can/interfaces/pcan/pcan.py b/can/interfaces/pcan/pcan.py index c60b9e6c9..cd0349c99 100644 --- a/can/interfaces/pcan/pcan.py +++ b/can/interfaces/pcan/pcan.py @@ -218,7 +218,7 @@ def __init__( channel = self._find_channel_by_dev_id(device_id) if channel is None: - err_msg = "Cannot find a channel with ID {:08x}".format(device_id) + err_msg = f"Cannot find a channel with ID {device_id:08x}" raise ValueError(err_msg) self.channel_info = str(channel) @@ -249,7 +249,7 @@ def __init__( f_clock = "{}={}".format("f_clock", kwargs.get("f_clock", None)) fd_parameters_values = [f_clock] + [ - "{}={}".format(key, kwargs.get(key, None)) + f"{key}={kwargs.get(key, None)}" for key in PCAN_FD_PARAMETER_LIST if kwargs.get(key, None) is not None ] @@ -350,7 +350,7 @@ def bits(n): for b in bits(error): stsReturn = self.m_objPCANBasic.GetErrorText(b, 0x9) if stsReturn[0] != PCAN_ERROR_OK: - text = "An error occurred. Error-code's text ({0:X}h) couldn't be retrieved".format( + text = "An error occurred. Error-code's text ({:X}h) couldn't be retrieved".format( error ) else: diff --git a/can/interfaces/robotell.py b/can/interfaces/robotell.py index 0d3ad1b77..4d82c1922 100644 --- a/can/interfaces/robotell.py +++ b/can/interfaces/robotell.py @@ -396,7 +396,7 @@ def get_serial_number(self, timeout: Optional[int]) -> Optional[str]: serial = "" for idx in range(0, 8, 2): - serial += "{:02X}{:02X}-".format(sn1[idx], sn1[idx + 1]) + serial += f"{sn1[idx]:02X}{sn1[idx + 1]:02X}-" for idx in range(0, 4, 2): - serial += "{:02X}{:02X}-".format(sn2[idx], sn2[idx + 1]) + serial += f"{sn2[idx]:02X}{sn2[idx + 1]:02X}-" return serial[:-1] diff --git a/can/interfaces/socketcan/socketcan.py b/can/interfaces/socketcan/socketcan.py index 549998dc8..66e2923c4 100644 --- a/can/interfaces/socketcan/socketcan.py +++ b/can/interfaces/socketcan/socketcan.py @@ -68,7 +68,7 @@ def bcm_header_factory( # requirements of this field, then we must add padding bytes until we # are aligned while curr_stride % field_alignment != 0: - results.append(("pad_{}".format(pad_index), ctypes.c_uint8)) + results.append((f"pad_{pad_index}", ctypes.c_uint8)) pad_index += 1 curr_stride += 1 @@ -84,7 +84,7 @@ def bcm_header_factory( # Add trailing padding to align to a multiple of the largest scalar member # in the structure while curr_stride % alignment != 0: - results.append(("pad_{}".format(pad_index), ctypes.c_uint8)) + results.append((f"pad_{pad_index}", ctypes.c_uint8)) pad_index += 1 curr_stride += 1 diff --git a/can/interfaces/socketcan/utils.py b/can/interfaces/socketcan/utils.py index 55e7eb392..ecc870ca4 100644 --- a/can/interfaces/socketcan/utils.py +++ b/can/interfaces/socketcan/utils.py @@ -21,7 +21,7 @@ def pack_filters(can_filters: Optional[typechecking.CanFilters] = None) -> bytes # Pass all messages can_filters = [{"can_id": 0, "can_mask": 0}] - can_filter_fmt = "={}I".format(2 * len(can_filters)) + can_filter_fmt = f"={2 * len(can_filters)}I" filter_data = [] for can_filter in can_filters: can_id = can_filter["can_id"] @@ -50,7 +50,7 @@ def find_available_interfaces() -> Iterable[str]: try: # adding "type vcan" would exclude physical can devices command = ["ip", "-o", "link", "list", "up"] - output = subprocess.check_output(command, universal_newlines=True) + output = subprocess.check_output(command, text=True) except Exception as e: # subprocess.CalledProcessError is too specific log.error("failed to fetch opened can devices: %s", e) diff --git a/can/interfaces/socketcand/socketcand.py b/can/interfaces/socketcand/socketcand.py index 327df9e73..28f0c700f 100644 --- a/can/interfaces/socketcand/socketcand.py +++ b/can/interfaces/socketcand/socketcand.py @@ -43,7 +43,7 @@ def convert_can_message_to_ascii_message(can_message: can.Message) -> str: # Note: seems like we cannot add CANFD_BRS (bitrate_switch) and CANFD_ESI (error_state_indicator) flags data = can_message.data length = can_message.dlc - bytes_string = " ".join("{:x}".format(x) for x in data[0:length]) + bytes_string = " ".join(f"{x:x}" for x in data[0:length]) return f"< send {can_id:X} {length:X} {bytes_string} >" diff --git a/can/interfaces/systec/ucanbus.py b/can/interfaces/systec/ucanbus.py index fee110b08..7d8b6133a 100644 --- a/can/interfaces/systec/ucanbus.py +++ b/can/interfaces/systec/ucanbus.py @@ -134,7 +134,7 @@ def __init__(self, channel, can_filters=None, **kwargs): self._ucan.init_hardware(device_number=device_number) self._ucan.init_can(self.channel, **self._params) hw_info_ex, _, _ = self._ucan.get_hardware_info() - self.channel_info = "%s, S/N %s, CH %s, BTR %s" % ( + self.channel_info = "{}, S/N {}, CH {}, BTR {}".format( self._ucan.get_product_code_message(hw_info_ex.product_code), hw_info_ex.serial, self.channel, diff --git a/can/interfaces/vector/canlib.py b/can/interfaces/vector/canlib.py index 8f699f3f7..aed916cdb 100644 --- a/can/interfaces/vector/canlib.py +++ b/can/interfaces/vector/canlib.py @@ -171,7 +171,7 @@ def __init__( ) self._app_name = app_name.encode() if app_name is not None else b"" - self.channel_info = "Application %s: %s" % ( + self.channel_info = "Application {}: {}".format( app_name, ", ".join(f"CAN {ch + 1}" for ch in self.channels), ) diff --git a/can/io/canutils.py b/can/io/canutils.py index f63df3f6b..e159ecdf4 100644 --- a/can/io/canutils.py +++ b/can/io/canutils.py @@ -170,7 +170,7 @@ def on_message_received(self, msg): if isinstance(channel, int) or isinstance(channel, str) and channel.isdigit(): channel = f"can{channel}" - framestr = "(%f) %s" % (timestamp, channel) + framestr = f"({timestamp:f}) {channel}" if msg.is_error_frame: framestr += " %08X#" % (CAN_ERR_FLAG | CAN_ERR_BUSERROR) diff --git a/can/io/trc.py b/can/io/trc.py index 25ba0f5c6..0a07f01c9 100644 --- a/can/io/trc.py +++ b/can/io/trc.py @@ -1,5 +1,3 @@ -# coding: utf-8 - """ Reader and writer for can logging files in peak trc format @@ -51,7 +49,7 @@ def __init__( If this is a file-like object, is has to opened in text read mode, not binary read mode. """ - super(TRCReader, self).__init__(file, mode="r") + super().__init__(file, mode="r") self.file_version = TRCFileVersion.UNKNOWN if not self.file: @@ -211,7 +209,7 @@ def __init__( :param channel: a default channel to use when the message does not have a channel set """ - super(TRCWriter, self).__init__(file, mode="w") + super().__init__(file, mode="w") self.channel = channel if type(file) is str: self.filepath = os.path.abspath(file) diff --git a/can/typechecking.py b/can/typechecking.py index b3a513a3a..31a298995 100644 --- a/can/typechecking.py +++ b/can/typechecking.py @@ -11,9 +11,14 @@ CanFilter: typing_extensions = typing_extensions.TypedDict( "CanFilter", {"can_id": int, "can_mask": int} ) -CanFilterExtended = typing_extensions.TypedDict( - "CanFilterExtended", {"can_id": int, "can_mask": int, "extended": bool} -) + + +class CanFilterExtended(typing_extensions.TypedDict): + can_id: int + can_mask: int + extended: bool + + CanFilters = typing.Sequence[typing.Union[CanFilter, CanFilterExtended]] # TODO: Once buffer protocol support lands in typing, we should switch to that, @@ -35,8 +40,10 @@ BusConfig = typing.NewType("BusConfig", typing.Dict[str, typing.Any]) -AutoDetectedConfig = typing_extensions.TypedDict( - "AutoDetectedConfig", {"interface": str, "channel": Channel} -) + +class AutoDetectedConfig(typing_extensions.TypedDict): + interface: str + channel: Channel + ReadableBytesLike = typing.Union[bytes, bytearray, memoryview] diff --git a/can/util.py b/can/util.py index e64eb13b5..41467542d 100644 --- a/can/util.py +++ b/can/util.py @@ -61,10 +61,10 @@ def load_file_config( else: config.read(path) - _config = {} + _config: Dict[str, str] = {} if config.has_section(section): - _config.update(dict((key, val) for key, val in config.items(section))) + _config.update(config.items(section)) return _config diff --git a/can/viewer.py b/can/viewer.py index 6773a0acd..3f0376880 100644 --- a/can/viewer.py +++ b/can/viewer.py @@ -515,7 +515,7 @@ def parse_args(args): ] = {} if parsed_args.decode: if os.path.isfile(parsed_args.decode[0]): - with open(parsed_args.decode[0], "r", encoding="utf-8") as f: + with open(parsed_args.decode[0], encoding="utf-8") as f: structs = f.readlines() else: structs = parsed_args.decode diff --git a/setup.py b/setup.py index a32471844..b3d99f612 100644 --- a/setup.py +++ b/setup.py @@ -15,12 +15,12 @@ logging.basicConfig(level=logging.WARNING) -with open("can/__init__.py", "r", encoding="utf-8") as fd: +with open("can/__init__.py", encoding="utf-8") as fd: version = re.search( r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', fd.read(), re.MULTILINE ).group(1) -with open("README.rst", "r", encoding="utf-8") as f: +with open("README.rst", encoding="utf-8") as f: long_description = f.read() # Dependencies diff --git a/test/config.py b/test/config.py index fc1bca6c2..d308a8cc8 100644 --- a/test/config.py +++ b/test/config.py @@ -27,7 +27,7 @@ def env(name: str) -> bool: IS_CI = IS_TRAVIS or IS_GITHUB_ACTIONS or env("CI") or env("CONTINUOUS_INTEGRATION") if IS_TRAVIS and IS_GITHUB_ACTIONS: - raise EnvironmentError( + raise OSError( f"only one of IS_TRAVIS ({IS_TRAVIS}) and IS_GITHUB_ACTIONS ({IS_GITHUB_ACTIONS}) may be True at the " "same time" ) @@ -43,7 +43,7 @@ def env(name: str) -> bool: del _sys if (IS_WINDOWS and IS_LINUX) or (IS_LINUX and IS_OSX) or (IS_WINDOWS and IS_OSX): - raise EnvironmentError( + raise OSError( f"only one of IS_WINDOWS ({IS_WINDOWS}), IS_LINUX ({IS_LINUX}) and IS_OSX ({IS_OSX}) " f'can be True at the same time (platform.system() == "{platform.system()}")' ) diff --git a/test/listener_test.py b/test/listener_test.py index 0e64a266a..9b2e9e93b 100644 --- a/test/listener_test.py +++ b/test/listener_test.py @@ -88,7 +88,7 @@ def testRemoveListenerFromNotifier(self): def testPlayerTypeResolution(self): def test_filetype_to_instance(extension, klass): - print("testing: {}".format(extension)) + print(f"testing: {extension}") try: if extension == ".blf": delete = False @@ -123,7 +123,7 @@ def testPlayerTypeResolutionUnsupportedFileTypes(self): def testLoggerTypeResolution(self): def test_filetype_to_instance(extension, klass): - print("testing: {}".format(extension)) + print(f"testing: {extension}") try: with tempfile.NamedTemporaryFile( suffix=extension, delete=False diff --git a/test/logformats_test.py b/test/logformats_test.py index 435b651b6..05c8b986f 100644 --- a/test/logformats_test.py +++ b/test/logformats_test.py @@ -141,7 +141,7 @@ def _setup_instance_helper( ] assert ( "log_event" in attrs - ), "cannot check comments with this writer: {}".format(writer_constructor) + ), f"cannot check comments with this writer: {writer_constructor}" # get all test comments self.original_comments = TEST_COMMENTS if check_comments else () diff --git a/test/message_helper.py b/test/message_helper.py index 1e7989156..fe193097b 100644 --- a/test/message_helper.py +++ b/test/message_helper.py @@ -31,8 +31,8 @@ def assertMessageEqual(self, message_1, message_2): if message_1.equals(message_2, timestamp_delta=self.allowed_timestamp_delta): return elif self.preserves_channel: - print("Comparing: message 1: {!r}".format(message_1)) - print(" message 2: {!r}".format(message_2)) + print(f"Comparing: message 1: {message_1!r}") + print(f" message 2: {message_2!r}") self.fail( "messages are unequal with allowed timestamp delta {}".format( self.allowed_timestamp_delta @@ -46,8 +46,8 @@ def assertMessageEqual(self, message_1, message_2): ): return else: - print("Comparing: message 1: {!r}".format(message_1)) - print(" message 2: {!r}".format(message_2)) + print(f"Comparing: message 1: {message_1!r}") + print(f" message 2: {message_2!r}") self.fail( "messages are unequal with allowed timestamp delta {} even when ignoring channels".format( self.allowed_timestamp_delta diff --git a/test/test_interface_virtual.py b/test/test_interface_virtual.py index 009722779..94833fdcb 100644 --- a/test/test_interface_virtual.py +++ b/test/test_interface_virtual.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -# coding: utf-8 """ This module tests :meth:`can.interface.virtual`. diff --git a/test/test_load_config.py b/test/test_load_config.py index a2969b0a5..1bfba450a 100644 --- a/test/test_load_config.py +++ b/test/test_load_config.py @@ -30,9 +30,9 @@ def _gen_configration_file(self, sections): ) as tmp_config_file: content = [] for section in sections: - content.append("[{}]".format(section)) + content.append(f"[{section}]") for k, v in self.configuration[section].items(): - content.append("{} = {}".format(k, v)) + content.append(f"{k} = {v}") tmp_config_file.write("\n".join(content)) return tmp_config_file.name diff --git a/test/test_load_file_config.py b/test/test_load_file_config.py index 6b1d5a382..afedf58d4 100644 --- a/test/test_load_file_config.py +++ b/test/test_load_file_config.py @@ -30,9 +30,9 @@ def _gen_configration_file(self, sections): ) as tmp_config_file: content = [] for section in sections: - content.append("[{}]".format(section)) + content.append(f"[{section}]") for k, v in self.configuration[section].items(): - content.append("{} = {}".format(k, v)) + content.append(f"{k} = {v}") tmp_config_file.write("\n".join(content)) return tmp_config_file.name diff --git a/test/test_message_class.py b/test/test_message_class.py index 9fae7262e..4840402ff 100644 --- a/test/test_message_class.py +++ b/test/test_message_class.py @@ -87,8 +87,8 @@ def test_methods(self, **kwargs): if is_valid: self.assertEqual(len(message), kwargs["dlc"]) self.assertTrue(bool(message)) - self.assertGreater(len("{}".format(message)), 0) - _ = "{}".format(message) + self.assertGreater(len(f"{message}"), 0) + _ = f"{message}" with self.assertRaises(Exception): _ = "{somespec}".format( message diff --git a/test/test_message_sync.py b/test/test_message_sync.py index 8750dd416..7552915e7 100644 --- a/test/test_message_sync.py +++ b/test/test_message_sync.py @@ -87,7 +87,7 @@ def test_skip(self): # the handling of the messages itself also takes some time: # ~0.001 s/message on a ThinkPad T560 laptop (Ubuntu 18.04, i5-6200U) - assert 0 < took < inc(len(messages) * (0.005 + 0.003)), "took: {}s".format(took) + assert 0 < took < inc(len(messages) * (0.005 + 0.003)), f"took: {took}s" self.assertMessagesEqual(messages, collected) diff --git a/test/test_viewer.py b/test/test_viewer.py index 5633a3bc1..baef10bda 100644 --- a/test/test_viewer.py +++ b/test/test_viewer.py @@ -302,7 +302,7 @@ def pack_data( return struct_t.pack(*data) else: - raise ValueError("Unknown command: 0x{:02X}".format(cmd)) + raise ValueError(f"Unknown command: 0x{cmd:02X}") def test_pack_unpack(self): CANOPEN_TPDO1 = 0x180