Skip to content

Instantly share code, notes, and snippets.

@a1d4r
Last active April 23, 2026 17:15
Show Gist options
  • Select an option

  • Save a1d4r/dd4aa5acab9e148e6a643669eddd36f9 to your computer and use it in GitHub Desktop.

Select an option

Save a1d4r/dd4aa5acab9e148e6a643669eddd36f9 to your computer and use it in GitHub Desktop.
Taskiq sentry middleware

Sentry middleware for Taskiq

A drop-in Taskiq middleware that reports background tasks to Sentry the same way Sentry's own integrations for celery, arq, rq and huey do.

Taskiq has no official Sentry integration. Wrapping task execution in sentry_sdk.start_span(...) looks right but silently breaks: in sentry-sdk >= 2.x, a span opened without an active transaction is an orphan and gets dropped, so tasks never show up in Performance / Profiling. This middleware creates proper transactions, propagates traces across producer and worker, and reports exceptions with the correct mechanism.

Features

  • Top-level transactions per task (op="queue.task.taskiq", source=TransactionSource.TASK) — visible in Performance and Profiling.
  • Trace propagation via sentry-trace / baggage in message.labels, so nested .kiq() calls share a trace with the producer.
  • Scheduler-safe: producer-side propagation is skipped when there's no active span, so periodic runs aren't glued into one ever-growing trace.
  • Unhandled-error classification via event_from_exception(mechanism={"handled": False}).
  • Retry-aware: NoResultErrorSPANSTATUS.ABORTED, no separate event.
  • OpenTelemetry attributes: messaging.message.id, messaging.system.
  • Isolated scope per task — tags and breadcrumbs don't leak.

Requirements

  • sentry-sdk >= 2.0
  • taskiq >= 0.11
  • Python 3.11+

Installation

Single-file middleware — copy taskiq_sentry_middleware.py into your project.

Usage

import sentry_sdk
from taskiq_aio_pika import AioPikaBroker

from .taskiq_sentry_middleware import SentryMiddleware

sentry_sdk.init(
    dsn="https://...@sentry.io/...",
    traces_sample_rate=1.0,
    # Optional: continuous profiling attaches profiles to task transactions.
    profile_session_sample_rate=1.0,
    profile_lifecycle="trace",
)

broker = AioPikaBroker("amqp://...").with_middlewares(SentryMiddleware())

What you'll see in Sentry

  • Performance → Transactions lists each task by name alongside HTTP endpoints. Filter with transaction.op:queue.task.taskiq to narrow down.
  • Trace view links request handlers to the tasks they dispatch and those tasks to any nested tasks.
  • Issues for unhandled exceptions inside tasks.
  • Profiling flamegraphs for each task run when continuous profiling is enabled.

Reference implementations

Modelled after Sentry's built-in worker integrations:

License

MIT.

from __future__ import annotations
from typing import TYPE_CHECKING, Any, ClassVar
from contextlib import ExitStack
from contextvars import ContextVar
import sentry_sdk
from sentry_sdk.consts import SPANDATA, SPANSTATUS
from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, SENTRY_TRACE_HEADER_NAME, TransactionSource
from sentry_sdk.utils import event_from_exception
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult
from taskiq.exceptions import NoResultError
if TYPE_CHECKING:
from collections.abc import Coroutine
_task_exit_stack: ContextVar[ExitStack | None] = ContextVar("_task_exit_stack", default=None)
class SentryMiddleware(TaskiqMiddleware):
"""Taskiq middleware that reports task executions to Sentry."""
# Message labels used to carry Sentry trace headers from producer to worker.
_SENTRY_TRACE_LABEL: ClassVar[str] = SENTRY_TRACE_HEADER_NAME
_SENTRY_BAGGAGE_LABEL: ClassVar[str] = BAGGAGE_HEADER_NAME
# Canonical values, matching the built-in Sentry integrations for arq/rq/huey.
_ORIGIN: ClassVar[str] = "auto.queue.taskiq"
_OP: ClassVar[str] = "queue.task.taskiq"
_MESSAGING_SYSTEM: ClassVar[str] = "taskiq"
# Label set by TaskiqScheduler via with_labels(schedule_id=...) — reliable
# marker that the message originates from a scheduled (periodic) task.
_SCHEDULE_ID_LABEL: ClassVar[str] = "schedule_id"
def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:
# Propagate the current trace via message labels so that the worker
# continues the same trace as the producer.
# Skip propagation for scheduled tasks: the scheduler's long-lived
# propagation context would otherwise glue every periodic run into a
# single ever-growing trace. The second check (no active span) is a
# safety net for producers outside of a transaction.
if self._SCHEDULE_ID_LABEL in message.labels:
return message
if sentry_sdk.get_current_span() is None:
return message
traceparent = sentry_sdk.get_traceparent()
baggage = sentry_sdk.get_baggage()
if traceparent is not None:
message.labels[self._SENTRY_TRACE_LABEL] = traceparent
if baggage is not None:
message.labels[self._SENTRY_BAGGAGE_LABEL] = baggage
return message
def pre_execute(
self, message: TaskiqMessage
) -> TaskiqMessage | Coroutine[Any, Any, TaskiqMessage]:
stack = ExitStack()
try:
# Isolated scope so tags, breadcrumbs and context don't leak between tasks.
scope = stack.enter_context(sentry_sdk.isolation_scope())
scope._name = "taskiq"
scope.clear_breadcrumbs()
scope.set_tag("task", message.task_name)
scope.set_context(
"taskiq",
{
"task_id": message.task_id,
"task_name": message.task_name,
"labels": {
k: v
for k, v in message.labels.items()
if k not in {self._SENTRY_TRACE_LABEL, self._SENTRY_BAGGAGE_LABEL}
},
"args": message.args,
"kwargs": message.kwargs,
},
)
# Continue the producer's trace (if any) and open a top-level transaction.
# A span alone (without a transaction as the root) would not appear in
# Sentry Performance / Profiling, so we explicitly start a transaction.
# Scheduled tasks intentionally ignore any inbound trace headers, so that
# each periodic run gets a fresh trace_id even if the producer leaks the
# scheduler's stable propagation context.
trace_headers: dict[str, str] = {}
if self._SCHEDULE_ID_LABEL not in message.labels:
if (sentry_trace := message.labels.get(self._SENTRY_TRACE_LABEL)) is not None:
trace_headers[SENTRY_TRACE_HEADER_NAME] = str(sentry_trace)
if (baggage := message.labels.get(self._SENTRY_BAGGAGE_LABEL)) is not None:
trace_headers[BAGGAGE_HEADER_NAME] = str(baggage)
transaction = sentry_sdk.continue_trace(
trace_headers,
op=self._OP,
name=message.task_name,
source=TransactionSource.TASK,
origin=self._ORIGIN,
)
transaction.set_status(SPANSTATUS.OK)
stack.enter_context(sentry_sdk.start_transaction(transaction))
transaction.set_data(SPANDATA.MESSAGING_MESSAGE_ID, message.task_id)
transaction.set_data(SPANDATA.MESSAGING_SYSTEM, self._MESSAGING_SYSTEM)
except BaseException:
# Exception before we hand off the stack to the ContextVar — close
# resources manually, otherwise the isolation scope would leak.
stack.close()
raise
_task_exit_stack.set(stack)
return message
def post_execute(
self,
message: TaskiqMessage,
result: TaskiqResult[Any],
) -> None:
stack = _task_exit_stack.get(None)
if stack is not None:
stack.close()
def on_error(
self,
message: TaskiqMessage,
result: TaskiqResult[Any],
exception: BaseException,
) -> Coroutine[Any, Any, None] | None:
transaction = sentry_sdk.get_current_scope().transaction
# NoResultError is Taskiq's signal for "no result, will retry" — not a crash.
if isinstance(exception, NoResultError):
if transaction is not None:
transaction.set_status(SPANSTATUS.ABORTED)
return None
if transaction is not None:
transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
client = sentry_sdk.get_client()
event, hint = event_from_exception(
exception,
client_options=client.options if client else None,
mechanism={"type": self._MESSAGING_SYSTEM, "handled": False},
)
sentry_sdk.capture_event(event, hint=hint)
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment