Партиционирование: сколько партиций, repartition и coalesce

Урок про главный рычаг параллелизма — число партиций — и про два инструмента его менять: repartition и coalesce.

Партиция — кусок данных, обрабатываемый одной задачей на одном ядре. Число партиций задаёт параллелизм и размер каждой задачи; и слишком мало, и слишком много партиций одинаково вредно.

Партиции — это и параллелизм, и баланс нагрузки

Мы уже знаем: одна партиция = одна задача = одно ядро. Отсюда два следствия, которые надо держать в голове постоянно:

  • Мало партиций — простой кластера. Если партиций меньше, чем ядер, часть ядер простаивает. В крайнем случае (одна партиция) весь кластер работает как одна машина.
  • Много мелких партиций — накладные расходы. На каждую задачу есть фиксированная цена планирования и запуска. Десятки тысяч крошечных задач — и кластер занят управлением, а не счётом.
  • Перекошенные партиции — отстающие. Если данные распределены неравномерно, одна гигантская партиция-«отстающий» (straggler) тянет всё время вниз, пока остальные ядра ждут.

Эвристика-ориентир: партиций должно быть в несколько раз больше, чем всего ядер в кластере (чтобы планировщику было чем балансировать и все ядра были заняты), а размер одной партиции — порядка сотни-двух мегабайт (часто называют диапазон ~128–200 МБ как разумную цель). Это не догма, но хорошая отправная точка.

Покажем сайзинг на конкретике. Пусть данных 100 ГБ, в кластере 20 исполнителей по 5 ядер — итого 100 ядер. Целимся в партиции по ~200 МБ: 100 ГБ / 200 МБ ≈ 500 партиций. Проверяем по ядрам: 500 партиций на 100 ядер — это 5 «волн» задач, и планировщику есть чем балансировать перекос, при этом задачи не настолько мелкие, чтобы накладные расходы съели выигрыш. Если бы мы оставили дефолтные 200 партиций, каждая была бы по 500 МБ — крупновато, риск спиллов и OOM; а если бы нарезали 50000 партиций, каждая задача обрабатывала бы по 2 МБ, и планировщик утонул бы в управлении мелочью. Вот так из объёма данных и числа ядер выводят разумное число партиций.

spark.sql.shuffle.partitions: загадочное число 200

После любого shuffle (groupBy, join, оконная функция) число выходных партиций определяется настройкой spark.sql.shuffle.partitions, и по умолчанию она равна 200. Это значение зашито «на все случаи жизни» и почти всегда неоптимально:

  • На маленьких данных 200 партиций — это 200 крошечных задач, львиная доля которых пустая. Чистые накладные расходы.
  • На очень больших данных 200 партиций могут оказаться слишком крупными — каждая задача обрабатывает гигабайты, рискуя OOM и спиллами.

Поэтому одна из первых вещей при тюнинге — настроить это число под объём данных и размер кластера. К счастью, в современном Spark с включённым AQE (раздел 6) число shuffle-партиций подбирается автоматически по реальным размерам — но понимать, откуда берётся 200, обязательно.

repartition против coalesce

Иногда нужно изменить число партиций вручную: укрупнить перед записью (чтобы не плодить мелкие файлы), раздробить, чтобы повысить параллелизм, или перераспределить по ключу. Для этого есть два метода с принципиально разной механикой:

repartition(n)coalesce(n)
Может увеличить число партицийданет (только уменьшить)
Shuffleполный shuffle всех данныхобычно без полного shuffle
Равномерность результатаравномерные партициивозможен перекос (просто склеивает существующие)
Ценадорого (сеть)дёшево

repartition(n) делает полный shuffle: данные заново перемешиваются и раскладываются равномерно на n партиций. Дорого, но результат равномерный; умеет и увеличивать, и уменьшать число партиций; может перераспределить по колонке (repartition("city")).

coalesce(n) только уменьшает число партиций и делает это, склеивая существующие партиции локально, по возможности без полного перемешивания по сети. Дёшево, но партиции могут получиться неравномерными. Идеально применять после сильной фильтрации, когда осталось мало данных, размазанных по многим партициям, и их надо собрать.

Покажем логику «склейки против полного передела» на Python: coalesce объединяет соседние группы (минимум движения), repartition раскидывает всё заново поровну.

partitions = [[1, 2, 3], [4], [], [5, 6], [], [7]]  # 6 партиций, неравномерно

# coalesce(2): просто склеиваем существующие в 2 группы — минимум движения
half = len(partitions) // 2
coalesced = [
    sum(partitions[:half], []),
    sum(partitions[half:], []),
]
print("coalesce(2) — склейка:", coalesced)

# repartition(2): собираем всё и раскладываем РАВНОМЕРНО (полный shuffle)
flat = sum(partitions, [])
repartitioned = [flat[0::2], flat[1::2]]   # раскидываем поэлементно
print("repartition(2) — равномерно:", repartitioned)

Вывод:

coalesce(2) — склейка: [[1, 2, 3, 4], [5, 6, 7]]
repartition(2) — равномерно: [[1, 3, 5, 7], [2, 4, 6]]

Видно суть: coalesce двигает минимум данных (дёшево, но может остаться перекос), repartition перетасовывает всё ради равномерности (дорого, зато ровно).

Когда что применять

  • Уменьшить число партиций перед записью (борьба с мелкими файлами) — coalesce, он дешевле.
  • Увеличить параллелизм или исправить перекосrepartition: только он умеет наращивать число партиций и раскладывать равномерно.
  • Перераспределить по ключу перед серией операций по этому ключу — repartition("key"), чтобы последующие группировки шли без лишних перемешиваний.

Подводные камни

  • Оставить дефолтные 200 на больших данных. Часто это либо слишком мало (гигантские задачи, OOM), либо слишком много (накладные расходы). Настраивайте или полагайтесь на AQE.
  • coalesce(1) ради одного файла на выходе. Сведение всего в одну партицию убивает параллелизм всего предшествующего этапа — вся работа схлопнется в одно ядро. Лучше писать несколько файлов или укрупнять умеренно.
  • Ожидание, что coalesce балансирует. coalesce только склеивает; если исходные партиции были неравномерны, результат тоже будет неравномерным. Для равномерности нужен repartition.
  • Лишний repartition. repartition — это полный shuffle; не вставляйте его «на всякий случай», только при реальной нужде.

Best practices

  • Держите размер партиции в районе сотни-двух мегабайт, а число партиций — кратно больше числа ядер.
  • Уменьшаете число партиций — берите coalesce (дёшево); увеличиваете или нужна равномерность — repartition.
  • Не сводите всё в одну партицию ради одного выходного файла — это убивает параллелизм.
  • Настройте spark.sql.shuffle.partitions под задачу или включите AQE, чтобы Spark подбирал его сам.

Итог

  • Число партиций задаёт параллелизм; мало — простой, много мелких — накладные расходы, перекос — отстающие задачи.
  • После shuffle число партиций берётся из spark.sql.shuffle.partitions (дефолт 200, почти всегда требует настройки).
  • repartition — полный shuffle, равномерно, умеет увеличивать; coalesce — дешёвая склейка, только уменьшает, без гарантии равномерности.
  • coalesce — перед записью; repartition — для параллелизма, равномерности и перераспределения по ключу.
Проверьте себя
1. В чём ключевая разница между repartition(n) и coalesce(n)?
Arepartition только уменьшает число партиций, coalesce — только увеличивает
Brepartition делает полный shuffle и даёт равномерные партиции (умеет увеличивать число), coalesce дёшево склеивает существующие и только уменьшает
CОни полностью идентичны
Dcoalesce всегда дороже repartition
2. Чему по умолчанию равно spark.sql.shuffle.partitions и почему это значение часто неоптимально?
A10 — слишком мало для любых данных
B200 — оно зашито на все случаи: для малых данных это лишние пустые задачи, для очень больших — слишком крупные партиции и риск OOM
CРавно числу ядер кластера
D1000 — слишком много накладных расходов
3. Почему coalesce(1) перед записью часто вредит производительности?
Acoalesce(1) вызывает ошибку
BСведение всего в одну партицию убивает параллелизм предшествующего этапа — вся работа схлопывается в одно ядро
CОн создаёт слишком много мелких файлов
DОн всегда делает полный shuffle
Поддержать проект