План выполнения, Catalyst/Tungsten и типичные ошибки

Урок учит читать то, что Spark реально собирается делать — план выполнения — и разбирает самые частые провалы производительности.

План выполнения (execution plan) — пошаговое описание того, как Spark выполнит запрос; его показывает метод explain(). Умение читать план — главный навык диагностики.

От кода к плану: что делает Catalyst

Мы уже знаем компоненты, теперь соберём картину. Ваш DataFrame/SQL-запрос проходит конвейер: логический план (что хотим) → оптимизация Catalyst (predicate pushdown, column pruning, выбор стратегии join) → физический план (как именно выполним: какие join'ы, где shuffle) → генерация кода Tungsten (WholeStageCodegen). Чтобы увидеть результат, есть explain():

df = spark.read.parquet("/data/orders").filter("amount > 100").groupBy("city").count()

df.explain()             # физический план
df.explain(True)         # все стадии: parsed, analyzed, optimized, physical

Как читать физический план

Физический план читается снизу вверх (от источников к результату). На что смотреть в первую очередь:

В плане видноЧто это значит
Exchangeshuffle — данные перемешиваются по сети. Каждый Exchange = граница stage и дорогая операция. Считайте их.
BroadcastHashJoinjoin через broadcast — хорошо, большая таблица не перемешивается
SortMergeJoinjoin обеих больших таблиц с shuffle и сортировкой — дорого
WholeStageCodegenTungsten сгенерировал единую функцию на stage — хорошо
PushedFilters у Scanфильтр протолкнут к источнику — лишнее не читается, отлично
PartitionFiltersсработал partition pruning — читаются только нужные папки

Практический алгоритм диагностики: открываете explain() и считаете Exchange (shuffle) — нет ли лишних? Проверяете, какие join выбраны — нет ли неожиданного SortMergeJoin там, где могла быть broadcast? Смотрите, дошли ли PushedFilters до Scan — отбрасывается ли лишнее у источника? Эти три вопроса покрывают большинство проблем производительности.

Разберём упрощённый физический план для запроса «прочитать заказы из Parquet, оставить дорогие, сгруппировать по городу». Читаем снизу вверх:

*(2) HashAggregate(keys=[city], functions=[count])
+- Exchange hashpartitioning(city, 200)          <- ВОТ ОН SHUFFLE (граница stage)
   +- *(1) HashAggregate(keys=[city], functions=[partial_count])
      +- *(1) Project [city]                       <- осталась только нужная колонка
         +- *(1) Filter (amount > 100)
            +- *(1) FileScan parquet [city, amount]
                  PushedFilters: [GreaterThan(amount,100)]   <- фильтр дошёл до источника
                  PartitionFilters: []

Что отсюда видно опытному глазу: (1) есть ровно один Exchange — один shuffle, два stage, нормально для groupBy; (2) PushedFilters показывает, что фильтр amount > 100 протолкнут в чтение Parquet — лишние строки не читаются, отлично; (3) Project [city] и узкий список колонок в FileScan означают, что прочитаны только нужные колонки (column pruning); (4) partial_count перед Exchange — это частичная агрегация до shuffle (map-side combine), по сети едут уже сжатые частичные суммы. Этот план хорош. Если бы вместо PushedFilters фильтр стоял выше FileScan, или Exchange'ей было несколько без причины — это были бы цели для оптимизации.

Типичная ошибка №1: OutOfMemory (OOM)

Самая частая авария. Причины и лечение:

  • OOM на драйвере. Обычно от collect()/toPandas() на большом наборе. Драйвер не должен держать весь датасет — пишите результат в хранилище или используйте limit.
  • OOM на исполнителе. Слишком крупные партиции (одна задача обрабатывает гигабайты), перекос (горячий ключ в одной партиции), broadcast слишком большой таблицы, или окно/сортировка без партиционирования. Лечение: больше/равномернее партиций, борьба с перекосом, не форсировать broadcast больших таблиц, добавить памяти исполнителю.

Типичная ошибка №2: проблема мелких файлов

Запись с большим числом мелких партиций или чрезмерным partitionBy плодит тысячи крошечных файлов. На чтении это катастрофа: открыть и распарсить метаданные каждого файла дороже, чем прочитать его содержимое. Лечение: укрупнять перед записью (coalesce/repartition до разумного числа), партиционировать по колонке умеренной кардинальности, периодически делать compaction (слияние мелких файлов в крупные).

Типичная ошибка №3: лишние shuffle

Каждый Exchange в плане — это сеть и время. Частые источники лишних shuffle:

  • groupByKey вместо reduceByKey — лишние данные через сеть.
  • Повторное перемешивание уже разложенных данных — если данные уже разбиты по ключу, второй groupBy/join по тому же ключу не должен снова перемешивать; проверьте план.
  • Сортировка раньше времениorderBy в середине пайплайна, результат которого потом всё равно перемешивается, — выброшенная работа. Сортируйте в самом конце.
  • Лишний repartition, вставленный «на всякий случай».

Промоделируем главную мысль: стоимость пайплайна грубо пропорциональна числу shuffle. Посчитаем «бюджет shuffle» для наивного и оптимизированного плана.

# Грубая модель: каждый shuffle стоит N единиц, узкие операции — почти 0.
def cost(plan):
    return sum(10 if step == "shuffle" else 1 for step in plan)

naive = ["read", "shuffle", "map", "shuffle", "filter", "shuffle", "write"]
tuned = ["read", "filter", "map", "shuffle", "write"]   # фильтр до shuffle, лишние убраны

print("Наивный план, shuffle:", naive.count("shuffle"), "стоимость:", cost(naive))
print("Оптимизированный план, shuffle:", tuned.count("shuffle"), "стоимость:", cost(tuned))
print("Ускорение примерно в", round(cost(naive) / cost(tuned), 1), "раза")

Вывод:

Наивный план, shuffle: 3 стоимость: 34
Оптимизированный план, shuffle: 1 стоимость: 14
Ускорение примерно в 2.4 раза

Модель грубая, но мысль верна: убрать лишний shuffle — самый результативный способ ускорить пайплайн, и видно их именно в плане выполнения.

Подводные камни

  • Тюнинг вслепую. Крутить конфиги, не глядя в explain() и Spark UI, — гадание. Сначала смотрите план и метрики, потом меняйте.
  • Считать, что Catalyst исправит всё. Он силён, но не уберёт перекос, неудачный порядок join или забытый broadcast. Читать план всё равно нужно.
  • Игнорировать спиллы (spill). Если в UI видны spill to disk, партиции слишком крупные для памяти — увеличьте число партиций или память.

Best practices

  • Диагностику всегда начинайте с explain() и Spark UI, а не с подбора конфигов.
  • Считайте Exchange в плане — каждый лишний shuffle убирайте.
  • Проверяйте стратегии join и наличие PushedFilters/PartitionFilters.
  • OOM лечите по месту: драйвер (не collect), исполнитель (партиции, перекос, broadcast, память).
  • Боритесь с мелкими файлами укрупнением перед записью и разумным partitionBy.

Итог

  • Catalyst строит и оптимизирует план, Tungsten его эффективно выполняет; смотреть результат — через explain().
  • В физическом плане ищите Exchange (shuffle), тип join, PushedFilters/PartitionFilters, WholeStageCodegen.
  • Типичные ошибки: OOM (драйвер от collect, исполнитель от крупных партиций/перекоса/broadcast), мелкие файлы, лишние shuffle.
  • Самый результативный тюнинг — убрать лишний shuffle, а увидеть его можно только в плане.
Проверьте себя
1. Что в физическом плане Spark (explain) обозначает оператор Exchange?
AЧтение данных с диска
BShuffle — перемешивание данных между партициями по сети; каждый Exchange это граница stage и дорогая операция
CКэширование данных
DПрименение фильтра
2. Где обычно возникает OutOfMemory, если в коде есть df.collect() на большом датасете?
AНа исполнителе из-за перекоса
BНа драйвере, потому что collect стягивает весь набор в его память
CВ cluster manager
DВ источнике данных
3. Что НЕ исправит оптимизатор Catalyst автоматически?
AПерестановку фильтра ближе к источнику
BОтсечение неиспользуемых колонок
CПерекос данных, неудачный порядок join и забытый broadcast большой таблицы
DВычисление константных выражений
Поддержать проект