-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmesh_console.py
355 lines (314 loc) · 13.4 KB
/
mesh_console.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# mesh_console.py
#
# Copyright (C) 2024, 2025 Florian Lengyel WM2D
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import argparse
import asyncio
from pubsub import pub
from meshtastic.serial_interface import SerialInterface
import serial.tools.list_ports
from src.station.utils.logger import configure_logger, get_available_levels
from src.station.handlers.redis_handler import RedisHandler
from src.station.handlers.data_handler import MeshtasticDataHandler
from src.station.utils.constants import RedisConst, DisplayConst, DeviceConst, LoggingConst
from src.station.config.base_config import BaseStationConfig
# Initialize asyncio queue for Redis updates
redis_update_queue = asyncio.Queue()
def parse_arguments():
"""Parse command-line arguments with enhanced logging options."""
parser = argparse.ArgumentParser(
description="Meshtastic Console",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --device /dev/ttyACM0 # Use default INFO level
%(prog)s --log INFO,PACKET # Show INFO and PACKET messages
%(prog)s --log DEBUG --threshold # Show DEBUG and above
%(prog)s --log PACKET,REDIS --no-file-logging # Show only PACKET and REDIS, console only
"""
)
# Device configuration
parser.add_argument(
"--device",
type=str,
default=DeviceConst.DEFAULT_PORT_LINUX,
help=f"Serial interface device (default: {DeviceConst.DEFAULT_PORT_LINUX})"
)
# Logging configuration
log_group = parser.add_argument_group('Logging Options')
log_group.add_argument(
"--log",
type=str,
default=LoggingConst.DEFAULT_LEVEL,
help=f"Comma-separated list of log levels to include. Available levels: {', '.join(get_available_levels())}"
)
log_group.add_argument(
"--threshold",
action="store_true",
help="Treat log level as threshold (show all messages at or above specified level)"
)
log_group.add_argument(
"--no-file-logging",
action="store_true",
help="Disable logging to file"
)
# Other options
parser.add_argument(
"--display-redis",
action="store_true",
help="Display Redis data and exit without connecting to the serial device"
)
parser.add_argument(
"--debugging",
action="store_true",
help="Print diagnostic debugging statements"
)
# Configuration
parser.add_argument(
"--config",
type=str,
help="Path to configuration file"
)
# Redis configuration
parser.add_argument(
"--redis-host",
type=str,
help="Redis host (overrides config)"
)
parser.add_argument(
"--redis-port",
type=int,
help="Redis port (overrides config)"
)
args = parser.parse_args()
# Convert comma-separated log levels to list
args.log_levels = [level.strip() for level in args.log.split(",")]
return args
async def display_stored_data(data_handler):
"""Display previously stored data."""
# Display nodes
print("\n--- Previously Saved Nodes ---") # Always print headers
logger.debug("Attempting to retrieve formatted nodes...")
nodes = await data_handler.get_formatted_nodes()
logger.debug(f"Retrieved {len(nodes) if nodes else 0} nodes")
if not nodes:
print("[No nodes found]")
else:
for node in sorted(nodes, key=lambda x: x['timestamp']):
print(f"[{node['timestamp']}] Node {node['id']}: {node['name']}")
# Display messages
print("\n--- Previously Saved Messages ---")
logger.debug("Attempting to retrieve formatted messages...")
messages = await data_handler.get_formatted_messages()
logger.debug(f"Retrieved {len(messages) if messages else 0} messages")
if not messages:
print("[No messages found]")
else:
for msg in sorted(messages, key=lambda x: x['timestamp']):
print(f"[{msg['timestamp']}] {msg['from']} -> {msg['to']}: {msg['text']}")
# Display device telemetry
print("\n--- Previously Saved Device Telemetry ---")
logger.debug("Attempting to retrieve device telemetry...")
device_telemetry = await data_handler.get_formatted_device_telemetry()
logger.debug(f"Retrieved {len(device_telemetry) if device_telemetry else 0} device telemetry records")
if not device_telemetry:
print("[No device telemetry found]")
else:
for tel in sorted(device_telemetry, key=lambda x: x['timestamp'])[-DisplayConst.MAX_DEVICE_TELEMETRY:]: # Last 10 entries
print(f"[{tel['timestamp']}] {tel['from_id']}: battery={tel['battery']}%, voltage={tel['voltage']}V")
# Display network telemetry
print("\n--- Previously Saved Network Telemetry ---")
logger.debug("Attempting to retrieve network telemetry...")
network_telemetry = await data_handler.get_formatted_network_telemetry()
logger.debug(f"Retrieved {len(network_telemetry) if network_telemetry else 0} network telemetry records")
if not network_telemetry:
print("[No network telemetry found]")
else:
for tel in sorted(network_telemetry, key=lambda x: x['timestamp'])[-DisplayConst.MAX_NETWORK_TELEMETRY:]: # Last 5 entries
print(f"[{tel['timestamp']}] {tel['from_id']}: {tel['online_nodes']}/{tel['total_nodes']} nodes online")
def on_text_message(packet, interface):
"""Callback for text messages."""
logger.packet(f"on_text_message: {packet}")
try:
redis_update_queue.put_nowait({
"type": "text",
"packet": packet
})
except Exception as e:
logger.error(f"Error in text message callback: {e}", exc_info=True)
def on_node_message(packet, interface):
"""Callback for node messages."""
logger.packet(f"on_node_message: {packet}")
try:
redis_update_queue.put_nowait({
"type": "node",
"packet": packet
})
except Exception as e:
logger.error(f"Error in node message callback: {e}", exc_info=True)
def on_telemetry_message(packet, interface):
"""Callback for telemetry messages."""
logger.packet(f"on_telemetry_message: {packet}")
try:
redis_update_queue.put_nowait({
"type": "telemetry",
"packet": packet
})
except Exception as e:
logger.error(f"Error in telemetry callback: {e}", exc_info=True)
def suggest_available_ports():
"""List available serial ports."""
try:
logger.info("Available ports:")
ports = list(serial.tools.list_ports.comports())
if ports:
for port in ports:
logger.info(f" - {port.device}")
else:
logger.info(" No serial ports detected.")
except Exception as e:
logger.error(f"Cannot list available ports: {e}")
async def redis_dispatcher(data_handler):
"""Process Redis updates from the queue."""
try:
logger.info("Redis dispatcher task started.")
last_size = 0
while True:
try:
current_size = redis_update_queue.qsize()
if current_size != last_size:
logger.debug(f"Queue size changed to: {current_size}")
last_size = current_size
# Use a shorter timeout to prevent hanging
try:
update = await asyncio.wait_for(redis_update_queue.get(), timeout=RedisConst.QUEUE_TIMEOUT)
logger.debug(f"Received update type: {update['type']}")
# Process the packet
await data_handler.process_packet(update["packet"], update["type"])
redis_update_queue.task_done()
except asyncio.TimeoutError:
# Periodic heartbeat
logger.debug("Dispatcher heartbeat - no updates")
await asyncio.sleep(RedisConst.HEARTBEAT_INTERVAL)
continue
except Exception as e:
logger.error(f"Error in dispatcher: {e}", exc_info=True)
redis_update_queue.task_done()
await asyncio.sleep(RedisConst.ERROR_SLEEP) # Prevent tight error loops
except asyncio.CancelledError:
logger.info("Dispatcher received cancellation signal")
# Process remaining updates during shutdown
remaining = redis_update_queue.qsize()
if remaining > 0:
logger.info(f"Processing {remaining} remaining updates during shutdown")
while not redis_update_queue.empty():
update = redis_update_queue.get_nowait()
try:
await data_handler.process_packet(update["packet"], update["type"])
except Exception as e:
logger.error(f"Error processing remaining update: {e}")
finally:
redis_update_queue.task_done()
logger.debug("Redis dispatcher completed final updates")
raise # Re-raise to ensure proper task cleanup
async def main():
"""Main function to set up the Meshtastic listener."""
# Parse arguments and set up logging
args = parse_arguments()
global logger
logger = configure_logger(
name=__name__,
log_levels=args.log_levels,
use_threshold=args.threshold,
log_file=None if args.no_file_logging else LoggingConst.DEFAULT_FILE,
debugging=args.debugging
)
# First step: load config if specified
config = None
if args.config:
config = BaseStationConfig.load(path=args.config, logger=logger)
else:
config = BaseStationConfig.load(logger=logger)
logger.debug(f"Loaded default config from known locations")
logger.debug(f"Config contains: redis.host={config.redis.host}, redis.port={config.redis.port}")
# Initialize handlers
try:
redis_handler = RedisHandler(
host=config.redis.host if config else RedisConst.DEFAULT_HOST,
port=config.redis.port if config else RedisConst.DEFAULT_PORT,
logger=logger
)
if not await redis_handler.verify_connection():
logger.error(f"Could not connect to Redis at {config.redis.host if config else 'localhost'}:"
f"{config.redis.port if config else RedisConst.DEFAULT_PORT}")
logger.error("Please check Redis configuration and ensure Redis server is running")
if args.debugging:
logger.debug("See above for connection error details")
return # Exit gracefully
except Exception as e:
logger.error(f"Unexpected error initializing Redis: {e}")
if args.debugging:
logger.debug("Initialization error details:", exc_info=True)
return # Exit gracefully
data_handler = MeshtasticDataHandler(redis_handler, logger=logger)
# Display Redis data and exit if requested
if args.display_redis:
logger.info("Displaying Redis data ...")
await display_stored_data(data_handler)
await redis_handler.close()
return
# Initialize device connection
try:
interface = SerialInterface(args.device)
logger.debug(f"Connected to serial device: {args.device}")
except FileNotFoundError:
logger.error(f"Cannot connect to serial device {args.device}: Device not found.")
suggest_available_ports()
await redis_handler.close()
return
except Exception as e:
logger.error(f"Cannot connect to serial device {args.device}: {e}")
suggest_available_ports()
await redis_handler.close()
return
# Display stored data
await display_stored_data(data_handler)
# Subscribe to message topics
pub.subscribe(on_text_message, "meshtastic.receive.text")
pub.subscribe(on_node_message, "meshtastic.receive.user")
pub.subscribe(on_telemetry_message, "meshtastic.receive.telemetry")
logger.info("Subscribed to text, user, and telemetry messages.")
# Start Redis dispatcher
dispatcher_task = asyncio.create_task(redis_dispatcher(data_handler))
logger.debug(f"Created redis_dispatcher task: {dispatcher_task}")
try:
logger.info("Listening for messages... Press Ctrl+C to exit.")
await dispatcher_task
except KeyboardInterrupt:
logger.info("Shutdown initiated...")
dispatcher_task.cancel()
try:
await dispatcher_task # Wait for task to finish processing queue
except asyncio.CancelledError:
pass # Expected during shutdown
finally:
interface.close()
await redis_handler.close()
logger.info("Interface closed.")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Program terminated.")