Первый DAG: расписание и параметры
Урок показывает, как правильно настроить расписание DAG и не утонуть в случайных запусках.
schedule — как часто запускать DAG (например,
@dailyили cron-строка). logical date — дата периода, за который выполняется запуск, а не момент времени.
Три обязательных параметра
У каждого DAG есть три ключевых настройки расписания: start_date (с какого дня конвейер «существует»), schedule (как часто запускать) и catchup (догонять ли пропущенные периоды). Ошибка в любом из них приводит к лавине лишних запусков или, наоборот, к тишине.
| schedule | Значение |
@daily | раз в сутки в полночь |
@hourly | раз в час |
0 6 * * * | cron: каждый день в 6:00 |
None | только вручную |
Логическая дата — не «сейчас»
Это понятие путает новичков. Когда DAG с @daily запускается ночью 22 июня, его логическая дата — 21 июня: он обрабатывает данные за прошедший завершённый день. Поэтому в задачах используют логическую дату, а не текущее время — иначе при перезапуске вчерашнего дня конвейер возьмёт сегодняшние данные.
Запомните это как мантру: в коде задачи — логическая дата, а не now(). Airflow подставляет её в шаблоны через переменную {{ ds }} (date string) и передаёт в функции. Благодаря этому любой запуск — хоть свежий, хоть перезапуск месячной давности — обрабатывает ровно тот период, который должен. Это фундамент, на котором держатся backfill и идемпотентность, которые мы разберём в следующем разделе.
Как работает под капотом
Вот DAG с расписанием. Код требует airflow, помечен text.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id="daily_report",
start_date=datetime(2026, 6, 1),
schedule="@daily",
catchup=False, # не догонять пропущенные дни
) as dag:
# {{ ds }} — логическая дата запуска в формате YYYY-MM-DD
run = BashOperator(
task_id="report",
bash_command="echo Считаю отчёт за {{ ds }}",
)Сымитируем, какие логические даты сгенерирует планировщик между двумя датами при ежедневном расписании.
from datetime import date, timedelta
start = date(2026, 6, 1)
end = date(2026, 6, 5)
d = start
while d < end:
print("запуск за", d.isoformat())
d += timedelta(days=1)Вывод:
запуск за 2026-06-01 запуск за 2026-06-02 запуск за 2026-06-03 запуск за 2026-06-04
Частые ошибки
- Оставить
catchup=Trueсо старым start_date. Airflow попытается «догнать» все пропущенные дни с этой даты — десятки запусков разом. Для нового DAG почти всегда ставятcatchup=False. - Брать в задачах
datetime.now()вместо логической даты. Тогда перезапуск вчерашнего дня обработает сегодняшние данные. - Динамический
start_date(например,datetime.now()). Это запрещено: дата старта должна быть фиксированной.
Итог
- Расписание DAG задают через start_date, schedule и catchup.
- Логическая дата — это период запуска, а не текущий момент; на неё опираются задачи.
- Для нового DAG обычно ставят catchup=False, чтобы не догонять прошлое лавиной.