Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 194 additions & 116 deletions can/interfaces/vector/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ def __init__(
If the bus could not be set up.
This may or may not be a :class:`~can.interfaces.vector.VectorInitializationError`.
"""
if os.name != "nt" and not kwargs.get("_testing", False):
self.__testing = kwargs.get("_testing", False)
if os.name != "nt" and not self.__testing:
raise CanInterfaceNotImplementedError(
f"The Vector interface is only supported on Windows, "
f'but you are running "{os.name}"'
Expand Down Expand Up @@ -232,66 +233,20 @@ def __init__(

# set CAN settings
for channel in self.channels:
if self._has_init_access(channel):
if fd:
self._set_bitrate_canfd(
channel=channel,
bitrate=bitrate,
data_bitrate=data_bitrate,
sjw_abr=sjw_abr,
tseg1_abr=tseg1_abr,
tseg2_abr=tseg2_abr,
sjw_dbr=sjw_dbr,
tseg1_dbr=tseg1_dbr,
tseg2_dbr=tseg2_dbr,
)
elif bitrate:
self._set_bitrate_can(channel=channel, bitrate=bitrate)

# Check CAN settings
for channel in self.channels:
if kwargs.get("_testing", False):
# avoid check if xldriver is mocked for testing
break

bus_params = self._read_bus_params(channel)
if fd:
_canfd = bus_params.canfd
if not all(
[
bus_params.bus_type is xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
_canfd.can_op_mode
& xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CANFD,
_canfd.bitrate == bitrate if bitrate else True,
_canfd.sjw_abr == sjw_abr if bitrate else True,
_canfd.tseg1_abr == tseg1_abr if bitrate else True,
_canfd.tseg2_abr == tseg2_abr if bitrate else True,
_canfd.data_bitrate == data_bitrate if data_bitrate else True,
_canfd.sjw_dbr == sjw_dbr if data_bitrate else True,
_canfd.tseg1_dbr == tseg1_dbr if data_bitrate else True,
_canfd.tseg2_dbr == tseg2_dbr if data_bitrate else True,
]
):
raise CanInitializationError(
f"The requested CAN FD settings could not be set for channel {channel}. "
f"Another application might have set incompatible settings. "
f"These are the currently active settings: {_canfd._asdict()}"
)
else:
_can = bus_params.can
if not all(
[
bus_params.bus_type is xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
_can.can_op_mode
& xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CAN20,
_can.bitrate == bitrate if bitrate else True,
]
):
raise CanInitializationError(
f"The requested CAN settings could not be set for channel {channel}. "
f"Another application might have set incompatible settings. "
f"These are the currently active settings: {_can._asdict()}"
)
self._set_bitrate_canfd(
channel=channel,
bitrate=bitrate,
data_bitrate=data_bitrate,
sjw_abr=sjw_abr,
tseg1_abr=tseg1_abr,
tseg2_abr=tseg2_abr,
sjw_dbr=sjw_dbr,
tseg1_dbr=tseg1_dbr,
tseg2_dbr=tseg2_dbr,
)
elif bitrate:
self._set_bitrate_can(channel=channel, bitrate=bitrate)

# Enable/disable TX receipts
tx_receipts = 1 if receive_own_messages else 0
Expand Down Expand Up @@ -422,32 +377,85 @@ def _set_bitrate_can(
)

# set parameters if channel has init access
if any(kwargs):
chip_params = xlclass.XLchipParams()
chip_params.bitRate = bitrate
chip_params.sjw = sjw
chip_params.tseg1 = tseg1
chip_params.tseg2 = tseg2
chip_params.sam = sam
self.xldriver.xlCanSetChannelParams(
self.port_handle,
self.channel_masks[channel],
chip_params,
if self._has_init_access(channel):
if any(kwargs):
chip_params = xlclass.XLchipParams()
chip_params.bitRate = bitrate
chip_params.sjw = sjw
chip_params.tseg1 = tseg1
chip_params.tseg2 = tseg2
chip_params.sam = sam
self.xldriver.xlCanSetChannelParams(
self.port_handle,
self.channel_masks[channel],
chip_params,
)
LOG.info(
"xlCanSetChannelParams: baudr.=%u, sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u",
chip_params.bitRate,
chip_params.sjw,
chip_params.tseg1,
chip_params.tseg2,
)
else:
self.xldriver.xlCanSetChannelBitrate(
self.port_handle,
self.channel_masks[channel],
bitrate,
)
LOG.info("xlCanSetChannelBitrate: baudr.=%u", bitrate)

if self.__testing:
return

# Compare requested CAN settings to active settings
bus_params = self._read_bus_params(channel)
settings_acceptable = True

# check bus type
settings_acceptable &= (
bus_params.bus_type is xldefine.XL_BusTypes.XL_BUS_TYPE_CAN
)

# check CAN operation mode. For CANcaseXL can_op_mode remains 0
if bus_params.can.can_op_mode != 0:
settings_acceptable &= bool(
bus_params.can.can_op_mode
& xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CAN20
)
LOG.info(
"xlCanSetChannelParams: baudr.=%u, sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u",
chip_params.bitRate,
chip_params.sjw,
chip_params.tseg1,
chip_params.tseg2,

# check bitrate
settings_acceptable &= abs(bus_params.can.bitrate - bitrate) < bitrate / 256

# check sample point
if all(kwargs):
requested_sample_point = (
100
* (1 + tseg1) # type: ignore[operator]
/ (1 + tseg1 + tseg2) # type: ignore[operator]
)
else:
self.xldriver.xlCanSetChannelBitrate(
self.port_handle,
self.channel_masks[channel],
bitrate,
actual_sample_point = (
100
* (1 + bus_params.can.tseg1)
/ (1 + bus_params.can.tseg1 + bus_params.can.tseg2)
)
settings_acceptable &= (
abs(actual_sample_point - requested_sample_point)
< 1.0 # 1 percent threshold
)

if not settings_acceptable:
active_settings = ", ".join(
[
f"{key}: {getattr(val, 'name', val)}" # print int or Enum/Flag name
for key, val in bus_params.can._asdict().items()
]
)
raise CanInitializationError(
f"The requested CAN settings could not be set for channel {channel}. "
f"Another application might have set incompatible settings. "
f"These are the currently active settings: {active_settings}"
)
LOG.info("xlCanSetChannelBitrate: baudr.=%u", bitrate)

def _set_bitrate_canfd(
self,
Expand All @@ -462,42 +470,112 @@ def _set_bitrate_canfd(
tseg2_dbr: int = 3,
) -> None:
# set parameters if channel has init access
canfd_conf = xlclass.XLcanFdConf()
if bitrate:
canfd_conf.arbitrationBitRate = int(bitrate)
else:
canfd_conf.arbitrationBitRate = 500_000
canfd_conf.sjwAbr = int(sjw_abr)
canfd_conf.tseg1Abr = int(tseg1_abr)
canfd_conf.tseg2Abr = int(tseg2_abr)
if data_bitrate:
canfd_conf.dataBitRate = int(data_bitrate)
else:
canfd_conf.dataBitRate = int(canfd_conf.arbitrationBitRate)
canfd_conf.sjwDbr = int(sjw_dbr)
canfd_conf.tseg1Dbr = int(tseg1_dbr)
canfd_conf.tseg2Dbr = int(tseg2_dbr)
self.xldriver.xlCanFdSetConfiguration(
self.port_handle, self.channel_masks[channel], canfd_conf
)
LOG.info(
"xlCanFdSetConfiguration.: ABaudr.=%u, DBaudr.=%u",
canfd_conf.arbitrationBitRate,
canfd_conf.dataBitRate,
)
LOG.info(
"xlCanFdSetConfiguration.: sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u",
canfd_conf.sjwAbr,
canfd_conf.tseg1Abr,
canfd_conf.tseg2Abr,
if self._has_init_access(channel):
canfd_conf = xlclass.XLcanFdConf()
if bitrate:
canfd_conf.arbitrationBitRate = int(bitrate)
else:
canfd_conf.arbitrationBitRate = 500_000
canfd_conf.sjwAbr = int(sjw_abr)
canfd_conf.tseg1Abr = int(tseg1_abr)
canfd_conf.tseg2Abr = int(tseg2_abr)
if data_bitrate:
canfd_conf.dataBitRate = int(data_bitrate)
else:
canfd_conf.dataBitRate = int(canfd_conf.arbitrationBitRate)
canfd_conf.sjwDbr = int(sjw_dbr)
canfd_conf.tseg1Dbr = int(tseg1_dbr)
canfd_conf.tseg2Dbr = int(tseg2_dbr)
self.xldriver.xlCanFdSetConfiguration(
self.port_handle, self.channel_masks[channel], canfd_conf
)
LOG.info(
"xlCanFdSetConfiguration.: ABaudr.=%u, DBaudr.=%u",
canfd_conf.arbitrationBitRate,
canfd_conf.dataBitRate,
)
LOG.info(
"xlCanFdSetConfiguration.: sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u",
canfd_conf.sjwAbr,
canfd_conf.tseg1Abr,
canfd_conf.tseg2Abr,
)
LOG.info(
"xlCanFdSetConfiguration.: sjwDbr=%u, tseg1Dbr=%u, tseg2Dbr=%u",
canfd_conf.sjwDbr,
canfd_conf.tseg1Dbr,
canfd_conf.tseg2Dbr,
)

if self.__testing:
return

# Compare requested CAN settings to active settings
bus_params = self._read_bus_params(channel)
settings_acceptable = True

# check bus type
settings_acceptable &= (
bus_params.bus_type is xldefine.XL_BusTypes.XL_BUS_TYPE_CAN
)
LOG.info(
"xlCanFdSetConfiguration.: sjwDbr=%u, tseg1Dbr=%u, tseg2Dbr=%u",
canfd_conf.sjwDbr,
canfd_conf.tseg1Dbr,
canfd_conf.tseg2Dbr,

# check CAN operation mode
settings_acceptable &= bool(
bus_params.canfd.can_op_mode
& xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CANFD
)

# check bitrates
if bitrate:
settings_acceptable &= (
abs(bus_params.canfd.bitrate - bitrate) < bitrate / 256
)
if data_bitrate:
settings_acceptable &= (
abs(bus_params.canfd.data_bitrate - data_bitrate) < data_bitrate / 256
)

# check sample points
if bitrate:
requested_nom_sample_point = (
100 * (1 + tseg1_abr) / (1 + tseg1_abr + tseg2_abr)
)
actual_nom_sample_point = (
100
* (1 + bus_params.canfd.tseg1_abr)
/ (1 + bus_params.canfd.tseg1_abr + bus_params.canfd.tseg2_abr)
)
settings_acceptable &= (
abs(actual_nom_sample_point - requested_nom_sample_point)
< 1.0 # 1 percent threshold
)
if data_bitrate:
requested_data_sample_point = (
100 * (1 + tseg1_dbr) / (1 + tseg1_dbr + tseg2_dbr)
)
actual_data_sample_point = (
100
* (1 + bus_params.canfd.tseg1_dbr)
/ (1 + bus_params.canfd.tseg1_dbr + bus_params.canfd.tseg2_dbr)
)
settings_acceptable &= (
abs(actual_data_sample_point - requested_data_sample_point)
< 1.0 # 1 percent threshold
)

if not settings_acceptable:
active_settings = ", ".join(
[
f"{key}: {getattr(val, 'name', val)}" # print int or Enum/Flag name
for key, val in bus_params.canfd._asdict().items()
]
)
raise CanInitializationError(
f"The requested CAN FD settings could not be set for channel {channel}. "
f"Another application might have set incompatible settings. "
f"These are the currently active settings: {active_settings}."
)

def _apply_filters(self, filters: Optional[CanFilters]) -> None:
if filters:
# Only up to one filter per ID type allowed
Expand Down
4 changes: 2 additions & 2 deletions test/test_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ def test_reset_mocked(mock_xldriver) -> None:


@pytest.mark.skipif(not XLDRIVER_FOUND, reason="Vector XL API is unavailable")
def test_reset_mocked() -> None:
def test_reset() -> None:
bus = canlib.VectorBus(
channel=0, serial=_find_virtual_can_serial(), interface="vector"
)
Expand Down Expand Up @@ -696,7 +696,7 @@ def _find_virtual_can_serial() -> int:
for i in range(xl_driver_config.channelCount):
xl_channel_config: xlclass.XLchannelConfig = xl_driver_config.channel[i]

if xl_channel_config.transceiverName.decode() == "Virtual CAN":
if "Virtual CAN" in xl_channel_config.transceiverName.decode():
return xl_channel_config.serialNumber

raise LookupError("Vector virtual CAN not found")
Expand Down