Structured Streaming: бесконечная таблица и watermarks
Урок вводит потоковую обработку в Spark через её центральную идею: поток — это бесконечно растущая таблица.
Structured Streaming — модель потоковой обработки, в которой входящий поток данных трактуется как бесконечная таблица, а результат пересчитывается инкрементально по мере прибытия новых строк.
Главная идея: поток как бесконечная таблица
Потоковая обработка обычно кажется чем-то принципиально иным, чем пакетная. Гениальность Structured Streaming в том, что он стирает эту границу: поток — это таблица, которая бесконечно растёт. Каждая новая порция данных — это новые строки, дописываемые в «входную таблицу». Запрос (тот же DataFrame/SQL, что и в батче) применяется к этой растущей таблице, и Spark инкрементально обновляет «таблицу результата».
Практическое следствие огромно: вы пишете потоковый запрос почти так же, как пакетный — те же select, groupBy, join. Не нужно осваивать отдельный API. Различие лишь в источнике (поток вместо файла) и приёмнике, плюс несколько специфичных для времени понятий.
# Чтение потока (например, из Kafka) — почти как batch
stream = (spark.readStream
.format("kafka")
.option("subscribe", "events")
.load())
# Та же агрегация, что в batch
counts = stream.groupBy("user_id").count()
# Запись результата в приёмник с триггером и чекпойнтом
query = (counts.writeStream
.outputMode("update")
.option("checkpointLocation", "/chk/events")
.start())
Micro-batch: как Spark на самом деле считает поток
Под капотом классический Structured Streaming работает микро-батчами: он не обрабатывает каждое событие по отдельности, а через короткие интервалы (триггер — например, раз в секунду) забирает накопившуюся порцию новых строк и обрабатывает её как маленький батч обычным движком Spark. Так достигается компромисс: задержка в районе секунд (а не миллисекунд, как у чисто событийных систем), но зато весь мощный оптимизированный движок Spark и высокая пропускная способность. Промоделируем эту инкрементальность на Python — состояние счётчиков переносится из батча в батч:
# Поток приходит порциями (микро-батчами). Состояние копится между ними.
micro_batches = [
["a", "b", "a"], # батч 1
["b", "c"], # батч 2
["a", "a", "c", "c"], # батч 3
]
state = {} # «таблица результата» — живёт между батчами
for i, batch in enumerate(micro_batches, 1):
for key in batch:
state[key] = state.get(key, 0) + 1
print(f"после батча {i}: {dict(sorted(state.items()))}")
Вывод:
после батча 1: {'a': 2, 'b': 1}
после батча 2: {'a': 2, 'b': 2, 'c': 1}
после батча 3: {'a': 4, 'b': 2, 'c': 3}
Ключевое отличие от батча: состояние сохраняется между микро-батчами. Счётчики не считаются заново — каждый новый батч дополняет прошлый результат. Это и есть инкрементальность, а место хранения этого состояния — чекпойнт.
Время события против времени обработки и watermarks
В потоках критично различать два времени: event time (когда событие реально случилось, обычно поле в данных) и processing time (когда Spark его получил). Из-за сетевых задержек и буферизации события приходят не по порядку и с опозданием: событие, случившееся в 12:00, может прийти в 12:05.
Это рождает проблему для оконных агрегаций по времени события («сколько заказов в каждую минуту»). Чтобы посчитать минуту 12:00, нужно дождаться всех её событий — но как понять, что больше опоздавших не будет? Бесконечно ждать нельзя: состояние всех минут копилось бы вечно и съело бы память. Решение — watermark (водяной знак):
Watermark — порог «насколько поздними» событиями мы готовы заниматься:
watermark = максимальное виденное время события − допуск. События старше watermark считаются безнадёжно опоздавшими и отбрасываются, а их окна — закрываются и очищаются.
Например, watermark с допуском 10 минут означает: «принимаем опоздания до 10 минут; что опоздало сильнее — игнорируем». Это даёт Spark право закрыть старые окна и очистить их состояние, не давая ему расти бесконечно. Watermark — это компромисс между полнотой (поймать опоздавших) и ограниченностью состояния (не хранить всё вечно).
Режимы вывода и гарантии
Поток пишет результат в одном из режимов: append (только новые финальные строки — для окон, закрытых watermark), update (изменившиеся строки), complete (вся таблица результата заново — для небольших агрегаций). Важная гарантия: при правильно настроенном чекпойнте и идемпотентном приёмнике Structured Streaming даёт exactly-once семантику — каждое событие учтено ровно один раз даже после сбоя и перезапуска. Достигается это тем, что в чекпойнт сохраняются и смещения источника, и состояние агрегаций.
Подводные камни
- Забыли checkpointLocation. Без чекпойнта поток не переживёт перезапуск: потеряется и состояние, и позиция в источнике, сломается exactly-once. Чекпойнт обязателен.
- Агрегация по времени без watermark. Состояние окон будет расти бесконечно → неизбежный OOM. Для оконных агрегаций по event time watermark обязателен.
- Путаница event time и processing time. Окна по времени обработки дают неверную аналитику при опозданиях; для корректности группируйте по event time.
- Слишком маленький триггер. Чрезмерно частые микро-батчи увеличивают накладные расходы; подбирайте интервал под требования к задержке.
Best practices
- Пишите потоковые запросы как пакетные — API почти тот же; различия только в источнике/приёмнике и времени.
- Всегда задавайте
checkpointLocation— это основа отказоустойчивости и exactly-once. - Для оконных агрегаций по времени события обязательно ставьте watermark, чтобы ограничить состояние.
- Группируйте аналитику по event time, а не по времени обработки.
Итог
- Structured Streaming трактует поток как бесконечно растущую таблицу; запрос пишется почти как батч.
- Под капотом — микро-батчи; состояние сохраняется между ними (инкрементальность).
- Watermark задаёт допустимое опоздание событий и позволяет закрывать старые окна, ограничивая состояние.
- Чекпойнт обеспечивает отказоустойчивость и exactly-once; для оконных агрегаций watermark обязателен.