Shuffle, узкие/широкие зависимости и кэширование
Урок раскрывает главную тему производительности Spark — shuffle — и инструмент переиспользования данных: кэширование.
Shuffle — перераспределение данных между партициями по сети так, чтобы записи с одинаковым ключом оказались вместе. Это самая дорогая операция в Spark: задействуются диск, сериализация и сеть.
Узкие vs широкие трансформации
Все трансформации делятся на два класса по тому, нужно ли перемещать данные между партициями.
| Узкие (narrow) | Широкие (wide) | |
| Данные между партициями | не двигаются | перемешиваются по сети (shuffle) |
| Примеры | map, filter, flatMap, union | groupByKey, reduceByKey, join, distinct, repartition, sortBy |
| Стоимость | дёшево, считается локально | дорого: диск + сеть + сериализация |
| Границы stage | внутри одного stage | порождают новый stage |
Узкая трансформация работает с данными, которые уже есть в партиции: чтобы отфильтровать или удвоить значения, не нужно ничего ни у кого спрашивать. Такие операции Spark конвейеризует в один stage.
Широкая трансформация требует собрать вместе записи, разбросанные по разным партициям. Чтобы посчитать сумму по ключу «Москва», нужно, чтобы все строки с этим ключом оказались на одной машине — а они изначально раскиданы по всему кластеру. Сбор «своих» записей вместе и есть shuffle.
Почему shuffle такой дорогой
Shuffle — это «всё против всех» по сети. Механика, шаг за шагом:
- Map-сторона. Каждая задача читает свою партицию и раскладывает записи по «корзинам» в зависимости от хеша ключа: эта запись поедет в партицию 5, та — в партицию 12. Корзины записываются на локальный диск (shuffle write).
- Передача по сети. Каждая reduce-задача стягивает по сети свои корзины со всех map-задач (shuffle read). Если было 200 map-задач и 200 reduce-задач, это потенциально 200×200 сетевых пересылок.
- Сериализация. Чтобы отправить объекты по сети и записать на диск, их нужно сериализовать, а на приёме — десериализовать. Это нагрузка на процессор.
Итог: shuffle бьёт сразу по трём дорогим ресурсам — диск, сеть, процессор. Поэтому главный принцип тюнинга Spark звучит так: минимизируй shuffle. Каждый лишний groupBy, join или repartition — это потенциально гигабайты, ползущие по сети.
Промоделируем суть shuffle на Python: данные раскиданы по партициям, а чтобы агрегировать по ключу, их нужно «перетасовать» так, чтобы один ключ собрался в одном месте.
# Данные раскиданы по 3 партициям (как в кластере).
partitions = [
[("ru", 5), ("us", 3)],
[("ru", 2), ("de", 7)],
[("us", 4), ("ru", 1)],
]
# SHUFFLE: тасуем по ключу так, чтобы один ключ собрался в одной «корзине».
buckets = {}
for part in partitions:
for key, val in part:
buckets.setdefault(key, []).append(val)
print("После shuffle (все значения ключа собраны вместе):")
for key, vals in buckets.items():
print(f" {key}: {vals} -> сумма {sum(vals)}")
Вывод:
После shuffle (все значения ключа собраны вместе): ru: [5, 2, 1] -> сумма 8 us: [3, 4] -> сумма 7 de: [7] -> сумма 7
В нашем примере «перетасовка» — это пара строк в цикле. В кластере те же значения ключа ru физически переезжают с трёх разных машин на одну по сети — вот откуда стоимость.
Канонический пример: word count
Подсчёт частоты слов — «hello world» больших данных, и он отлично показывает узкие и широкие шаги. Цепочка: прочитать строки → flatMap в слова (узкая) → пары (слово, 1) (узкая) → reduceByKey сумма (широкая, здесь shuffle). Узкие шаги летят локально; единственное дорогое место — свод по ключу, где слова переезжают, чтобы одинаковые собрались вместе.
text = "spark считает слова spark делит данные spark быстро считает"
# Узкие шаги: разбить на слова и сделать пары (слово, 1)
words = text.split()
pairs = [(w, 1) for w in words]
# Широкий шаг (shuffle): свод по ключу
counts = {}
for w, c in pairs:
counts[w] = counts.get(w, 0) + c
# Топ по частоте
for w, c in sorted(counts.items(), key=lambda kv: -kv[1]):
print(f"{w}: {c}")
Вывод:
spark: 3 считает: 2 слова: 1 делит: 1 данные: 1 быстро: 1
В реальном Spark тот же word count на RDD: sc.textFile(...).flatMap(lambda l: l.split()).map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b). Единственная широкая операция — reduceByKey, и именно она формирует границу stage и shuffle.
Кэширование: cache и persist
Помним: без кэша каждое действие пересчитывает всю цепочку с нуля по lineage. Если один и тот же набор используется многократно (итеративный алгоритм, несколько отчётов из одной подготовленной таблицы), это расточительно. Кэширование просит Spark материализовать набор и держать его наготове.
cache()— закэшировать с уровнем по умолчанию. Для DataFrame этоMEMORY_AND_DISK(в памяти, при нехватке — спилл на диск).persist(StorageLevel)— то же, но с явным выбором уровня хранения.
Главные уровни хранения:
| Уровень | Что значит |
MEMORY_ONLY | только в памяти; не влезло — будет пересчитано при обращении |
MEMORY_AND_DISK | в памяти, излишки спиллятся на диск (дефолт для DataFrame) |
DISK_ONLY | только на диске (когда память дороже скорости) |
MEMORY_AND_DISK_2 | как MEMORY_AND_DISK, но с репликацией на 2 узла (надёжность) |
Кэш тоже ленив: данные реально лягут в кэш при первом действии. После работы кэш освобождают через unpersist(), чтобы не держать память зря.
Подводные камни
- Кэшировать всё подряд. Кэш занимает память, которой и так не хватает на вычисления. Кэшируйте только то, что реально читается несколько раз. Лишний кэш вытесняет полезные данные и замедляет работу.
- Кэш использован один раз. Если набор читается ровно одним действием, кэш бесполезен (даже вреден — лишняя материализация). Кэш окупается от двух обращений.
- Забыли
unpersist(). Висящий кэш съедает память до конца приложения. - Перекос при shuffle. Если один ключ встречается в разы чаще остальных, после shuffle почти все его записи попадут в одну партицию — одна задача будет считать часами (data skew). Это отдельная тема раздела 5.
Best practices
- Главный девиз тюнинга: уменьшай shuffle. Считай в плане границы stages — это места перемешивания.
- Заменяй groupByKey на reduceByKey/aggregateByKey; фильтруй до join/groupBy, а не после.
- Кэшируй только многократно используемые наборы; после —
unpersist(). - По умолчанию хватает
MEMORY_AND_DISK; меняй уровень осознанно под задачу.
Итог
- Узкие трансформации (map, filter) считаются локально и склеиваются в stage; широкие (groupBy, join, distinct) вызывают shuffle.
- Shuffle дорог, потому что бьёт по диску, сети и процессору; минимизация shuffle — главный приём тюнинга.
- Word count: узкие flatMap/map плюс единственный широкий reduceByKey.
- cache()/persist() избавляют от повторного пересчёта; уровни хранения управляют, где держать данные.
- Кэшируйте только переиспользуемое и освобождайте через unpersist().