Skip to content

Instantly share code, notes, and snippets.

@B-Ricey763
Created August 20, 2025 15:30
Show Gist options
  • Select an option

  • Save B-Ricey763/3989fdb94e1c0182221bef9bf44bdb70 to your computer and use it in GitHub Desktop.

Select an option

Save B-Ricey763/3989fdb94e1c0182221bef9bf44bdb70 to your computer and use it in GitHub Desktop.
A pair of a listener plus logger handler for use in multiprocessing logging in python. Start the listener, and for all root loggers in other processes, register the logging handler.
import copy
import logging
import threading
import traceback
import zmq
LISTENER_TIMEOUT = 100 # ms
LOG_TOPIC = ""
class ZeroMQRecvLoggingError(Exception):
pass
class ZeroMQSocketListener:
def __init__(self, uri: str, ctx: zmq.Context | None = None):
self.ctx = ctx if ctx else zmq.Context.instance()
self.socket = zmq.Socket(self.ctx, zmq.SUB)
self.socket.setsockopt_string(zmq.SUBSCRIBE, LOG_TOPIC)
self.socket.setsockopt(
zmq.RCVTIMEO, LISTENER_TIMEOUT
) # Check for shutdown every 1/10 second
self.socket.bind(uri)
self._thread: threading.Thread
def start(self):
"""
Start the listener.
This starts up a background thread to monitor the socket for
LogRecords to process.
"""
self._thread = t = threading.Thread(target=self._monitor)
self._stop_event = threading.Event()
t.daemon = True
t.start()
def process(self):
msg = self.socket.recv_pyobj()
if not isinstance(msg, logging.LogRecord):
raise ZeroMQRecvLoggingError(
"Only send logging records to logger listener!"
)
# Pass any new messages to the root logger to process
logging.getLogger().handle(msg)
def _monitor(self):
"""
Monitor the socket for records, and ask the handler
to deal with them.
This method runs on a separate, internal thread.
"""
while True:
try:
self.process()
except zmq.Again:
if self._stop_event.is_set():
break
def stop(self):
# Drain the queu
while True:
try:
self.process()
except zmq.Again:
break
self._stop_event.set()
self._thread.join()
class ZeroMQSocketHandler(logging.Handler):
def __init__(self, uri: str, ctx: zmq.Context | None = None):
logging.Handler.__init__(self)
self.ctx = ctx if ctx else zmq.Context.instance()
self.socket = zmq.Socket(self.ctx, zmq.PUB)
self.socket.connect(uri)
self.listener = None # will be set to listener if configured via dictConfig()
def emit(self, record):
"""
Emit a record.
Sends the LogRecord over zmq
"""
try:
record_copy = copy.copy(record)
# Workaround since the call stack are not picklable
if record_copy.exc_info is not None:
record_copy.getMessage()
record_copy.exc_info = None
record_copy.exc_text = traceback.format_exc()
self.socket.send_pyobj(record_copy)
except Exception:
self.handleError(record)
def close(self):
self.socket.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment