Skip to content

Instantly share code, notes, and snippets.

@a1d4r
Last active April 26, 2026 15:26
Show Gist options
  • Select an option

  • Save a1d4r/a90fd56193a2449ae1e973a3e34c84f2 to your computer and use it in GitHub Desktop.

Select an option

Save a1d4r/a90fd56193a2449ae1e973a3e34c84f2 to your computer and use it in GitHub Desktop.
Кейс: интеграционный тест на отправку email через taskiq + MailHog

Кейс: интеграционный тест на отправку email через taskiq + MailHog

Сценарий: при регистрации пользователя приложение отправляет welcome-письмо. Сама отправка вынесена в taskiq-задачу, чтобы не блокировать HTTP-ответ. Нужно написать интеграционный тест, который убеждается, что после POST /register письмо реально доходит до получателя.

Типичная первая попытка — поднять три контейнера (rabbitmq, mailhog, worker taskiq) и натравить на них pytest. И почти всегда тест в таком виде не взлетает или моргает. Разберём почему и как это правильно собрать.

Где боль на самом деле

Подняв контейнер taskiq (worker) рядом с rabbitmq и mailhog, вы получили три разнесённых процесса + тест-раннер = четыре процесса, и теперь:

  1. Гонки: тест зовёт register_user, тот делает kiq() в RabbitMQ, тест читает MailHog — но worker ещё даже не подхватил сообщение из очереди. Тест либо «спит наугад» (await asyncio.sleep(2)), либо моргает в CI.
  2. Worker не видит ваш код: worker в контейнере должен импортировать тот же модуль, где определён @broker.task. Если код смонтирован томом — нужны рестарты при каждом изменении; если копируется — пересборка образа.
  3. Один broker URL для двух миров: ваш тест и worker должны указывать на rabbitmq под тем же hostname (внутри docker network), но pytest может запускаться вне сети.
  4. Очистка состояния: MailHog копит письма между тестами; RabbitMQ копит undelivered сообщения, если worker падает. Тесты текут друг в друга.

Хорошая новость: ничего из этого делать не надо. Уберите контейнер taskiq, и большая часть проблем исчезает.

Решение по умолчанию: InMemoryBroker + MailHog

Сохраняем MailHog (он реальный SMTP — это важно, если хотим убедиться, что письмо вообще генерируется и отправляется), а taskiq запускаем внутри тестового процесса через InMemoryBroker. Так таска выполняется в той же event loop, синхронно с тестом.

docker-compose для тестов

# docker-compose.test.yml
services:
  mailhog:
    image: mailhog/mailhog:latest
    ports:
      - "1025:1025"   # SMTP
      - "8025:8025"   # HTTP API
    healthcheck:
      test: ["CMD", "wget", "--spider", "-q", "http://localhost:8025/api/v2/messages"]
      interval: 2s
      timeout: 2s
      retries: 10

Никаких rabbitmq, никакого worker-контейнера. Только MailHog.

Декларация задач через async_shared_broker

Чтобы не зашивать broker в код задач (и не делать if ENVIRONMENT == "pytest" при бутстрапе), объявляем задачи через @async_shared_broker.task. async_shared_broker — встроенный синглтон, не привязанный к конкретному транспорту. В рантайме мы просто говорим ему «вот твой настоящий broker», и все kicker() начинают слать в него:

# your_project/tasks.py
from typing import Annotated
from taskiq import TaskiqDepends
from taskiq.brokers.shared_broker import async_shared_broker

from your_project.mail import Mailer


@async_shared_broker.task
async def send_welcome_email(
    user_email: str,
    mailer: Annotated[Mailer, TaskiqDepends()],
) -> None:
    await mailer.send(
        to=user_email,
        subject="Welcome!",
        body="Thanks for signing up.",
    )

Альтернативно есть короткий алиас shared_task = async_shared_broker.task в том же модуле (from taskiq.brokers.shared_broker import shared_task). Из верхнеуровневого taskiq он не реэкспортируется, поэтому from taskiq import shared_task работать не будет.

Mailer — обычный SMTP-клиент (например, aiosmtplib), который в тестах указывает на localhost:1025 (MailHog).

Бутстрап брокера в production

# your_project/main.py
import os
from taskiq.brokers.shared_broker import async_shared_broker
from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker(os.environ["RABBITMQ_URL"])
async_shared_broker.default_broker(broker)

В тестовом conftest мы перевесим default_broker на InMemoryBroker — никаких ENV-условий, никаких отдельных веток в коде.

⚠️ Важно: async_shared_broker.default_broker(...) обязательно вызвать до первого kiq(). Без этого kicker упадёт с SharedBrokerSendTaskError (наследник RuntimeError) — сам shared broker не умеет ни kick'ать, ни listen'ить.

Регистрация пользователя

# your_project/services.py
from your_project.tasks import send_welcome_email

async def register_user(email: str, password: str) -> int:
    user = await user_repo.create(email=email, password=password)
    await send_welcome_email.kiq(user.email)   # fire-and-forget
    return user.id

MailHog клиент

MailHog отдаёт REST API на порту 8025. Минимальный клиент для тестов:

# tests/mailhog.py
from dataclasses import dataclass
import httpx


@dataclass(frozen=True)
class MailHog:
    base_url: str = "http://localhost:8025"

    async def reset(self) -> None:
        async with httpx.AsyncClient() as c:
            await c.delete(f"{self.base_url}/api/v1/messages")

    async def messages(self) -> list[dict]:
        async with httpx.AsyncClient() as c:
            r = await c.get(f"{self.base_url}/api/v2/messages")
            r.raise_for_status()
            return r.json()["items"]

    async def find_by_recipient(self, email: str) -> dict | None:
        for msg in await self.messages():
            recipients = [
                f"{a['Mailbox']}@{a['Domain']}" for a in msg["To"]
            ]
            if email in recipients:
                return msg
        return None

Эндпоинты MailHog (стандартные, открыты на :8025):

Метод URL Что делает
GET /api/v2/messages Список писем (с пагинацией)
GET /api/v2/search?kind=to&query=user@x.com Поиск по полю
DELETE /api/v1/messages Удалить все письма

Тело сообщения лежит в msg["Content"]["Body"], заголовки — в msg["Content"]["Headers"] (это dict[str, list[str]]).

Conftest

# tests/conftest.py
import pytest
from taskiq import InMemoryBroker
from taskiq.brokers.shared_broker import async_shared_broker

from tests.mailhog import MailHog
from your_project.mail import Mailer

@pytest.fixture(scope="session")
def anyio_backend() -> str:
    return "asyncio"


@pytest.fixture
def mailhog() -> MailHog:
    return MailHog(base_url="http://localhost:8025")


@pytest.fixture(autouse=True)
async def broker():
    """Подменяем default broker для shared-задач на InMemoryBroker.

    await_inplace=True: kiq() выполняется до return — нет гонок,
    тест читает MailHog уже после того, как письмо ушло на SMTP.
    """
    test_broker = InMemoryBroker(await_inplace=True)
    async_shared_broker.default_broker(test_broker)
    await test_broker.startup()
    yield test_broker
    await test_broker.shutdown()


@pytest.fixture(autouse=True)
async def _clean_mailhog(mailhog: MailHog):
    """Чистим MailHog ДО теста — чтобы не зависеть от порядка прогонов."""
    await mailhog.reset()
    yield


@pytest.fixture(autouse=True)
async def _wire_taskiq_deps(broker, mailer):
    """Если у задачи TaskiqDepends — подкладываем зависимости здесь."""
    broker.add_dependency_context({Mailer: mailer})
    yield
    broker.custom_dependency_context = {}

Если в задаче нет TaskiqDepends — фикстура _wire_taskiq_deps не нужна.

async_shared_broker.default_broker(test_broker) — единственная точка переключения. Production main привязывает AioPikaBroker, тестовый conftest — InMemoryBroker. Код задач и сервисов одинаковый в обеих средах.

Сам тест

# tests/test_registration.py
import pytest
from your_project.services import register_user

pytestmark = pytest.mark.anyio


async def test_registration_sends_welcome_email(mailhog):
    await register_user(email="alice@test.dev", password="hunter2")

    msg = await mailhog.find_by_recipient("alice@test.dev")
    assert msg is not None
    assert msg["Content"]["Headers"]["Subject"] == ["Welcome!"]
    assert "Thanks for signing up." in msg["Content"]["Body"]

Никаких sleep, никаких retry-loop. Поскольку InMemoryBroker(await_inplace=True) гарантирует, что register_user вернётся только когда таска отработала, к моменту опроса MailHog письмо уже там.

Альтернатива: подмена broker через ENV-переменную

Если по каким-то причинам async_shared_broker не подходит (например, у вас уже тонна @broker.task и переписывать жалко), можно остаться на привычном @broker.task и подменять сам broker через pytest-env:

# your_project/tkq.py
import os
from taskiq import AsyncBroker, InMemoryBroker

if os.environ.get("ENVIRONMENT") == "pytest":
    broker: AsyncBroker = InMemoryBroker(await_inplace=True)
else:
    from taskiq_aio_pika import AioPikaBroker
    broker = AioPikaBroker(os.environ["RABBITMQ_URL"])
# pyproject.toml
[tool.pytest.ini_options]
env = ["ENVIRONMENT=pytest"]

pytest-env ставит переменную до импорта your_project.tkq, так что broker подменяется один раз на весь прогон. Conftest становится проще (broker уже правильный, фикстура только зовёт startup/shutdown).

Минусы по сравнению с shared_broker:

  • Развилка if env == "pytest" живёт в production-коде. Каждая новая тестовая среда (pytest_unit под FakeBroker, например) — ещё одна ветка.
  • Порядок импортов имеет значение: если что-то импортирует tkq до того, как pytest-env поставил переменную, broker зафиксируется неправильным.
  • ENV-переменные легко забыть установить локально (запустил python -m pytest напрямую без pytest-env — поедет в прод-RabbitMQ).

shared_broker от этого свободен: подмена живёт в conftest, явная и программная, импорт-агностичная.

Альтернатива: оставить await_inplace=False

Если регистрация делает await register_user.kiq() и сразу возвращает (как обычно бывает в HTTP-эндпоинте), а await_inplace=True вам не подходит (скрывает реальные гонки), используйте wait_all():

async def test_registration_sends_welcome_email(mailhog):
    await register_user(email="alice@test.dev", password="hunter2")
    await broker.wait_all()              # дренируем все pending kicks
    msg = await mailhog.find_by_recipient("alice@test.dev")
    assert msg is not None

Между этими двумя:

  • await_inplace=True — проще, тест читается линейно, скрывает race conditions.
  • wait_all() — поведение ближе к проду, видит race conditions, требует дисциплины (вызывать перед каждой ассертой).

Альтернатива: разделить на два теста

Один «толстый» тест проверяет всю цепочку API → kiq → задача → SMTP. Это удобно как регрессионная страховка, но плохо локализует баги: упал — иди читай весь стек. Часто чище разнести проверку на два независимых теста:

  1. API-тест: POST /register действительно делает kiq() нужной задачи с нужными аргументами. Никакого SMTP, никакого MailHog.
  2. Тест задачи: send_welcome_email(...) отправляет правильное письмо правильному адресату. Никакого HTTP-слоя.

Стык между ними держится на типах аргументов задачи (Pydantic-модель payload или dataclass). Если хочется ещё страховки — оставить один тонкий end-to-end smoke-тест с InMemoryBroker + MailHog.

FakeBroker — broker, который копит сообщения и не выполняет задачи

Вместо моков и monkeypatch — отдельный broker, который реализует только kick (копит сообщения) и явно отказывается слушать. Никакой магии, чистый контракт AsyncBroker:

# tests/fake_broker.py
from collections.abc import AsyncGenerator
from typing import Any

from taskiq import AckableMessage, AsyncBroker, BrokerMessage
from taskiq.decor import AsyncTaskiqDecoratedTask
from taskiq.message import TaskiqMessage


class FakeBroker(AsyncBroker):
    """Broker для unit-тестов: накапливает kicked-сообщения, ничего не выполняет."""

    def __init__(self) -> None:
        super().__init__()
        self.messages: list[BrokerMessage] = []

    async def startup(self) -> None:
        self.messages.clear()
        await super().startup()

    async def shutdown(self) -> None:
        self.messages.clear()
        await super().shutdown()

    async def kick(self, message: BrokerMessage) -> None:
        self.messages.append(message)

    async def listen(self) -> AsyncGenerator[bytes | AckableMessage, None]:  # type: ignore[override]
        raise RuntimeError("FakeBroker cannot listen")
        yield  # для типизации как AsyncGenerator

    # --- удобные ассерты ---

    def messages_for(self, task: AsyncTaskiqDecoratedTask[Any, Any]) -> list[TaskiqMessage]:
        """Все накопленные TaskiqMessage для конкретной задачи (уже распарсенные formatter'ом)."""
        return [
            self.formatter.loads(m.message)
            for m in self.messages
            if m.task_name == task.task_name
        ]

    def assert_kicked_once(
        self,
        task: AsyncTaskiqDecoratedTask[Any, Any],
    ) -> TaskiqMessage:
        msgs = self.messages_for(task)
        assert len(msgs) == 1, (
            f"expected 1 kick of {task.task_name}, got {len(msgs)}: {msgs}"
        )
        return msgs[0]

Что здесь важно:

  • broker.formatter.loads(...) вместо orjson.loads напрямую. Формат сериализации (JSON / msgpack / pickle) — деталь конкретного broker'а; если его сменят, тесты не должны падать.
  • task.task_name как источник правды, а не endswith("...") — переименование функции/модуля автоматически синхронизирует тесты с кодом.
  • startup/shutdown чистят messages — фикстура lifecycle и так обнуляет состояние, забыть clear() руками невозможно.
  • listen бросает RuntimeError — никто случайно не запустит worker на этом broker'е и не получит мистические зависания.

Подмена через тот же shared_broker — на этот раз подсовываем FakeBroker:

# tests/conftest.py (для unit-тестов API)
import pytest
from taskiq.brokers.shared_broker import async_shared_broker

from tests.fake_broker import FakeBroker


@pytest.fixture
async def fake_broker() -> FakeBroker:
    broker = FakeBroker()
    async_shared_broker.default_broker(broker)
    await broker.startup()
    yield broker
    await broker.shutdown()

Никаких отдельных ENV-веток — для разных тестовых файлов используются разные фикстуры (broker с InMemoryBroker для интеграционных, fake_broker с FakeBroker для юнитовых API).

Тест #1: API-эндпоинт делает kiq()

# tests/test_registration_api.py
import pytest
from your_project.tasks import send_welcome_email

pytestmark = pytest.mark.anyio


async def test_register_kicks_welcome_email(client, fake_broker):
    await client.post("/register", json={"email": "alice@test.dev", "password": "x"})

    msg = fake_broker.assert_kicked_once(send_welcome_email)
    assert msg.args == ["alice@test.dev"]

Тут нет ни MailHog, ни SMTP, ни broker.startup-времени — всё работает in-process за миллисекунды. Можно гонять в каждом push.

Тест #2: задача отправляет письмо

Задача — это обычная async def. Зовём напрямую, без всякого taskiq:

# tests/test_send_welcome_email.py
import pytest
from your_project.tasks import send_welcome_email

pytestmark = pytest.mark.anyio


async def test_send_welcome_email(mailhog, mailer):
    await send_welcome_email("alice@test.dev", mailer=mailer)

    msg = await mailhog.find_by_recipient("alice@test.dev")
    assert msg is not None
    assert msg["Content"]["Headers"]["Subject"] == ["Welcome!"]

mailer — реальный SMTP-клиент, направленный на MailHog. Брокер вообще не участвует. Если у задачи TaskiqDepends, передаём зависимость явно аргументом — TaskiqDepends() это default value, который перекрывается обычным позиционным/именованным параметром.

Что мы потеряли по сравнению с одним толстым тестом

Сцепку. Если кто-то поменяет имя параметра задачи (user_emailemail), но не обновит API-вызов, оба тонких теста зелёные, прод — нет. Защита:

  • Pydantic-модель payload, которую и API, и задача принимают/возвращают. Тогда mypy ловит расхождение на этапе сборки.
  • Либо один end-to-end smoke-тест с InMemoryBroker(await_inplace=True) и MailHog поверх двух тонких — он медленный, но один такой не страшно.

Когда выбирать что

Ситуация Стратегия
Простые API + простая задача, мало логики Один толстый тест с InMemoryBroker + MailHog
API со своей логикой (валидация, rate limit, conditional kick) Тонкий API-тест с FakeBroker
Задача со своей логикой (шаблоны, ветвления, retry) Тонкий тест задачи как async def
Проект большой, разделение unit/integration уже есть Пирамида: много FakeBroker-тестов + точечные интеграционные с MailHog

Когда настоящий E2E с RabbitMQ + worker оправдан

Только если сам предмет теста — это RabbitMQ-специфика: ack/nack, deadletter, очерёдность, потеря сообщений при рестарте worker'а. Для «пришло ли письмо после регистрации» это оверкилл.

Если всё-таки нужен полный E2E:

# docker-compose.e2e.yml
services:
  rabbitmq:
    image: rabbitmq:3-management
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
      interval: 2s
      timeout: 2s
      retries: 20

  mailhog:
    image: mailhog/mailhog
    ports: ["1025:1025", "8025:8025"]

  worker:
    build: .
    command: taskiq worker your_project.tkq:broker
    environment:
      RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672/
      SMTP_HOST: mailhog
      SMTP_PORT: 1025
    depends_on:
      rabbitmq:
        condition: service_healthy
      mailhog:
        condition: service_started

И тест с polling — потому что worker асинхронен и в другом процессе:

import asyncio

async def wait_for_email(mailhog, recipient: str, timeout: float = 10.0) -> dict:
    deadline = asyncio.get_event_loop().time() + timeout
    while asyncio.get_event_loop().time() < deadline:
        msg = await mailhog.find_by_recipient(recipient)
        if msg is not None:
            return msg
        await asyncio.sleep(0.2)
    raise AssertionError(f"No email for {recipient} after {timeout}s")


async def test_registration_e2e(mailhog):
    # Уникальный email на каждый прогон, чтобы тесты не интерферировали
    email = f"alice+{uuid.uuid4().hex[:8]}@test.dev"
    await register_user(email=email, password="hunter2")

    msg = await wait_for_email(mailhog, email, timeout=10.0)
    assert msg["Content"]["Headers"]["Subject"] == ["Welcome!"]

Здесь критичны три вещи:

  • healthcheck для rabbitmq + condition: service_healthy у worker'а, иначе worker падает с "connection refused" на старте.
  • Polling с таймаутом, не sleep(N). CI бывает медленный.
  • Уникальный recipient на тест или агрессивный mailhog.reset() — иначе тесты увидят письма друг друга.

Краткое резюме

Вопрос Ответ
Стоит ли держать taskiq-контейнер для теста на email? Нет.
Нужен ли RabbitMQ в тесте? Нет — InMemoryBroker с тем же интерфейсом.
Нужен ли MailHog? Да, если хотите проверить реальный SMTP-флоу. Иначе можно мокнуть Mailer целиком.
Как избежать sleep в тесте? InMemoryBroker(await_inplace=True) или await broker.wait_all() перед ассертами.
Как изолировать тесты друг от друга? mailhog.reset() в autouse фикстуре + broker.custom_dependency_context = {} в teardown.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment