Created
August 20, 2025 15:30
-
-
Save B-Ricey763/3989fdb94e1c0182221bef9bf44bdb70 to your computer and use it in GitHub Desktop.
A pair of a listener plus logger handler for use in multiprocessing logging in python. Start the listener, and for all root loggers in other processes, register the logging handler.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import copy | |
| import logging | |
| import threading | |
| import traceback | |
| import zmq | |
| LISTENER_TIMEOUT = 100 # ms | |
| LOG_TOPIC = "" | |
| class ZeroMQRecvLoggingError(Exception): | |
| pass | |
| class ZeroMQSocketListener: | |
| def __init__(self, uri: str, ctx: zmq.Context | None = None): | |
| self.ctx = ctx if ctx else zmq.Context.instance() | |
| self.socket = zmq.Socket(self.ctx, zmq.SUB) | |
| self.socket.setsockopt_string(zmq.SUBSCRIBE, LOG_TOPIC) | |
| self.socket.setsockopt( | |
| zmq.RCVTIMEO, LISTENER_TIMEOUT | |
| ) # Check for shutdown every 1/10 second | |
| self.socket.bind(uri) | |
| self._thread: threading.Thread | |
| def start(self): | |
| """ | |
| Start the listener. | |
| This starts up a background thread to monitor the socket for | |
| LogRecords to process. | |
| """ | |
| self._thread = t = threading.Thread(target=self._monitor) | |
| self._stop_event = threading.Event() | |
| t.daemon = True | |
| t.start() | |
| def process(self): | |
| msg = self.socket.recv_pyobj() | |
| if not isinstance(msg, logging.LogRecord): | |
| raise ZeroMQRecvLoggingError( | |
| "Only send logging records to logger listener!" | |
| ) | |
| # Pass any new messages to the root logger to process | |
| logging.getLogger().handle(msg) | |
| def _monitor(self): | |
| """ | |
| Monitor the socket for records, and ask the handler | |
| to deal with them. | |
| This method runs on a separate, internal thread. | |
| """ | |
| while True: | |
| try: | |
| self.process() | |
| except zmq.Again: | |
| if self._stop_event.is_set(): | |
| break | |
| def stop(self): | |
| # Drain the queu | |
| while True: | |
| try: | |
| self.process() | |
| except zmq.Again: | |
| break | |
| self._stop_event.set() | |
| self._thread.join() | |
| class ZeroMQSocketHandler(logging.Handler): | |
| def __init__(self, uri: str, ctx: zmq.Context | None = None): | |
| logging.Handler.__init__(self) | |
| self.ctx = ctx if ctx else zmq.Context.instance() | |
| self.socket = zmq.Socket(self.ctx, zmq.PUB) | |
| self.socket.connect(uri) | |
| self.listener = None # will be set to listener if configured via dictConfig() | |
| def emit(self, record): | |
| """ | |
| Emit a record. | |
| Sends the LogRecord over zmq | |
| """ | |
| try: | |
| record_copy = copy.copy(record) | |
| # Workaround since the call stack are not picklable | |
| if record_copy.exc_info is not None: | |
| record_copy.getMessage() | |
| record_copy.exc_info = None | |
| record_copy.exc_text = traceback.format_exc() | |
| self.socket.send_pyobj(record_copy) | |
| except Exception: | |
| self.handleError(record) | |
| def close(self): | |
| self.socket.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment