Skip to content

script: logging: Fix small bugs and add support for live log parsing from a file #93107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions doc/releases/migration-guide-4.2.rst
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,15 @@ hawkBit
be prepended with :kconfig:option:`CONFIG_BOARD`. It is the user's responsibility to write a
callback that prepends the board name if needed.

Logging
=======

* The UART dictionary log parsing script
:zephyr_file:`scripts/logging/dictionary/log_parser_uart.py` has been deprecated. Instead, the
more generic script of :zephyr_file:`scripts/logging/dictionary/live_log_parser.py` should be
used. The new script supports the same functionality (and more), but requires different command
line arguments when invoked.

State Machine Framework
=======================

Expand Down
7 changes: 4 additions & 3 deletions scripts/logging/dictionary/dictionary_parser/log_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""

import abc
import re

from colorama import Fore

Expand Down Expand Up @@ -36,13 +37,13 @@ def formalize_fmt_string(fmt_str):

for spec in ['d', 'i', 'o', 'u', 'x', 'X']:
# Python doesn't support %ll for integer specifiers, so remove extra 'l'
new_str = new_str.replace("%ll" + spec, "%l" + spec)
new_str = re.sub(r'%(\#?\d*)ll' + spec, r'%\1l' + spec, new_str)

if spec in ['x', 'X']:
new_str = new_str.replace("%#ll" + spec, "%#l" + spec)
new_str = re.sub(r'%\#(\d*)ll' + spec, r'%#\1l' + spec, new_str)

# Python doesn't support %hh for integer specifiers, so remove extra 'h'
new_str = new_str.replace("%hh" + spec, "%h" + spec)
new_str = re.sub(r'%(\#?\d*)hh' + spec, r'%\1h' + spec, new_str)

# No %p for pointer either, so use %x
new_str = new_str.replace("%p", "0x%x")
Expand Down
70 changes: 51 additions & 19 deletions scripts/logging/dictionary/dictionary_parser/log_parser_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,18 @@ def print_hexdump(hex_data, prefix_len, color):
print(f"{color}%s%s%s|%s{Fore.RESET}" % ((" " * prefix_len),
hex_vals, hex_padding, chr_vals))

def get_full_msg_hdr_size(self):
"""Get the size of the full message header"""
return struct.calcsize(self.fmt_msg_type) + \
struct.calcsize(self.fmt_msg_hdr) + \
struct.calcsize(self.fmt_msg_timestamp)

def get_normal_msg_size(self, logdata, offset):
"""Get the needed size of the normal log message at offset"""
log_desc, _ = struct.unpack_from(self.fmt_msg_hdr, logdata, offset)
pkg_len = (log_desc >> 6) & int(math.pow(2, 10) - 1)
data_len = (log_desc >> 16) & int(math.pow(2, 12) - 1)
return self.get_full_msg_hdr_size() + pkg_len + data_len

def parse_one_normal_msg(self, logdata, offset):
"""Parse one normal log message and print the encoded message"""
Expand Down Expand Up @@ -341,33 +353,53 @@ def parse_one_normal_msg(self, logdata, offset):
# Point to next message
return next_msg_offset

def parse_one_msg(self, logdata, offset):
if offset + struct.calcsize(self.fmt_msg_type) > len(logdata):
return False, offset

def parse_log_data(self, logdata, debug=False):
"""Parse binary log data and print the encoded log messages"""
offset = 0
# Get message type
msg_type = struct.unpack_from(self.fmt_msg_type, logdata, offset)[0]

while offset < len(logdata):
# Get message type
msg_type = struct.unpack_from(self.fmt_msg_type, logdata, offset)[0]
if msg_type == MSG_TYPE_DROPPED:

if offset + struct.calcsize(self.fmt_dropped_cnt) > len(logdata):
return False, offset
offset += struct.calcsize(self.fmt_msg_type)

if msg_type == MSG_TYPE_DROPPED:
num_dropped = struct.unpack_from(self.fmt_dropped_cnt, logdata, offset)
offset += struct.calcsize(self.fmt_dropped_cnt)
num_dropped = struct.unpack_from(self.fmt_dropped_cnt, logdata, offset)
offset += struct.calcsize(self.fmt_dropped_cnt)

print(f"--- {num_dropped} messages dropped ---")
print(f"--- {num_dropped} messages dropped ---")

elif msg_type == MSG_TYPE_NORMAL:
ret = self.parse_one_normal_msg(logdata, offset)
if ret is None:
return False
elif msg_type == MSG_TYPE_NORMAL:

offset = ret
if ((offset + self.get_full_msg_hdr_size() > len(logdata)) or
(offset + self.get_normal_msg_size(logdata, offset) > len(logdata))):
return False, offset

else:
logger.error("------ Unknown message type: %s", msg_type)
return False
offset += struct.calcsize(self.fmt_msg_type)

ret = self.parse_one_normal_msg(logdata, offset)
if ret is None:
raise ValueError("Error parsing normal log message")

offset = ret

else:
logger.error("------ Unknown message type: %s", msg_type)
raise ValueError(f"Unknown message type: {msg_type}")

return True, offset

def parse_log_data(self, logdata, debug=False):
"""Parse binary log data and print the encoded log messages"""
offset = 0
still_parsing = True

while offset < len(logdata) and still_parsing:
still_parsing, offset = self.parse_one_msg(logdata, offset)

return offset

return True

colorama.init()
69 changes: 50 additions & 19 deletions scripts/logging/dictionary/dictionary_parser/log_parser_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,16 @@ def print_hexdump(hex_data, prefix_len, color):
print(f"{color}%s%s%s|%s{Fore.RESET}" % ((" " * prefix_len),
hex_vals, hex_padding, chr_vals))

def get_full_msg_hdr_size(self):
"""Get the size of the full message header"""
return struct.calcsize(self.fmt_msg_type) + \
struct.calcsize(self.fmt_msg_hdr) + \
struct.calcsize(self.fmt_msg_timestamp)

def get_normal_msg_size(self, logdata, offset):
"""Get the needed size of the normal log message at offset"""
_, pkg_len, data_len, _ = struct.unpack_from(self.fmt_msg_hdr, logdata, offset + struct.calcsize(self.fmt_msg_type))
return self.get_full_msg_hdr_size() + pkg_len + data_len

def parse_one_normal_msg(self, logdata, offset):
"""Parse one normal log message and print the encoded message"""
Expand Down Expand Up @@ -350,33 +360,54 @@ def parse_one_normal_msg(self, logdata, offset):
# Point to next message
return next_msg_offset

def parse_one_msg(self, logdata, offset):
if offset + struct.calcsize(self.fmt_msg_type) > len(logdata):
return False, offset

def parse_log_data(self, logdata, debug=False):
"""Parse binary log data and print the encoded log messages"""
offset = 0
# Get message type
msg_type = struct.unpack_from(self.fmt_msg_type, logdata, offset)[0]

if msg_type == MSG_TYPE_DROPPED:

if offset + struct.calcsize(self.fmt_dropped_cnt) > len(logdata):
return False, offset

while offset < len(logdata):
# Get message type
msg_type = struct.unpack_from(self.fmt_msg_type, logdata, offset)[0]
offset += struct.calcsize(self.fmt_msg_type)

if msg_type == MSG_TYPE_DROPPED:
num_dropped = struct.unpack_from(self.fmt_dropped_cnt, logdata, offset)
offset += struct.calcsize(self.fmt_dropped_cnt)
num_dropped = struct.unpack_from(self.fmt_dropped_cnt, logdata, offset)
offset += struct.calcsize(self.fmt_dropped_cnt)

print(f"--- {num_dropped} messages dropped ---")
print(f"--- {num_dropped} messages dropped ---")

elif msg_type == MSG_TYPE_NORMAL:
ret = self.parse_one_normal_msg(logdata, offset)
if ret is None:
return False
elif msg_type == MSG_TYPE_NORMAL:

offset = ret
if ((offset + self.get_full_msg_hdr_size() > len(logdata)) or
(offset + self.get_normal_msg_size(logdata, offset) > len(logdata))):
return False, offset

else:
logger.error("------ Unknown message type: %s", msg_type)
return False
offset += struct.calcsize(self.fmt_msg_type)

ret = self.parse_one_normal_msg(logdata, offset)
if ret is None:
raise ValueError("Error parsing normal log message")

offset = ret

else:
logger.error("------ Unknown message type: %s", msg_type)
raise ValueError(f"Unknown message type: {msg_type}")

return True, offset

def parse_log_data(self, logdata, debug=False):
"""Parse binary log data and print the encoded log messages"""
offset = 0
still_parsing = True

while offset < len(logdata) and still_parsing:
still_parsing, offset = self.parse_one_msg(logdata, offset)

return offset

return True

colorama.init()
129 changes: 129 additions & 0 deletions scripts/logging/dictionary/live_log_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/usr/bin/env python3
#
# Copyright (c) 2024 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0

"""
Log Parser for Dictionary-based Logging

This uses the JSON database file to decode the binary
log data taken directly from input serialport and print
the log messages.
"""

import argparse
import logging
import sys
import select
import os

import parserlib
import serial

LOGGER_FORMAT = "%(message)s"
logger = logging.getLogger("parser")


class SerialReader:
"""Class to read data from serial port and parse it"""

def __init__(self, serial_port, baudrate):
self.serial = serial.Serial(serial_port, baudrate)

def close(self):
self.serial.close()

def fileno(self):
return self.serial.fileno()

def read_non_blocking(self):
size = self.serial.in_waiting
return self.serial.read(size)


class FileReader:
"""Class to read data from serial port and parse it"""

def __init__(self, filepath):
if filepath is not None:
self.file = open(filepath, 'rb')
else:
sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0)
self.file = sys.stdin

def close(self):
self.file.close()

def fileno(self):
return self.file.fileno()

def read_non_blocking(self):
# Read available data using a reasonable buffer size (without buffer size, this blocks
# forever, but with buffer size it returns even when less data than the buffer read was
# available).
return self.file.read(1024)


def parse_args():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(allow_abbrev=False)

parser.add_argument("dbfile", help="Dictionary Logging Database file")
parser.add_argument("--debug", action="store_true", help="Print extra debugging information")

# Create subparsers for different input modes
subparsers = parser.add_subparsers(dest="mode", required=True, help="Input source mode")

# Serial subparser
serial_parser = subparsers.add_parser("serial", help="Read from serial port")
serial_parser.add_argument("port", help="Serial port")
serial_parser.add_argument("baudrate", type=int, help="Baudrate")

# File subparser
file_parser = subparsers.add_parser("file", help="Read from file")
file_parser.add_argument("filepath", nargs="?", default=None,
help="Input file path, leave empty for stdin")

return parser.parse_args()


def main():
"""function of serial parser"""
args = parse_args()

if args.dbfile is None or '.json' not in args.dbfile:
logger.error("ERROR: invalid log database path: %s, exiting...", args.dbfile)
sys.exit(1)

logging.basicConfig(format=LOGGER_FORMAT)

if args.debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)

log_parser = parserlib.get_log_parser(args.dbfile, logger)

data = b''

if args.mode == "serial":
reader = SerialReader(args.port, args.baudrate)
elif args.mode == "file":
reader = FileReader(args.filepath)
else:
raise ValueError("Invalid mode selected. Use 'serial' or 'file'.")

try:
while True:
ready, _, _ = select.select([reader], [], [])
if ready:
data += reader.read_non_blocking()
parsed_data_offset = parserlib.parser(data, log_parser, logger)
data = data[parsed_data_offset:]
finally:
reader.close()


if __name__ == "__main__":
main()
7 changes: 6 additions & 1 deletion scripts/logging/dictionary/log_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,17 @@ def main():
else:
logger.setLevel(logging.INFO)

log_parser = parserlib.get_log_parser(args.dbfile, logger)

logdata = read_log_file(args)
if logdata is None:
logger.error("ERROR: cannot read log from file: %s, exiting...", args.logfile)
sys.exit(1)

parserlib.parser(logdata, args.dbfile, logger)
parsed_data_offset = parserlib.parser(logdata, log_parser, logger)
if parsed_data_offset != len(logdata):
logger.error("ERROR: Not all data was parsed, %d bytes left unparsed", len(logdata) - parsed_data_offset)
sys.exit(1)

if __name__ == "__main__":
main()
Loading