План выполнения, Catalyst/Tungsten и типичные ошибки
Урок учит читать то, что Spark реально собирается делать — план выполнения — и разбирает самые частые провалы производительности.
План выполнения (execution plan) — пошаговое описание того, как Spark выполнит запрос; его показывает метод
explain(). Умение читать план — главный навык диагностики.
От кода к плану: что делает Catalyst
Мы уже знаем компоненты, теперь соберём картину. Ваш DataFrame/SQL-запрос проходит конвейер: логический план (что хотим) → оптимизация Catalyst (predicate pushdown, column pruning, выбор стратегии join) → физический план (как именно выполним: какие join'ы, где shuffle) → генерация кода Tungsten (WholeStageCodegen). Чтобы увидеть результат, есть explain():
df = spark.read.parquet("/data/orders").filter("amount > 100").groupBy("city").count()
df.explain() # физический план
df.explain(True) # все стадии: parsed, analyzed, optimized, physical
Как читать физический план
Физический план читается снизу вверх (от источников к результату). На что смотреть в первую очередь:
| В плане видно | Что это значит |
Exchange | shuffle — данные перемешиваются по сети. Каждый Exchange = граница stage и дорогая операция. Считайте их. |
BroadcastHashJoin | join через broadcast — хорошо, большая таблица не перемешивается |
SortMergeJoin | join обеих больших таблиц с shuffle и сортировкой — дорого |
WholeStageCodegen | Tungsten сгенерировал единую функцию на stage — хорошо |
PushedFilters у Scan | фильтр протолкнут к источнику — лишнее не читается, отлично |
PartitionFilters | сработал partition pruning — читаются только нужные папки |
Практический алгоритм диагностики: открываете explain() и считаете Exchange (shuffle) — нет ли лишних? Проверяете, какие join выбраны — нет ли неожиданного SortMergeJoin там, где могла быть broadcast? Смотрите, дошли ли PushedFilters до Scan — отбрасывается ли лишнее у источника? Эти три вопроса покрывают большинство проблем производительности.
Разберём упрощённый физический план для запроса «прочитать заказы из Parquet, оставить дорогие, сгруппировать по городу». Читаем снизу вверх:
*(2) HashAggregate(keys=[city], functions=[count])
+- Exchange hashpartitioning(city, 200) <- ВОТ ОН SHUFFLE (граница stage)
+- *(1) HashAggregate(keys=[city], functions=[partial_count])
+- *(1) Project [city] <- осталась только нужная колонка
+- *(1) Filter (amount > 100)
+- *(1) FileScan parquet [city, amount]
PushedFilters: [GreaterThan(amount,100)] <- фильтр дошёл до источника
PartitionFilters: []
Что отсюда видно опытному глазу: (1) есть ровно один Exchange — один shuffle, два stage, нормально для groupBy; (2) PushedFilters показывает, что фильтр amount > 100 протолкнут в чтение Parquet — лишние строки не читаются, отлично; (3) Project [city] и узкий список колонок в FileScan означают, что прочитаны только нужные колонки (column pruning); (4) partial_count перед Exchange — это частичная агрегация до shuffle (map-side combine), по сети едут уже сжатые частичные суммы. Этот план хорош. Если бы вместо PushedFilters фильтр стоял выше FileScan, или Exchange'ей было несколько без причины — это были бы цели для оптимизации.
Типичная ошибка №1: OutOfMemory (OOM)
Самая частая авария. Причины и лечение:
- OOM на драйвере. Обычно от
collect()/toPandas()на большом наборе. Драйвер не должен держать весь датасет — пишите результат в хранилище или используйтеlimit. - OOM на исполнителе. Слишком крупные партиции (одна задача обрабатывает гигабайты), перекос (горячий ключ в одной партиции), broadcast слишком большой таблицы, или окно/сортировка без партиционирования. Лечение: больше/равномернее партиций, борьба с перекосом, не форсировать broadcast больших таблиц, добавить памяти исполнителю.
Типичная ошибка №2: проблема мелких файлов
Запись с большим числом мелких партиций или чрезмерным partitionBy плодит тысячи крошечных файлов. На чтении это катастрофа: открыть и распарсить метаданные каждого файла дороже, чем прочитать его содержимое. Лечение: укрупнять перед записью (coalesce/repartition до разумного числа), партиционировать по колонке умеренной кардинальности, периодически делать compaction (слияние мелких файлов в крупные).
Типичная ошибка №3: лишние shuffle
Каждый Exchange в плане — это сеть и время. Частые источники лишних shuffle:
- groupByKey вместо reduceByKey — лишние данные через сеть.
- Повторное перемешивание уже разложенных данных — если данные уже разбиты по ключу, второй groupBy/join по тому же ключу не должен снова перемешивать; проверьте план.
- Сортировка раньше времени —
orderByв середине пайплайна, результат которого потом всё равно перемешивается, — выброшенная работа. Сортируйте в самом конце. - Лишний
repartition, вставленный «на всякий случай».
Промоделируем главную мысль: стоимость пайплайна грубо пропорциональна числу shuffle. Посчитаем «бюджет shuffle» для наивного и оптимизированного плана.
# Грубая модель: каждый shuffle стоит N единиц, узкие операции — почти 0.
def cost(plan):
return sum(10 if step == "shuffle" else 1 for step in plan)
naive = ["read", "shuffle", "map", "shuffle", "filter", "shuffle", "write"]
tuned = ["read", "filter", "map", "shuffle", "write"] # фильтр до shuffle, лишние убраны
print("Наивный план, shuffle:", naive.count("shuffle"), "стоимость:", cost(naive))
print("Оптимизированный план, shuffle:", tuned.count("shuffle"), "стоимость:", cost(tuned))
print("Ускорение примерно в", round(cost(naive) / cost(tuned), 1), "раза")
Вывод:
Наивный план, shuffle: 3 стоимость: 34 Оптимизированный план, shuffle: 1 стоимость: 14 Ускорение примерно в 2.4 раза
Модель грубая, но мысль верна: убрать лишний shuffle — самый результативный способ ускорить пайплайн, и видно их именно в плане выполнения.
Подводные камни
- Тюнинг вслепую. Крутить конфиги, не глядя в
explain()и Spark UI, — гадание. Сначала смотрите план и метрики, потом меняйте. - Считать, что Catalyst исправит всё. Он силён, но не уберёт перекос, неудачный порядок join или забытый broadcast. Читать план всё равно нужно.
- Игнорировать спиллы (spill). Если в UI видны spill to disk, партиции слишком крупные для памяти — увеличьте число партиций или память.
Best practices
- Диагностику всегда начинайте с
explain()и Spark UI, а не с подбора конфигов. - Считайте
Exchangeв плане — каждый лишний shuffle убирайте. - Проверяйте стратегии join и наличие PushedFilters/PartitionFilters.
- OOM лечите по месту: драйвер (не collect), исполнитель (партиции, перекос, broadcast, память).
- Боритесь с мелкими файлами укрупнением перед записью и разумным partitionBy.
Итог
- Catalyst строит и оптимизирует план, Tungsten его эффективно выполняет; смотреть результат — через
explain(). - В физическом плане ищите Exchange (shuffle), тип join, PushedFilters/PartitionFilters, WholeStageCodegen.
- Типичные ошибки: OOM (драйвер от collect, исполнитель от крупных партиций/перекоса/broadcast), мелкие файлы, лишние shuffle.
- Самый результативный тюнинг — убрать лишний shuffle, а увидеть его можно только в плане.