diff --git a/.vscode/settings.json b/.vscode/settings.json index 5d877a9..d5151fe 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,5 +5,5 @@ "python.defaultInterpreterPath": "python3", "modulename": "${workspaceFolderBasename}", "distname": "${workspaceFolderBasename}", - "moduleversion": "1.1.8" + "moduleversion": "1.1.9" } \ No newline at end of file diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 49b05b0..9a476b0 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,15 @@ # pygnssutils Release Notes +### RELEASE 1.1.9 + +FIXES: + +1. Fix `IndexError: list index out of range` error in `format_conn` on platforms with no IP6 support. + +CHANGES: + +1. ubx CLI utilities moved to `pyubxutils` - `ubxsave`, `ubxload`, `ubxcompare`, `ubxsetrate`, `ubxsimulator`. For the time being, `pyubxutils` will remain a dependency of `pygnssutils` and will be installed alongside it via pip, and `from pygnssutils import UBXSimulator` will still work as an import statement, but these will be removed altogether in v1.2.0. + ### RELEASE 1.1.8 ENHANCEMENTS: diff --git a/docs/pygnssutils.rst b/docs/pygnssutils.rst index 54657f6..357c82b 100644 --- a/docs/pygnssutils.rst +++ b/docs/pygnssutils.rst @@ -116,54 +116,6 @@ pygnssutils.socketwrapper module :undoc-members: :show-inheritance: -pygnssutils.ubxcompare module ------------------------------ - -.. automodule:: pygnssutils.ubxcompare - :members: - :undoc-members: - :show-inheritance: - -pygnssutils.ubxload module --------------------------- - -.. automodule:: pygnssutils.ubxload - :members: - :undoc-members: - :show-inheritance: - -pygnssutils.ubxsave module --------------------------- - -.. automodule:: pygnssutils.ubxsave - :members: - :undoc-members: - :show-inheritance: - -pygnssutils.ubxsetrate module ------------------------------ - -.. automodule:: pygnssutils.ubxsetrate - :members: - :undoc-members: - :show-inheritance: - -pygnssutils.ubxsimulator module -------------------------------- - -.. automodule:: pygnssutils.ubxsimulator - :members: - :undoc-members: - :show-inheritance: - -pygnssutils.ubxsimulator\_cli module ------------------------------------- - -.. automodule:: pygnssutils.ubxsimulator_cli - :members: - :undoc-members: - :show-inheritance: - Module contents --------------- diff --git a/pyproject.toml b/pyproject.toml index 7714397..03fcb46 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "pygnssutils" authors = [{ name = "semuadmin", email = "semuadmin@semuconsulting.com" }] maintainers = [{ name = "semuadmin", email = "semuadmin@semuconsulting.com" }] description = "GNSS Command Line Utilities" -version = "1.1.8" +version = "1.1.9" license = { file = "LICENSE" } readme = "README.md" requires-python = ">=3.9" @@ -39,19 +39,14 @@ dependencies = [ "pyserial>=3.5", "pyspartn>=1.0.5", "pyubx2>=1.2.48", + "pyubxutils>=1.0.2", ] [project.scripts] gnssstreamer = "pygnssutils.gnssstreamer_cli:main" -gnssdump = "pygnssutils.gnssstreamer_cli:main" # for backwards compatibility gnssserver = "pygnssutils.gnssserver_cli:main" gnssntripclient = "pygnssutils.gnssntripclient_cli:main" gnssmqttclient = "pygnssutils.gnssmqttclient_cli:main" -ubxsetrate = "pygnssutils.ubxsetrate:main" -ubxsave = "pygnssutils.ubxsave:main" -ubxload = "pygnssutils.ubxload:main" -ubxsimulator = "pygnssutils.ubxsimulator_cli:main" -ubxcompare = "pygnssutils.ubxcompare:main" [project.urls] homepage = "https://github.com/semuconsulting/pygnssutils" diff --git a/src/pygnssutils/__init__.py b/src/pygnssutils/__init__.py index 5886f98..9fe51a1 100644 --- a/src/pygnssutils/__init__.py +++ b/src/pygnssutils/__init__.py @@ -6,6 +6,8 @@ :license: BSD 3-Clause """ +from pyubxutils.ubxsimulator import UBXSimulator + from pygnssutils._version import __version__ from pygnssutils.exceptions import GNSSStreamError, ParameterError from pygnssutils.globals import * @@ -16,9 +18,5 @@ from pygnssutils.helpers import * from pygnssutils.mqttmessage import * from pygnssutils.socketwrapper import SocketWrapper -from pygnssutils.ubxload import UBXLoader -from pygnssutils.ubxsave import UBXSaver -from pygnssutils.ubxsetrate import UBXSetRate -from pygnssutils.ubxsimulator import UBXSimulator version = __version__ # pylint: disable=invalid-name diff --git a/src/pygnssutils/_version.py b/src/pygnssutils/_version.py index 349da3f..969e61f 100644 --- a/src/pygnssutils/_version.py +++ b/src/pygnssutils/_version.py @@ -8,4 +8,4 @@ :license: BSD 3-Clause """ -__version__ = "1.1.8" +__version__ = "1.1.9" diff --git a/src/pygnssutils/gnssserver_cli.py b/src/pygnssutils/gnssserver_cli.py index 0ef5afb..8ba3bac 100644 --- a/src/pygnssutils/gnssserver_cli.py +++ b/src/pygnssutils/gnssserver_cli.py @@ -15,6 +15,7 @@ from socket import create_connection, gethostbyname from time import sleep +from pyubxutils.ubxsimulator import UBXSimulator from serial import Serial from pygnssutils._version import __version__ as VERSION @@ -32,7 +33,6 @@ from pygnssutils.gnssserver import GNSSSocketServer from pygnssutils.helpers import set_common_args from pygnssutils.socketwrapper import SocketWrapper -from pygnssutils.ubxsimulator import UBXSimulator def _run_streamer(stream, **kwargs): diff --git a/src/pygnssutils/gnssstreamer_cli.py b/src/pygnssutils/gnssstreamer_cli.py index 2ac5504..a0bce44 100644 --- a/src/pygnssutils/gnssstreamer_cli.py +++ b/src/pygnssutils/gnssstreamer_cli.py @@ -37,6 +37,7 @@ from types import FunctionType from pyubx2 import ERR_LOG, SETPOLL, UBXReader +from pyubxutils.ubxsimulator import UBXSimulator from serial import Serial, SerialException from pygnssutils._version import __version__ as VERSION @@ -75,7 +76,6 @@ from pygnssutils.helpers import parse_url, set_common_args from pygnssutils.socket_server import runserver from pygnssutils.socketwrapper import SocketWrapper -from pygnssutils.ubxsimulator import UBXSimulator STATUSINTERVAL = 5 diff --git a/src/pygnssutils/helpers.py b/src/pygnssutils/helpers.py index ebb7af1..9ec48f4 100644 --- a/src/pygnssutils/helpers.py +++ b/src/pygnssutils/helpers.py @@ -322,7 +322,10 @@ def format_conn( if flowinfo != 0 or scopeid != 0: return (server, port, flowinfo, scopeid) try: - return getaddrinfo(server, port)[1][4] + gai = getaddrinfo(server, port) + if len(gai) == 1: # No IP6 support (Windows) + return gai[0][4] + return gai[1][4] # IP6 support (Posix) except gaierror as err: raise ValueError(f"Invalid server or port {server} {port}") from err if family == AF_INET: diff --git a/src/pygnssutils/ubxcompare.py b/src/pygnssutils/ubxcompare.py deleted file mode 100644 index c1bea3b..0000000 --- a/src/pygnssutils/ubxcompare.py +++ /dev/null @@ -1,227 +0,0 @@ -""" -ubxcompare.py - -Parse and compare contents of two or more u-blox config files. Can accept either -*.txt (text) or *.ubx (binary) formatted input files - the default is *.txt. - -Usage: - - ubxcompare --infiles "config1.txt, config2.txt" --format 0 --diffsonly 1 - -Outputs dictionary of config keys and their values for each file e.g. - -- CFG_RATE_MEAS (None): {1: '1000', 2: '1000'} signifies both files have same value -- CFG_RATE_MEAS (DIFFS!): {1: '1000', 2: '100'} signifies differences between values -- CFG_RATE_MEAS (DIFFS!): {1: '1000'} signifies the value was missing from the second - or subsequent file(s) - -:author: semuadmin -:copyright: SEMU Consulting © 2024 -:license: BSD 3-Clause -""" - -from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser -from logging import getLogger - -from pyubx2 import ( - POLL_LAYER_BBR, - POLL_LAYER_FLASH, - SET, - SET_LAYER_BBR, - SET_LAYER_FLASH, - SET_LAYER_RAM, - TXN_NONE, - U1, - UBXMessage, - UBXReader, - bytes2val, - val2bytes, -) - -from pygnssutils._version import __version__ as VERSION -from pygnssutils.globals import EPILOG, VERBOSITY_HIGH -from pygnssutils.helpers import set_common_args - -CFG = b"\x06" -VALGET = b"\x8b" -VALSET = b"\x8a" -FORMAT_UBX = 1 -FORMAT_TXT = 0 - - -class UBXCompare: - """UBX Compare Configuration Class.""" - - def __init__(self, infiles: str, form: int = FORMAT_TXT, diffsonly: bool = True): - """ - Constructor. - - :param str infiles: comma-separated list of fully-qualified filenames - :param int format: 0 = text, 1 = binary (0) - :param bool diffsonly: True = show diffs only, False = show entire config (True) - """ - - self.logger = getLogger(__name__) - if infiles in ("", None): - raise ValueError("--infiles parameter must not be blank") - - infiles = infiles.split(",") - cfgdict = {} - fcount = 0 - dcount = 0 - kcount = 0 - - for file in infiles: - fcount += 1 - self.parse_file(cfgdict, file.strip(), fcount, form) - - self.logger.info( - f"{fcount} files processed, list of {'differences in' if diffsonly else 'all'}" - " config keys and their values follows: ", - ) - - for key, vals in dict(sorted(cfgdict.items())).items(): - kcount += 1 - totalvals = len(vals.values()) # check if config appears in all files - uniquevals = len(set(vals.values())) # check if all values are the same - diff = totalvals != fcount or uniquevals != 1 - if diff: - dcount += 1 - if (diffsonly and diff) or not diffsonly: - print(f"{key} ({'DIFFS!' if diff else None}); {str(vals).strip('{}')}") - - self.logger.info(f"Total config keys: {kcount}. Total differences: {dcount}.") - - def parse_line(self, line: str) -> UBXMessage: - """ - Parse individual config line from txt file. - - Any messages other than CFG-MSG, CFG-PRT or CFG-VALGET are discarded. - The CFG-VALGET messages are converted into CFG-VALSET. - - :param str line: config line - :returns: parsed config line as UBXMessage - :rtype: UBXMessage - """ - - parts = line.replace(" ", "").split("-") - data = bytes.fromhex(parts[-1]) - cls = data[0:1] - mid = data[1:2] - if cls != CFG: - return None - if mid == VALGET: # config database command - version = data[4:5] - layer = bytes2val(data[5:6], U1) - if layer == POLL_LAYER_BBR: - layers = SET_LAYER_BBR - elif layer == POLL_LAYER_FLASH: - layers = SET_LAYER_FLASH - else: - layers = SET_LAYER_RAM - layers = val2bytes(layers, U1) - transaction = val2bytes(TXN_NONE, U1) - reserved0 = b"\x00" - cfgdata = data[8:] - payload = version + layers + transaction + reserved0 + cfgdata - parsed = UBXMessage(CFG, VALSET, SET, payload=payload) - else: # legacy CFG command - parsed = UBXMessage(CFG, mid, SET, payload=data[4:]) - - return parsed - - def get_attrs(self, cfgdict: dict, parsed: str, fileno: int): - """ - Get individual config keys and values from parsed line. - - :param dict cfgdict: dictionary of all config keys and values - :param UBXMessage parsed: parsed config line - :param int fileno: file number - """ - - attrs = parsed.split(",") - for attr in attrs: - attr = attr.strip('")> ') - if attr[0:3] == "CFG": - key, val = attr.split("=") - diff = cfgdict.get(key, {}) - diff[fileno] = val - cfgdict[key] = diff - - def parse_file(self, cfgdict: dict, filename: str, fileno: int, form: int): - """ - Load u-center format text configuration file. - - :param dict cfgdict: dictionary of all config keys and values - :param str filename: fully qualified input file name - :param int fileno: file number - :param int form: 0 = TXT (text), 1 = UBX (binary) - """ - - # pylint: disable=broad-exception-caught - - i = 0 - try: - if form == FORMAT_UBX: # ubx (binary) format - with open(filename, "rb") as infile: - ubr = UBXReader(infile, msgmode=SET) - for _, parsed in ubr: - if parsed is not None: - self.get_attrs(cfgdict, str(parsed), fileno) - i += 1 - else: # txt (text) format - with open(filename, "r", encoding="utf-8") as infile: - for line in infile: - parsed = self.parse_line(line) - if parsed is not None: - self.get_attrs(cfgdict, str(parsed), fileno) - i += 1 - except Exception as err: - self.logger.error(f"ERROR parsing {filename}! \n{err}") - - self.logger.info(f"\n{i} configuration commands processed in {filename}") - - -def main(): - """ - CLI Entry point. - - :param: as per UBXLoader constructor. - """ - - ap = ArgumentParser(epilog=EPILOG, formatter_class=ArgumentDefaultsHelpFormatter) - ap.add_argument("-V", "--version", action="version", version="%(prog)s " + VERSION) - ap.add_argument( - "-I", - "--infiles", - type=str, - required=True, - help="Comma-separated list of fully-qualified filenames", - ) - ap.add_argument( - "-F", - "--format", - required=False, - help="Format 0 = txt (text), 1 = ubx (binary)", - type=int, - choices=[0, 1], - default=0, - ) - ap.add_argument( - "-D", - "--diffsonly", - required=False, - help="Diffs 0 = Show all config values, 1 = Show differences only", - type=int, - choices=[0, 1], - default=1, - ) - - kwargs = set_common_args("ubxcompare", ap, logdefault=VERBOSITY_HIGH) - - UBXCompare(kwargs.pop("infiles"), kwargs.pop("format"), kwargs.pop("diffsonly")) - - -if __name__ == "__main__": - - main() diff --git a/src/pygnssutils/ubxload.py b/src/pygnssutils/ubxload.py deleted file mode 100644 index 43aa817..0000000 --- a/src/pygnssutils/ubxload.py +++ /dev/null @@ -1,262 +0,0 @@ -""" -ubxload.py - -NB: ONLY FOR GENERATION 9+ UBX DEVICES e.g. NEO-M9N, ZED-F9P - -This command line utility reads UBX configuration data from a -binary file and loads this into a compatible UBX device via its -serial port. It then confirms that the data has been -successfully acknowledged by the device, or reports any errors. - -The binary file is created using the ubxsave utility and contains -a series of CFG-VALSET messages representing the complete -configuration of the source device. - -Created on 06 Jan 2023 - -:author: semuadmin -:copyright: SEMU Consulting © 2023 -:license: BSD 3-Clause -""" - -# pylint: disable=invalid-name - -from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser -from datetime import datetime, timedelta -from logging import getLogger -from math import ceil -from queue import Queue -from threading import Event, Lock, Thread -from time import sleep - -from pyubx2 import ( - NMEA_PROTOCOL, - SETPOLL, - UBX_PROTOCOL, - UBXMessageError, - UBXParseError, - UBXReader, -) -from serial import Serial - -from pygnssutils._version import __version__ as VERSION -from pygnssutils.globals import EPILOG, VERBOSITY_HIGH -from pygnssutils.helpers import progbar, set_common_args - -ACK = "ACK-ACK" -NAK = "ACK-NAK" -WAITTIME = 5 # wait time for acknowledgements -WRITETIME = 0.02 # wait time between writes - - -class UBXLoader: - """UBX Configuration Loader Class.""" - - def __init__(self, stream: object, **kwargs): - """ - Constructor. - - :param object file: input file - :param object stream: output serial stream - """ - - self.logger = getLogger(__name__) - self._stream = stream - self._filename = kwargs["infile"] - self._waittime = ceil(kwargs.get("waittime", WAITTIME)) - self._ubxreader = UBXReader( - self._stream, protfilter=NMEA_PROTOCOL | UBX_PROTOCOL - ) - self._serial_lock = Lock() - self._out_queue = Queue() - self._stop_event = Event() - self._last_ack = datetime.now() - self._read_thread = Thread( - target=self._read_data, - daemon=True, - args=( - stream, - self._ubxreader, - self._out_queue, - self._stop_event, - ), - ) - - self._msg_ack = self._msg_nak = self._msg_write = self._msg_load = 0 - - def _load_data(self, filename: str, queue: Queue): - """ - Get CFG-VALSET data from file and place it on output queue. - """ - - with open(filename, "rb") as stream: - ubl = UBXReader(stream, msgmode=SETPOLL) - eof = False - while not eof: - (raw_data, parsed_data) = ubl.read() - if raw_data is None: - eof = True - else: - self._msg_load += 1 - queue.put(parsed_data) - self.logger.debug(f"LOAD {self._msg_load} - {parsed_data.identity}") - - def _read_data( - self, - stream: object, - ubr: UBXReader, - queue: Queue, - stop: Event, - ): - """ - Read incoming acknowledgements from device - """ - # pylint: disable=broad-except - - # read until expected no of acknowledgements has been received - # or waittime has been exceeded. - while not stop.is_set(): - try: - (_, parsed_data) = ubr.read() - if parsed_data is not None: - if ( - parsed_data.identity in (ACK, NAK) - and parsed_data.clsID == 6 # CFG - and parsed_data.msgID == 138 # CFG-VALSET - ): - self._last_ack = datetime.now() - if parsed_data.identity == ACK: - self._msg_ack += 1 - else: - self._msg_nak += 1 - self.logger.debug( - f"ACKNOWLEDGEMENT {self._msg_ack + self._msg_nak} - {parsed_data}" - ) - - # send config message(s) to receiver - if not queue.empty(): - i = 0 - while not queue.empty(): - i += 1 - n = ceil(50 / self._msg_load) - progbar(i * n, self._msg_load * n, 50) - parsed_data = queue.get() - self._msg_write += 1 - self.logger.debug( - f"WRITE {self._msg_write} {parsed_data.identity}" - ) - stream.write(parsed_data.serialize()) - sleep(WRITETIME) - queue.task_done() - - if ( - self._msg_ack + self._msg_nak >= self._msg_load - or datetime.now() - > self._last_ack + timedelta(seconds=self._waittime) - ): - stop.set() - - except (UBXMessageError, UBXParseError): - continue - except Exception as err: - if not stop.is_set(): - self.logger.error(f"Something went wrong {err}") - continue - - def run(self): - """ - Run configuration load routines. - """ - - rc = 1 - self.logger.info( - f"Loading configuration from {self._filename} to {self._stream.port}. " - "Press Ctrl-C to terminate early.", - ) - - self._load_data(self._filename, self._out_queue) - self._read_thread.start() - - # loop until all commands sent or user presses Ctrl-C - while not self._stop_event.is_set(): - try: - sleep(1) - except KeyboardInterrupt: # capture Ctrl-C - self.logger.warning( - "Terminated by user. Configuration may be incomplete." - ) - self._stop_event.set() - - self._read_thread.join() - - if self._msg_ack == self._msg_load: - self.logger.info( - "Configuration successfully loaded. " - f"{self._msg_load} CFG-VALSET messages sent and acknowledged." - ) - else: - null = self._msg_load - self._msg_ack - self._msg_nak - rc = 0 - self.logger.warning( - "Configuration may be incomplete. " - f"{self._msg_load} CFG-VALSET messages sent, " - f"{self._msg_ack} acknowledged, {self._msg_nak} rejected, " - f"{null} null responses." - ) - if null: - self.logger.warning( - f"Consider increasing waittime to >{self._waittime} seconds." - ) - if self._msg_nak: - self.logger.warning( - "Check device is compatible with this saved configuration." - ) - - return rc - - -def main(): - """ - CLI Entry point. - - :param: as per UBXLoader constructor. - """ - - ap = ArgumentParser(epilog=EPILOG, formatter_class=ArgumentDefaultsHelpFormatter) - ap.add_argument("-V", "--version", action="version", version="%(prog)s " + VERSION) - ap.add_argument("-I", "--infile", required=True, help="Input file") - ap.add_argument("-P", "--port", required=True, help="Serial port") - ap.add_argument( - "--baudrate", - required=False, - help="Serial baud rate", - type=int, - choices=[4800, 9600, 19200, 38400, 57600, 115200, 230400, 460800], - default=9600, - ) - ap.add_argument( - "--timeout", - required=False, - help="Serial timeout in seconds", - type=float, - default=3.0, - ) - ap.add_argument( - "--waittime", - required=False, - help="Wait time in seconds", - type=float, - default=WAITTIME, - ) - - kwargs = set_common_args("ubxload", ap, logdefault=VERBOSITY_HIGH) - - with Serial( - kwargs.pop("port"), kwargs.pop("baudrate"), timeout=kwargs.pop("timeout") - ) as serial_stream: - ubl = UBXLoader(serial_stream, **kwargs) - ubl.run() - - -if __name__ == "__main__": - main() diff --git a/src/pygnssutils/ubxsave.py b/src/pygnssutils/ubxsave.py deleted file mode 100644 index 6c8d926..0000000 --- a/src/pygnssutils/ubxsave.py +++ /dev/null @@ -1,317 +0,0 @@ -""" -ubxsave.py - -NB: ONLY FOR GENERATION 9+ UBX DEVICES e.g. NEO-M9N, ZED-F9P - -CLI utility which saves Generation 9+ UBX device configuration data to a file. `ubxsave` polls -configuration data via the device's serial port using a series of CFG-VALGET poll messages. It -parses the responses to these polls, converts them to CFG-VALSET command messages and saves these -to a binary file. This binary file can then be loaded into any compatible UBX device (e.g. via -the `ubxload` utility) to restore the saved configuration. - -The CFG-VALSET commands are stored as a single transaction, so if one or more fails on reload, the -entire set will be rejected. - -*NB*: The utility relies on receiving a complete set of poll responses within a specified -`waittime`. If the device is exceptionally busy or the transmit buffer is full, poll responses -may be delayed or dropped altogether. If the utility reports errors, try increasing the -waittime and/or baudrate or temporarily reducing periodic message rates. - -Usage (all kwargs are optional): - -> ubxsave --port /dev/ttyACM1 --baud 9600 --timeout 0.02 --outfile ubxconfig.ubx --verbosity 1 - -Created on 06 Jan 2023 - -:author: semuadmin -:copyright: SEMU Consulting © 2023 -:license: BSD 3-Clause -""" - -# pylint: disable=invalid-name - -from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser -from logging import getLogger -from math import ceil -from queue import Queue -from threading import Event, Lock, Thread -from time import sleep, strftime - -from pyubx2 import ( - POLL_LAYER_RAM, - SET_LAYER_RAM, - TXN_COMMIT, - TXN_ONGOING, - TXN_START, - UBX_CONFIG_DATABASE, - UBX_PROTOCOL, - UBXMessage, - UBXReader, -) -from serial import Serial - -from pygnssutils._version import __version__ as VERSION -from pygnssutils.globals import EPILOG, VERBOSITY_HIGH -from pygnssutils.helpers import progbar, set_common_args - -# try increasing these values if device response is too slow: -DELAY = 0.02 # delay between polls -WRAPUP = 5 # delay for final responses - - -class UBXSaver: - """UBX Configuration Saver Class.""" - - def __init__(self, file: object, stream: object, **kwargs): - """Constructor.""" - - self.logger = getLogger(__name__) - self._file = file - self._stream = stream - self._waittime = ceil(kwargs.get("waittime", WRAPUP)) - - self._ubxreader = UBXReader(stream, protfilter=UBX_PROTOCOL) - - self._serial_lock = Lock() - self._save_queue = Queue() - self._send_queue = Queue() - self._stop_event = Event() - - self._write_thread = Thread( - target=self._write_data, - daemon=True, - args=( - stream, - self._send_queue, - self._serial_lock, - ), - ) - self._read_thread = Thread( - target=self._read_data, - daemon=True, - args=( - self._stream, - self._ubxreader, - self._save_queue, - self._serial_lock, - self._stop_event, - ), - ) - self._save_thread = Thread( - target=self._save_data, - daemon=True, - args=( - self._file, - self._save_queue, - ), - ) - - self._msg_write = self._msg_sent = self._msg_rcvd = self._msg_save = ( - self._cfgkeys - ) = 0 - - def _write_data(self, stream: object, queue: Queue, lock: Lock): - """ - Read send queue containing CFG-VALGET poll requests and - send these to the device - """ - - while True: - message = queue.get() - lock.acquire() - stream.write(message.serialize()) - self._msg_write += 1 - self.logger.debug(f"WRITE {self._msg_write} - {message.identity}") - lock.release() - queue.task_done() - - def _read_data( - self, - stream: object, - ubr: UBXReader, - queue: Queue, - lock: Lock, - stop: Event, - ): - """ - Read incoming CFG-VALGET poll responses from device and - place on save queue. NB: we won't know how many poll - responses to expect. - """ - # pylint: disable=broad-except - - while not stop.is_set(): - try: - if stream.in_waiting: - lock.acquire() - (raw_data, parsed_data) = ubr.read() - lock.release() - if parsed_data is not None: - if parsed_data.identity == "CFG-VALGET": - queue.put((raw_data, parsed_data)) - self._msg_rcvd += 1 - self.logger.debug( - f"RESPONSE {self._msg_rcvd} - {parsed_data.identity}" - ) - except Exception as err: - if not stop.is_set(): - self.logger.error(f"Something went wrong {err}") - continue - - def _save_data(self, file: object, queue: Queue): - """ - Get CFG-VALGET poll responses from save queue, convert to - CFG-VALSET commands and save to binary file. - """ - - i = 0 - while True: - cfgdata = [] - while queue.qsize() > 0: - (_, parsed) = queue.get() - if parsed.identity == "CFG-VALGET": - for keyname in dir(parsed): - if keyname[0:3] == "CFG": - cfgdata.append((keyname, getattr(parsed, keyname))) - queue.task_done() - if len(cfgdata) >= 64: # up to 64 keys in each CFG-VALSET - txn = TXN_ONGOING if i else TXN_START - self._file_write(file, txn, cfgdata) - cfgdata = [] - i += 1 - if len(cfgdata) > 0: - self._file_write(file, TXN_COMMIT, cfgdata) - - def _file_write(self, file: object, txn: int, cfgdata: list): - """ - Write binary CFG-VALSET message data to output file. - """ - - if len(cfgdata) == 0: - return - self._msg_save += 1 - self._cfgkeys += len(cfgdata) - data = UBXMessage.config_set( - layers=SET_LAYER_RAM, transaction=txn, cfgData=cfgdata - ) - self.logger.debug(f"SAVE {self._msg_save} - {data.identity}") - file.write(data.serialize()) - - def run(self): - """ - Main save routine. - """ - - rc = 1 - self.logger.info( - f"Saving configuration from {self._stream.port} to {self._file.name}. " - "Press Ctrl-C to terminate early." - ) - - # loop until all commands sent or user presses Ctrl-C - try: - self._write_thread.start() - self._read_thread.start() - - layer = POLL_LAYER_RAM - position = 0 - keys = [] - for i, key in enumerate(UBX_CONFIG_DATABASE): - progbar(i, len(UBX_CONFIG_DATABASE), 50) - keys.append(key) - msg = UBXMessage.config_poll(layer, position, keys) - self._send_queue.put(msg) - self._msg_sent += 1 - self.logger.debug(f"POLL {i} - {msg.identity}") - keys = [] - sleep(DELAY) - - for i in range(self._waittime): - print( - f"Waiting {self._waittime - i} seconds for final responses..." - + " " * 20, - end="\r", - ) - sleep(1) - # sleep(self._waittime) - - self._stop_event.set() - self._send_queue.join() - self._save_thread.start() - self._save_queue.join() - - except KeyboardInterrupt: # capture Ctrl-C - self._stop_event.set() - self.logger.warning("Terminated by user. Configuration may be incomplete.") - - if self._msg_rcvd == self._cfgkeys: - self.logger.info( - "Configuration successfully saved. " - f"{self._msg_save} CFG-VALSET messages saved to {self._file.name}" - ) - else: - rc = 0 - self.logger.warning( - "Configuration may not be successfully saved. " - f"{self._msg_sent} CFG-VALGET polls sent to {self._stream.port}, " - f"{self._msg_rcvd} CFG-VALGET responses received, " - f"{self._msg_save} CFG-VALSET messages containing {self._cfgkeys} keys " - f"({self._cfgkeys*100/self._msg_rcvd:.1f}%) written to {self._file.name}. " - f"Consider increasing waittime to >{self._waittime}." - ) - - return rc - - -def main(): - """ - CLI Entry Point. - - :param: as per UBXSaver constructor. - """ - - ap = ArgumentParser(epilog=EPILOG, formatter_class=ArgumentDefaultsHelpFormatter) - ap.add_argument("-V", "--version", action="version", version="%(prog)s " + VERSION) - ap.add_argument("-P", "--port", required=True, help="Serial port") - ap.add_argument( - "-O", - "--outfile", - required=False, - help="Output file", - default=f"ubxconfig-{strftime('%Y%m%d%H%M%S')}.ubx", - ) - ap.add_argument( - "--baudrate", - required=False, - help="Serial baud rate", - type=int, - choices=[4800, 9600, 19200, 38400, 57600, 115200, 230400, 460800], - default=9600, - ) - ap.add_argument( - "--timeout", - required=False, - help="Serial timeout in seconds", - type=float, - default=0.2, - ) - ap.add_argument( - "--waittime", - required=False, - help="Wait time in seconds", - type=int, - default=WRAPUP, - ) - - kwargs = set_common_args("ubxsave", ap, logdefault=VERBOSITY_HIGH) - - with open(kwargs.pop("outfile"), "wb") as outfile: - with Serial( - kwargs.pop("port"), kwargs.pop("baudrate"), timeout=kwargs.pop("timeout") - ) as serial_stream: - ubs = UBXSaver(outfile, serial_stream, **kwargs) - ubs.run() - - -if __name__ == "__main__": - main() diff --git a/src/pygnssutils/ubxsetrate.py b/src/pygnssutils/ubxsetrate.py deleted file mode 100644 index 959499a..0000000 --- a/src/pygnssutils/ubxsetrate.py +++ /dev/null @@ -1,230 +0,0 @@ -""" -ubxsetrate.py - -Simple command line utility, installed with PyPi library pygnssutils, -to configure message rates on a UBX GNSS receiver connected -to a local serial port via the UBX CFG-MSG command. - -Usage: - -ubxsetrate --port dev/tty.usbmodem1301 --baudrate 38400 --timeout 5 - --msgClass 0xf1 --msgID 0x00 --rate 1 --verbosity 1 - -or (to turn off all NMEA messages): - -ubxsetrate --port /dev/tty.usbmodem1301 --msgClass allnmea --rate 0 - -Created on 12 Dec 2022 - -:author: semuadmin -:copyright: SEMU Consulting © 2022 -:license: BSD 3-Clause -""" - -# pylint: disable=invalid-name - -from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser -from logging import getLogger - -from pyubx2 import SET, UBX_CLASSES, UBX_MSGIDS, UBXMessage -from serial import Serial - -from pygnssutils._version import __version__ as VERSION -from pygnssutils.exceptions import ParameterError -from pygnssutils.globals import ( - ALLNMEA, - ALLNMEA_CLS, - ALLUBX, - ALLUBX_CLS, - EPILOG, - MINMMEA_ID, - MINNMEA, - MINUBX, - MINUBX_ID, - VERBOSITY_HIGH, -) -from pygnssutils.helpers import set_common_args - - -class UBXSetRate: - """ - UBX Set Rate Class. - - Supports basic enable/disable message configuration for UBX GNSS devices - connected to a local serial port - """ - - # pylint: disable=too-many-instance-attributes - - def __init__(self, **kwargs): - """ - Constructor. - - :param str port: (kwarg) serial port name - :param int baudrate: (kwarg) serial baud rate (9600) - :param int timeout: (kwarg) serial timeout in seconds (3) - :param int msgClass: (kwarg) message class from pyubx2.UBX_CLASSES OR - :param str msgClass: (kwarg) special values "allubx", 'minubx", "allnmea" or "minnmea" - :param int msgID: (kwarg) message ID from pyubx2.UBX_MSGIDS[1:] - :param int rate: (kwarg) message rate per navigation solution (1) - :raises: ParameterError - """ - - self.logger = getLogger(__name__) - try: - self._serialOut = None - self._port = kwargs.get("port") - self._baudrate = int(kwargs.get("baudrate", 9600)) - self._timeout = int(kwargs.get("timeout", 3)) - mcls = kwargs.get("msgClass") - mid = kwargs.get("msgID", 0) - - if mcls.lower() in (ALLNMEA, ALLUBX, MINNMEA, MINUBX): - self._msgClass = mcls - else: - self._msgClass = int(mcls, 16) if mcls[0:2] == "0x" else int(mcls) - self._msgID = int(mid, 16) if mid[0:2] == "0x" else int(mid) - # check valid message type - mclsb = int.to_bytes(self._msgClass, 1, byteorder="big") - midb = int.to_bytes( - (self._msgClass << 8) + self._msgID, 2, byteorder="big" - ) - if mclsb not in UBX_CLASSES or midb not in UBX_MSGIDS: - raise ( - ParameterError( - "Unknown message type: class " - f"{self._msgClass} (0x{self._msgClass:02x}), " - f"id {self._msgID} (0x{self._msgID:02x})", - ) - ) - self._rate = int(kwargs.get("rate", 1)) - - except (ParameterError, ValueError, TypeError) as err: - raise ParameterError( - f"Invalid input arguments {kwargs}\n{err}\nType ubxsetrate -h for help." - ) from err - - def apply(self): - """ - Creates UBX CFG-MSG command(s) and sends them to receiver. - """ - - try: - self.logger.info( - f"Opening serial port {self._port} @ {self._baudrate} baud ..." - ) - self._serialOut = Serial(self._port, self._baudrate, timeout=self._timeout) - - if self._msgClass == ALLNMEA: # all available NMEA messages - for msgID in UBX_MSGIDS: - if msgID[:1] in ALLNMEA_CLS: - self._sendmsg( - int.from_bytes(msgID[:1], "little"), - int.from_bytes(msgID[1:], "little"), - ) - elif self._msgClass == MINNMEA: # minimum NMEA messages - for msgID in MINMMEA_ID: - self._sendmsg(0xF0, int.from_bytes(msgID[1:], "little")) - elif self._msgClass == ALLUBX: # all available UBX messages - for msgID in UBX_MSGIDS: - if msgID[:1] in ALLUBX_CLS: - self._sendmsg( - int.from_bytes(msgID[:1], "little"), - int.from_bytes(msgID[1:], "little"), - ) - elif self._msgClass == MINUBX: # minimum UBX messages - for msgID in MINUBX_ID: - self._sendmsg(0x01, int.from_bytes(msgID[1:], "little")) - else: # individual defined message - self._sendmsg(self._msgClass, self._msgID) - - except Exception as err: - raise err from err - finally: - if self._serialOut is not None: - self.logger.info("Configuration message(s) sent.") - self._serialOut.close() - - def _sendmsg(self, msgClass: int, msgID: int): - """ - Send individual CFG-MSG command to receiver - - :param int msgClass: message class - :param int msgID: message ID - """ - - msg = UBXMessage( - "CFG", - "CFG-MSG", - SET, - msgClass=msgClass, - msgID=msgID, - rateDDC=self._rate, - rateUART1=self._rate, - rateUART2=self._rate, - rateUSB=self._rate, - rateSPI=self._rate, - ) - - self.logger.info(f"Sending configuration message {msg}...") - self._serialOut.write(msg.serialize()) - - -def main(): - """ - CLI Entry point. - - :param: as per UBXSetRate constructor. - :raises: ParameterError if parameters are invalid - """ - # pylint: disable=raise-missing-from - - ap = ArgumentParser(epilog=EPILOG, formatter_class=ArgumentDefaultsHelpFormatter) - ap.add_argument("-V", "--version", action="version", version="%(prog)s " + VERSION) - ap.add_argument("-P", "--port", required=True, help="Serial port") - ap.add_argument( - "--baudrate", - required=False, - help="Serial baud rate", - type=int, - choices=[4800, 9600, 19200, 38400, 57600, 115200, 230400, 460800], - default=9600, - ) - ap.add_argument( - "--timeout", - required=False, - help="Serial timeout in seconds", - type=float, - default=3.0, - ) - ap.add_argument( - "--msgClass", - required=True, - help=( - "Message class from pyubx2.UBX_CLASSES or " - "special values 'allubx', 'minubx', 'allnmea' or 'minnmea'" - ), - ) - ap.add_argument( - "--msgID", required=False, help="Message ID from pyubx2.UBX_MSGIDS[1:]" - ) - ap.add_argument( - "--rate", - required=False, - help="Message rate per navigation solution", - type=int, - default=1, - ) - - kwargs = set_common_args("ubxsetrate", ap, logdefault=VERBOSITY_HIGH) - - try: - usr = UBXSetRate(**kwargs) - usr.apply() - - except KeyboardInterrupt: - pass - - -if __name__ == "__main__": - main() diff --git a/src/pygnssutils/ubxsimulator.py b/src/pygnssutils/ubxsimulator.py deleted file mode 100644 index f11da0b..0000000 --- a/src/pygnssutils/ubxsimulator.py +++ /dev/null @@ -1,491 +0,0 @@ -""" -ubxsimulator.py - -*** EXPERIMENTAL *** - -Simple UBX GNSS serial device simulator. - -Simulates a GNSS serial stream by generating synthetic UBX or NMEA messages -based on parameters defined in a json configuration file. Can simulate a -motion vector based on a specified course over ground and speed. - -Example usage: - - from pygnssutils import UBXSimulator - from pyubx2 import UBXReader - - with UBXSimulator(configfile="/home/myuser/ubxsimulator.json", interval=1, timeout=3) as stream: - ubr = UBXReader(stream) - for raw, parsed in ubr: - print(parsed) - -Generates mock acknowledgements (ACK-ACK) for valid incoming UBX commands -and polls. - -See sample ubxsimulator.json configuration file in the \\\\examples folder. - -NB: Principally intended for testing Python GNSS application functionality. -There is currently no attempt to simulate real-world satellite geodetics, -though this could be done using e.g. the Python `skyfield` library and the -relevant satellite TLE (orbital elements) files. I may look into adding -such functionality as and when time permits. Contributions welcome. - -Created on 3 Feb 2024 - -:author: semuadmin -:copyright: SEMU Consulting © 2024 -:license: BSD 3-Clause -""" - -# pylint: disable=too-many-locals, too-many-instance-attributes, unused-import - -from datetime import datetime, timedelta -from json import JSONDecodeError, load -from logging import getLogger -from math import cos, pi, sin -from os import getenv, path -from pathlib import Path -from queue import Queue -from threading import Event, Thread -from time import sleep - -from pynmeagps import NMEAMessage -from pyrtcm import RTCMMessage, RTCMMessageError, RTCMParseError, RTCMReader -from pyubx2 import ( - GET, - RTCM3_PROTOCOL, - UBX_PROTOCOL, - UBXMessage, - UBXMessageError, - UBXParseError, - UBXReader, - escapeall, - getinputmode, - protocol, - utc2itow, -) - -from pygnssutils.globals import EARTH_RADIUS, UBXSIMULATOR - -DEFAULT_INTERVAL = 1000 # milliseconds -DEFAULT_TIMEOUT = 3 # seconds -DEFAULT_PATH = path.join(Path.home(), "ubxsimulator") - - -class UBXSimulator: - """ - Simple dummy GNSS UBX serial stream class. - """ - - def __init__(self, app=None, **kwargs): - """ - Constructor. - - :param float interval: (kwarg) simulated navigation interval in seconds - :param float timeout: (kwarg) simulated serial read timeout in seconds - :param str configfile: (kwarg) fully qualified path to json config file - """ - - # Reference to calling application class (if applicable) - self.__app = app # pylint: disable=unused-private-member - # configure logger with name "pygnssutils" in calling module - self.logger = getLogger(__name__) - self._config = self._readconfig( - kwargs.get( - "configfile", - getenv(f"{UBXSIMULATOR.upper()}_JSON", DEFAULT_PATH + ".json"), - ) - ) - self.logger.debug(f"Configuration loaded:\n{self._config}") - self._interval = kwargs.get( - "interval", (self._config.get("interval", DEFAULT_INTERVAL)) - ) # milliseconds - self._timeout = kwargs.get( - "timeout", (self._config.get("timeout", self._interval * 3)) - ) - self._stopevent = Event() - self._outqueue = Queue() - self._inqueue = Queue() - self._buffer = b"" - self._mainloop_thread = None - self._msgfactory_thread = None - self._lastread = datetime.fromordinal(1) - self._loops = 0 - - def _readconfig(self, cfile: str) -> dict: - """ - Get configuration from json file. - - :param str cfile: fully qualified path to config file - :returns: config as dict - :rtype: dict - """ - - try: - with open(cfile, "r", encoding="utf-8") as jsonfile: - config = load(jsonfile) - except (OSError, JSONDecodeError) as err: - self.logger.error(f"Unable to read configuration file:\n{err}") - return { - "interval": DEFAULT_INTERVAL, - "timeout": DEFAULT_TIMEOUT, - "logfile": DEFAULT_PATH + ".log", - } - - return config - - def __enter__(self): - """ - Context manager enter routine. - """ - - self.start() - return self - - def __exit__(self, exc_type, exc_value, exc_traceback): - """ - Context manager exit routine. - """ - - self.stop() - - def start(self): - """ - Start streaming. - """ - - self.logger.info("UBX Simulator started") - self._stopevent.clear() - self._msgfactory_thread = Thread( - target=self._msgfactory, - args=( - self._loops, - self._config, - self._stopevent, - self._outqueue, - ), - ) - self._msgfactory_thread.start() - sleep(self._interval / 1000) - self._mainloop_thread = Thread( - target=self._mainloop, - args=( - self._stopevent, - self._outqueue, - self._inqueue, - ), - ) - self._mainloop_thread.start() - - def stop(self): - """ - Stop streaming. - """ - - self._stopevent.set() - if self._mainloop_thread is not None: - self._mainloop_thread.join() - if self._msgfactory_thread is not None: - self._msgfactory_thread.join() - self.logger.info("UBX Simulator stopped") - - def _mainloop(self, stop: Event, outq: Queue, inq: Queue): - """ - THREADED Main Loop. - - :param Event stop: stop event - :param Queue outq: output queue - :param Queue inq: input queue - """ - - while not stop.is_set(): - while not outq.empty(): - self._buffer += outq.get() - outq.task_done() - while not inq.empty(): - data = inq.get() - self._datahandler(data, outq) - inq.task_done() - sleep(self._interval / 10000) - - def _msgfactory( - self, - loops: int, - config: dict, - stop: Event, - outq: Queue, - ) -> bytes: - """ - THREADED message factory. - - Generate synthetic UBX or NMEA navigation messages using navigation - interval and attribute values specified in configuration file. - - TODO allow dynamic attribute derivation for UBX messages. - - :param int loops: message loop counter - :param dict config: configuration dictionary - :param Event stop: stop event - :param Queue outq: output queue - """ - - while not stop.is_set(): - - nw = datetime.now() - _, itow = utc2itow(nw) - - # time and global position attributes are applied to all msg types - time_attrs = { - "iTOW": itow, - "year": nw.year, - "month": nw.month, - "day": nw.day, - "hour": nw.hour, - "min": nw.minute, - "second": nw.second, - } - global_attrs = config.get("global", {}) - - # simulate motion vector using course over ground and speed - # NB: UBX NAV-PVT outputs speed as mm/s; NMEA RMC uses knots - if config.get("simVector", False): - lat1 = global_attrs.get("lat", 0) - lon1 = global_attrs.get("lon", 0) - spd = global_attrs.get("spd", 0) * 0.5144444 # knots -> m/s - spd = global_attrs.get("gSpeed", spd * 1000) / 1000 # mm/s -> m/s - cog = global_attrs.get("headMot", global_attrs.get("cog", 0)) - lat2, lon2 = self._add_vector(lat1, lon1, spd, cog, self._interval) - global_attrs["lat"] = lat2 - global_attrs["lon"] = lon2 - - ubxm = config.get("ubxmessages", []) - for msg in ubxm: - msgcls = msg["msgCls"] - msgid = msg["msgId"] - rate = msg.get("rate", 1) - if not loops % rate: - attrs = {**time_attrs, **global_attrs, **msg["attrs"]} - ubx = UBXMessage(msgcls, msgid, GET, **attrs) - outq.put(ubx.serialize()) - - nmeam = config.get("nmeamessages", []) - for msg in nmeam: - talker = msg["talker"] - msgid = msg["msgId"] - rate = msg.get("rate", 1) - if not loops % rate: - # pynmeagps automatically defaults time to now() - attrs = {**global_attrs, **msg["attrs"]} - nme = NMEAMessage(talker, msgid, GET, **attrs) - outq.put(nme.serialize()) - - sleep(self._interval / 1000) - loops = (loops + 1) % 1024 - - def _datahandler(self, data: bytes, outq: Queue): - """ - THREADED - Process incoming UBX or RTCM3 data. - - TODO enhance to mimic wider range of command or poll responses. - - :param bytes data: UBXMessage or RTCMMessage - :param Queue outq: output queue - """ - - if data is None: - return - - if isinstance(data, UBXMessage): - self._do_ackack(data, outq) - - if data.identity == "MON-VER": - self._do_monver(outq) - if data.identity == "CFG-RATE": - self._do_cfgrate(data, outq) - - def _do_send(self, msg: UBXMessage, outq: Queue): - """ - Send synthetic command or query response. - - :param UBXMessage msg: UBXMessage - :param Queue outq: output queue - """ - - raw = msg.serialize() - outq.put(raw) - self.logger.info(f"Response Sent by Simulator:\n{raw}\n{msg}") - - def _do_ackack(self, data: UBXMessage, outq: Queue): - """ - Generate synthetic ACK ACK acknowledgement. - - :param UBXMessage msg: incoming UBXMessage - :param Queue outq: output queue - """ - - msg = UBXMessage( - "ACK", - "ACK-ACK", - GET, - clsID=int.from_bytes(data.msg_cls, "little"), - msgID=int.from_bytes(data.msg_id, "little"), - ) - self._do_send(msg, outq) - - def _do_monver(self, outq: Queue): - """ - Generate synthetic MON-VER poll response. - - :param Queue outq: output queue - """ - - parms = { - "swVersion": b"ROM_CORE UBXSIM" + b"\x00" * 15, - "hwVersion": b"UBXSIM" + b"\x00" * 4, - } - msg = UBXMessage("MON", "MON-VER", GET, **parms) - self._do_send(msg, outq) - - def _do_cfgrate(self, data: UBXMessage, outq: Queue): - """ - Update nav interval in response to CFG_RATE command. - - :param Queue outq: output queue - """ - - if hasattr(data, "measRate"): # SET - self._interval = data.measRate - else: # POLL - parms = { - "measRate": int(self._interval), - "navRate": 1, - "timeRef": 0, - } - msg = UBXMessage("CFG", "CFG-RATE", GET, **parms) - self._do_send(msg, outq) - - def _add_vector( - self, - lat: float, - lon: float, - speed: float, - course: float, - interval: float = 1000, - radius: float = EARTH_RADIUS, - ) -> tuple: - """ - Add vector to given lat/lon position. - - :param float lat: starting latitude - :param float lon: starting longitude - :param float speed: speed in m/s - :param float course: course over ground in degrees - :param float interval: navigation interval in milliseconds (1) - :param float radius: earth radius in km (6371) - :returns: ending lat/lon - :rtype: tuple - """ - - # convert speed to km/s - course *= pi / 180 - dn = speed / 1000 * cos(course) * interval / 1000 - de = speed / 1000 * sin(course) * interval / 1000 - dlat = dn / radius - dlon = de / (radius * cos(lat * pi / 180)) - lat2 = lat + dlat * 180 / pi - lon2 = lon + dlon * 180 / pi - return lat2, lon2 - - def read(self, num: int = 1) -> bytes: - """ - Read n bytes from buffer. - - :param int val: num of bytes to read (1) - :returns: bytes - :raises: TimeoutError - """ - - while len(self._buffer) < num: - sleep(self._interval / 20000) - if datetime.now() > self._lastread + timedelta(seconds=self._timeout): - raise TimeoutError - data = self._buffer[0:num] - self._buffer = self._buffer[num:] - self._lastread = datetime.now() - return data - - def readline(self) -> bytes: - """ - Read line from buffer. - - :returns: bytes - """ - - b = data = b"" - while b != b"\x0a": # LF - b = self.read() - if b == b"": - break - data += b - return data - - def write(self, data: bytes): - """ - Simulated write to receiver. - - :param bytes data: UBX data - """ - - prot = protocol(data) - try: - if prot == RTCM3_PROTOCOL: - rtm = RTCMReader.parse(data) - self._inqueue.put(rtm) - val = ("RTCM", rtm) - elif prot == UBX_PROTOCOL: - msgmode = getinputmode(data) # returns SET or POLL - ubx = UBXReader.parse(data, msgmode=msgmode) - self._inqueue.put(ubx) - val = ("UBX", ubx) - else: - val = (f"Other Protocol {prot}", None) - except ( - UBXParseError, - UBXMessageError, - RTCMParseError, - RTCMMessageError, - ) as err: - val = ("Invalid/Unknown Data:", f"{err}") - self.logger.debug( - f"{val[0]} Data Received by Simulator:\n{escapeall(data)}\n{val[1]}" - ) - - def close(self): - """ - Close dummy serial stream. - """ - - self.stop() - - @property - def is_open(self) -> bool: - """ - Return status. - - :returns: true or false - :rtype: bool - """ - - return self._mainloop_thread is not None - - @property - def in_waiting(self) -> int: - """ - Return number of bytes in buffer. - - :returns: buffer length - :rtype: int - """ - - return len(self._buffer) diff --git a/src/pygnssutils/ubxsimulator_cli.py b/src/pygnssutils/ubxsimulator_cli.py deleted file mode 100644 index 85a0d42..0000000 --- a/src/pygnssutils/ubxsimulator_cli.py +++ /dev/null @@ -1,85 +0,0 @@ -""" -ubxsimulator_cli.py - -CLI wrapper for UBXSimulator class. - -Created on 24 Jul 2024 - -:author: semuadmin -:copyright: SEMU Consulting © 2024 -:license: BSD 3-Clause -""" - -from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser -from logging import getLogger -from os import getenv - -from pyubx2 import UBXReader - -from pygnssutils._version import __version__ as VERSION -from pygnssutils.globals import CLIAPP, EPILOG, UBXSIMULATOR -from pygnssutils.helpers import set_common_args -from pygnssutils.ubxsimulator import DEFAULT_PATH, UBXSimulator - -SIMCONFIG = f"{UBXSIMULATOR.upper()}_JSON" - - -def main(): - """ - CLI Entry point. - """ - - ap = ArgumentParser( - description="pygnssutils EXPERIMENTAL UBX Serial Device Simulator", - epilog=EPILOG, - formatter_class=ArgumentDefaultsHelpFormatter, - ) - ap.add_argument("-V", "--version", action="version", version="%(prog)s " + VERSION) - ap.add_argument( - "-I", - "--interval", - required=False, - type=float, - help="Simulated navigation interval in milliseconds (Hz = 1000/interval)", - default=1000, - ) - ap.add_argument( - "-T", - "--timeout", - required=False, - type=float, - help="Simulated serial read timeout in seconds", - default=3, - ) - ap.add_argument( - "--simconfigfile", - required=False, - type=str, - help=( - "Fully qualified path to simulator json configuration file " - f"(will use environment variable {SIMCONFIG} if set)" - ), - default=getenv(SIMCONFIG, DEFAULT_PATH + ".json"), - ) - kwargs = set_common_args("ubxsimulator", ap) - - logger = getLogger("pygnssutils.ubxsimulator") - - kwargs["configfile"] = kwargs.pop( - "simconfigfile", getenv(SIMCONFIG, DEFAULT_PATH + ".json") - ) - with UBXSimulator(CLIAPP, **kwargs) as stream: - - try: - ubr = UBXReader(stream) - i = 0 - for _, parsed in ubr: - logger.debug(str(parsed)) - i += 1 - except KeyboardInterrupt: - logger.info(f"Terminated by user, {i} messages processed") - - -if __name__ == "__main__": - - main()