Фоновые задачи с Celery

Когда внутри HTTP-запроса делается что-то долгое — отправка письма, генерация отчёта, обращение к внешнему API, — пользователь ждёт, а сервер занят впустую; Celery уносит такую работу в фон, и запрос отвечает мгновенно.

Celery — система очередей задач для Python: ваше приложение кладёт «задание» в брокер сообщений, а отдельные процессы-воркеры разбирают очередь и выполняют задания асинхронно, вне цикла обработки запроса.

Веб-запрос должен быть быстрым: пользователь нажал кнопку — через сотни миллисекунд получил ответ. Но часть работы по своей природе медленная: отправить письмо (секунды на SMTP), сжать загруженное видео (минуты), дёрнуть платёжный шлюз (зависит от чужого сервера). Если делать это прямо во вьюхе, пользователь смотрит на крутящийся индикатор, а воркер веб-сервера всё это время занят одним клиентом и не обслуживает других. Решение — не делать долгое в запросе, а поставить задачу в очередь и ответить сразу, а саму работу выполнить фоном.

Зачем это на практике

Классический пример — регистрация. После создания пользователя нужно отправить письмо с подтверждением. Если ждать SMTP прямо в обработчике, форма «думает» 2-3 секунды, а при недоступном почтовом сервере — падает по таймауту, и регистрация срывается из-за второстепенного шага. Вынеся отправку в Celery, вы сохраняете пользователя, ставите задачу «отправить письмо» в очередь и мгновенно показываете «готово». Письмо уйдёт через секунду фоном, а если SMTP временно лежит — задача переотправится позже, не уронив регистрацию. Так фоновые задачи делают приложение и быстрым, и устойчивым: медленные и ненадёжные шаги изолированы от пользовательского пути.

Из чего состоит Celery

В системе три участника. Producer (ваше Django-приложение) формирует задачу и отправляет её. Брокер — посредник-очередь, где задачи ждут исполнителя; чаще всего это Redis или RabbitMQ. Worker — отдельный процесс Celery, который постоянно слушает брокер, забирает задачи и выполняет их код. Опционально есть ещё result backend — хранилище результатов, если они нужны.

КомпонентРоль
Django (producer)создаёт задачу: task.delay(args) — кладёт в брокер и сразу возвращает управление
брокер (Redis/RabbitMQ)хранит очередь задач, пока их не заберёт воркер
воркер Celeryотдельный процесс; забирает задачи из брокера и исполняет их функции
result backendхранит результат/статус задачи (если нужно его потом узнать)

Важно понять: воркер — это не ваш веб-сервер. Это отдельный процесс, который вы запускаете рядом командой celery -A myproject worker. Он импортирует тот же код Django, имеет доступ к тем же моделям и БД, но живёт своей жизнью и обрабатывает очередь независимо от gunicorn/uwsgi.

Минимальная настройка и первая задача

Задача — это обычная Python-функция, помеченная декоратором @shared_task. Вызывают её не напрямую, а через .delay() — это и есть «поставить в очередь».

# app/tasks.py
from celery import shared_task
from django.core.mail import send_mail

@shared_task
def send_welcome_email(user_id):
    user = User.objects.get(pk=user_id)
    send_mail(
        "Добро пожаловать",
        "Спасибо за регистрацию!",
        "[email protected]",
        [user.email],
    )
    return user.email  # вернётся в result backend, если он настроен
# во вьюхе: НЕ вызываем функцию напрямую, а ставим в очередь
def register(request):
    user = create_user(request.POST)
    send_welcome_email.delay(user.id)   # вернётся мгновенно, письмо уйдёт фоном
    return redirect("welcome")

Обратите внимание: в задачу передаётся user.id, а не сам объект user. Это намеренно — к моменту, когда воркер возьмёт задачу, переданный объект уже устарел бы; правильнее передавать идентификатор и заново загружать свежую запись из БД внутри задачи. Аргументы задачи проходят через брокер сериализованными (обычно JSON), поэтому они и должны быть простыми — числа, строки, id, а не сложные объекты Django.

Брокер: где живут задачи

Брокер настраивается одной строкой. Redis — самый частый выбор, тем более если он уже стоит под кэш.

# settings.py
CELERY_BROKER_URL = "redis://127.0.0.1:6379/0"
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/0"  # если нужны результаты
CELERY_TASK_SERIALIZER = "json"
CELERY_ACCEPT_CONTENT = ["json"]

RabbitMQ — более «серьёзный» брокер с гарантиями доставки и маршрутизацией; его берут в крупных системах. Для большинства Django-проектов Redis достаточно. Запускают воркер так:

celery -A myproject worker --loglevel=info
# -A указывает на модуль с экземпляром Celery (обычно myproject/celery.py)

Очереди и приоритеты

По умолчанию все задачи валятся в одну очередь celery, и быстрые задачи могут застрять за медленными. Решение — разные очереди под разные классы задач, и отдельные воркеры на каждую. Например: лёгкие письма в очередь emails, тяжёлую обработку видео в media.

# направить задачу в конкретную очередь
send_welcome_email.apply_async(args=[user.id], queue="emails")
transcode_video.apply_async(args=[video.id], queue="media")
# один воркер обслуживает быстрые письма, другой — тяжёлое видео
celery -A myproject worker -Q emails --concurrency=8
celery -A myproject worker -Q media  --concurrency=2

Так минутная перекодировка видео не блокирует отправку писем: у них разные воркеры и разные очереди. Параметр --concurrency задаёт, сколько задач воркер тянет параллельно — для лёгких задач больше, для тяжёлых CPU-задач обычно по числу ядер.

Периодические задачи: Celery Beat

Часть работы нужна не «по событию», а «по расписанию»: ночью пересчитать статистику, раз в час чистить просроченные сессии, каждое утро слать дайджест. За это отвечает Celery Beat — планировщик, который по таймеру сам ставит задачи в очередь, а выполняют их обычные воркеры.

# settings.py — расписание
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    "nightly-stats": {
        "task": "app.tasks.recalc_stats",
        "schedule": crontab(hour=3, minute=0),   # каждый день в 03:00
    },
    "cleanup-sessions": {
        "task": "app.tasks.clear_expired",
        "schedule": crontab(minute=0),            # каждый час в начале часа
    },
}
# Beat — ОТДЕЛЬНЫЙ процесс, запускается рядом с воркером
celery -A myproject beat --loglevel=info

Beat сам ничего не выполняет — он только в нужный момент кладёт задачу в брокер, а исполняет её воркер. Поэтому Beat запускают в единственном экземпляре (иначе расписание сработает несколько раз), а воркеров под нагрузку может быть много.

Идемпотентность: задача может выполниться дважды

Ключевое свойство очередей: задача может быть выполнена больше одного раза. Воркер упал посреди работы, не успев подтвердить брокеру завершение, — брокер отдаст задачу другому воркеру. Сеть моргнула при подтверждении — то же самое. Поэтому задачи проектируют идемпотентными: повторный запуск с теми же аргументами не должен ломать данные.

# НЕ идемпотентно: повтор начислит бонус дважды
@shared_task
def give_bonus(user_id):
    user = User.objects.get(pk=user_id)
    user.balance += 100   # повтор задачи: +100 ещё раз, баг
    user.save()

# идемпотентно: помечаем факт начисления и проверяем его
@shared_task
def give_bonus(user_id):
    user = User.objects.get(pk=user_id)
    if user.bonus_granted:        # уже начисляли — выходим
        return
    user.balance += 100
    user.bonus_granted = True
    user.save()

Идея — сделать так, чтобы результат зависел от конечного состояния, а не от числа выполнений. Способы: проверять флаг/статус перед действием (как выше), использовать уникальные ключи операций, опираться на update_or_create вместо слепого create. Это не паранойя, а норма для распределённых систем: «доставлено хотя бы один раз» (at-least-once) — стандартная гарантия брокеров, и код обязан её переживать.

Как это работает под капотом

Когда вы вызываете send_welcome_email.delay(7), Celery не запускает функцию — он сериализует имя задачи и аргументы в сообщение (по умолчанию JSON) и кладёт его в брокер, в список Redis или в обменник RabbitMQ. Вызов возвращает объект AsyncResult с id задачи и тут же отдаёт управление вьюхе — отсюда мгновенный ответ. Параллельно процесс-воркер крутит цикл: «забери сообщение из очереди → найди по имени зарегистрированную функцию → исполни с переданными аргументами → подтверди брокеру (ack)». Подтверждение (ack) и есть механизм надёжности: пока воркер не сказал «сделано», брокер считает задачу невыполненной и при падении воркера вернёт её в очередь — поэтому возможен повторный запуск. Результат функции, если настроен result backend, складывается туда под id задачи, и приложение может позже спросить статус через AsyncResult(task_id).get(). Никакой магии параллелизма внутри Django нет — вся асинхронность вынесена в отдельные процессы, общающиеся через брокер.

Частые ошибки

  • Передавать в задачу объект модели вместо id. Объект сериализуется и устаревает; к моменту выполнения данные могли измениться. Передавайте obj.id и загружайте свежую запись внутри задачи.
  • Считать задачи идемпотентными по умолчанию. Брокер гарантирует «хотя бы один раз», и задача может выполниться дважды. Начисления, списания, отправки — защищайте флагом или уникальным ключом.
  • Вызывать задачу напрямую вместо .delay(). send_welcome_email(user.id) выполнит её синхронно в запросе — весь смысл выноса в фон теряется. Нужен .delay() или .apply_async().
  • Запускать несколько процессов Beat. Каждый поставит задачу по расписанию, и ночная рассылка уйдёт трижды. Beat — строго в одном экземпляре, воркеров — сколько угодно.
  • Складывать всё в одну очередь. Тяжёлые задачи забивают воркеров, и быстрые письма ждут часами. Разделяйте очереди и выделяйте им отдельных воркеров.

Итоги

  • Celery выносит долгую и ненадёжную работу (письма, отчёты, внешние API) из HTTP-запроса в фон, и запрос отвечает мгновенно.
  • Участники: Django-producer кладёт задачу через .delay(), брокер (Redis/RabbitMQ) хранит очередь, отдельный процесс-воркер её исполняет.
  • В задачу передают простые сериализуемые аргументы (id), а не объекты, и загружают свежие данные внутри.
  • Разные очереди и воркеры разводят лёгкие и тяжёлые задачи; Celery Beat (в одном экземпляре) ставит задачи по расписанию.
  • Брокер гарантирует доставку «хотя бы один раз», поэтому задачи делают идемпотентными — повтор не должен портить данные.
Проверьте себя
1. Зачем выносить отправку письма при регистрации в Celery вместо выполнения прямо во вьюхе?
AЧтобы письмо точно не отправилось
BЧтобы запрос отвечал мгновенно, а медленный и ненадёжный SMTP не задерживал пользователя и не ронял регистрацию по таймауту
CПотому что Django не умеет отправлять письма синхронно
DЧтобы письмо отправлялось дважды для надёжности
2. Почему в задачу Celery лучше передавать user.id, а не сам объект user?
AОбъекты вообще нельзя передавать в функции
BАргументы сериализуются и проходят через брокер; к моменту выполнения объект устарел бы, поэтому передают id и загружают свежую запись внутри задачи
Cid занимает меньше байт в базе данных
DПередача объекта удаляет его из базы
3. Что означает идемпотентность задачи и зачем она нужна в Celery?
AЗадача выполняется максимально быстро
BПовторный запуск задачи с теми же аргументами не портит данные — это важно, потому что брокер гарантирует доставку «хотя бы один раз» и задача может выполниться дважды
CЗадача никогда не возвращает результат
DЗадача выполняется только по расписанию