Skip to content

Commit 01365a1

Browse files
authored
bring loggers in sync and add multiproc capabilities (#1254)
1 parent 652a771 commit 01365a1

File tree

1 file changed

+172
-18
lines changed

1 file changed

+172
-18
lines changed

src/common/logger.py

Lines changed: 172 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,50 @@
1+
"""Structured logger utility for creating JSON logs."""
2+
3+
# the Delphi group uses two ~identical versions of this file.
4+
# try to keep them in sync with edits, for sanity.
5+
# https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py # pylint: disable=line-too-long
6+
# https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py
7+
8+
import contextlib
19
import logging
10+
import multiprocessing
211
import os
312
import sys
413
import threading
14+
from traceback import format_exception
15+
516
import structlog
617

718

819
def handle_exceptions(logger):
920
"""Handle exceptions using the provided logger."""
1021

11-
def exception_handler(etype, value, traceback):
12-
logger.exception("Top-level exception occurred", exc_info=(etype, value, traceback))
22+
def exception_handler(scope, etype, value, traceback):
23+
logger.exception("Top-level exception occurred",
24+
scope=scope, exc_info=(etype, value, traceback))
1325

14-
def multithread_exception_handler(args):
15-
exception_handler(args.exc_type, args.exc_value, args.exc_traceback)
26+
def sys_exception_handler(etype, value, traceback):
27+
exception_handler("sys", etype, value, traceback)
1628

17-
sys.excepthook = exception_handler
18-
threading.excepthook = multithread_exception_handler
29+
def threading_exception_handler(args):
30+
if args.exc_type == SystemExit and args.exc_value.code == 0:
31+
# `sys.exit(0)` is considered "successful termination":
32+
# https://docs.python.org/3/library/sys.html#sys.exit
33+
logger.debug("normal thread exit", thread=args.thread,
34+
stack="".join(
35+
format_exception(
36+
args.exc_type, args.exc_value, args.exc_traceback)))
37+
else:
38+
exception_handler(f"thread: {args.thread}",
39+
args.exc_type, args.exc_value, args.exc_traceback)
1940

41+
sys.excepthook = sys_exception_handler
42+
threading.excepthook = threading_exception_handler
2043

21-
def get_structured_logger(name=__name__, filename=None, log_exceptions=True):
44+
45+
def get_structured_logger(name=__name__,
46+
filename=None,
47+
log_exceptions=True):
2248
"""Create a new structlog logger.
2349
2450
Use the logger returned from this in indicator code using the standard
@@ -38,22 +64,19 @@ def get_structured_logger(name=__name__, filename=None, log_exceptions=True):
3864
is a good choice.
3965
filename: An (optional) file to write log output.
4066
"""
41-
# Configure the underlying logging configuration
42-
handlers = [logging.StreamHandler()]
43-
if filename:
44-
handlers.append(logging.FileHandler(filename))
45-
67+
# Set the underlying logging configuration
4668
if "LOG_DEBUG" in os.environ:
4769
log_level = logging.DEBUG
4870
else:
4971
log_level = logging.INFO
5072

51-
logging.basicConfig(format="%(message)s", level=log_level, handlers=handlers)
73+
logging.basicConfig(
74+
format="%(message)s",
75+
level=log_level,
76+
handlers=[logging.StreamHandler()])
5277

53-
def add_pid(logger, method_name, event_dict):
54-
"""
55-
Add current PID to the event dict.
56-
"""
78+
def add_pid(_logger, _method_name, event_dict):
79+
"""Add current PID to the event dict."""
5780
event_dict["pid"] = os.getpid()
5881
return event_dict
5982

@@ -92,9 +115,140 @@ def add_pid(logger, method_name, event_dict):
92115
cache_logger_on_first_use=True,
93116
)
94117

95-
logger = structlog.get_logger(name)
118+
# Create the underlying python logger and wrap it with structlog
119+
system_logger = logging.getLogger(name)
120+
if filename and not system_logger.handlers:
121+
system_logger.addHandler(logging.FileHandler(filename))
122+
system_logger.setLevel(log_level)
123+
logger = structlog.wrap_logger(system_logger)
96124

97125
if log_exceptions:
98126
handle_exceptions(logger)
99127

100128
return logger
129+
130+
131+
class LoggerThread():
132+
"""
133+
A construct to use a logger from multiprocessing workers/jobs.
134+
135+
the bare structlog loggers are thread-safe but not multiprocessing-safe.
136+
a `LoggerThread` will spawn a thread that listens to a mp.Queue
137+
and logs messages from it with the provided logger,
138+
so other processes can send logging messages to it
139+
via the logger-like `SubLogger` interface.
140+
the SubLogger even logs the pid of the caller.
141+
142+
this is good to use with a set of jobs that are part of a mp.Pool,
143+
but isnt recommended for general use
144+
because of overhead from threading and multiprocessing,
145+
and because it might introduce lag to log messages.
146+
147+
somewhat inspired by:
148+
docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
149+
"""
150+
151+
class SubLogger():
152+
"""MP-safe logger-like interface to convey log messages to a listening LoggerThread."""
153+
154+
def __init__(self, queue):
155+
"""Create SubLogger with a bound queue."""
156+
self.queue = queue
157+
158+
def _log(self, level, *args, **kwargs):
159+
kwargs_plus = {'sub_pid': multiprocessing.current_process().pid}
160+
kwargs_plus.update(kwargs)
161+
self.queue.put([level, args, kwargs_plus])
162+
163+
def debug(self, *args, **kwargs):
164+
"""Log a DEBUG level message."""
165+
self._log(logging.DEBUG, *args, **kwargs)
166+
167+
def info(self, *args, **kwargs):
168+
"""Log an INFO level message."""
169+
self._log(logging.INFO, *args, **kwargs)
170+
171+
def warning(self, *args, **kwargs):
172+
"""Log a WARNING level message."""
173+
self._log(logging.WARNING, *args, **kwargs)
174+
175+
def error(self, *args, **kwargs):
176+
"""Log an ERROR level message."""
177+
self._log(logging.ERROR, *args, **kwargs)
178+
179+
def critical(self, *args, **kwargs):
180+
"""Log a CRITICAL level message."""
181+
self._log(logging.CRITICAL, *args, **kwargs)
182+
183+
184+
def get_sublogger(self):
185+
"""Retrieve SubLogger for this LoggerThread."""
186+
return self.sublogger
187+
188+
def __init__(self, logger, q=None):
189+
"""Create and start LoggerThread with supplied logger, creating a queue if not provided."""
190+
self.logger = logger
191+
if q:
192+
self.msg_queue = q
193+
else:
194+
self.msg_queue = multiprocessing.Queue()
195+
196+
def logger_thread_worker():
197+
logger.info('thread started')
198+
while True:
199+
msg = self.msg_queue.get()
200+
if msg == 'STOP':
201+
logger.debug('received stop signal')
202+
break
203+
level, args, kwargs = msg
204+
if level in [logging.DEBUG, logging.INFO, logging.WARNING,
205+
logging.ERROR, logging.CRITICAL]:
206+
logger.log(level, *args, **kwargs)
207+
else:
208+
logger.error('received unknown logging level! exiting...',
209+
level=level, args_kwargs=(args, kwargs))
210+
break
211+
logger.debug('stopping thread')
212+
213+
self.thread = threading.Thread(target=logger_thread_worker,
214+
name="LoggerThread__"+logger.name)
215+
logger.debug('starting thread')
216+
self.thread.start()
217+
218+
self.sublogger = LoggerThread.SubLogger(self.msg_queue)
219+
self.running = True
220+
221+
def stop(self):
222+
"""Terminate this LoggerThread."""
223+
if not self.running:
224+
self.logger.warning('thread already stopped')
225+
return
226+
self.logger.debug('sending stop signal')
227+
self.msg_queue.put('STOP')
228+
self.thread.join()
229+
self.running = False
230+
self.logger.info('thread stopped')
231+
232+
233+
@contextlib.contextmanager
234+
def pool_and_threadedlogger(logger, *poolargs):
235+
"""
236+
Provide (to a context) a multiprocessing Pool and a proxy to the supplied logger.
237+
238+
Emulates the multiprocessing.Pool() context manager,
239+
but also provides (via a LoggerThread) a SubLogger proxy to logger
240+
that can be safely used by pool workers.
241+
The SubLogger proxy interface supports these methods: debug, info, warning, error,
242+
and critical.
243+
Also "cleans up" the pool by waiting for workers to complete
244+
as it exits the context.
245+
"""
246+
with multiprocessing.Manager() as manager:
247+
logger_thread = LoggerThread(logger, manager.Queue())
248+
try:
249+
with multiprocessing.Pool(*poolargs) as pool:
250+
yield pool, logger_thread.get_sublogger()
251+
pool.close()
252+
pool.join()
253+
finally:
254+
logger_thread.stop()

0 commit comments

Comments
 (0)