Shuffle и перекос данных: диагностика и приёмы

Урок углубляется в две главные боли распределённых вычислений — дорогой shuffle и перекос данных — и в практические приёмы их укрощения.

Перекос данных (data skew) — ситуация, когда несколько «горячих» ключей собирают непропорционально много записей, из-за чего одна задача работает в разы дольше остальных.

Что вызывает shuffle и как его уменьшить

Напомним механику: shuffle перемещает данные между партициями по сети, задействуя диск (запись/чтение корзин), сеть (передача) и процессор (сериализация). Вызывают shuffle все широкие операции: groupBy, join, distinct, repartition, оконные функции, reduceByKey. Полностью убрать shuffle нельзя (иначе агрегации и соединения были бы невозможны), но его объём можно резко сократить:

  • Фильтруйте до shuffle. Чем меньше данных входит в groupBy/join, тем дешевле перемешивание. Catalyst старается двигать фильтры сам, но помогайте ему, фильтруя рано.
  • Отбрасывайте лишние колонки до shuffle. Перемешивать 3 нужные колонки дешевле, чем 50.
  • Заменяйте groupByKey на reduceByKey/aggregateByKey — частичный свод до сети.
  • Делайте join маленькой таблицы через broadcast — он вообще избегает shuffle большой таблицы.
  • Не перемешивайте лишний раз. Если данные уже разложены по нужному ключу, повторный shuffle не нужен; следите за планом.

Broadcast-переменные

Помимо broadcast join, есть broadcast-переменные для другого случая: когда вашим задачам нужен общий справочник или конфиг (словарь, список, небольшая таблица соответствий), который иначе пришлось бы пересылать в каждую задачу заново. Broadcast-переменная рассылает такие данные на каждый исполнитель один раз и кэширует их там; все задачи на узле читают одну общую копию вместо того, чтобы тащить её с каждой задачей. Это экономит сеть, когда один и тот же небольшой объект нужен множеству задач. Правило то же, что и для broadcast join: объект должен быть маленьким (помещаться в память исполнителя).

Перекос данных: симптомы и причина

Перекос — самая частая причина того, что «job на 99% завершён за минуту, а последний 1% тянется час». Причина в том, что shuffle раскладывает записи по партициям по хешу ключа: если один ключ встречается в десятки раз чаще остальных, почти все его записи попадут в одну партицию, и одна задача будет обрабатывать их в одиночку, пока остальные ядра простаивают.

Диагностика по Spark UI: на вкладке Stages смотрите распределение времени и размера задач. Симптом перекоса — почти все задачи быстрые, а одна-две имеют огромные input/shuffle read и время выполнения. Это и есть «горячий» ключ.

Промоделируем перекос: посчитаем, сколько работы достанется самой нагруженной партиции при неравномерных ключах.

# Ключи сильно перекошены: 'default' встречается чаще всех
keys = ["default"] * 70 + ["a"] * 10 + ["b"] * 10 + ["c"] * 10

num_partitions = 4

def which_partition(key):
    # Простой устойчивый хеш ключа -> номер партиции (как делает shuffle)
    return sum(ord(ch) for ch in key) % num_partitions

loads = {p: 0 for p in range(num_partitions)}
for k in keys:
    loads[which_partition(k)] += 1

print("Записей в каждой партиции:", loads)
print("Самая нагруженная партиция:", max(loads.values()), "из", len(keys))
print("Доля у отстающего:", round(max(loads.values()) / len(keys) * 100), "%")

Вывод:

Записей в каждой партиции: {0: 0, 1: 80, 2: 10, 3: 10}
Самая нагруженная партиция: 80 из 100
Доля у отстающего: 80 %

80% всех записей в одной партиции (горячий ключ default попал в неё с остальными), а одна партиция и вовсе пустая — вот этот один «отстающий» и определит время всего этапа, сколько бы ядер ни простаивало рядом.

Приёмы против перекоса

  • AQE skew join (первая линия обороны). Adaptive Query Execution (раздел 6) умеет на лету замечать перекошенную партицию в sort-merge join и автоматически разбивать её на несколько меньших. Часто этого достаточно — поэтому AQE включают в первую очередь.
  • Salting (подсаливание). Если AQE не справляется, к «горячему» ключу добавляют случайный суффикс (соль): вместо ключа defaultdefault_0, default_1, ..., default_N. Это искусственно разбивает одну гигантскую партицию на N меньших. При join маленькую сторону «размножают» на все N вариантов соли, чтобы совпадения сохранились. Приём ручной и трудоёмкий, зато действенный там, где автоматика не помогает.
  • Broadcast вместо обычного join. Если перекошенный join можно сделать broadcast'ом (маленькая сторона), перекос исчезает: shuffle большой таблицы не происходит вовсе.
  • Изолировать горячий ключ. Иногда «горячее» значение (например, NULL или служебный плейсхолдер) выгодно обработать отдельным запросом, а остальное — обычным join.

Подводные камни

  • Чрезмерный или преждевременный salting. Salting усложняет код и сам добавляет работы. Сначала включите AQE и убедитесь, что перекос реально есть, прежде чем солить вручную.
  • «Просто добавлю партиций». Увеличение числа партиций не лечит перекос: горячий ключ всё равно уедет в одну партицию. Нужно разбивать сам ключ (salting) или менять стратегию.
  • Игнорировать NULL-ключи. Огромное число строк с NULL в ключе join — частая причина перекоса; их часто стоит отфильтровать или обработать отдельно.
  • Broadcast «справочника», который не мал. Broadcast-переменная или broadcast join на большом объекте кладёт узлы OOM.

Best practices

  • Сначала уменьшайте сам shuffle: фильтр и отсечение колонок до широких операций, reduceByKey вместо groupByKey, broadcast маленьких таблиц.
  • Диагностируйте перекос по Spark UI: ищите единичные задачи с аномально большим input/временем.
  • Против перекоса: первым делом AQE; не помогло — salting или изоляция горячего ключа.
  • Используйте broadcast-переменные для общих маленьких справочников, нужных множеству задач.

Итог

  • Shuffle вызывают все широкие операции; уменьшают его фильтрацией и отсечением колонок до shuffle, reduceByKey и broadcast.
  • Broadcast-переменные рассылают маленький общий справочник на узлы один раз вместо пересылки в каждую задачу.
  • Перекос — горячий ключ собирает почти все записи в одну партицию; симптом — одна вечная задача в Spark UI.
  • Лечение: AQE skew join (авто), затем salting или broadcast; просто добавить партиций не помогает.
Проверьте себя
1. Почему простое увеличение числа партиций НЕ лечит перекос данных (data skew)?
AБольше партиций всегда замедляет работу
BГорячий ключ по хешу всё равно уедет в одну партицию — её нужно разбивать (salting) или менять стратегию join
CПерекос лечится только кэшированием
DУвеличение партиций отключает shuffle
2. В чём идея salting (подсаливания) при борьбе с перекосом?
AЗашифровать ключи join
BДобавить к горячему ключу случайный суффикс, разбив одну гигантскую партицию на несколько меньших; маленькую сторону join при этом размножают на все варианты соли
CУдалить строки с горячим ключом
DУвеличить spark.sql.shuffle.partitions
3. Зачем нужны broadcast-переменные (помимо broadcast join)?
AЧтобы реплицировать большие таблицы
BЧтобы разослать маленький общий справочник/конфиг на каждый исполнитель один раз, вместо пересылки его в каждую задачу
CЧтобы отключить shuffle
DЧтобы увеличить число партиций
Поддержать проект