DAG, задачи и операторы
Урок разбирает три кирпичика Airflow: DAG, задачу (task) и оператор (operator).
DAG — Directed Acyclic Graph, направленный ациклический граф задач: вершины — задачи, рёбра — зависимости, циклов нет.
Почему именно граф без циклов
Конвейер — это набор шагов с порядком: extract должен пройти до transform, а тот — до load. Это естественно описывается направленным графом. «Ациклический» значит, что задача не может зависеть сама от себя (прямо или через цепочку) — иначе конвейер зациклился бы навсегда.
Это не каприз разработчиков Airflow, а математическое требование. Чтобы планировщик мог решить, в каком порядке запускать задачи, граф должен допускать топологическую сортировку — линейный порядок, при котором каждая задача идёт после всех своих предков. Такая сортировка существует тогда и только тогда, когда в графе нет циклов. Поэтому Airflow при загрузке DAG проверяет ацикличность и отвергает граф с циклом ещё до запуска.
extract ──> transform ──> load ──> notify
│ ▲
└────── validate ───────┘ (validate тоже должен пройти до load)Task против Operator
Разница тонкая, но важная. Operator — это шаблон того, что делать (запустить Python-функцию, выполнить SQL, дёрнуть bash-команду). Task — это конкретный экземпляр оператора в конкретном DAG с конкретным task_id.
| Оператор | Что делает |
| PythonOperator | вызывает Python-функцию |
| BashOperator | выполняет shell-команду |
| SQLExecuteQueryOperator | выполняет SQL в базе |
| EmptyOperator | пустышка-маркер (узел графа) |
Как работает под капотом
Соберём DAG из трёх задач на разных операторах. Код требует пакет airflow, поэтому он помечен text (в браузере не запускается).
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
def transform():
print("преобразуем данные")
with DAG("demo", start_date=datetime(2026,1,1), schedule="@daily", catchup=False) as dag:
extract = BashOperator(task_id="extract", bash_command="echo extract")
process = PythonOperator(task_id="process", python_callable=transform)
load = BashOperator(task_id="load", bash_command="echo load")
extract >> process >> loadСам граф зависимостей можно смоделировать на чистом Python и проверить топологический порядок — это ровно то, что делает планировщик.
deps = {"extract": [], "transform": ["extract"], "load": ["transform"], "notify": ["load"]}
order = []
while len(order) < len(deps):
for task, need in deps.items():
if task not in order and all(n in order for n in need):
order.append(task)
break
print(" -> ".join(order))Вывод:
extract -> transform -> load -> notify
Частые ошибки
- Создать цикл в зависимостях. Если A зависит от B, а B от A, Airflow отвергнет DAG — граф обязан быть ациклическим.
- Дублировать task_id. Внутри одного DAG идентификаторы задач должны быть уникальны.
- Путать оператор и задачу. Оператор — класс-шаблон, задача — его конкретный экземпляр в DAG.
Итог
- DAG — направленный ациклический граф: задачи-вершины и зависимости-рёбра без циклов.
- Operator — шаблон действия, task — его конкретный экземпляр с task_id.
- Из операторов (Python, Bash, SQL) собирается граф конвейера.