Идемпотентность и повторные запуски

Урок объясняет идемпотентность — свойство, без которого конвейеры нельзя безопасно перезапускать.

Идемпотентность — свойство операции, при котором повторное выполнение даёт тот же результат, что и однократное.

Почему это главное свойство пайплайна

Задачи падают: упал источник, кончилось место, обновился сервер. После починки задачу перезапускают — и она не должна испортить уже частично загруженные данные. Если конвейер идемпотентен, перезапуск всегда безопасен: он приводит данные к правильному состоянию независимо от того, сколько раз выполнялся.

Слово пугающее, но идея бытовая. Выключатель света идемпотентен по состоянию: сколько раз ни жми «выключить», свет останется выключенным. А вот кнопка «добавить товар в корзину» — нет: каждое нажатие добавляет ещё одну штуку. В дата-инженерии мы всегда хотим «выключатель», а не «корзину»: повтор загрузки должен оставлять данные в одном правильном состоянии, а не накапливать дубли с каждым запуском.

НЕидемпотентно:  INSERT строк → запуск ×2 = дубли ✗
идемпотентно:    заменить партицию за день → запуск ×N = один результат ✔

Три приёма идемпотентности

ПриёмСуть
Перезапись партицииудалить и записать данные за период заново
Upsert по ключуобновить запись или вставить, если нет
Delete-insertудалить строки периода, затем вставить новые

Как работает под капотом

Покажем delete-insert на SQL: загрузка за конкретный день сначала чистит этот день, потом вставляет — повтор даёт один и тот же результат.

CREATE TABLE daily_sales (day TEXT, region TEXT, amount INTEGER);
INSERT INTO daily_sales VALUES ('2026-06-21','Север',300);

-- идемпотентная загрузка за 21 июня: сначала удаляем, потом вставляем
DELETE FROM daily_sales WHERE day = '2026-06-21';
INSERT INTO daily_sales VALUES ('2026-06-21','Север',300), ('2026-06-21','Юг',500);

SELECT day, SUM(amount) AS total FROM daily_sales GROUP BY day;

Сравним два счётчика на Python: неидемпотентный накапливает, идемпотентный присваивает.

store = {}

def bad_load(day, value):
    store[day] = store.get(day, 0) + value   # накапливает

def good_load(day, value):
    store[day] = value                        # присваивает

good_load("2026-06-21", 800)
good_load("2026-06-21", 800)  # повтор после сбоя
print("идемпотентно:", store["2026-06-21"])

Вывод:

идемпотентно: 800

Разница в одной строке: += против =. Но именно она отделяет конвейер, который ломается при каждом сбое, от того, который спокойно переживает любые перезапуски. В реальном проде задачи падают регулярно — это норма, а не исключение. Поэтому идемпотентность закладывают с самого начала, а не «когда-нибудь потом»: переделывать неидемпотентный пайплайн на проде, где уже накопились дубли, гораздо больнее, чем сразу написать загрузку через delete-insert или upsert.

Частые ошибки

  • Чистый INSERT в загрузке. Любой повтор добавляет строки — данные раздуваются дублями.
  • Запоминать состояние «между запусками» в памяти задачи. Задачи Airflow могут выполняться на разных воркерах; состояние нужно хранить во внешнем хранилище.
  • Использовать текущее время вместо логической даты. Тогда повтор обрабатывает другой период, и результат непредсказуем.

Итог

  • Идемпотентность — повторный запуск даёт тот же результат, что и однократный.
  • Без неё перезапуск упавшей задачи создаёт дубли и порчу данных.
  • Приёмы: перезапись партиции, upsert по ключу, delete-insert за период.
Проверьте себя
1. Что значит, что загрузка данных идемпотентна?
AОна выполняется очень быстро
BПовторный запуск за тот же период даёт тот же результат без дублей
CОна работает только один раз и блокируется
DОна не требует ключа
2. Какой из приёмов НЕ обеспечивает идемпотентность?
AПерезапись партиции за период
BUpsert по уникальному ключу
CЧистый INSERT строк при каждом запуске
DDelete-insert за период
3. Почему нельзя хранить состояние задачи «между запусками» в её оперативной памяти?
AПамять слишком дорогая
BЗадачи Airflow могут выполняться на разных воркерах, и память не сохраняется
CЭто запрещено Python
DСостояние и так теряется специально