Skip to content

Commit

Permalink
Merge pull request #278 from quartiq/py-common
Browse files Browse the repository at this point in the history
py: introduce common for async/sync commonalities
  • Loading branch information
jordens authored Feb 17, 2025
2 parents 0db7452 + 1464819 commit b70f260
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 123 deletions.
11 changes: 9 additions & 2 deletions miniconf_mqtt/examples/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ struct Settings {
opt: Option<Leaf<i32>>,
#[tree(validate=self.validate_four)]
four: Leaf<f32>,
exit: Leaf<bool>,
#[tree(validate=self.validate_exit, rename=exit)]
_exit: Leaf<()>,
#[tree(skip)]
exit: bool,
}

impl Settings {
Expand All @@ -39,6 +42,10 @@ impl Settings {
Ok(depth)
}
}
fn validate_exit(&mut self, depth: usize) -> Result<usize, &'static str> {
self.exit = true;
Ok(depth)
}
}

#[tokio::main]
Expand All @@ -60,7 +67,7 @@ async fn main() {
client.set_alive("\"hello\"");

let mut settings = Settings::default();
while !*settings.exit {
while !settings.exit {
tokio::time::sleep(Duration::from_millis(10)).await;
if client.update(&mut settings).unwrap() {
println!("Settings updated: {:?}", settings);
Expand Down
5 changes: 2 additions & 3 deletions py/miniconf-mqtt/miniconf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
"""Miniconf Client"""
"""Miniconf MQTT Client"""

from . import sync, async_
from .async_ import Miniconf, Client, MQTTv5, MiniconfException, discover, one
from .async_ import *
4 changes: 1 addition & 3 deletions py/miniconf-mqtt/miniconf/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""
Command line interface for Miniconf-over-MQTT (asynchronous)
"""
"""Miniconf default CLI (async)"""

import asyncio
from .async_ import _main
Expand Down
112 changes: 10 additions & 102 deletions py/miniconf-mqtt/miniconf/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,13 @@

import asyncio
import json
import logging
import uuid
from typing import Dict, Any

import paho.mqtt
from paho.mqtt.properties import Properties, PacketTypes
from aiomqtt import Client, Message, MqttError

MQTTv5 = paho.mqtt.enums.MQTTProtocolVersion.MQTTv5

LOGGER = logging.getLogger(__name__)


class MiniconfException(Exception):
"""Miniconf Error"""

def __init__(self, code, message):
self.code = code
self.message = message

def __repr__(self):
return f"{self.code}: {self.message}"
from .common import MiniconfException, LOGGER


class Miniconf:
Expand Down Expand Up @@ -63,7 +48,7 @@ async def close(self):

async def _listen(self):
await self.client.subscribe(self.response_topic)
LOGGER.info(f"Subscribed to {self.response_topic}")
LOGGER.debug(f"Subscribed to {self.response_topic}")
self.subscribed.set()
try:
async for message in self.client.messages:
Expand All @@ -76,7 +61,7 @@ async def _listen(self):
try:
await self.client.unsubscribe(self.response_topic)
self.subscribed.clear()
LOGGER.info(f"Unsubscribed from {self.response_topic}")
LOGGER.debug(f"Unsubscribed from {self.response_topic}")
except MqttError as e:
LOGGER.debug(f"MQTT Error {e}", exc_info=True)

Expand Down Expand Up @@ -136,7 +121,7 @@ async def _do(self, path: str, *, response=1, **kwargs):
self._inflight[cd] = fut, []

topic = f"{self.prefix}/settings{path}"
LOGGER.info(f"Publishing {topic}: {kwargs.get('payload')}, [{props}]")
LOGGER.debug(f"Publishing {topic}: {kwargs.get('payload')}, [{props}]")
await self.client.publish(
topic,
properties=props,
Expand Down Expand Up @@ -244,9 +229,9 @@ async def listen():
try:
payload = json.loads(message.payload)
except json.JSONDecodeError:
logging.info(f"Ignoring {peer} not/invalid alive")
LOGGER.info(f"Ignoring {peer} not/invalid alive")
else:
logging.info(f"Discovered {peer} alive")
LOGGER.debug(f"Discovered {peer} alive")
discovered[peer] = payload

t_start = asyncio.get_running_loop().time()
Expand All @@ -264,87 +249,11 @@ async def listen():
return discovered


def one(devices: Dict[str, Any]) -> (str, Any):
"""Return the prefix for the unique alive Miniconf device.
See `discover()` for arguments.
"""
try:
(device,) = devices.items()
except ValueError:
raise MiniconfException(
"Discover", f"No unique Miniconf device (found `{devices}`)."
)
logging.info("Found device: %s", device)
return device


class _Path:
def __init__(self):
self.current = ""

def normalize(self, path):
"""Return an absolute normalized path and update current absolute reference."""
if path.startswith("/") or not path:
self.current = path[: path.rfind("/")]
else:
path = f"{self.current}/{path}"
assert path.startswith("/") or not path
return path


def _cli():
import argparse

parser = argparse.ArgumentParser(
description="Miniconf command line interface.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""Examples (with a target at prefix 'app/id' and device-discovery):
%(prog)s -d app/+ /path # GET
%(prog)s -d app/+ /path=value # SET
%(prog)s -d app/+ /path= # CLEAR
%(prog)s -d app/+ /path? # LIST-GET
%(prog)s -d app/+ /path! # DUMP
""",
)
parser.add_argument(
"-v", "--verbose", action="count", default=0, help="Increase logging verbosity"
)
parser.add_argument(
"--broker", "-b", default="mqtt", type=str, help="The MQTT broker address"
)
parser.add_argument(
"--retain",
"-r",
default=False,
action="store_true",
help="Retain the settings that are being set on the broker",
)
parser.add_argument(
"--discover", "-d", action="store_true", help="Detect device prefix"
)
parser.add_argument(
"prefix",
type=str,
help="The MQTT topic prefix of the target or a prefix filter for discovery",
)
parser.add_argument(
"commands",
metavar="CMD",
nargs="*",
help="Path to get ('PATH') or path and JSON encoded value to set "
"('PATH=VALUE') or path to clear ('PATH=') or path to list ('PATH?') or "
"path to dump ('PATH!'). "
"Use sufficient shell quoting/escaping. "
"Absolute PATHs are empty or start with a '/'. "
"All other PATHs are relative to the last absolute PATH.",
)
return parser


async def _main():
import sys
import os
import logging
from .common import _cli, MQTTv5, one

if sys.platform.lower() == "win32" or os.name.lower() == "nt":
from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
Expand All @@ -358,9 +267,7 @@ async def _main():
level=logging.WARN - 10 * args.verbose,
)

async with Client(
args.broker, protocol=MQTTv5, logger=logging.getLogger("aiomqtt-client")
) as client:
async with Client(args.broker, protocol=MQTTv5) as client:
if args.discover:
prefix, _alive = one(await discover(client, args.prefix))
else:
Expand All @@ -376,6 +283,7 @@ async def _main():

async def _handle_commands(interface, commands, retain):
import sys
from .common import _Path

current = _Path()
for arg in commands:
Expand Down
101 changes: 101 additions & 0 deletions py/miniconf-mqtt/miniconf/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Common code for miniconf.async_ and miniconf.sync"""

# pylint: disable=R0801,C0415,W1203,R0903,W0707

from typing import Dict, Any
import logging

import paho.mqtt

MQTTv5 = paho.mqtt.enums.MQTTProtocolVersion.MQTTv5

LOGGER = logging.getLogger("miniconf")


class MiniconfException(Exception):
"""Miniconf Error"""

def __init__(self, code, message):
self.code = code
self.message = message

def __repr__(self):
return f"{self.code}: {self.message}"


def one(devices: Dict[str, Any]) -> (str, Any):
"""Return the prefix for the unique alive Miniconf device.
See `discover()` for arguments.
"""
try:
(device,) = devices.items()
except ValueError:
raise MiniconfException(
"Discover", f"No unique Miniconf device (found `{devices}`)."
)
LOGGER.info("Found device: %s", device)
return device


class _Path:
def __init__(self):
self.current = ""

def normalize(self, path):
"""Return an absolute normalized path and update current absolute reference."""
if path.startswith("/") or not path:
self.current = path[: path.rfind("/")]
else:
path = f"{self.current}/{path}"
assert path.startswith("/") or not path
return path


def _cli():
import argparse

parser = argparse.ArgumentParser(
description="Miniconf command line interface.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""Examples (with a target at prefix 'app/id' and id discovery):
%(prog)s -d app/+ /path # GET
%(prog)s -d app/+ /path=value # SET
%(prog)s -d app/+ /path= # CLEAR
%(prog)s -d app/+ /path? # LIST-GET
%(prog)s -d app/+ /path! # DUMP
""",
)
parser.add_argument(
"-v", "--verbose", action="count", default=0, help="Increase logging verbosity"
)
parser.add_argument(
"--broker", "-b", default="mqtt", type=str, help="The MQTT broker address"
)
parser.add_argument(
"--retain",
"-r",
default=False,
action="store_true",
help="Retain the settings that are being set on the broker",
)
parser.add_argument(
"--discover", "-d", action="store_true", help="Detect device prefix"
)
parser.add_argument(
"prefix",
type=str,
help="The MQTT topic prefix of the target or a prefix filter for discovery",
)
parser.add_argument(
"commands",
metavar="CMD",
nargs="*",
help="Path to get ('PATH') or path and JSON encoded value to set "
"('PATH=VALUE') or path to clear ('PATH=') or path to list ('PATH?') or "
"path to dump ('PATH!'). "
"Use sufficient shell quoting/escaping. "
"Absolute PATHs are empty or start with a '/'. "
"All other PATHs are relative to the last absolute PATH.",
)
return parser
Loading

0 comments on commit b70f260

Please sign in to comment.