6
6
import time
7
7
from datetime import datetime
8
8
import platform
9
-
10
- from typing import Optional
9
+ from typing import Optional , Union
11
10
12
11
from packaging import version
13
12
14
- from ...message import Message
15
- from ...bus import BusABC , BusState
16
- from ...util import len2dlc , dlc2len
17
- from ...exceptions import CanError , CanOperationError , CanInitializationError
18
-
19
-
13
+ from can import (
14
+ BusABC ,
15
+ BusState ,
16
+ BitTiming ,
17
+ BitTimingFd ,
18
+ CanInitializationError ,
19
+ Message ,
20
+ CanError ,
21
+ CanOperationError ,
22
+ )
23
+ from can .util import check_or_adjust_timing_clock , dlc2len , len2dlc
20
24
from .basic import (
21
25
PCAN_BITRATES ,
22
26
PCAN_FD_PARAMETER_LIST ,
57
61
FEATURE_FD_CAPABLE ,
58
62
PCAN_DICT_STATUS ,
59
63
PCAN_BUSOFF_AUTORESET ,
64
+ TPCANBaudrate ,
60
65
)
61
66
62
-
63
67
# Set up logging
64
68
log = logging .getLogger ("can.pcan" )
65
69
89
93
90
94
HAS_EVENTS = True
91
95
except ImportError :
92
- try :
93
- # Try pywin32 package
94
- from win32event import CreateEvent
95
- from win32event import WaitForSingleObject , WAIT_OBJECT_0 , INFINITE
96
-
97
- HAS_EVENTS = True
98
- except ImportError :
99
- # Use polling instead
100
- HAS_EVENTS = False
96
+ CreateEvent , WaitForSingleObject , WAIT_OBJECT_0 , INFINITE = [None ] * 4
97
+ HAS_EVENTS = False
101
98
102
99
103
100
class PcanBus (BusABC ):
104
101
def __init__ (
105
102
self ,
106
- channel = "PCAN_USBBUS1" ,
107
- device_id = None ,
108
- state = BusState .ACTIVE ,
109
- bitrate = 500000 ,
110
- * args ,
103
+ channel : str = "PCAN_USBBUS1" ,
104
+ device_id : Optional [ int ] = None ,
105
+ state : BusState = BusState .ACTIVE ,
106
+ timing : Optional [ Union [ BitTiming , BitTimingFd ]] = None ,
107
+ bitrate : int = 500000 ,
111
108
** kwargs ,
112
109
):
113
110
"""A PCAN USB interface to CAN.
@@ -133,6 +130,18 @@ def __init__(
133
130
BusState of the channel.
134
131
Default is ACTIVE
135
132
133
+ :param timing:
134
+ An instance of :class:`~can.BitTiming` or :class:`~can.BitTimingFd`
135
+ to specify the bit timing parameters for the PCAN interface. If this parameter
136
+ is provided, it takes precedence over all other timing-related parameters.
137
+ If this parameter is not provided, the bit timing parameters can be specified
138
+ using the `bitrate` parameter for standard CAN or the `fd`, `f_clock`,
139
+ `f_clock_mhz`, `nom_brp`, `nom_tseg1`, `nom_tseg2`, `nom_sjw`, `data_brp`,
140
+ `data_tseg1`, `data_tseg2`, and `data_sjw` parameters for CAN FD.
141
+ Note that the `f_clock` value of the `timing` instance must be 8_000_000
142
+ for standard CAN or any of the following values for CAN FD: 20_000_000,
143
+ 24_000_000, 30_000_000, 40_000_000, 60_000_000, 80_000_000.
144
+
136
145
:param int bitrate:
137
146
Bitrate of channel in bit/s.
138
147
Default is 500 kbit/s.
@@ -222,8 +231,7 @@ def __init__(
222
231
raise ValueError (err_msg )
223
232
224
233
self .channel_info = str (channel )
225
- self .fd = kwargs .get ("fd" , False )
226
- pcan_bitrate = PCAN_BITRATES .get (bitrate , PCAN_BAUD_500K )
234
+ self .fd = isinstance (timing , BitTimingFd ) if timing else kwargs .get ("fd" , False )
227
235
228
236
hwtype = PCAN_TYPE_ISA
229
237
ioport = 0x02A0
@@ -241,7 +249,41 @@ def __init__(
241
249
else :
242
250
raise ValueError ("BusState must be Active or Passive" )
243
251
244
- if self .fd :
252
+ if isinstance (timing , BitTimingFd ):
253
+ valid_fd_f_clocks = [
254
+ 20_000_000 ,
255
+ 24_000_000 ,
256
+ 30_000_000 ,
257
+ 40_000_000 ,
258
+ 60_000_000 ,
259
+ 80_000_000 ,
260
+ ]
261
+ check_or_adjust_timing_clock (
262
+ timing , sorted (valid_fd_f_clocks , reverse = True )
263
+ )
264
+ self .fd_bitrate = (
265
+ f"f_clock={ timing .f_clock } ,"
266
+ f"nom_brp={ timing .nom_brp } ,"
267
+ f"nom_tseg1={ timing .nom_tseg1 } ,"
268
+ f"nom_tseg2={ timing .nom_tseg2 } ,"
269
+ f"nom_sjw={ timing .nom_sjw } ,"
270
+ f"data_brp={ timing .data_brp } ,"
271
+ f"data_tseg1={ timing .data_tseg1 } ,"
272
+ f"data_tseg2={ timing .data_tseg2 } ,"
273
+ f"data_sjw={ timing .data_sjw } "
274
+ ).encode ("ascii" )
275
+ result = self .m_objPCANBasic .InitializeFD (
276
+ self .m_PcanHandle , self .fd_bitrate
277
+ )
278
+
279
+ elif isinstance (timing , BitTiming ):
280
+ check_or_adjust_timing_clock (timing , [8_000_000 ])
281
+ pcan_bitrate = TPCANBaudrate (timing .btr0 << 8 | timing .btr1 )
282
+ result = self .m_objPCANBasic .Initialize (
283
+ self .m_PcanHandle , pcan_bitrate , hwtype , ioport , interrupt
284
+ )
285
+
286
+ elif self .fd :
245
287
f_clock_val = kwargs .get ("f_clock" , None )
246
288
if f_clock_val is None :
247
289
f_clock = "{}={}" .format ("f_clock_mhz" , kwargs .get ("f_clock_mhz" , None ))
@@ -260,6 +302,7 @@ def __init__(
260
302
self .m_PcanHandle , self .fd_bitrate
261
303
)
262
304
else :
305
+ pcan_bitrate = PCAN_BITRATES .get (bitrate , PCAN_BAUD_500K )
263
306
result = self .m_objPCANBasic .Initialize (
264
307
self .m_PcanHandle , pcan_bitrate , hwtype , ioport , interrupt
265
308
)
@@ -297,7 +340,7 @@ def __init__(
297
340
if result != PCAN_ERROR_OK :
298
341
raise PcanCanInitializationError (self ._get_formatted_error (result ))
299
342
300
- super ().__init__ (channel = channel , state = state , bitrate = bitrate , * args , * *kwargs )
343
+ super ().__init__ (channel = channel , state = state , bitrate = bitrate , ** kwargs )
301
344
302
345
def _find_channel_by_dev_id (self , device_id ):
303
346
"""
0 commit comments