9
9
"""
10
10
11
11
import logging
12
- import sys
13
- import time
14
- import struct
12
+ import os
13
+ from types import ModuleType
14
+ from typing import Optional
15
15
16
16
from can import BusABC , Message
17
- from ..exceptions import CanInitializationError , CanOperationError
18
-
17
+ from ..exceptions import (
18
+ CanInitializationError ,
19
+ CanOperationError ,
20
+ CanInterfaceNotImplementedError ,
21
+ )
19
22
20
23
logger = logging .getLogger (__name__ )
21
24
22
- if sys .platform == "win32" :
23
- try :
24
- from nixnet import (
25
- session ,
26
- types ,
27
- constants ,
28
- errors ,
29
- system ,
30
- database ,
31
- XnetError ,
32
- )
33
- except ImportError as error :
34
- raise ImportError ("NIXNET python module cannot be loaded" ) from error
35
- else :
36
- raise NotImplementedError ("NiXNET only supported on Win32 platforms" )
25
+ nixnet : Optional [ModuleType ] = None
26
+ try :
27
+ import nixnet # type: ignore
28
+ except Exception as exc :
29
+ logger .warning ("Could not import nixnet: %s" , exc )
37
30
38
31
39
32
class NiXNETcanBus (BusABC ):
@@ -68,9 +61,18 @@ def __init__(
68
61
``is_error_frame`` set to True and ``arbitration_id`` will identify
69
62
the error (default True)
70
63
71
- :raises can.exceptions.CanInitializationError:
64
+ :raises ~ can.exceptions.CanInitializationError:
72
65
If starting communication fails
73
66
"""
67
+ if os .name != "nt" and not kwargs .get ("_testing" , False ):
68
+ raise CanInterfaceNotImplementedError (
69
+ f"The NI-XNET interface is only supported on Windows, "
70
+ f'but you are running "{ os .name } "'
71
+ )
72
+
73
+ if nixnet is None :
74
+ raise CanInterfaceNotImplementedError ("The NI-XNET API has not been loaded" )
75
+
74
76
self ._rx_queue = []
75
77
self .channel = channel
76
78
self .channel_info = "NI-XNET: " + channel
@@ -88,10 +90,10 @@ def __init__(
88
90
89
91
# We need two sessions for this application, one to send frames and another to receive them
90
92
91
- self .__session_send = session .FrameOutStreamSession (
93
+ self .__session_send = nixnet . session .FrameOutStreamSession (
92
94
channel , database_name = database_name
93
95
)
94
- self .__session_receive = session .FrameInStreamSession (
96
+ self .__session_receive = nixnet . session .FrameInStreamSession (
95
97
channel , database_name = database_name
96
98
)
97
99
@@ -110,15 +112,15 @@ def __init__(
110
112
self .__session_receive .intf .can_fd_baud_rate = fd_bitrate
111
113
112
114
if can_termination :
113
- self .__session_send .intf .can_term = constants .CanTerm .ON
114
- self .__session_receive .intf .can_term = constants .CanTerm .ON
115
+ self .__session_send .intf .can_term = nixnet . constants .CanTerm .ON
116
+ self .__session_receive .intf .can_term = nixnet . constants .CanTerm .ON
115
117
116
118
self .__session_receive .queue_size = 512
117
119
# Once that all the parameters have been restarted, we start the sessions
118
120
self .__session_send .start ()
119
121
self .__session_receive .start ()
120
122
121
- except errors .XnetError as error :
123
+ except nixnet . errors .XnetError as error :
122
124
raise CanInitializationError (
123
125
f"{ error .args [0 ]} ({ error .error_type } )" , error .error_code
124
126
) from None
@@ -145,13 +147,15 @@ def _recv_internal(self, timeout):
145
147
msg = Message (
146
148
timestamp = can_frame .timestamp / 10000000.0 - 11644473600 ,
147
149
channel = self .channel ,
148
- is_remote_frame = can_frame .type == constants .FrameType .CAN_REMOTE ,
149
- is_error_frame = can_frame .type == constants .FrameType .CAN_BUS_ERROR ,
150
+ is_remote_frame = can_frame .type == nixnet .constants .FrameType .CAN_REMOTE ,
151
+ is_error_frame = can_frame .type
152
+ == nixnet .constants .FrameType .CAN_BUS_ERROR ,
150
153
is_fd = (
151
- can_frame .type == constants .FrameType .CANFD_DATA
152
- or can_frame .type == constants .FrameType .CANFDBRS_DATA
154
+ can_frame .type == nixnet . constants .FrameType .CANFD_DATA
155
+ or can_frame .type == nixnet . constants .FrameType .CANFDBRS_DATA
153
156
),
154
- bitrate_switch = can_frame .type == constants .FrameType .CANFDBRS_DATA ,
157
+ bitrate_switch = can_frame .type
158
+ == nixnet .constants .FrameType .CANFDBRS_DATA ,
155
159
is_extended_id = can_frame .identifier .extended ,
156
160
# Get identifier from CanIdentifier structure
157
161
arbitration_id = can_frame .identifier .identifier ,
@@ -178,29 +182,29 @@ def send(self, msg, timeout=None):
178
182
It does not wait for message to be ACKed currently.
179
183
"""
180
184
if timeout is None :
181
- timeout = constants .TIMEOUT_INFINITE
185
+ timeout = nixnet . constants .TIMEOUT_INFINITE
182
186
183
187
if msg .is_remote_frame :
184
- type_message = constants .FrameType .CAN_REMOTE
188
+ type_message = nixnet . constants .FrameType .CAN_REMOTE
185
189
elif msg .is_error_frame :
186
- type_message = constants .FrameType .CAN_BUS_ERROR
190
+ type_message = nixnet . constants .FrameType .CAN_BUS_ERROR
187
191
elif msg .is_fd :
188
192
if msg .bitrate_switch :
189
- type_message = constants .FrameType .CANFDBRS_DATA
193
+ type_message = nixnet . constants .FrameType .CANFDBRS_DATA
190
194
else :
191
- type_message = constants .FrameType .CANFD_DATA
195
+ type_message = nixnet . constants .FrameType .CANFD_DATA
192
196
else :
193
- type_message = constants .FrameType .CAN_DATA
197
+ type_message = nixnet . constants .FrameType .CAN_DATA
194
198
195
- can_frame = types .CanFrame (
196
- types .CanIdentifier (msg .arbitration_id , msg .is_extended_id ),
199
+ can_frame = nixnet . types .CanFrame (
200
+ nixnet . types .CanIdentifier (msg .arbitration_id , msg .is_extended_id ),
197
201
type = type_message ,
198
202
payload = msg .data ,
199
203
)
200
204
201
205
try :
202
206
self .__session_send .frames .write ([can_frame ], timeout )
203
- except errors .XnetError as error :
207
+ except nixnet . errors .XnetError as error :
204
208
raise CanOperationError (
205
209
f"{ error .args [0 ]} ({ error .error_type } )" , error .error_code
206
210
) from None
@@ -237,7 +241,7 @@ def _detect_available_configs():
237
241
configs = []
238
242
239
243
try :
240
- with system .System () as nixnet_system :
244
+ with nixnet . system .System () as nixnet_system :
241
245
for interface in nixnet_system .intf_refs_can :
242
246
cahnnel = str (interface )
243
247
logger .debug (
@@ -248,10 +252,10 @@ def _detect_available_configs():
248
252
"interface" : "nixnet" ,
249
253
"channel" : cahnnel ,
250
254
"can_term_available" : interface .can_term_cap
251
- == constants .CanTermCap .YES ,
255
+ == nixnet . constants .CanTermCap .YES ,
252
256
}
253
257
)
254
- except XnetError as error :
258
+ except Exception as error :
255
259
logger .debug ("An error occured while searching for configs: %s" , str (error ))
256
260
257
261
return configs
0 commit comments