Skip to content

feat(vucm): add integration with NI-cDAQ #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions examples/vucm/ni-daq/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM python:3.11-alpine3.22

WORKDIR /app

RUN apk add build-base

RUN python -m venv .venv
COPY requirements.txt requirements.txt
RUN .venv/bin/pip install -r requirements.txt

COPY script.py script.py

CMD [".venv/bin/python", "script.py"]
17 changes: 17 additions & 0 deletions examples/vucm/ni-daq/docker_run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

set -euo pipefail
IFS=$'\n\t'

SCRIPT_DIR="$(realpath "$(dirname "$0")")"
IMAGE_TAG="${IMAGE_TAG:-"enapter-vucm-examples/$(basename "$SCRIPT_DIR"):latest"}"

docker build --tag "$IMAGE_TAG" "$SCRIPT_DIR"

docker run --rm -it \
--name "ni-daq" \
--network host \
-e ENAPTER_LOG_LEVEL="${ENAPTER_LOG_LEVEL:-info}" \
-e ENAPTER_VUCM_BLOB="$ENAPTER_VUCM_BLOB" \
-e LISTEN_TCP_PORT="$LISTEN_TCP_PORT" \
"$IMAGE_TAG"
104 changes: 104 additions & 0 deletions examples/vucm/ni-daq/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
blueprint_spec: "device/1.0"

display_name: ATS stack

communication_module:
product: ENP-VIRTUAL

properties:
vendor:
display_name: Vendor
type: string
model:
display_name: Model
type: string

alerts:
parse_error:
display_name: Data processing failed
severity: error
telemetry:
status:
display_name: Status
type: string
enum:
- ok
- error
- no_data
T1:
display_name: T1
type: float
T2:
display_name: T2
type: float
T3:
display_name: T2
type: float
Current:
display_name: Current
type: float
PSU:
display_name: Current
type: float
P1:
display_name: P1
type: float
P2:
display_name: P2
type: float
P3:
display_name: P3
type: float
Flow:
display_name: Flow
type: float
Conductivity:
display_name: Conductivity
type: float
MFMH2:
display_name: MFMH2
type: float
Theoretical_h2:
display_name: MFMH2
type: float
MCM02:
display_name: MCM02
type: float
Refilling:
display_name: Refilling
type: float
PC:
display_name: PC
type: float
C1:
display_name: Cell 1
type: float
C2:
display_name: Cell 2
type: float
C3:
display_name: Cell 3
type: float
C4:
display_name: Cell 4
type: float
C5:
display_name: Cell 5
type: float
C6:
display_name: Cell 6
type: float
C7:
display_name: Cell 7
type: float
C8:
display_name: Cell 8
type: float
C9:
display_name: Cell 9
type: float
C10:
display_name: Cell 10
type: float

commands: {}
1 change: 1 addition & 0 deletions examples/vucm/ni-daq/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
enapter==0.9.2
91 changes: 91 additions & 0 deletions examples/vucm/ni-daq/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import asyncio
import functools
import json
import os
import socket
from datetime import datetime

import enapter


async def main():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
sock.bind(("127.0.0.1", int(os.environ["LISTEN_TCP_PORT"])))
sock.listen()
sock.setblocking(False)
device_factory = functools.partial(NIDAQ, socket=sock)
await enapter.vucm.run(device_factory)


class NIDAQ(enapter.vucm.Device):
def __init__(self, socket, **kwargs):
super().__init__(**kwargs)
self.socket = socket

async def task_accept_conns(self):
sock = self.socket
while True:
(conn, addr) = await asyncio.get_event_loop().sock_accept(sock)
asyncio.create_task(self.handle_conn(conn, addr))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should a task fail, the exception will be lost as no one is supervising it. Using a task group solves the issue.


async def handle_conn(self, conn, addr):
print(addr, "accept")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make this debugging info available in the Cloud you could use built-in logging capabilities via self.log.

data = bytearray()
try:
while True:
try:
async with asyncio.timeout(5):
chunk = await asyncio.get_event_loop().sock_recv(conn, 1024)
except TimeoutError:
print(addr, "timeout")
return
if chunk:
print(addr, "read chunk: ", chunk)
data.extend(chunk)
continue
print(addr, "read data: ", data)
await self._process_and_send_telemetry(data)
return
finally:
print(addr, "close")
conn.close()

async def task_properties_sender(self):
"""Periodically send device properties."""
while True:
await self.send_properties(
{"vendor": "National Instruments", "model": "cDAQ 9178"}
)
await asyncio.sleep(10)

async def _process_and_send_telemetry(self, data):
"""Parse, enrich, and send telemetry data."""
telemetry = {}
status = "no_data"
try:
if data:
status = "ok"
telemetry = json.loads(data.decode())
self._add_timestamp_if_present(telemetry)
telemetry["status"] = status
await self.send_telemetry(telemetry)
self.alerts.clear()
except Exception as e:
self.alerts.add("parse_error")
await self.log.error(f"Failed to process data: {e}")

def _add_timestamp_if_present(self, telemetry):
"""If 'Date' and 'Time' are present, combine and convert to timestamp."""
date_str = telemetry.get("Date")
time_str = telemetry.get("Time")
if date_str and time_str:
dt_str = f"{date_str} {time_str}"
date = datetime.strptime(dt_str, "%d/%m/%Y %H:%M:%S")
telemetry.pop("Date")
telemetry.pop("Time")
telemetry["timestamp"] = int(date.timestamp())


if __name__ == "__main__":
asyncio.run(main())
Loading