Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
0200f7a
Add constructor to OdeMessageFrameMetadata for ASN.1 hex string; refa…
Michael7371 Apr 2, 2026
eea011b
Add ASN.1 validation to receiver tests by asserting that produced ASN…
Michael7371 May 8, 2026
36c2fea
update ODE TMC diagram
Michael7371 May 8, 2026
af4381d
resolve checkstyle formatting comments
Michael7371 May 8, 2026
3711e95
Update Kafka consumer configuration in application.yaml to adjust aut…
Michael7371 May 8, 2026
2f734b6
Enhance error logging in GenericReceiver to include detected message …
Michael7371 May 11, 2026
e1598af
Update Kafka configuration in docker-compose.yml and application.yaml…
Michael7371 May 13, 2026
0d564c6
Enhance UdpHexDecoder to support stripping of 1609.2 headers and impr…
Michael7371 May 13, 2026
eea0b91
Refactor UdpHexDecoder to ensure consistent casing for ASN.1 hex stri…
Michael7371 May 15, 2026
ff80952
Reverting changes to unintended files
Michael7371 May 15, 2026
550717a
resolve checkstyle comments
Michael7371 May 15, 2026
479e91f
Refactor OdeLogMetadata and OdeMsgMetadata constructors to use generi…
Michael7371 May 18, 2026
66755d5
Update receiver tests to replace "1609.3 WSMP header" with "1609.2 WS…
Michael7371 May 18, 2026
7592f0b
Update BSM and PSM receiver test files to correct expected output val…
Michael7371 May 18, 2026
aa29d7d
Refactor receiver tests to replace parameterized tests with individua…
Michael7371 May 19, 2026
bf9acd5
Remove commented-out local UDP_IP assignment in udpsender_generic.py …
Michael7371 May 19, 2026
7c10a92
updated BSM and PSM receiver tests to use IEE1609.2 signatures instea…
Michael7371 May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ The ITS ODE is a real-time virtual data router that ingests and processes operat

_Figure 1: ODE Dataflows_

For a module-by-module description of the components in this diagram, see [Architecture Section 5.2 – Architecture Module Reference](docs/Architecture.md#52---architecture-module-reference).

**Documentation:**

1. [ODE Architecture](docs/Architecture.md)
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ services:
KAFKA_KEY_SERIALIZER: ${KAFKA_KEY_SERIALIZER}
KAFKA_VALUE_SERIALIZER: ${KAFKA_VALUE_SERIALIZER}
KAFKA_PARTITIONER_CLASS: ${KAFKA_PARTITIONER_CLASS}
KAFKA_AUTO_COMMIT_INTERVAL: ${KAFKA_AUTO_COMMIT_INTERVAL}
ODE_TIM_INGEST_MONITORING_ENABLED: ${ODE_TIM_INGEST_MONITORING_ENABLED}
ODE_TIM_INGEST_MONITORING_INTERVAL: ${ODE_TIM_INGEST_MONITORING_INTERVAL}
ODE_STOMP_EXPORTER_ENABLED: ${ODE_STOMP_EXPORTER_ENABLED}
Expand Down
108 changes: 93 additions & 15 deletions docs/Architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,28 @@ _Last updated April 26th, 2024_
# Contents

- [Version History](#version-history)
- [1 - Introduction](#introduction)
- [2 - Project Overview](#project-overview)
- [3 - System Overview](#system-overview)
- [3.1 - ODE Technology Stack](#ode-technology-stack)
- [3.2 - Producer Mechanisms](#producer-mechanisms)
- [3.3 - Consumer Mechanisms](#consumer-mechanisms)
- [3.4 - ODE Management Console](#ode-management-console)
- [4 - Architecture Pattern](#architecture-pattern)
- [4.1 - Pattern Description](#pattern-description)
- [4.2 - Pattern Topology](#pattern-topology)
- [5 - JPO ODE Micro-services Topology](#jpo-ode-micro-services-topology)
- [5.1 - Deployments](#deployments)
- [6 - Appendix](#appendix)
- [6.1 - Glossary](#glossary)
- [1 - Introduction](#1---introduction)
- [2 - Project Overview](#2---project-overview)
- [3 - System Overview](#3---system-overview)
- [3.1 - ODE Technology Stack](#31---ode-technology-stack)
- [3.2 - Producer Mechanisms](#32---producer-mechanisms)
- [3.3 - Consumer Mechanisms](#33---consumer-mechanisms)
- [3.4 - ODE Management Console](#34---ode-management-console)
- [4 - Architecture Pattern](#4---architecture-pattern)
- [4.1 - Pattern Description](#41---pattern-description)
- [4.2 - Pattern Topology](#42---pattern-topology)
- [5 - JPO ODE Micro-services Topology](#5---jpo-ode-micro-services-topology)
- [5.1 - Deployments](#51---deployments)
- [5.2 - Architecture Module Reference](#52---architecture-module-reference)
- [Core ODE Services](#core-ode-services)
- [Data Import Services](#data-import-services)
- [Data Processing Services](#data-processing-services)
- [Data Export Services](#data-export-services)
- [System-Wide Services](#system-wide-services)
- [Supporting Libraries](#supporting-libraries)
- [Notes](#notes)
- [6 - Appendix](#6---appendix)
- [6.1 - Glossary](#61---glossary)

<a name="version-history"></a>

Expand Down Expand Up @@ -352,7 +360,7 @@ _Figure 7 - JPO ODE Micro-services Topology_

<a name="deployments"></a>

### 5.1 - Deployments
## 5.1 - Deployments

Docker is utilized as the primary deployment mechanism to
compartmentalize each of the designed micro-services into separate
Expand All @@ -361,6 +369,76 @@ containers each running a distinct service. The ODE application runs in
one container, its submodules run in separate containers and other major
frameworks such as Kafka run in their own separate containers.

<a name="architecture-module-reference"></a>

## 5.2 - Architecture Module Reference

This section provides implementation details for the modules shown in the
ODE TMC architecture diagrams, including feature descriptions, interfaces,
deployment roles, and source code references.

### Core ODE Services

| Module | Purpose | Interface(s) | Implementation / Codebase |
|--------|---------|--------------|----------------------------|
| REST API | Accepts Traveler Information Messages (TIM), Probe Data Messages (PDM), and management requests into the ODE processing pipeline. | HTTP/REST | [jpo-ode](https://github.com/usdot-jpo-ode/jpo-ode) |
| Kafka Data Streams | Primary internal messaging bus used for inter-service communication and publish/subscribe routing of JSON and POJO message streams. | Kafka Producer/Consumer APIs | [jpo-ode](https://github.com/usdot-jpo-ode/jpo-ode), [jpo-utils](https://github.com/usdot-jpo-ode/jpo-utils) |
| UDP (ASN.1) Input | Receives ASN.1 UPER encoded J2735 messages for direct ingestion and decoding. | UDP | [jpo-ode](https://github.com/usdot-jpo-ode/jpo-ode) |
| Log Upload Folder | File-based ingestion mechanism for uploaded OBU and infrastructure log files. | File System / SCP | [jpo-ode](https://github.com/usdot-jpo-ode/jpo-ode) |
| SNMP to RSU | Supports TIM query, deposit, and deletion operations with roadside units (RSUs). | SNMP / NTCIP1218 / RSU MIB 4.1 | [jpo-ode](https://github.com/usdot-jpo-ode/jpo-ode) |

### Data Import Services

| Module | Purpose | Interface(s) | Implementation / Codebase |
|--------|---------|--------------|----------------------------|
| BSM Importer | Ingests and routes Basic Safety Messages (BSM). | Kafka, UDP, File Upload | [jpo-ode](https://github.com/usdot-jpo-ode/jpo-ode) |
| TIM Importer | Ingests and routes Traveler Information Messages (TIM). | REST, Kafka, SNMP | [jpo-ode](https://github.com/usdot-jpo-ode/jpo-ode) |
| Distress Notification Importer | Imports distress notification events for downstream processing. | Kafka / REST | [jpo-ode](https://github.com/usdot-jpo-ode/jpo-ode) |

### Data Processing Services

| Module | Purpose | Interface(s) | Implementation / Codebase |
|--------|---------|--------------|----------------------------|
| Privacy Protection Module | Filters and sanitizes BSM data to reduce privacy risk and support geofencing/privacy policies. | Kafka | [jpo-cvdp](https://github.com/usdot-jpo-ode/jpo-cvdp) |
| ASN.1 Message Deduplication Module | Removes duplicate ASN.1 messages before downstream processing/archiving. | Kafka | [jpo-deduplicator](https://github.com/usdot-jpo-ode/jpo-deduplicator) |
| ASN.1 Decoder Module | Decodes ASN.1 UPER/COER encoded J2735 messages into XER. | Kafka | [asn1_codec](https://github.com/usdot-jpo-ode/asn1_codec) |
| ASN.1 Encoder Module | Encodes XER messages into ASN.1 formats for outbound transmission. | Kafka | [asn1_codec](https://github.com/usdot-jpo-ode/asn1_codec) |
| HAAS Alert J2735 Module | Processes HAAS alerts messaging and generates J2735 messages for V2X transmission. | Kafka | [jpo-haas-asn1-bridge](https://github.com/usdot-jpo-ode/jpo-haas-asn1-bridge) |
| RTK Ingest J2735 Module | Imports and processes RTK correction messages. | Kafka / UDP | [jpo-rtk-asn1-bridge](https://github.com/usdot-jpo-ode/jpo-rtk-asn1-bridge) |
| GeoJSON Converter | Converts location-based data into GeoJSON format. | Internal service | [jpo-geojsonconverter](https://github.com/usdot-jpo-ode/jpo-geojsonconverter) |
| Conflict Monitor | Supports conflict detection and monitoring workflows. | Kafka | [jpo-conflictmonitor](https://github.com/usdot-jpo-ode/jpo-conflictmonitor) |

### Data Export Services

| Module | Purpose | Interface(s) | Implementation / Codebase |
|--------|---------|--------------|----------------------------|
| S3 Depositor Module | Stores exported ODE data into AWS S3-compatible storage. | AWS S3 API | [jpo-ode](https://github.com/usdot-jpo-ode/jpo-ode) |
| JPO MEC MQTT Depositor | Publishes processed data to external MQTT brokers or MEC environments. | MQTT | [jpo-mec-deposit](https://github.com/usdot-jpo-ode/jpo-mec-deposit) |
| SNMP Depositor | Sends TIM and related messages to RSUs via SNMP. | SNMP | [jpo-ode](https://github.com/usdot-jpo-ode/jpo-ode) |
| SDX Depositor | Publishes data to Situational Data Exchange (SDX). | External API | [jpo-sdw-depositor](https://github.com/usdot-jpo-ode/jpo-sdw-depositor) |

### System-Wide Services

| Module | Purpose | Interface(s) | Implementation / Codebase |
|--------|---------|--------------|----------------------------|
| Kafka Message Bus | Distributed event backbone for ODE services. | Kafka | [jpo-utils](https://github.com/usdot-jpo-ode/jpo-utils) |
| Security Module | Supports IEEE 1609.2 signing, encryption, and certificate management. | REST / Internal service | [jpo-security-svcs](https://github.com/usdot-jpo-ode/jpo-security-svcs) |
| Message Database (MongoDB) | Optional persistence layer for ODE output topics. | MongoDB / Kafka Connect | [jpo-utils](https://github.com/usdot-jpo-ode/jpo-utils) |

### Supporting Libraries

| Library | Purpose | Codebase |
|--------|---------|----------|
| ASN.1 POJOs | Java POJOs representing J2735 ASN.1 message schemas. | [jpo-asn-pojos](https://github.com/usdot-jpo-ode/jpo-asn-pojos) |
| ODE Output Validator Library | Schema validation and output verification utilities. | [ode-output-validator-library](https://github.com/usdot-jpo-ode/ode-output-validator-library) |

### Notes

- Optional modules may be enabled through Docker Compose profiles or
deployment-specific configuration.
- Repository links reflect the current public source locations for each
major service or library.

<a name="appendix"></a>

# 6 - Appendix
Expand Down
Binary file modified docs/images/readme/figure1.png

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update readme or other document that describe (including feature, purpose, interface etc) the modules in the architecture diagram. If the module codebase exist, please add link to the codebase.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ public OdeLogMetadata(OdeMsgPayload payload) {
super(payload);
}

/**
* Same as {@link #OdeLogMetadata(OdeMsgPayload)} but sets {@code asn1} to {@code asn1Hex}
* directly instead of from the payload's stripped bytes.
*/
public OdeLogMetadata(OdeMsgPayload<?> payload, String asn1Hex) {
super(payload, asn1Hex);
}

public OdeLogMetadata() {
super();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ public OdeMessageFrameMetadata(OdeMsgPayload<?> payload) {
super(payload);
}

/**
* Same as {@link #OdeMessageFrameMetadata(OdeMsgPayload)} but sets {@code asn1} to the given hex
* string (for example the full UDP datagram payload before 1609.3 header stripping).
*/
public OdeMessageFrameMetadata(OdeMsgPayload<?> payload, String asn1Hex) {
super(payload, asn1Hex);
}
Comment thread
iyourshaw marked this conversation as resolved.

public OdeMessageFrameMetadata(Source source) {
this.source = source;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
/**
* Base ODE Metadata class.
*/
@JsonPropertyOrder({ "logFileName", "recordType", "receivedMessageDetails", "payloadType", "serialId",
"odeReceivedAt", "schemaVersion", "maxDurationTime", "recordGeneratedAt", "recordGeneratedBy", "sanitized",
"asn1" })
@JsonPropertyOrder({"logFileName", "recordType", "receivedMessageDetails", "payloadType",
"serialId", "odeReceivedAt", "schemaVersion", "maxDurationTime", "recordGeneratedAt",
"recordGeneratedBy", "sanitized", "asn1"})
public class OdeMsgMetadata extends OdeObject {

public static final String METADATA_STRING = "metadata";
Expand Down Expand Up @@ -71,11 +71,24 @@ public OdeMsgMetadata(OdeMsgPayload payload) {
}

/**
* Constructs an OdeMsgMetadata object with the specified payload, serial ID,
* and received time.
* Same as {@link #OdeMsgMetadata(OdeMsgPayload)} but sets {@code asn1} to {@code asn1Hex}
* directly (for example the full received UDP hex) instead of from the payload's stripped
* {@code bytes} field.
*
* @param payload the payload to be set
* @param serialId the serial ID to be set
* @param payload the payload (type, serial id, and receive time are initialized like the
* single-argument constructor)
* @param asn1Hex hex string for {@link #setAsn1(String)}; may be null
*/
public OdeMsgMetadata(OdeMsgPayload<?> payload, String asn1Hex) {
this(payload.getClass().getName(), new SerialId(), DateTimeUtils.now());
setAsn1(asn1Hex);
}

/**
* Constructs an OdeMsgMetadata object with the specified payload, serial ID, and received time.
*
* @param payload the payload to be set
* @param serialId the serial ID to be set
* @param receivedAt the time the message was received
*/
private OdeMsgMetadata(OdeMsgPayload payload, SerialId serialId, String receivedAt) {
Expand All @@ -84,12 +97,12 @@ private OdeMsgMetadata(OdeMsgPayload payload, SerialId serialId, String received
}

/**
* Constructs an OdeMsgMetadata object with the specified payload type, serial
* ID, and received time.
* Constructs an OdeMsgMetadata object with the specified payload type, serial ID, and received
* time.
*
* @param payloadType the type of the payload
* @param serialId the serial ID to be set
* @param receivedAt the time the message was received
* @param serialId the serial ID to be set
* @param receivedAt the time the message was received
*/
public OdeMsgMetadata(String payloadType, SerialId serialId, String receivedAt) {
super();
Expand Down Expand Up @@ -198,9 +211,11 @@ public void setAsn1(String asn1) {
}

/**
* Sets the ASN1 value for the metadata object.
* Sets {@code asn1} from the serialized {@code bytes} field of the payload (typically the
* header-stripped ASN.1 used for decoding). For full received hex (e.g. UDP), prefer
* {@link #OdeMsgMetadata(OdeMsgPayload, String)} or {@link #setAsn1(String)}.
*
* @param payload the ASN1 payload hex string
* @param payload the payload whose bytes are copied into {@code asn1}
*/
public void setAsn1(OdeMsgPayload payload) {
if (payload != null && payload.getData() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy;
import us.dot.its.jpo.ode.model.ReceivedMessageDetails;
import us.dot.its.jpo.ode.model.RxSource;
import us.dot.its.jpo.ode.uper.StartFlagNotFoundException;
import us.dot.its.jpo.ode.uper.SupportedMessageType;
import us.dot.its.jpo.ode.uper.UperUtil;
import us.dot.its.jpo.ode.util.CodecUtils;
import us.dot.its.jpo.ode.util.DateTimeUtils;
import us.dot.its.jpo.ode.util.JsonUtils;

Expand All @@ -37,6 +39,13 @@ private UdpHexDecoder() {
throw new UnsupportedOperationException();
}

/**
* Result of extracting an ASN.1 payload from a UDP packet: the stripped payload used for decode,
* plus the full received hex (before 1609.3 / 1609.2 header stripping) for metadata.
*/
private record Asn1PayloadExtraction(OdeAsn1Payload payload, String untrimmedPayloadHex) {
}

/**
* Extracts the payload from the given {@link DatagramPacket} and converts it into an
* {@link OdeAsn1Payload} object. The method validates that the payload contains the necessary
Expand All @@ -50,6 +59,11 @@ private UdpHexDecoder() {
*/
public static OdeAsn1Payload getPayloadHexString(DatagramPacket packet,
SupportedMessageType msgType) throws InvalidPayloadException {
return extractAsn1PayloadFromPacket(packet, msgType).payload();
}

private static Asn1PayloadExtraction extractAsn1PayloadFromPacket(DatagramPacket packet,
SupportedMessageType msgType) throws InvalidPayloadException {
// retrieve the buffer from the packet
byte[] buffer = packet.getData();
if (buffer == null) {
Expand All @@ -61,19 +75,30 @@ public static OdeAsn1Payload getPayloadHexString(DatagramPacket packet,
int offsetOfReceivedPacket = packet.getOffset();
byte[] payload = retrieveRelevantBytes(lengthOfReceivedPacket, buffer, offsetOfReceivedPacket);

// convert bytes to hex string and verify identity
String payloadHexString = HexUtils.toHexString(payload).toLowerCase();
if (!payloadHexString.contains(msgType.getStartFlag())) {
// Lowercase hex for start-flag matching (flags use a-f, e.g. TIM "001f"); uppercase for
// metadata.asn1 to match OdeHexByteArray / CodecUtils serialization.
String untrimmedPayloadHexLower = HexUtils.toHexString(payload).toLowerCase();
if (!untrimmedPayloadHexLower.contains(msgType.getStartFlag())) {
throw new InvalidPayloadException("Payload does not contain start flag");
}

log.debug("Full {} packet: {}", msgType, payloadHexString);
log.debug("Full {} packet: {}", msgType, untrimmedPayloadHexLower);

String strippedHex =
UperUtil.stripDot3Header(untrimmedPayloadHexLower, msgType.getStartFlag()).toLowerCase();

payloadHexString =
UperUtil.stripDot3Header(payloadHexString, msgType.getStartFlag()).toLowerCase();
log.debug("Stripped {} packet: {}", msgType, payloadHexString);
// Adding the dot2 header stripping here to handle the case where the signed 1609.2 header is
// present.
try {
strippedHex = UperUtil.stripDot2Header(strippedHex, msgType.getStartFlag());
} catch (StartFlagNotFoundException e) {
log.debug("Error stripping dot2 header: {}", e.getMessage());
}

return new OdeAsn1Payload(HexUtils.fromHexString(payloadHexString));
log.debug("Stripped {} packet: {}", msgType, strippedHex);

return new Asn1PayloadExtraction(new OdeAsn1Payload(HexUtils.fromHexString(strippedHex)),
CodecUtils.toHex(payload));
}

/**
Expand Down Expand Up @@ -143,7 +168,7 @@ public static String buildJsonBsmFromPacket(DatagramPacket packet)
public static String buildJsonSsmFromPacket(DatagramPacket packet)
throws InvalidPayloadException {
return JsonUtils.toJson(buildAsn1DataFromPacket(packet, SupportedMessageType.SSM,
RecordType.ssmTx, Source.RSU, GeneratedBy.RSU, false), false);
RecordType.ssmTx, Source.RSU, GeneratedBy.RSU, false), false);
}

/**
Expand Down Expand Up @@ -244,15 +269,18 @@ private static byte[] retrieveRelevantBytes(int length, byte[] buffer, int offse
*/
public static OdeAsn1Data buildAsn1DataFromPacket(DatagramPacket packet,
SupportedMessageType messageType, RecordType recordType, Source source,
GeneratedBy generatedBy, boolean includeReceivedMessageDetails) throws InvalidPayloadException {
GeneratedBy generatedBy, boolean includeReceivedMessageDetails)
throws InvalidPayloadException {

String senderIp = packet.getAddress().getHostAddress();
int senderPort = packet.getPort();
log.debug("Packet received from {}:{}", senderIp, senderPort);

// Create OdeMsgPayload and OdeLogMetadata objects and populate them
OdeAsn1Payload payload = getPayloadHexString(packet, messageType);
OdeMessageFrameMetadata metadata = new OdeMessageFrameMetadata(payload);
Asn1PayloadExtraction extracted = extractAsn1PayloadFromPacket(packet, messageType);
OdeAsn1Payload payload = extracted.payload();
OdeMessageFrameMetadata metadata =
new OdeMessageFrameMetadata(payload, extracted.untrimmedPayloadHex());

// Add header data for the decoding process
metadata.setOdeReceivedAt(DateTimeUtils.now());
Expand Down
Loading
Loading