Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eds bugfixing #830

Merged
merged 22 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
# os: [ubuntu-latest]
experimental: [false]
# python-version: ["2.7","3.4","3.5","3.6","3.7","3.8","3.9","3.10","3.11","3.12"]
python-version: ["3.7","3.8","3.9","3.10","3.11","3.12"]
python-version: ["3.8","3.9","3.10","3.11","3.12", "3.13"]
fail-fast: false
steps:
- uses: actions/checkout@v4
Expand Down
1 change: 0 additions & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ skip_branch_with_pr: true

environment:
matrix:
- TOXENV: py37
- TOXENV: py38
- TOXENV: py39
- TOXENV: py310
Expand Down
4 changes: 3 additions & 1 deletion src/canmatrix/formats/dbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ def dump(in_db, f, **options):
# remove "-" from frame names
if compatibility:
frame.name = re.sub("[^A-Za-z0-9]", whitespace_replacement, frame.name)
if frame.name[0].isdigit():
frame.name = "_" + frame.name

duplicate_signal_totals = collections.Counter(normalized_names.values())
duplicate_signal_counter = collections.Counter() # type: typing.Counter[str]
Expand Down Expand Up @@ -459,7 +461,7 @@ def dump(in_db, f, **options):
if frame.is_complex_multiplexed:
for signal in frame.signals:
if signal.muxer_for_signal is not None:
f.write(("SG_MUL_VAL_ %d %s %s " % (frame.arbitration_id.to_compound_integer(), signal.name, signal.muxer_for_signal)).encode(dbc_export_encoding, ignore_encoding_errors))
f.write(("SG_MUL_VAL_ %d %s %s " % (frame.arbitration_id.to_compound_integer(), output_names[frame][signal], signal.muxer_for_signal)).encode(dbc_export_encoding, ignore_encoding_errors))
f.write((", ".join(["%d-%d" % (a, b) for a, b in signal.mux_val_grp])).encode(dbc_export_encoding, ignore_encoding_errors))

f.write(";\n".encode(dbc_export_encoding, ignore_encoding_errors))
Expand Down
295 changes: 196 additions & 99 deletions src/canmatrix/formats/eds.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import canopen.objectdictionary.eds
import canopen.objectdictionary.datatypes
import codecs

import copy
import re
import math

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -33,129 +35,224 @@
0x82: 'Reset Communication'}


def get_signals(parent_object, signal_receiver):
signals = []
position = 0
for sub in range(1, len(parent_object)):
name = parent_object[sub].name
size = datatype_mapping[parent_object[sub].data_type][1]
unsigned = "UNSIGNED" in datatype_mapping[parent_object[sub].data_type][0]
signal = canmatrix.Signal(name = name, receivers=[signal_receiver], size=size, start_bit = position, is_signed = not unsigned)
position += size
signals.append(signal)
return signals
def name_cleanup(in_str):
rets_str = re.sub("[^A-Za-z0-9]", '_', in_str)
return rets_str

def get_bit_length(data_type_code):
return datatype_mapping[data_type_code][1]
def get_data_type_name(data_type_code):
return datatype_mapping[data_type_code][0]

def format_index(index, subindex):
return f"Index: 0x{index:04X}{subindex:02X}"

def load(f, **options): # type: (typing.IO, **typing.Any) -> canmatrix.CanMatrix
eds_import_encoding = options.get("edsImportEncoding", 'iso-8859-1')
node_id = options.get("eds_node_id", 1)
node_id = options.get("eds_node_id", 0)
generic = options.get("generic", False)
fp = codecs.getreader(eds_import_encoding)(f)
od = canopen.objectdictionary.eds.import_eds(fp, node_id)
db = canmatrix.CanMatrix()
signal_group_counter = 1

node_name = od.device_information.product_name
if len(node_name) == 0:
node_name = "DUMMY"
plc_name = "PLC"
if generic is True:
nm_out = canmatrix.canmatrix.Frame(name="NMT_Out_Request", size=2, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0), transmitters=[plc_name])
sig_cmd = canmatrix.canmatrix.Signal(name="nmt_CMD", size=8, start_bit = 0, receivers=[node_name])
for val, val_name in cmd_values.items():
sig_cmd.add_values(val, val_name)
nm_out.add_signal(sig_cmd)
nm_out.add_signal(canmatrix.canmatrix.Signal(name="Node_ID", size=8, start_bit = 8, receivers=[node_name]))
db.add_frame(nm_out)

nm_responde = canmatrix.canmatrix.Frame(name="NMT_Response_Frame_In", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x700+node_id), transmitters=[node_name])
response_sig1 = canmatrix.canmatrix.Signal(name="NMT_Response_1", size=32, start_bit = 0, receivers=[plc_name])
nm_responde.add_signal(response_sig1)
response_sig2 = canmatrix.canmatrix.Signal(name="NMT_Response_1", size=32, start_bit = 32, receivers=[plc_name])
nm_responde.add_signal(response_sig2)
db.add_frame(nm_responde)

sync = canmatrix.canmatrix.Frame(name="SYNC", size=0, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x80), transmitters=[plc_name])
db.add_frame(sync)

emcy = canmatrix.canmatrix.Frame(name="EMCY", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x80+node_id), transmitters=[node_name])
emcy.add_signal(canmatrix.canmatrix.Signal(name="EMCY_Error_Code", size=16, start_bit=0, receivers=[plc_name]))
emcy.add_signal(canmatrix.canmatrix.Signal(name="E_Reg", size=8, start_bit=16, receivers=[plc_name]))
emcy.add_signal(canmatrix.canmatrix.Signal(name="E_Number", size=8, start_bit=24, receivers=[plc_name]))
db.add_frame(emcy)

nm_out = canmatrix.canmatrix.Frame(name="NMT_Out_Request", size=2, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0), transmitters=[plc_name])
sig_cmd = canmatrix.canmatrix.Signal(name="nmt_CMD", size=8, start_bit = 0, receivers=[node_name])
for val, val_name in cmd_values.items():
sig_cmd.add_values(val, val_name)
nm_out.add_signal(sig_cmd)
nm_out.add_signal(canmatrix.canmatrix.Signal(name="Node_ID", size=8, start_bit = 8, receivers=[node_name]))
db.add_frame(nm_out)

nm_responde = canmatrix.canmatrix.Frame(name="NMT_Response_Frame_In", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x700+node_id), transmitters=[node_name])
response_sig1 = canmatrix.canmatrix.Signal(name="NMT_Response_1", size=32, start_bit = 0, receivers=[plc_name])
nm_responde.add_signal(response_sig1)
response_sig2 = canmatrix.canmatrix.Signal(name="NMT_Response_1", size=32, start_bit = 32, receivers=[plc_name])
nm_responde.add_signal(response_sig2)
db.add_frame(nm_responde)

sync = canmatrix.canmatrix.Frame(name="SYNC", size=0, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x80), transmitters=[plc_name])
db.add_frame(sync)

emcy = canmatrix.canmatrix.Frame(name="EMCY", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x80+node_id), transmitters=[node_name])
emcy.add_signal(canmatrix.canmatrix.Signal(name="EMCY_Error_Code", size=16, start_bit=0, receivers=[plc_name]))
emcy.add_signal(canmatrix.canmatrix.Signal(name="E_Reg", size=8, start_bit=16, receivers=[plc_name]))
emcy.add_signal(canmatrix.canmatrix.Signal(name="E_Number", size=8, start_bit=24, receivers=[plc_name]))
db.add_frame(emcy)

sdo_down = canmatrix.canmatrix.Frame(name="SDO_download", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x600+node_id), transmitters=[node_name])
sig_cmd = canmatrix.canmatrix.Signal(name="sdo_down_CMD", size=8, start_bit=0, receivers=[plc_name])
sdo_down = canmatrix.canmatrix.Frame(name="SDO_receive", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x600+node_id), transmitters=[node_name])
sig_cmd = canmatrix.canmatrix.Signal(name="CCS", size=3, start_bit=5, receivers=[plc_name], is_signed=False)
sig_cmd.is_multiplexer = True
sdo_down.is_complex_multiplexed = True
sig_cmd.multiplex = "Multiplexor"
sdo_down.add_signal(sig_cmd)
sdo_down.add_signal(canmatrix.canmatrix.Signal(name="sdo_down_IDX", size=16, start_bit=8, receivers=[plc_name]))
sdo_down.add_signal(canmatrix.canmatrix.Signal(name="sdo_down_SUBIDX", size=8, start_bit=24, receivers=[plc_name]))
sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data8", size=8, start_bit=32, receivers=[plc_name], multiplex=0x2f))
sig_cmd.add_values(0x2f, "8_bytes")
sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data16", size=16, start_bit=32, receivers=[plc_name], multiplex=0x2b))
sig_cmd.add_values(0x2b, "16_bytes")
sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data24", size=24, start_bit=32, receivers=[plc_name], multiplex=0x27))
sig_cmd.add_values(0x27, "3_bytes")
sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data32", size=32, start_bit=32, receivers=[plc_name], multiplex=0x23))
sig_cmd.add_values(0x23, "4_bytes")
sig_cmd.add_values(0x40, "upload_request")
index = canmatrix.canmatrix.Signal(name="IDX", size=24, start_bit=8, receivers=[plc_name])
index.multiplex = "Multiplexor"
index.is_multiplexer = True
index.mux_val = 1
index.mux_val_grp.append([ 2, 2])
index.muxer_for_signal = "CCS"
sdo_down.add_signal(index)
db.add_frame(sdo_down)

sdo_up = canmatrix.canmatrix.Frame(name="SDO_upload", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x580+node_id), transmitters=[plc_name])
sig_cmd = canmatrix.canmatrix.Signal(name="sdo_state", size=8, start_bit=0, receivers=[node_name])
sdo_up = canmatrix.canmatrix.Frame(name="SDO_transmit", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x580+node_id), transmitters=[plc_name])
sig_cmd = canmatrix.canmatrix.Signal(name="SCS", size=3, start_bit=5, is_signed=False)
sig_cmd.is_multiplexer = True
sdo_up.is_complex_multiplexed = True
sig_cmd.multiplex = "Multiplexor"
sdo_up.add_signal(sig_cmd)
sdo_up.add_signal(canmatrix.canmatrix.Signal(name="sdo_uo_IDX", size=16, start_bit=8, receivers=[node_name]))
sdo_up.add_signal(canmatrix.canmatrix.Signal(name="sdo_up_SUBIDX", size=8, start_bit=24, receivers=[node_name]))
sdo_up.add_signal(canmatrix.canmatrix.Signal(name="error_code", size=32, start_bit=32, receivers=[node_name], multiplex=0x80))
sig_cmd.add_values(0x80, "upload_error")

sdo_up.add_signal(canmatrix.canmatrix.Signal(name="data8", size=8, start_bit=32, receivers=[plc_name], multiplex=0x4f))
sig_cmd.add_values(0x2f, "8_bytes")
sdo_up.add_signal(canmatrix.canmatrix.Signal(name="data16", size=16, start_bit=32, receivers=[plc_name], multiplex=0x4b))
sig_cmd.add_values(0x2b, "16_bytes")
sdo_up.add_signal(canmatrix.canmatrix.Signal(name="data24", size=24, start_bit=32, receivers=[plc_name], multiplex=0x47))
sig_cmd.add_values(0x27, "3_bytes")
sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data32", size=32, start_bit=32, receivers=[plc_name], multiplex=0x43))
sig_cmd.add_values(0x23, "4_bytes")

index = canmatrix.canmatrix.Signal(name="IDX", size=24, start_bit=8)
index.multiplex = "Multiplexor"
index.is_multiplexer = True
index.mux_val = 2
index.mux_val_grp.append([ 2, 2])
index.muxer_for_signal = "SCS"
sdo_up.add_signal(index)
db.add_frame(sdo_up)


# RX Can-Ids ...
for index in range(0x1400, 0x1408):
if index in od:
# store canid in object...
od[index+0x200].canid = od[index][1].default
for obj in od.values():
if isinstance(obj, canopen.objectdictionary.ODVariable):
subindex = 0
combined_value = int(f"{subindex:02X}{obj.index:04X}", 16)
signal_name = name_cleanup(obj.name)
size = get_bit_length(obj.data_type)
if size == 0:
logger.info("Ignoring " + signal_name + " size 0")
continue
new_sig = canmatrix.canmatrix.Signal(name=signal_name, size=size, start_bit=32, receivers=[plc_name])
datatype_name = get_data_type_name(obj.data_type)
if "UNSIGNED" in datatype_name:
new_sig.is_signed = False
new_sig.mux_val = combined_value
new_sig.mux_val_grp.append([ combined_value, combined_value])
new_sig.muxer_for_signal = "IDX"
sdo_down.add_signal(new_sig)
up_sig = copy.deepcopy(new_sig)
up_sig.muxer_for_signal = "IDX"

##RX PDOs
for index in range(0x1600, 0x1608):
if index in od:
pdo_name = od[index].name.replace(" ", "_")
frame = canmatrix.canmatrix.Frame(name=pdo_name, transmitters=[plc_name])
db.add_frame(frame)
frame_id = od[index].canid
up_sig.receivers = []
sdo_up.add_signal(up_sig)
elif isinstance(obj, canopen.objectdictionary.ODRecord):
members = []
for subobj in obj.values():
combined_value = int(f"{subobj.subindex:02X}{obj.index:04X}", 16)
signal_name = name_cleanup(subobj.name)
size = get_bit_length(subobj.data_type)
if size == 0:
logger.info("Ignoring " + signal_name + " size 0")
continue

new_sig = canmatrix.canmatrix.Signal(name=signal_name, size=size, start_bit=32, receivers=[plc_name])
datatype_name = get_data_type_name(subobj.data_type)
if "UNSIGNED" in datatype_name:
new_sig.is_signed = False
new_sig.mux_val = combined_value
new_sig.mux_val_grp.append([ combined_value, combined_value])
new_sig.muxer_for_signal = "IDX"
sdo_down.add_signal(new_sig)
up_sig = copy.deepcopy(new_sig)
up_sig.muxer_for_signal = "IDX"

up_sig.receivers = []
sdo_up.add_signal(up_sig)
if len(members) > 0:
sdo_down.add_signal_group("SG_R_" + name_cleanup(obj.name), signal_group_counter, members)
signal_group_counter += 1

elif isinstance(obj, canopen.objectdictionary.ODArray):
members = []
for subobj in obj.values():
combined_value = int(f"{subobj.subindex:02X}{obj.index:04X}", 16)
signal_name = name_cleanup(subobj.name)
size = get_bit_length(subobj.data_type)
if size == 0:
logger.info("Ignoring " + signal_name + " size 0")
continue

new_sig = canmatrix.canmatrix.Signal(name=signal_name, size=size, start_bit=32, receivers=[plc_name])
datatype_name = get_data_type_name(subobj.data_type)
if "UNSIGNED" in datatype_name:
new_sig.is_signed = False
new_sig.mux_val = combined_value
new_sig.mux_val_grp.append([ combined_value, combined_value])
new_sig.muxer_for_signal = "IDX"
sdo_down.add_signal(new_sig)
members.append(signal_name)
up_sig = copy.deepcopy(new_sig)
up_sig.muxer_for_signal = "IDX"
up_sig.receivers = []
sdo_up.add_signal(up_sig)
if len(members) > 0:
sdo_down.add_signal_group("SG_A_" + name_cleanup(obj.name), signal_group_counter, members)
signal_group_counter += 1


for start_index, rx_tx_config in {0x1400 : {"transmitter": [], "receiver": [node_name]}, 0x1800: {"transmitter": [node_name], "receiver": []}}.items():
for comm_index in range(start_index, start_index + 0x8):
map_index = comm_index + 0x200
if comm_index not in od or map_index not in od:
continue

# Retrieve the COB-ID
comm_param = od[comm_index] #od.get(comm_index)
cob_id_entry = comm_param.get(1) if comm_param else None
if not cob_id_entry or cob_id_entry.default is None:
# print(f" Warning: No valid COB-ID found for {pdo_type} PDO at index 0x{comm_index:04X}. Skipping.")
continue
cob_id = cob_id_entry.default & 0x7FF
pdo_name = name_cleanup(od[comm_index].name)
frame = canmatrix.canmatrix.Frame(name=pdo_name, transmitters=rx_tx_config["transmitter"])
frame_id = cob_id
frame.arbitration_id = canmatrix.ArbitrationId(id=frame_id)
frame.size = 8
signals = get_signals(od[index], node_name)
for sig in signals:
frame.add_signal(sig)

# RT Can-Ids ...
for index in range(0x1800, 0x1808):
if index in od:
# store canid in object...
od[index+0x200].canid = od[index][1].default - 0x40000000

#TX
for index in range(0x1A00, 0x1A08):
if index in od:
frame = canmatrix.canmatrix.Frame(name=pdo_name, transmitters=[node_name])
db.add_frame(frame)
pdo_name = od[index].name.replace(" ", "_")
frame_id = od[index].canid
frame.arbitration_id = canmatrix.ArbitrationId(id=frame_id)
frame.size = 8
signals = get_signals(od[index], plc_name)
for sig in signals:
frame.add_signal(sig)
mapping_param = od.get(map_index)
if not mapping_param:
# print(f" Warning: No mapping parameter found for {pdo_type} PDO at index 0x{map_index:04X}.")
continue
num_entries = mapping_param[0].default if 0 in mapping_param else 0
current_bit_start = 0
for subindex in range(1, num_entries + 1):
mapping_entry = mapping_param.get(subindex)
if not mapping_entry or mapping_entry.default is None:
#print(f" Warning: Subindex {subindex} missing for mapping parameter at 0x{map_index:04X}.")
continue

# Decode the mapping entry
mapping_value = mapping_entry.default
obj_index = (mapping_value >> 16) & 0xFFFF
obj_subindex = (mapping_value >> 8) & 0xFF
bit_length = mapping_value & 0xFF

# Fetch the mapped object
mapped_obj = od.get_variable(obj_index, obj_subindex)
if not mapped_obj:
#print(f" Warning: Could not find object at Index: 0x{obj_index:04X}, Subindex: {obj_subindex}.")
current_bit_start += bit_length
continue
signal_name = name_cleanup(mapped_obj.name)
new_sig = canmatrix.Signal(signal_name, size=bit_length, start_bit=current_bit_start)
datatype_name = get_data_type_name(mapping_entry.data_type)
if "UNSIGNED" in datatype_name:
new_sig.is_signed = False
new_sig.factor = mapped_obj.factor
if mapped_obj.min is not None:
new_sig.min = mapped_obj.min
new_sig.offset = mapped_obj.min
if mapped_obj.max is not None:
new_sig.max = mapped_obj.max
new_sig.receivers = rx_tx_config["receiver"]
frame.add_signal(new_sig)
current_bit_start += bit_length
frame.size = math.ceil(current_bit_start/8)

db.update_ecu_list()
for ecu in db.ecus:
db.rename_ecu(ecu.name, name_cleanup(ecu.name))
return db
Loading