Сценарий: при регистрации пользователя приложение отправляет
welcome-письмо. Сама отправка вынесена в taskiq-задачу, чтобы не блокировать
HTTP-ответ. Нужно написать интеграционный тест, который убеждается, что
после POST /register письмо реально доходит до получателя.
Типичная первая попытка — поднять три контейнера (rabbitmq, mailhog,
worker taskiq) и натравить на них pytest. И почти всегда тест в таком виде
не взлетает или моргает. Разберём почему и как это правильно собрать.
Подняв контейнер taskiq (worker) рядом с rabbitmq и mailhog, вы получили
три разнесённых процесса + тест-раннер = четыре процесса, и теперь:
- Гонки: тест зовёт
register_user, тот делаетkiq()в RabbitMQ, тест читает MailHog — но worker ещё даже не подхватил сообщение из очереди. Тест либо «спит наугад» (await asyncio.sleep(2)), либо моргает в CI. - Worker не видит ваш код: worker в контейнере должен импортировать тот же
модуль, где определён
@broker.task. Если код смонтирован томом — нужны рестарты при каждом изменении; если копируется — пересборка образа. - Один broker URL для двух миров: ваш тест и worker должны указывать на
rabbitmqпод тем же hostname (внутри docker network), но pytest может запускаться вне сети. - Очистка состояния: MailHog копит письма между тестами; RabbitMQ копит undelivered сообщения, если worker падает. Тесты текут друг в друга.
Хорошая новость: ничего из этого делать не надо. Уберите контейнер
taskiq, и большая часть проблем исчезает.
Сохраняем MailHog (он реальный SMTP — это важно, если хотим убедиться, что
письмо вообще генерируется и отправляется), а taskiq запускаем внутри
тестового процесса через InMemoryBroker. Так таска выполняется в той же
event loop, синхронно с тестом.
# docker-compose.test.yml
services:
mailhog:
image: mailhog/mailhog:latest
ports:
- "1025:1025" # SMTP
- "8025:8025" # HTTP API
healthcheck:
test: ["CMD", "wget", "--spider", "-q", "http://localhost:8025/api/v2/messages"]
interval: 2s
timeout: 2s
retries: 10Никаких rabbitmq, никакого worker-контейнера. Только MailHog.
Чтобы не зашивать broker в код задач (и не делать if ENVIRONMENT == "pytest"
при бутстрапе), объявляем задачи через @async_shared_broker.task.
async_shared_broker — встроенный синглтон, не привязанный к конкретному
транспорту. В рантайме мы просто говорим ему «вот твой настоящий broker», и
все kicker() начинают слать в него:
# your_project/tasks.py
from typing import Annotated
from taskiq import TaskiqDepends
from taskiq.brokers.shared_broker import async_shared_broker
from your_project.mail import Mailer
@async_shared_broker.task
async def send_welcome_email(
user_email: str,
mailer: Annotated[Mailer, TaskiqDepends()],
) -> None:
await mailer.send(
to=user_email,
subject="Welcome!",
body="Thanks for signing up.",
)Альтернативно есть короткий алиас
shared_task = async_shared_broker.taskв том же модуле (from taskiq.brokers.shared_broker import shared_task). Из верхнеуровневогоtaskiqон не реэкспортируется, поэтомуfrom taskiq import shared_taskработать не будет.
Mailer — обычный SMTP-клиент (например, aiosmtplib), который в тестах
указывает на localhost:1025 (MailHog).
# your_project/main.py
import os
from taskiq.brokers.shared_broker import async_shared_broker
from taskiq_aio_pika import AioPikaBroker
broker = AioPikaBroker(os.environ["RABBITMQ_URL"])
async_shared_broker.default_broker(broker)В тестовом conftest мы перевесим default_broker на InMemoryBroker —
никаких ENV-условий, никаких отдельных веток в коде.
⚠️ Важно:async_shared_broker.default_broker(...)обязательно вызвать до первогоkiq(). Без этогоkickerупадёт сSharedBrokerSendTaskError(наследникRuntimeError) — сам shared broker не умеет ни kick'ать, ни listen'ить.
# your_project/services.py
from your_project.tasks import send_welcome_email
async def register_user(email: str, password: str) -> int:
user = await user_repo.create(email=email, password=password)
await send_welcome_email.kiq(user.email) # fire-and-forget
return user.idMailHog отдаёт REST API на порту 8025. Минимальный клиент для тестов:
# tests/mailhog.py
from dataclasses import dataclass
import httpx
@dataclass(frozen=True)
class MailHog:
base_url: str = "http://localhost:8025"
async def reset(self) -> None:
async with httpx.AsyncClient() as c:
await c.delete(f"{self.base_url}/api/v1/messages")
async def messages(self) -> list[dict]:
async with httpx.AsyncClient() as c:
r = await c.get(f"{self.base_url}/api/v2/messages")
r.raise_for_status()
return r.json()["items"]
async def find_by_recipient(self, email: str) -> dict | None:
for msg in await self.messages():
recipients = [
f"{a['Mailbox']}@{a['Domain']}" for a in msg["To"]
]
if email in recipients:
return msg
return NoneЭндпоинты MailHog (стандартные, открыты на :8025):
| Метод | URL | Что делает |
|---|---|---|
| GET | /api/v2/messages |
Список писем (с пагинацией) |
| GET | /api/v2/search?kind=to&query=user@x.com |
Поиск по полю |
| DELETE | /api/v1/messages |
Удалить все письма |
Тело сообщения лежит в msg["Content"]["Body"], заголовки — в
msg["Content"]["Headers"] (это dict[str, list[str]]).
# tests/conftest.py
import pytest
from taskiq import InMemoryBroker
from taskiq.brokers.shared_broker import async_shared_broker
from tests.mailhog import MailHog
from your_project.mail import Mailer
@pytest.fixture(scope="session")
def anyio_backend() -> str:
return "asyncio"
@pytest.fixture
def mailhog() -> MailHog:
return MailHog(base_url="http://localhost:8025")
@pytest.fixture(autouse=True)
async def broker():
"""Подменяем default broker для shared-задач на InMemoryBroker.
await_inplace=True: kiq() выполняется до return — нет гонок,
тест читает MailHog уже после того, как письмо ушло на SMTP.
"""
test_broker = InMemoryBroker(await_inplace=True)
async_shared_broker.default_broker(test_broker)
await test_broker.startup()
yield test_broker
await test_broker.shutdown()
@pytest.fixture(autouse=True)
async def _clean_mailhog(mailhog: MailHog):
"""Чистим MailHog ДО теста — чтобы не зависеть от порядка прогонов."""
await mailhog.reset()
yield
@pytest.fixture(autouse=True)
async def _wire_taskiq_deps(broker, mailer):
"""Если у задачи TaskiqDepends — подкладываем зависимости здесь."""
broker.add_dependency_context({Mailer: mailer})
yield
broker.custom_dependency_context = {}Если в задаче нет
TaskiqDepends— фикстура_wire_taskiq_depsне нужна.
async_shared_broker.default_broker(test_broker) — единственная точка
переключения. Production main привязывает AioPikaBroker, тестовый conftest
— InMemoryBroker. Код задач и сервисов одинаковый в обеих средах.
# tests/test_registration.py
import pytest
from your_project.services import register_user
pytestmark = pytest.mark.anyio
async def test_registration_sends_welcome_email(mailhog):
await register_user(email="alice@test.dev", password="hunter2")
msg = await mailhog.find_by_recipient("alice@test.dev")
assert msg is not None
assert msg["Content"]["Headers"]["Subject"] == ["Welcome!"]
assert "Thanks for signing up." in msg["Content"]["Body"]Никаких sleep, никаких retry-loop. Поскольку InMemoryBroker(await_inplace=True)
гарантирует, что register_user вернётся только когда таска отработала,
к моменту опроса MailHog письмо уже там.
Если по каким-то причинам async_shared_broker не подходит (например, у вас
уже тонна @broker.task и переписывать жалко), можно остаться на привычном
@broker.task и подменять сам broker через pytest-env:
# your_project/tkq.py
import os
from taskiq import AsyncBroker, InMemoryBroker
if os.environ.get("ENVIRONMENT") == "pytest":
broker: AsyncBroker = InMemoryBroker(await_inplace=True)
else:
from taskiq_aio_pika import AioPikaBroker
broker = AioPikaBroker(os.environ["RABBITMQ_URL"])# pyproject.toml
[tool.pytest.ini_options]
env = ["ENVIRONMENT=pytest"]pytest-env ставит переменную до импорта your_project.tkq, так что
broker подменяется один раз на весь прогон. Conftest становится проще
(broker уже правильный, фикстура только зовёт startup/shutdown).
Минусы по сравнению с shared_broker:
- Развилка
if env == "pytest"живёт в production-коде. Каждая новая тестовая среда (pytest_unitподFakeBroker, например) — ещё одна ветка. - Порядок импортов имеет значение: если что-то импортирует
tkqдо того, какpytest-envпоставил переменную, broker зафиксируется неправильным. - ENV-переменные легко забыть установить локально (запустил
python -m pytestнапрямую безpytest-env— поедет в прод-RabbitMQ).
shared_broker от этого свободен: подмена живёт в conftest, явная и
программная, импорт-агностичная.
Если регистрация делает await register_user.kiq() и сразу возвращает (как
обычно бывает в HTTP-эндпоинте), а await_inplace=True вам не подходит
(скрывает реальные гонки), используйте wait_all():
async def test_registration_sends_welcome_email(mailhog):
await register_user(email="alice@test.dev", password="hunter2")
await broker.wait_all() # дренируем все pending kicks
msg = await mailhog.find_by_recipient("alice@test.dev")
assert msg is not NoneМежду этими двумя:
await_inplace=True— проще, тест читается линейно, скрывает race conditions.wait_all()— поведение ближе к проду, видит race conditions, требует дисциплины (вызывать перед каждой ассертой).
Один «толстый» тест проверяет всю цепочку API → kiq → задача → SMTP. Это удобно как регрессионная страховка, но плохо локализует баги: упал — иди читай весь стек. Часто чище разнести проверку на два независимых теста:
- API-тест:
POST /registerдействительно делаетkiq()нужной задачи с нужными аргументами. Никакого SMTP, никакого MailHog. - Тест задачи:
send_welcome_email(...)отправляет правильное письмо правильному адресату. Никакого HTTP-слоя.
Стык между ними держится на типах аргументов задачи (Pydantic-модель payload
или dataclass). Если хочется ещё страховки — оставить один тонкий
end-to-end smoke-тест с InMemoryBroker + MailHog.
Вместо моков и monkeypatch — отдельный broker, который реализует только
kick (копит сообщения) и явно отказывается слушать. Никакой магии, чистый
контракт AsyncBroker:
# tests/fake_broker.py
from collections.abc import AsyncGenerator
from typing import Any
from taskiq import AckableMessage, AsyncBroker, BrokerMessage
from taskiq.decor import AsyncTaskiqDecoratedTask
from taskiq.message import TaskiqMessage
class FakeBroker(AsyncBroker):
"""Broker для unit-тестов: накапливает kicked-сообщения, ничего не выполняет."""
def __init__(self) -> None:
super().__init__()
self.messages: list[BrokerMessage] = []
async def startup(self) -> None:
self.messages.clear()
await super().startup()
async def shutdown(self) -> None:
self.messages.clear()
await super().shutdown()
async def kick(self, message: BrokerMessage) -> None:
self.messages.append(message)
async def listen(self) -> AsyncGenerator[bytes | AckableMessage, None]: # type: ignore[override]
raise RuntimeError("FakeBroker cannot listen")
yield # для типизации как AsyncGenerator
# --- удобные ассерты ---
def messages_for(self, task: AsyncTaskiqDecoratedTask[Any, Any]) -> list[TaskiqMessage]:
"""Все накопленные TaskiqMessage для конкретной задачи (уже распарсенные formatter'ом)."""
return [
self.formatter.loads(m.message)
for m in self.messages
if m.task_name == task.task_name
]
def assert_kicked_once(
self,
task: AsyncTaskiqDecoratedTask[Any, Any],
) -> TaskiqMessage:
msgs = self.messages_for(task)
assert len(msgs) == 1, (
f"expected 1 kick of {task.task_name}, got {len(msgs)}: {msgs}"
)
return msgs[0]Что здесь важно:
broker.formatter.loads(...)вместоorjson.loadsнапрямую. Формат сериализации (JSON / msgpack / pickle) — деталь конкретного broker'а; если его сменят, тесты не должны падать.task.task_nameкак источник правды, а неendswith("...")— переименование функции/модуля автоматически синхронизирует тесты с кодом.startup/shutdownчистятmessages— фикстура lifecycle и так обнуляет состояние, забытьclear()руками невозможно.listenбросаетRuntimeError— никто случайно не запустит worker на этом broker'е и не получит мистические зависания.
Подмена через тот же shared_broker — на этот раз подсовываем FakeBroker:
# tests/conftest.py (для unit-тестов API)
import pytest
from taskiq.brokers.shared_broker import async_shared_broker
from tests.fake_broker import FakeBroker
@pytest.fixture
async def fake_broker() -> FakeBroker:
broker = FakeBroker()
async_shared_broker.default_broker(broker)
await broker.startup()
yield broker
await broker.shutdown()Никаких отдельных ENV-веток — для разных тестовых файлов используются разные
фикстуры (broker с InMemoryBroker для интеграционных, fake_broker с
FakeBroker для юнитовых API).
# tests/test_registration_api.py
import pytest
from your_project.tasks import send_welcome_email
pytestmark = pytest.mark.anyio
async def test_register_kicks_welcome_email(client, fake_broker):
await client.post("/register", json={"email": "alice@test.dev", "password": "x"})
msg = fake_broker.assert_kicked_once(send_welcome_email)
assert msg.args == ["alice@test.dev"]Тут нет ни MailHog, ни SMTP, ни broker.startup-времени — всё работает in-process за миллисекунды. Можно гонять в каждом push.
Задача — это обычная async def. Зовём напрямую, без всякого taskiq:
# tests/test_send_welcome_email.py
import pytest
from your_project.tasks import send_welcome_email
pytestmark = pytest.mark.anyio
async def test_send_welcome_email(mailhog, mailer):
await send_welcome_email("alice@test.dev", mailer=mailer)
msg = await mailhog.find_by_recipient("alice@test.dev")
assert msg is not None
assert msg["Content"]["Headers"]["Subject"] == ["Welcome!"]mailer — реальный SMTP-клиент, направленный на MailHog. Брокер вообще не
участвует. Если у задачи TaskiqDepends, передаём зависимость явно
аргументом — TaskiqDepends() это default value, который перекрывается
обычным позиционным/именованным параметром.
Сцепку. Если кто-то поменяет имя параметра задачи (user_email → email),
но не обновит API-вызов, оба тонких теста зелёные, прод — нет. Защита:
- Pydantic-модель payload, которую и API, и задача принимают/возвращают. Тогда mypy ловит расхождение на этапе сборки.
- Либо один end-to-end smoke-тест с
InMemoryBroker(await_inplace=True)и MailHog поверх двух тонких — он медленный, но один такой не страшно.
| Ситуация | Стратегия |
|---|---|
| Простые API + простая задача, мало логики | Один толстый тест с InMemoryBroker + MailHog |
| API со своей логикой (валидация, rate limit, conditional kick) | Тонкий API-тест с FakeBroker |
| Задача со своей логикой (шаблоны, ветвления, retry) | Тонкий тест задачи как async def |
| Проект большой, разделение unit/integration уже есть | Пирамида: много FakeBroker-тестов + точечные интеграционные с MailHog |
Только если сам предмет теста — это RabbitMQ-специфика: ack/nack, deadletter, очерёдность, потеря сообщений при рестарте worker'а. Для «пришло ли письмо после регистрации» это оверкилл.
Если всё-таки нужен полный E2E:
# docker-compose.e2e.yml
services:
rabbitmq:
image: rabbitmq:3-management
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
interval: 2s
timeout: 2s
retries: 20
mailhog:
image: mailhog/mailhog
ports: ["1025:1025", "8025:8025"]
worker:
build: .
command: taskiq worker your_project.tkq:broker
environment:
RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672/
SMTP_HOST: mailhog
SMTP_PORT: 1025
depends_on:
rabbitmq:
condition: service_healthy
mailhog:
condition: service_startedИ тест с polling — потому что worker асинхронен и в другом процессе:
import asyncio
async def wait_for_email(mailhog, recipient: str, timeout: float = 10.0) -> dict:
deadline = asyncio.get_event_loop().time() + timeout
while asyncio.get_event_loop().time() < deadline:
msg = await mailhog.find_by_recipient(recipient)
if msg is not None:
return msg
await asyncio.sleep(0.2)
raise AssertionError(f"No email for {recipient} after {timeout}s")
async def test_registration_e2e(mailhog):
# Уникальный email на каждый прогон, чтобы тесты не интерферировали
email = f"alice+{uuid.uuid4().hex[:8]}@test.dev"
await register_user(email=email, password="hunter2")
msg = await wait_for_email(mailhog, email, timeout=10.0)
assert msg["Content"]["Headers"]["Subject"] == ["Welcome!"]Здесь критичны три вещи:
- healthcheck для rabbitmq +
condition: service_healthyу worker'а, иначе worker падает с "connection refused" на старте. - Polling с таймаутом, не
sleep(N). CI бывает медленный. - Уникальный recipient на тест или агрессивный
mailhog.reset()— иначе тесты увидят письма друг друга.
| Вопрос | Ответ |
|---|---|
Стоит ли держать taskiq-контейнер для теста на email? |
Нет. |
| Нужен ли RabbitMQ в тесте? | Нет — InMemoryBroker с тем же интерфейсом. |
| Нужен ли MailHog? | Да, если хотите проверить реальный SMTP-флоу. Иначе можно мокнуть Mailer целиком. |
Как избежать sleep в тесте? |
InMemoryBroker(await_inplace=True) или await broker.wait_all() перед ассертами. |
| Как изолировать тесты друг от друга? | mailhog.reset() в autouse фикстуре + broker.custom_dependency_context = {} в teardown. |