|
from __future__ import annotations |
|
|
|
from typing import TYPE_CHECKING, Any, ClassVar |
|
|
|
from contextlib import ExitStack |
|
from contextvars import ContextVar |
|
|
|
import sentry_sdk |
|
|
|
from sentry_sdk.consts import SPANDATA, SPANSTATUS |
|
from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, SENTRY_TRACE_HEADER_NAME, TransactionSource |
|
from sentry_sdk.utils import event_from_exception |
|
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult |
|
from taskiq.exceptions import NoResultError |
|
|
|
if TYPE_CHECKING: |
|
from collections.abc import Coroutine |
|
|
|
_task_exit_stack: ContextVar[ExitStack | None] = ContextVar("_task_exit_stack", default=None) |
|
|
|
|
|
class SentryMiddleware(TaskiqMiddleware): |
|
"""Taskiq middleware that reports task executions to Sentry.""" |
|
|
|
# Message labels used to carry Sentry trace headers from producer to worker. |
|
_SENTRY_TRACE_LABEL: ClassVar[str] = SENTRY_TRACE_HEADER_NAME |
|
_SENTRY_BAGGAGE_LABEL: ClassVar[str] = BAGGAGE_HEADER_NAME |
|
# Canonical values, matching the built-in Sentry integrations for arq/rq/huey. |
|
_ORIGIN: ClassVar[str] = "auto.queue.taskiq" |
|
_OP: ClassVar[str] = "queue.task.taskiq" |
|
_MESSAGING_SYSTEM: ClassVar[str] = "taskiq" |
|
# Label set by TaskiqScheduler via with_labels(schedule_id=...) — reliable |
|
# marker that the message originates from a scheduled (periodic) task. |
|
_SCHEDULE_ID_LABEL: ClassVar[str] = "schedule_id" |
|
|
|
def pre_send(self, message: TaskiqMessage) -> TaskiqMessage: |
|
# Propagate the current trace via message labels so that the worker |
|
# continues the same trace as the producer. |
|
# Skip propagation for scheduled tasks: the scheduler's long-lived |
|
# propagation context would otherwise glue every periodic run into a |
|
# single ever-growing trace. The second check (no active span) is a |
|
# safety net for producers outside of a transaction. |
|
if self._SCHEDULE_ID_LABEL in message.labels: |
|
return message |
|
if sentry_sdk.get_current_span() is None: |
|
return message |
|
traceparent = sentry_sdk.get_traceparent() |
|
baggage = sentry_sdk.get_baggage() |
|
if traceparent is not None: |
|
message.labels[self._SENTRY_TRACE_LABEL] = traceparent |
|
if baggage is not None: |
|
message.labels[self._SENTRY_BAGGAGE_LABEL] = baggage |
|
return message |
|
|
|
def pre_execute( |
|
self, message: TaskiqMessage |
|
) -> TaskiqMessage | Coroutine[Any, Any, TaskiqMessage]: |
|
stack = ExitStack() |
|
try: |
|
# Isolated scope so tags, breadcrumbs and context don't leak between tasks. |
|
scope = stack.enter_context(sentry_sdk.isolation_scope()) |
|
scope._name = "taskiq" |
|
scope.clear_breadcrumbs() |
|
scope.set_tag("task", message.task_name) |
|
scope.set_context( |
|
"taskiq", |
|
{ |
|
"task_id": message.task_id, |
|
"task_name": message.task_name, |
|
"labels": { |
|
k: v |
|
for k, v in message.labels.items() |
|
if k not in {self._SENTRY_TRACE_LABEL, self._SENTRY_BAGGAGE_LABEL} |
|
}, |
|
"args": message.args, |
|
"kwargs": message.kwargs, |
|
}, |
|
) |
|
|
|
# Continue the producer's trace (if any) and open a top-level transaction. |
|
# A span alone (without a transaction as the root) would not appear in |
|
# Sentry Performance / Profiling, so we explicitly start a transaction. |
|
# Scheduled tasks intentionally ignore any inbound trace headers, so that |
|
# each periodic run gets a fresh trace_id even if the producer leaks the |
|
# scheduler's stable propagation context. |
|
trace_headers: dict[str, str] = {} |
|
if self._SCHEDULE_ID_LABEL not in message.labels: |
|
if (sentry_trace := message.labels.get(self._SENTRY_TRACE_LABEL)) is not None: |
|
trace_headers[SENTRY_TRACE_HEADER_NAME] = str(sentry_trace) |
|
if (baggage := message.labels.get(self._SENTRY_BAGGAGE_LABEL)) is not None: |
|
trace_headers[BAGGAGE_HEADER_NAME] = str(baggage) |
|
|
|
transaction = sentry_sdk.continue_trace( |
|
trace_headers, |
|
op=self._OP, |
|
name=message.task_name, |
|
source=TransactionSource.TASK, |
|
origin=self._ORIGIN, |
|
) |
|
transaction.set_status(SPANSTATUS.OK) |
|
stack.enter_context(sentry_sdk.start_transaction(transaction)) |
|
transaction.set_data(SPANDATA.MESSAGING_MESSAGE_ID, message.task_id) |
|
transaction.set_data(SPANDATA.MESSAGING_SYSTEM, self._MESSAGING_SYSTEM) |
|
except BaseException: |
|
# Exception before we hand off the stack to the ContextVar — close |
|
# resources manually, otherwise the isolation scope would leak. |
|
stack.close() |
|
raise |
|
|
|
_task_exit_stack.set(stack) |
|
return message |
|
|
|
def post_execute( |
|
self, |
|
message: TaskiqMessage, |
|
result: TaskiqResult[Any], |
|
) -> None: |
|
stack = _task_exit_stack.get(None) |
|
if stack is not None: |
|
stack.close() |
|
|
|
def on_error( |
|
self, |
|
message: TaskiqMessage, |
|
result: TaskiqResult[Any], |
|
exception: BaseException, |
|
) -> Coroutine[Any, Any, None] | None: |
|
transaction = sentry_sdk.get_current_scope().transaction |
|
# NoResultError is Taskiq's signal for "no result, will retry" — not a crash. |
|
if isinstance(exception, NoResultError): |
|
if transaction is not None: |
|
transaction.set_status(SPANSTATUS.ABORTED) |
|
return None |
|
|
|
if transaction is not None: |
|
transaction.set_status(SPANSTATUS.INTERNAL_ERROR) |
|
|
|
client = sentry_sdk.get_client() |
|
event, hint = event_from_exception( |
|
exception, |
|
client_options=client.options if client else None, |
|
mechanism={"type": self._MESSAGING_SYSTEM, "handled": False}, |
|
) |
|
sentry_sdk.capture_event(event, hint=hint) |
|
return None |