Structured Streaming: бесконечная таблица и watermarks

Урок вводит потоковую обработку в Spark через её центральную идею: поток — это бесконечно растущая таблица.

Structured Streaming — модель потоковой обработки, в которой входящий поток данных трактуется как бесконечная таблица, а результат пересчитывается инкрементально по мере прибытия новых строк.

Главная идея: поток как бесконечная таблица

Потоковая обработка обычно кажется чем-то принципиально иным, чем пакетная. Гениальность Structured Streaming в том, что он стирает эту границу: поток — это таблица, которая бесконечно растёт. Каждая новая порция данных — это новые строки, дописываемые в «входную таблицу». Запрос (тот же DataFrame/SQL, что и в батче) применяется к этой растущей таблице, и Spark инкрементально обновляет «таблицу результата».

Практическое следствие огромно: вы пишете потоковый запрос почти так же, как пакетный — те же select, groupBy, join. Не нужно осваивать отдельный API. Различие лишь в источнике (поток вместо файла) и приёмнике, плюс несколько специфичных для времени понятий.

# Чтение потока (например, из Kafka) — почти как batch
stream = (spark.readStream
    .format("kafka")
    .option("subscribe", "events")
    .load())

# Та же агрегация, что в batch
counts = stream.groupBy("user_id").count()

# Запись результата в приёмник с триггером и чекпойнтом
query = (counts.writeStream
    .outputMode("update")
    .option("checkpointLocation", "/chk/events")
    .start())

Micro-batch: как Spark на самом деле считает поток

Под капотом классический Structured Streaming работает микро-батчами: он не обрабатывает каждое событие по отдельности, а через короткие интервалы (триггер — например, раз в секунду) забирает накопившуюся порцию новых строк и обрабатывает её как маленький батч обычным движком Spark. Так достигается компромисс: задержка в районе секунд (а не миллисекунд, как у чисто событийных систем), но зато весь мощный оптимизированный движок Spark и высокая пропускная способность. Промоделируем эту инкрементальность на Python — состояние счётчиков переносится из батча в батч:

# Поток приходит порциями (микро-батчами). Состояние копится между ними.
micro_batches = [
    ["a", "b", "a"],        # батч 1
    ["b", "c"],             # батч 2
    ["a", "a", "c", "c"],   # батч 3
]

state = {}   # «таблица результата» — живёт между батчами
for i, batch in enumerate(micro_batches, 1):
    for key in batch:
        state[key] = state.get(key, 0) + 1
    print(f"после батча {i}: {dict(sorted(state.items()))}")

Вывод:

после батча 1: {'a': 2, 'b': 1}
после батча 2: {'a': 2, 'b': 2, 'c': 1}
после батча 3: {'a': 4, 'b': 2, 'c': 3}

Ключевое отличие от батча: состояние сохраняется между микро-батчами. Счётчики не считаются заново — каждый новый батч дополняет прошлый результат. Это и есть инкрементальность, а место хранения этого состояния — чекпойнт.

Время события против времени обработки и watermarks

В потоках критично различать два времени: event time (когда событие реально случилось, обычно поле в данных) и processing time (когда Spark его получил). Из-за сетевых задержек и буферизации события приходят не по порядку и с опозданием: событие, случившееся в 12:00, может прийти в 12:05.

Это рождает проблему для оконных агрегаций по времени события («сколько заказов в каждую минуту»). Чтобы посчитать минуту 12:00, нужно дождаться всех её событий — но как понять, что больше опоздавших не будет? Бесконечно ждать нельзя: состояние всех минут копилось бы вечно и съело бы память. Решение — watermark (водяной знак):

Watermark — порог «насколько поздними» событиями мы готовы заниматься: watermark = максимальное виденное время события − допуск. События старше watermark считаются безнадёжно опоздавшими и отбрасываются, а их окна — закрываются и очищаются.

Например, watermark с допуском 10 минут означает: «принимаем опоздания до 10 минут; что опоздало сильнее — игнорируем». Это даёт Spark право закрыть старые окна и очистить их состояние, не давая ему расти бесконечно. Watermark — это компромисс между полнотой (поймать опоздавших) и ограниченностью состояния (не хранить всё вечно).

Режимы вывода и гарантии

Поток пишет результат в одном из режимов: append (только новые финальные строки — для окон, закрытых watermark), update (изменившиеся строки), complete (вся таблица результата заново — для небольших агрегаций). Важная гарантия: при правильно настроенном чекпойнте и идемпотентном приёмнике Structured Streaming даёт exactly-once семантику — каждое событие учтено ровно один раз даже после сбоя и перезапуска. Достигается это тем, что в чекпойнт сохраняются и смещения источника, и состояние агрегаций.

Подводные камни

  • Забыли checkpointLocation. Без чекпойнта поток не переживёт перезапуск: потеряется и состояние, и позиция в источнике, сломается exactly-once. Чекпойнт обязателен.
  • Агрегация по времени без watermark. Состояние окон будет расти бесконечно → неизбежный OOM. Для оконных агрегаций по event time watermark обязателен.
  • Путаница event time и processing time. Окна по времени обработки дают неверную аналитику при опозданиях; для корректности группируйте по event time.
  • Слишком маленький триггер. Чрезмерно частые микро-батчи увеличивают накладные расходы; подбирайте интервал под требования к задержке.

Best practices

  • Пишите потоковые запросы как пакетные — API почти тот же; различия только в источнике/приёмнике и времени.
  • Всегда задавайте checkpointLocation — это основа отказоустойчивости и exactly-once.
  • Для оконных агрегаций по времени события обязательно ставьте watermark, чтобы ограничить состояние.
  • Группируйте аналитику по event time, а не по времени обработки.

Итог

  • Structured Streaming трактует поток как бесконечно растущую таблицу; запрос пишется почти как батч.
  • Под капотом — микро-батчи; состояние сохраняется между ними (инкрементальность).
  • Watermark задаёт допустимое опоздание событий и позволяет закрывать старые окна, ограничивая состояние.
  • Чекпойнт обеспечивает отказоустойчивость и exactly-once; для оконных агрегаций watermark обязателен.
Проверьте себя
1. В чём центральная идея модели Structured Streaming?
AКаждое событие обрабатывается мгновенно и независимо
BПоток трактуется как бесконечно растущая таблица, а запрос пишется почти так же, как пакетный
CДанные обязательно сортируются по времени обработки
DПоток нельзя агрегировать
2. Зачем в потоковой оконной агрегации по времени события нужен watermark?
AЧтобы ускорить обработку
BЧтобы задать допустимое опоздание событий и позволить Spark закрывать старые окна, не давая состоянию расти бесконечно
CЧтобы шифровать данные
DЧтобы отключить чекпойнт
3. Почему для потокового запроса обязателен checkpointLocation?
AЧтобы ускорить shuffle
BЧтобы поток пережил перезапуск: в чекпойнте хранятся смещения источника и состояние, что и обеспечивает отказоустойчивость и exactly-once
CЧтобы уменьшить число партиций
DЭто нужно только для batch-запросов
Поддержать проект