Фоновые задачи с Celery
Когда внутри HTTP-запроса делается что-то долгое — отправка письма, генерация отчёта, обращение к внешнему API, — пользователь ждёт, а сервер занят впустую; Celery уносит такую работу в фон, и запрос отвечает мгновенно.
Celery — система очередей задач для Python: ваше приложение кладёт «задание» в брокер сообщений, а отдельные процессы-воркеры разбирают очередь и выполняют задания асинхронно, вне цикла обработки запроса.
Веб-запрос должен быть быстрым: пользователь нажал кнопку — через сотни миллисекунд получил ответ. Но часть работы по своей природе медленная: отправить письмо (секунды на SMTP), сжать загруженное видео (минуты), дёрнуть платёжный шлюз (зависит от чужого сервера). Если делать это прямо во вьюхе, пользователь смотрит на крутящийся индикатор, а воркер веб-сервера всё это время занят одним клиентом и не обслуживает других. Решение — не делать долгое в запросе, а поставить задачу в очередь и ответить сразу, а саму работу выполнить фоном.
Зачем это на практике
Классический пример — регистрация. После создания пользователя нужно отправить письмо с подтверждением. Если ждать SMTP прямо в обработчике, форма «думает» 2-3 секунды, а при недоступном почтовом сервере — падает по таймауту, и регистрация срывается из-за второстепенного шага. Вынеся отправку в Celery, вы сохраняете пользователя, ставите задачу «отправить письмо» в очередь и мгновенно показываете «готово». Письмо уйдёт через секунду фоном, а если SMTP временно лежит — задача переотправится позже, не уронив регистрацию. Так фоновые задачи делают приложение и быстрым, и устойчивым: медленные и ненадёжные шаги изолированы от пользовательского пути.
Из чего состоит Celery
В системе три участника. Producer (ваше Django-приложение) формирует задачу и отправляет её. Брокер — посредник-очередь, где задачи ждут исполнителя; чаще всего это Redis или RabbitMQ. Worker — отдельный процесс Celery, который постоянно слушает брокер, забирает задачи и выполняет их код. Опционально есть ещё result backend — хранилище результатов, если они нужны.
| Компонент | Роль |
| Django (producer) | создаёт задачу: task.delay(args) — кладёт в брокер и сразу возвращает управление |
| брокер (Redis/RabbitMQ) | хранит очередь задач, пока их не заберёт воркер |
| воркер Celery | отдельный процесс; забирает задачи из брокера и исполняет их функции |
| result backend | хранит результат/статус задачи (если нужно его потом узнать) |
Важно понять: воркер — это не ваш веб-сервер. Это отдельный процесс, который вы запускаете рядом командой celery -A myproject worker. Он импортирует тот же код Django, имеет доступ к тем же моделям и БД, но живёт своей жизнью и обрабатывает очередь независимо от gunicorn/uwsgi.
Минимальная настройка и первая задача
Задача — это обычная Python-функция, помеченная декоратором @shared_task. Вызывают её не напрямую, а через .delay() — это и есть «поставить в очередь».
# app/tasks.py
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_welcome_email(user_id):
user = User.objects.get(pk=user_id)
send_mail(
"Добро пожаловать",
"Спасибо за регистрацию!",
"[email protected]",
[user.email],
)
return user.email # вернётся в result backend, если он настроен
# во вьюхе: НЕ вызываем функцию напрямую, а ставим в очередь
def register(request):
user = create_user(request.POST)
send_welcome_email.delay(user.id) # вернётся мгновенно, письмо уйдёт фоном
return redirect("welcome")
Обратите внимание: в задачу передаётся user.id, а не сам объект user. Это намеренно — к моменту, когда воркер возьмёт задачу, переданный объект уже устарел бы; правильнее передавать идентификатор и заново загружать свежую запись из БД внутри задачи. Аргументы задачи проходят через брокер сериализованными (обычно JSON), поэтому они и должны быть простыми — числа, строки, id, а не сложные объекты Django.
Брокер: где живут задачи
Брокер настраивается одной строкой. Redis — самый частый выбор, тем более если он уже стоит под кэш.
# settings.py
CELERY_BROKER_URL = "redis://127.0.0.1:6379/0"
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/0" # если нужны результаты
CELERY_TASK_SERIALIZER = "json"
CELERY_ACCEPT_CONTENT = ["json"]
RabbitMQ — более «серьёзный» брокер с гарантиями доставки и маршрутизацией; его берут в крупных системах. Для большинства Django-проектов Redis достаточно. Запускают воркер так:
celery -A myproject worker --loglevel=info
# -A указывает на модуль с экземпляром Celery (обычно myproject/celery.py)
Очереди и приоритеты
По умолчанию все задачи валятся в одну очередь celery, и быстрые задачи могут застрять за медленными. Решение — разные очереди под разные классы задач, и отдельные воркеры на каждую. Например: лёгкие письма в очередь emails, тяжёлую обработку видео в media.
# направить задачу в конкретную очередь
send_welcome_email.apply_async(args=[user.id], queue="emails")
transcode_video.apply_async(args=[video.id], queue="media")
# один воркер обслуживает быстрые письма, другой — тяжёлое видео
celery -A myproject worker -Q emails --concurrency=8
celery -A myproject worker -Q media --concurrency=2
Так минутная перекодировка видео не блокирует отправку писем: у них разные воркеры и разные очереди. Параметр --concurrency задаёт, сколько задач воркер тянет параллельно — для лёгких задач больше, для тяжёлых CPU-задач обычно по числу ядер.
Периодические задачи: Celery Beat
Часть работы нужна не «по событию», а «по расписанию»: ночью пересчитать статистику, раз в час чистить просроченные сессии, каждое утро слать дайджест. За это отвечает Celery Beat — планировщик, который по таймеру сам ставит задачи в очередь, а выполняют их обычные воркеры.
# settings.py — расписание
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
"nightly-stats": {
"task": "app.tasks.recalc_stats",
"schedule": crontab(hour=3, minute=0), # каждый день в 03:00
},
"cleanup-sessions": {
"task": "app.tasks.clear_expired",
"schedule": crontab(minute=0), # каждый час в начале часа
},
}
# Beat — ОТДЕЛЬНЫЙ процесс, запускается рядом с воркером
celery -A myproject beat --loglevel=info
Beat сам ничего не выполняет — он только в нужный момент кладёт задачу в брокер, а исполняет её воркер. Поэтому Beat запускают в единственном экземпляре (иначе расписание сработает несколько раз), а воркеров под нагрузку может быть много.
Идемпотентность: задача может выполниться дважды
Ключевое свойство очередей: задача может быть выполнена больше одного раза. Воркер упал посреди работы, не успев подтвердить брокеру завершение, — брокер отдаст задачу другому воркеру. Сеть моргнула при подтверждении — то же самое. Поэтому задачи проектируют идемпотентными: повторный запуск с теми же аргументами не должен ломать данные.
# НЕ идемпотентно: повтор начислит бонус дважды
@shared_task
def give_bonus(user_id):
user = User.objects.get(pk=user_id)
user.balance += 100 # повтор задачи: +100 ещё раз, баг
user.save()
# идемпотентно: помечаем факт начисления и проверяем его
@shared_task
def give_bonus(user_id):
user = User.objects.get(pk=user_id)
if user.bonus_granted: # уже начисляли — выходим
return
user.balance += 100
user.bonus_granted = True
user.save()
Идея — сделать так, чтобы результат зависел от конечного состояния, а не от числа выполнений. Способы: проверять флаг/статус перед действием (как выше), использовать уникальные ключи операций, опираться на update_or_create вместо слепого create. Это не паранойя, а норма для распределённых систем: «доставлено хотя бы один раз» (at-least-once) — стандартная гарантия брокеров, и код обязан её переживать.
Как это работает под капотом
Когда вы вызываете send_welcome_email.delay(7), Celery не запускает функцию — он сериализует имя задачи и аргументы в сообщение (по умолчанию JSON) и кладёт его в брокер, в список Redis или в обменник RabbitMQ. Вызов возвращает объект AsyncResult с id задачи и тут же отдаёт управление вьюхе — отсюда мгновенный ответ. Параллельно процесс-воркер крутит цикл: «забери сообщение из очереди → найди по имени зарегистрированную функцию → исполни с переданными аргументами → подтверди брокеру (ack)». Подтверждение (ack) и есть механизм надёжности: пока воркер не сказал «сделано», брокер считает задачу невыполненной и при падении воркера вернёт её в очередь — поэтому возможен повторный запуск. Результат функции, если настроен result backend, складывается туда под id задачи, и приложение может позже спросить статус через AsyncResult(task_id).get(). Никакой магии параллелизма внутри Django нет — вся асинхронность вынесена в отдельные процессы, общающиеся через брокер.
Частые ошибки
- Передавать в задачу объект модели вместо id. Объект сериализуется и устаревает; к моменту выполнения данные могли измениться. Передавайте
obj.idи загружайте свежую запись внутри задачи. - Считать задачи идемпотентными по умолчанию. Брокер гарантирует «хотя бы один раз», и задача может выполниться дважды. Начисления, списания, отправки — защищайте флагом или уникальным ключом.
- Вызывать задачу напрямую вместо
.delay().send_welcome_email(user.id)выполнит её синхронно в запросе — весь смысл выноса в фон теряется. Нужен.delay()или.apply_async(). - Запускать несколько процессов Beat. Каждый поставит задачу по расписанию, и ночная рассылка уйдёт трижды. Beat — строго в одном экземпляре, воркеров — сколько угодно.
- Складывать всё в одну очередь. Тяжёлые задачи забивают воркеров, и быстрые письма ждут часами. Разделяйте очереди и выделяйте им отдельных воркеров.
Итоги
- Celery выносит долгую и ненадёжную работу (письма, отчёты, внешние API) из HTTP-запроса в фон, и запрос отвечает мгновенно.
- Участники: Django-producer кладёт задачу через
.delay(), брокер (Redis/RabbitMQ) хранит очередь, отдельный процесс-воркер её исполняет. - В задачу передают простые сериализуемые аргументы (id), а не объекты, и загружают свежие данные внутри.
- Разные очереди и воркеры разводят лёгкие и тяжёлые задачи; Celery Beat (в одном экземпляре) ставит задачи по расписанию.
- Брокер гарантирует доставку «хотя бы один раз», поэтому задачи делают идемпотентными — повтор не должен портить данные.