Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Py-stream #1001

Merged
merged 5 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

### [UNRELEASED](https://github.com/quartiq/stabilizer/compare/v0.11.0...HEAD) - DATE

### Added

* Support for exponentially swept sine signal source

### Changed

* `py`: `StabilizerStream` renamed to `Stream`

## [v0.11.0](https://github.com/quartiq/stabilizer/compare/v0.10.0...v0.11.0) - 2024-12-02

### Added
Expand Down
36 changes: 17 additions & 19 deletions hitl/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import os

import miniconf
from stabilizer.stream import measure, StabilizerStream, get_local_ip
from stabilizer.stream import measure, Stream, get_local_ip

logger = logging.getLogger(__name__)

Expand All @@ -22,12 +22,13 @@
async def _main():
parser = argparse.ArgumentParser(description="Stabilizer Stream HITL test")
parser.add_argument(
"prefix", type=str, nargs="?", help="The MQTT topic prefix of the target"
"prefix",
help="The MQTT topic prefix of the target",
)
parser.add_argument(
"--broker", "-b", default="mqtt", type=str, help="The MQTT broker address"
)
parser.add_argument("--ip", default="0.0.0.0", help="The IP address to listen on")
parser.add_argument("--addr", default="0.0.0.0", help="The IP address to listen on")
parser.add_argument(
"--port", type=int, default=9293, help="Local port to listen on"
)
Expand All @@ -36,39 +37,36 @@ async def _main():
"--max-loss", type=float, default=5e-2, help="Maximum loss for success"
)
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)

async with miniconf.Client(
args.broker,
protocol=miniconf.MQTTv5,
logger=logging.getLogger("aiomqtt-client"),
) as client:
prefix = args.prefix
if not args.prefix:
prefix, _alive = miniconf.one(
await miniconf.discover(client, "dt/sinara/dual-iir/+")
)

logging.basicConfig(level=logging.INFO)

prefix, _alive = miniconf.one(await miniconf.discover(client, args.prefix))
conf = miniconf.Miniconf(client, prefix)

if ipaddress.ip_address(args.ip).is_unspecified:
args.ip = get_local_ip(args.broker)
if ipaddress.ip_address(args.addr).is_unspecified:
args.addr = get_local_ip(args.broker)
if ipaddress.ip_address(args.addr).is_multicast:
local = get_local_ip(args.broker)
else:
local = "0.0.0.0"

logger.info("Starting stream")
await conf.set("/stream", f"{args.ip}:{args.port}", retain=False)
await conf.set("/stream", f"{args.addr}:{args.port}")

try:
logger.info("Testing stream reception")
_transport, stream = await StabilizerStream.open(
args.port, args.ip, args.broker
_transport, stream = await Stream.open(
args.port, addr=args.addr, local=local
)
logger.info("Testing stream reception")
loss = await measure(stream, args.duration)
if loss > args.max_loss:
raise RuntimeError("High frame loss", loss)
finally:
logger.info("Stopping stream")
await conf.set("/stream", "0.0.0.0:0", retain=False)
await conf.set("/stream", "0.0.0.0:0")

logger.info("Draining queue")
await asyncio.sleep(0.1)
Expand Down
105 changes: 70 additions & 35 deletions py/stabilizer/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def get_local_ip(remote):
Returns a list of four octets."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.connect((remote, 1883))
sock.connect((remote, 9)) # discard
return sock.getsockname()[0]
finally:
sock.close()
Expand Down Expand Up @@ -79,6 +79,26 @@ def to_traces(self):
]


class ThermostatEem:
"""Thermostat-EEM format"""

format_id = 3

def __init__(self, header, body):
self.header = header
self.body = body

def size(self):
"""Return the data size of the frame in bytes"""
return len(self.body)

def to_si(self):
"""Return the parsed data in SI units"""
return np.frombuffer(
self.body, np.dtype([("input", "<f4", (4, 4)), ("output", "<f4", (4,))])
)


class Frame:
"""Stream frame constisting of a header and multiple data batches"""

Expand All @@ -88,6 +108,7 @@ class Frame:
header = namedtuple("Header", "magic format_id batches sequence")
parsers = {
AdcDac.format_id: AdcDac,
ThermostatEem.format_id: ThermostatEem,
}

@classmethod
Expand All @@ -103,38 +124,42 @@ def parse(cls, data):
return parser(header, data[cls.header_fmt.size :])


class StabilizerStream(asyncio.DatagramProtocol):
class Stream(asyncio.DatagramProtocol):
"""Stabilizer streaming receiver protocol"""

@classmethod
async def open(cls, port=9293, addr="0.0.0.0", broker=None, maxsize=1):
async def open(cls, port=9293, addr="0.0.0.0", local="0.0.0.0", maxsize=1):
"""Open a UDP socket and start receiving frames"""
loop = asyncio.get_running_loop()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# Increase the OS UDP receive buffer size to 4 MiB so that latency
# spikes don't impact much. Achieving 4 MiB may require increasing
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except NameError:
pass # Windows
# Increase the OS UDP receive buffer size so that latency
# spikes don't impact much. Achieving this may require increasing
# the max allowed buffer size, e.g. via
# `sudo sysctl net.core.rmem_max=26214400` but nowadays the default
# max appears to be ~ 50 MiB already.
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 << 20)

# We need to specify which interface to receive broadcasts from, or Windows may choose the
# wrong one. Thus, use the broker address to figure out our local address for the interface
# of interest.
# max appears to be ~ 50 MiB already, at least on Linux.
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 8 << 20)
# We need to specify which interface to receive multicasts from, or Windows may choose the
# wrong one. Thus, use a bind address to figure out our local address for the interface
# of interest. There's also an interface index, at least on linux, but apparently windows
# sockets don't do that.
if ipaddress.ip_address(addr).is_multicast:
group = socket.inet_aton(addr)
iface = socket.inet_aton(get_local_ip(broker))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, group + iface)
sock.bind(("", port))
else:
sock.bind((addr, port))

transport, protocol = await loop.create_datagram_endpoint(
lambda: cls(maxsize), sock=sock
multiaddr = socket.inet_aton(addr)
local = socket.inet_aton(local)
sock.setsockopt(
socket.IPPROTO_IP,
socket.IP_ADD_MEMBERSHIP,
multiaddr + local,
)
sock.bind((addr, port))
return await loop.create_datagram_endpoint(
lambda: cls(maxsize),
sock=sock,
)
return transport, protocol

def __init__(self, maxsize):
self.queue = asyncio.Queue(maxsize)
Expand Down Expand Up @@ -177,8 +202,6 @@ async def _record():
stat.received += frame.header.batches
stat.expect = wrap(frame.header.sequence + frame.header.batches)
stat.bytes += frame.size()
# test conversion
# frame.to_si()

try:
await asyncio.wait_for(_record(), timeout=duration)
Expand All @@ -190,10 +213,7 @@ async def _record():
)

sent = stat.received + stat.lost
if sent:
loss = stat.lost / sent
else:
loss = 1
loss = stat.lost / sent if sent else 1
logger.info("Loss: %s/%s batches (%g %%)", stat.lost, sent, loss * 1e2)
return loss

Expand All @@ -202,17 +222,32 @@ async def main():
"""Test CLI"""
parser = argparse.ArgumentParser(description="Stabilizer streaming demo")
parser.add_argument(
"--port", type=int, default=9293, help="Local port to listen on"
"--port", type=int, default=9293, help="Local port to listen on [%(default)s]"
)
parser.add_argument(
"--host", default="0.0.0.0", help="Local address to listen on [%(default)s]"
)
parser.add_argument(
"--local",
default="0.0.0.0",
help="The local IP address to receive multicast frames on [%(default)s]",
)
parser.add_argument(
"--broker", help="The MQTT broker address for local IP lookup [%(default)s]"
)
parser.add_argument(
"--maxsize", type=int, default=1, help="Frame queue size [%(default)s]"
)
parser.add_argument(
"--duration", type=float, default=1.0, help="Test duration [%(default)s]"
)
parser.add_argument("--host", default="0.0.0.0", help="Local address to listen on")
parser.add_argument("--broker", default="mqtt", help="The MQTT broker address")
parser.add_argument("--maxsize", type=int, default=1, help="Frame queue size")
parser.add_argument("--duration", type=float, default=1.0, help="Test duration")
args = parser.parse_args()

logging.basicConfig(level=logging.INFO)
_transport, stream = await StabilizerStream.open(
args.port, args.host, args.broker, args.maxsize
if args.broker is not None:
args.local = get_local_ip(args.broker)
_transport, stream = await Stream.open(
args.port, args.host, args.local, args.maxsize
)
await measure(stream, args.duration)

Expand Down
Loading