Created
July 7, 2025 09:09
-
-
Save SF-300/a490bb13caf87f22781488788cc2e1dd to your computer and use it in GitHub Desktop.
Revisions
-
SF-300 created this gist
Jul 7, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,140 @@ import typing as t import asyncio as aio class Producer[T](t.Protocol): async def put(self, item: T) -> None: ... def put_nowait(self, item: T) -> None: ... def multi_put_nowait[T](queues: t.Iterable[Producer[T]], item: T) -> None: # Snapshot to avoid modification during iteration queues = tuple(queues) errors: list[aio.QueueFull] = [] if not queues: return for q in queues: try: q.put_nowait(item) except aio.QueueFull as e: errors.append(e) if errors: raise ExceptionGroup( "QueueFull for one or more subscribers during put_nowait", errors ) async def multi_put[T](queues: t.Iterable[Producer[T]], item: T) -> None: # Create a snapshot of queues with their filters to prevent issues with concurrent modification queues = tuple(queues) if not queues: return results = await aio.gather(*(q.put(item) for q in queues), return_exceptions=True) errors: list[Exception] = [] for res in results: if isinstance(res, Exception): errors.append(res) elif isinstance(res, BaseException): # For non-Exception BaseExceptions (e.g. KeyboardInterrupt, SystemExit, CancelledError from gather itself) # re-raise them immediately as they are not typically grouped application errors. raise res if errors: raise ExceptionGroup( "Error(s) occurred while putting item into subscriber queues", errors ) class QueueConsumer[T]: def __init__( self, queue: aio.Queue[T], unsubscribe: t.Callable[[], t.Any], ) -> None: self._queue = queue self._unsubscribe = unsubscribe async def get(self) -> T: return await self._queue.get() def get_nowait(self) -> T: return self._queue.get_nowait() def empty(self) -> bool: return self._queue.empty() def qsize(self) -> int: return self._queue.qsize() def full(self) -> bool: return self._queue.full() @property def maxsize(self) -> int: return self._queue.maxsize def task_done(self) -> None: self._queue.task_done() def __enter__(self) -> t.Self: return self def __exit__(self, *args, **kwargs) -> None: self._unsubscribe() _instantly_ready = aio.Event() _instantly_ready.set() class QueueProducer[T]: def __init__(self, maxsize: int = 0) -> None: # Dictionary: queue -> filter function self._queues: dict[aio.Queue[T], t.Callable[[T], bool]] = {} self._maxsize = maxsize async def put(self, item: T) -> None: queues = (q for q, filter_fn in self._queues.items() if filter_fn(item)) return await multi_put(queues, item) def put_nowait(self, item: T) -> None: queues = (q for q, filter_fn in self._queues.items() if filter_fn(item)) return multi_put_nowait(queues, item) @t.overload def subscribed( self, filter_: t.Callable[[T], bool] = ..., queue: aio.Queue[T] | None = ..., ) -> QueueConsumer[T]: ... @t.overload def subscribed[F]( self, filter_: type[F], queue: aio.Queue[T] | None = ..., ) -> QueueConsumer[F]: ... def subscribed(self, filter_=lambda _: True, queue=None): if queue is None: queue = aio.Queue(maxsize=self._maxsize) if not callable(filter_): def filter_fn(item, /): return isinstance(item, filter_) else: filter_fn = t.cast(t.Callable[[T], bool], filter_) self._queues[queue] = filter_fn def unsubscribe() -> None: self._queues.pop(queue, None) return QueueConsumer(queue, unsubscribe)