Shuffle и перекос данных: диагностика и приёмы
Урок углубляется в две главные боли распределённых вычислений — дорогой shuffle и перекос данных — и в практические приёмы их укрощения.
Перекос данных (data skew) — ситуация, когда несколько «горячих» ключей собирают непропорционально много записей, из-за чего одна задача работает в разы дольше остальных.
Что вызывает shuffle и как его уменьшить
Напомним механику: shuffle перемещает данные между партициями по сети, задействуя диск (запись/чтение корзин), сеть (передача) и процессор (сериализация). Вызывают shuffle все широкие операции: groupBy, join, distinct, repartition, оконные функции, reduceByKey. Полностью убрать shuffle нельзя (иначе агрегации и соединения были бы невозможны), но его объём можно резко сократить:
- Фильтруйте до shuffle. Чем меньше данных входит в groupBy/join, тем дешевле перемешивание. Catalyst старается двигать фильтры сам, но помогайте ему, фильтруя рано.
- Отбрасывайте лишние колонки до shuffle. Перемешивать 3 нужные колонки дешевле, чем 50.
- Заменяйте groupByKey на reduceByKey/aggregateByKey — частичный свод до сети.
- Делайте join маленькой таблицы через broadcast — он вообще избегает shuffle большой таблицы.
- Не перемешивайте лишний раз. Если данные уже разложены по нужному ключу, повторный shuffle не нужен; следите за планом.
Broadcast-переменные
Помимо broadcast join, есть broadcast-переменные для другого случая: когда вашим задачам нужен общий справочник или конфиг (словарь, список, небольшая таблица соответствий), который иначе пришлось бы пересылать в каждую задачу заново. Broadcast-переменная рассылает такие данные на каждый исполнитель один раз и кэширует их там; все задачи на узле читают одну общую копию вместо того, чтобы тащить её с каждой задачей. Это экономит сеть, когда один и тот же небольшой объект нужен множеству задач. Правило то же, что и для broadcast join: объект должен быть маленьким (помещаться в память исполнителя).
Перекос данных: симптомы и причина
Перекос — самая частая причина того, что «job на 99% завершён за минуту, а последний 1% тянется час». Причина в том, что shuffle раскладывает записи по партициям по хешу ключа: если один ключ встречается в десятки раз чаще остальных, почти все его записи попадут в одну партицию, и одна задача будет обрабатывать их в одиночку, пока остальные ядра простаивают.
Диагностика по Spark UI: на вкладке Stages смотрите распределение времени и размера задач. Симптом перекоса — почти все задачи быстрые, а одна-две имеют огромные input/shuffle read и время выполнения. Это и есть «горячий» ключ.
Промоделируем перекос: посчитаем, сколько работы достанется самой нагруженной партиции при неравномерных ключах.
# Ключи сильно перекошены: 'default' встречается чаще всех
keys = ["default"] * 70 + ["a"] * 10 + ["b"] * 10 + ["c"] * 10
num_partitions = 4
def which_partition(key):
# Простой устойчивый хеш ключа -> номер партиции (как делает shuffle)
return sum(ord(ch) for ch in key) % num_partitions
loads = {p: 0 for p in range(num_partitions)}
for k in keys:
loads[which_partition(k)] += 1
print("Записей в каждой партиции:", loads)
print("Самая нагруженная партиция:", max(loads.values()), "из", len(keys))
print("Доля у отстающего:", round(max(loads.values()) / len(keys) * 100), "%")
Вывод:
Записей в каждой партиции: {0: 0, 1: 80, 2: 10, 3: 10}
Самая нагруженная партиция: 80 из 100
Доля у отстающего: 80 %
80% всех записей в одной партиции (горячий ключ default попал в неё с остальными), а одна партиция и вовсе пустая — вот этот один «отстающий» и определит время всего этапа, сколько бы ядер ни простаивало рядом.
Приёмы против перекоса
- AQE skew join (первая линия обороны). Adaptive Query Execution (раздел 6) умеет на лету замечать перекошенную партицию в sort-merge join и автоматически разбивать её на несколько меньших. Часто этого достаточно — поэтому AQE включают в первую очередь.
- Salting (подсаливание). Если AQE не справляется, к «горячему» ключу добавляют случайный суффикс (соль): вместо ключа
default—default_0,default_1, ...,default_N. Это искусственно разбивает одну гигантскую партицию на N меньших. При join маленькую сторону «размножают» на все N вариантов соли, чтобы совпадения сохранились. Приём ручной и трудоёмкий, зато действенный там, где автоматика не помогает. - Broadcast вместо обычного join. Если перекошенный join можно сделать broadcast'ом (маленькая сторона), перекос исчезает: shuffle большой таблицы не происходит вовсе.
- Изолировать горячий ключ. Иногда «горячее» значение (например, NULL или служебный плейсхолдер) выгодно обработать отдельным запросом, а остальное — обычным join.
Подводные камни
- Чрезмерный или преждевременный salting. Salting усложняет код и сам добавляет работы. Сначала включите AQE и убедитесь, что перекос реально есть, прежде чем солить вручную.
- «Просто добавлю партиций». Увеличение числа партиций не лечит перекос: горячий ключ всё равно уедет в одну партицию. Нужно разбивать сам ключ (salting) или менять стратегию.
- Игнорировать NULL-ключи. Огромное число строк с NULL в ключе join — частая причина перекоса; их часто стоит отфильтровать или обработать отдельно.
- Broadcast «справочника», который не мал. Broadcast-переменная или broadcast join на большом объекте кладёт узлы OOM.
Best practices
- Сначала уменьшайте сам shuffle: фильтр и отсечение колонок до широких операций, reduceByKey вместо groupByKey, broadcast маленьких таблиц.
- Диагностируйте перекос по Spark UI: ищите единичные задачи с аномально большим input/временем.
- Против перекоса: первым делом AQE; не помогло — salting или изоляция горячего ключа.
- Используйте broadcast-переменные для общих маленьких справочников, нужных множеству задач.
Итог
- Shuffle вызывают все широкие операции; уменьшают его фильтрацией и отсечением колонок до shuffle, reduceByKey и broadcast.
- Broadcast-переменные рассылают маленький общий справочник на узлы один раз вместо пересылки в каждую задачу.
- Перекос — горячий ключ собирает почти все записи в одну партицию; симптом — одна вечная задача в Spark UI.
- Лечение: AQE skew join (авто), затем salting или broadcast; просто добавить партиций не помогает.