DAG, задачи и операторы

Урок разбирает три кирпичика Airflow: DAG, задачу (task) и оператор (operator).

DAG — Directed Acyclic Graph, направленный ациклический граф задач: вершины — задачи, рёбра — зависимости, циклов нет.

Почему именно граф без циклов

Конвейер — это набор шагов с порядком: extract должен пройти до transform, а тот — до load. Это естественно описывается направленным графом. «Ациклический» значит, что задача не может зависеть сама от себя (прямо или через цепочку) — иначе конвейер зациклился бы навсегда.

Это не каприз разработчиков Airflow, а математическое требование. Чтобы планировщик мог решить, в каком порядке запускать задачи, граф должен допускать топологическую сортировку — линейный порядок, при котором каждая задача идёт после всех своих предков. Такая сортировка существует тогда и только тогда, когда в графе нет циклов. Поэтому Airflow при загрузке DAG проверяет ацикличность и отвергает граф с циклом ещё до запуска.

extract ──> transform ──> load ──> notify
   │                       ▲
   └────── validate ───────┘   (validate тоже должен пройти до load)

Task против Operator

Разница тонкая, но важная. Operator — это шаблон того, что делать (запустить Python-функцию, выполнить SQL, дёрнуть bash-команду). Task — это конкретный экземпляр оператора в конкретном DAG с конкретным task_id.

ОператорЧто делает
PythonOperatorвызывает Python-функцию
BashOperatorвыполняет shell-команду
SQLExecuteQueryOperatorвыполняет SQL в базе
EmptyOperatorпустышка-маркер (узел графа)

Как работает под капотом

Соберём DAG из трёх задач на разных операторах. Код требует пакет airflow, поэтому он помечен text (в браузере не запускается).

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

def transform():
    print("преобразуем данные")

with DAG("demo", start_date=datetime(2026,1,1), schedule="@daily", catchup=False) as dag:
    extract = BashOperator(task_id="extract", bash_command="echo extract")
    process = PythonOperator(task_id="process", python_callable=transform)
    load = BashOperator(task_id="load", bash_command="echo load")
    extract >> process >> load

Сам граф зависимостей можно смоделировать на чистом Python и проверить топологический порядок — это ровно то, что делает планировщик.

deps = {"extract": [], "transform": ["extract"], "load": ["transform"], "notify": ["load"]}
order = []
while len(order) < len(deps):
    for task, need in deps.items():
        if task not in order and all(n in order for n in need):
            order.append(task)
            break
print(" -> ".join(order))

Вывод:

extract -> transform -> load -> notify

Частые ошибки

  • Создать цикл в зависимостях. Если A зависит от B, а B от A, Airflow отвергнет DAG — граф обязан быть ациклическим.
  • Дублировать task_id. Внутри одного DAG идентификаторы задач должны быть уникальны.
  • Путать оператор и задачу. Оператор — класс-шаблон, задача — его конкретный экземпляр в DAG.

Итог

  • DAG — направленный ациклический граф: задачи-вершины и зависимости-рёбра без циклов.
  • Operator — шаблон действия, task — его конкретный экземпляр с task_id.
  • Из операторов (Python, Bash, SQL) собирается граф конвейера.
Проверьте себя
1. Почему граф DAG обязан быть ациклическим?
AТак быстрее работает интерфейс
BИначе задачи зациклятся, ожидая друг друга бесконечно
CЦиклы запрещены языком Python
DЭто требование cron
2. В чём разница между operator и task в Airflow?
AЭто одно и то же
BOperator — шаблон действия (класс), task — его конкретный экземпляр в DAG с task_id
CTask выполняется в браузере, operator на сервере
DOperator хранит данные, task их удаляет
3. Какой оператор нужен, чтобы вызвать обычную Python-функцию?
ABashOperator
BEmptyOperator
CPythonOperator
DSQLExecuteQueryOperator