Первый DAG: расписание и параметры

Урок показывает, как правильно настроить расписание DAG и не утонуть в случайных запусках.

schedule — как часто запускать DAG (например, @daily или cron-строка). logical date — дата периода, за который выполняется запуск, а не момент времени.

Три обязательных параметра

У каждого DAG есть три ключевых настройки расписания: start_date (с какого дня конвейер «существует»), schedule (как часто запускать) и catchup (догонять ли пропущенные периоды). Ошибка в любом из них приводит к лавине лишних запусков или, наоборот, к тишине.

scheduleЗначение
@dailyраз в сутки в полночь
@hourlyраз в час
0 6 * * *cron: каждый день в 6:00
Noneтолько вручную

Логическая дата — не «сейчас»

Это понятие путает новичков. Когда DAG с @daily запускается ночью 22 июня, его логическая дата — 21 июня: он обрабатывает данные за прошедший завершённый день. Поэтому в задачах используют логическую дату, а не текущее время — иначе при перезапуске вчерашнего дня конвейер возьмёт сегодняшние данные.

Запомните это как мантру: в коде задачи — логическая дата, а не now(). Airflow подставляет её в шаблоны через переменную {{ ds }} (date string) и передаёт в функции. Благодаря этому любой запуск — хоть свежий, хоть перезапуск месячной давности — обрабатывает ровно тот период, который должен. Это фундамент, на котором держатся backfill и идемпотентность, которые мы разберём в следующем разделе.

Как работает под капотом

Вот DAG с расписанием. Код требует airflow, помечен text.

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id="daily_report",
    start_date=datetime(2026, 6, 1),
    schedule="@daily",
    catchup=False,   # не догонять пропущенные дни
) as dag:
    # {{ ds }} — логическая дата запуска в формате YYYY-MM-DD
    run = BashOperator(
        task_id="report",
        bash_command="echo Считаю отчёт за {{ ds }}",
    )

Сымитируем, какие логические даты сгенерирует планировщик между двумя датами при ежедневном расписании.

from datetime import date, timedelta

start = date(2026, 6, 1)
end = date(2026, 6, 5)
d = start
while d < end:
    print("запуск за", d.isoformat())
    d += timedelta(days=1)

Вывод:

запуск за 2026-06-01
запуск за 2026-06-02
запуск за 2026-06-03
запуск за 2026-06-04

Частые ошибки

  • Оставить catchup=True со старым start_date. Airflow попытается «догнать» все пропущенные дни с этой даты — десятки запусков разом. Для нового DAG почти всегда ставят catchup=False.
  • Брать в задачах datetime.now() вместо логической даты. Тогда перезапуск вчерашнего дня обработает сегодняшние данные.
  • Динамический start_date (например, datetime.now()). Это запрещено: дата старта должна быть фиксированной.

Итог

  • Расписание DAG задают через start_date, schedule и catchup.
  • Логическая дата — это период запуска, а не текущий момент; на неё опираются задачи.
  • Для нового DAG обычно ставят catchup=False, чтобы не догонять прошлое лавиной.
Проверьте себя
1. Что такое логическая дата (logical date) запуска DAG?
AМомент, когда задача физически стартовала
BДата периода, за который выполняется запуск (часто вчерашний день)
CДата создания файла DAG
DДата установки Airflow
2. Зачем для нового DAG обычно ставят catchup=False?
AЧтобы ускорить отдельную задачу
BЧтобы Airflow не пытался догнать все пропущенные периоды с start_date лавиной запусков
CЧтобы отключить расписание полностью
DЧтобы запретить ручной запуск