Skip to content

Instantly share code, notes, and snippets.

@SF-300
Created July 7, 2025 09:09
Show Gist options
  • Select an option

  • Save SF-300/a490bb13caf87f22781488788cc2e1dd to your computer and use it in GitHub Desktop.

Select an option

Save SF-300/a490bb13caf87f22781488788cc2e1dd to your computer and use it in GitHub Desktop.

Revisions

  1. SF-300 created this gist Jul 7, 2025.
    140 changes: 140 additions & 0 deletions multiqueue.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,140 @@
    import typing as t
    import asyncio as aio


    class Producer[T](t.Protocol):
    async def put(self, item: T) -> None: ...
    def put_nowait(self, item: T) -> None: ...


    def multi_put_nowait[T](queues: t.Iterable[Producer[T]], item: T) -> None:
    # Snapshot to avoid modification during iteration
    queues = tuple(queues)
    errors: list[aio.QueueFull] = []

    if not queues:
    return

    for q in queues:
    try:
    q.put_nowait(item)
    except aio.QueueFull as e:
    errors.append(e)

    if errors:
    raise ExceptionGroup(
    "QueueFull for one or more subscribers during put_nowait", errors
    )


    async def multi_put[T](queues: t.Iterable[Producer[T]], item: T) -> None:
    # Create a snapshot of queues with their filters to prevent issues with concurrent modification
    queues = tuple(queues)

    if not queues:
    return

    results = await aio.gather(*(q.put(item) for q in queues), return_exceptions=True)

    errors: list[Exception] = []
    for res in results:
    if isinstance(res, Exception):
    errors.append(res)
    elif isinstance(res, BaseException):
    # For non-Exception BaseExceptions (e.g. KeyboardInterrupt, SystemExit, CancelledError from gather itself)
    # re-raise them immediately as they are not typically grouped application errors.
    raise res

    if errors:
    raise ExceptionGroup(
    "Error(s) occurred while putting item into subscriber queues", errors
    )


    class QueueConsumer[T]:
    def __init__(
    self,
    queue: aio.Queue[T],
    unsubscribe: t.Callable[[], t.Any],
    ) -> None:
    self._queue = queue
    self._unsubscribe = unsubscribe

    async def get(self) -> T:
    return await self._queue.get()

    def get_nowait(self) -> T:
    return self._queue.get_nowait()

    def empty(self) -> bool:
    return self._queue.empty()

    def qsize(self) -> int:
    return self._queue.qsize()

    def full(self) -> bool:
    return self._queue.full()

    @property
    def maxsize(self) -> int:
    return self._queue.maxsize

    def task_done(self) -> None:
    self._queue.task_done()

    def __enter__(self) -> t.Self:
    return self

    def __exit__(self, *args, **kwargs) -> None:
    self._unsubscribe()


    _instantly_ready = aio.Event()
    _instantly_ready.set()


    class QueueProducer[T]:
    def __init__(self, maxsize: int = 0) -> None:
    # Dictionary: queue -> filter function
    self._queues: dict[aio.Queue[T], t.Callable[[T], bool]] = {}
    self._maxsize = maxsize

    async def put(self, item: T) -> None:
    queues = (q for q, filter_fn in self._queues.items() if filter_fn(item))
    return await multi_put(queues, item)

    def put_nowait(self, item: T) -> None:
    queues = (q for q, filter_fn in self._queues.items() if filter_fn(item))
    return multi_put_nowait(queues, item)

    @t.overload
    def subscribed(
    self,
    filter_: t.Callable[[T], bool] = ...,
    queue: aio.Queue[T] | None = ...,
    ) -> QueueConsumer[T]: ...

    @t.overload
    def subscribed[F](
    self,
    filter_: type[F],
    queue: aio.Queue[T] | None = ...,
    ) -> QueueConsumer[F]: ...

    def subscribed(self, filter_=lambda _: True, queue=None):
    if queue is None:
    queue = aio.Queue(maxsize=self._maxsize)

    if not callable(filter_):

    def filter_fn(item, /):
    return isinstance(item, filter_)
    else:
    filter_fn = t.cast(t.Callable[[T], bool], filter_)

    self._queues[queue] = filter_fn

    def unsubscribe() -> None:
    self._queues.pop(queue, None)

    return QueueConsumer(queue, unsubscribe)