Shuffle, узкие/широкие зависимости и кэширование

Урок раскрывает главную тему производительности Spark — shuffle — и инструмент переиспользования данных: кэширование.

Shuffle — перераспределение данных между партициями по сети так, чтобы записи с одинаковым ключом оказались вместе. Это самая дорогая операция в Spark: задействуются диск, сериализация и сеть.

Узкие vs широкие трансформации

Все трансформации делятся на два класса по тому, нужно ли перемещать данные между партициями.

Узкие (narrow)Широкие (wide)
Данные между партициямине двигаютсяперемешиваются по сети (shuffle)
Примерыmap, filter, flatMap, uniongroupByKey, reduceByKey, join, distinct, repartition, sortBy
Стоимостьдёшево, считается локальнодорого: диск + сеть + сериализация
Границы stageвнутри одного stageпорождают новый stage

Узкая трансформация работает с данными, которые уже есть в партиции: чтобы отфильтровать или удвоить значения, не нужно ничего ни у кого спрашивать. Такие операции Spark конвейеризует в один stage.

Широкая трансформация требует собрать вместе записи, разбросанные по разным партициям. Чтобы посчитать сумму по ключу «Москва», нужно, чтобы все строки с этим ключом оказались на одной машине — а они изначально раскиданы по всему кластеру. Сбор «своих» записей вместе и есть shuffle.

Почему shuffle такой дорогой

Shuffle — это «всё против всех» по сети. Механика, шаг за шагом:

  1. Map-сторона. Каждая задача читает свою партицию и раскладывает записи по «корзинам» в зависимости от хеша ключа: эта запись поедет в партицию 5, та — в партицию 12. Корзины записываются на локальный диск (shuffle write).
  2. Передача по сети. Каждая reduce-задача стягивает по сети свои корзины со всех map-задач (shuffle read). Если было 200 map-задач и 200 reduce-задач, это потенциально 200×200 сетевых пересылок.
  3. Сериализация. Чтобы отправить объекты по сети и записать на диск, их нужно сериализовать, а на приёме — десериализовать. Это нагрузка на процессор.

Итог: shuffle бьёт сразу по трём дорогим ресурсам — диск, сеть, процессор. Поэтому главный принцип тюнинга Spark звучит так: минимизируй shuffle. Каждый лишний groupBy, join или repartition — это потенциально гигабайты, ползущие по сети.

Промоделируем суть shuffle на Python: данные раскиданы по партициям, а чтобы агрегировать по ключу, их нужно «перетасовать» так, чтобы один ключ собрался в одном месте.

# Данные раскиданы по 3 партициям (как в кластере).
partitions = [
    [("ru", 5), ("us", 3)],
    [("ru", 2), ("de", 7)],
    [("us", 4), ("ru", 1)],
]

# SHUFFLE: тасуем по ключу так, чтобы один ключ собрался в одной «корзине».
buckets = {}
for part in partitions:
    for key, val in part:
        buckets.setdefault(key, []).append(val)

print("После shuffle (все значения ключа собраны вместе):")
for key, vals in buckets.items():
    print(f"  {key}: {vals}  -> сумма {sum(vals)}")

Вывод:

После shuffle (все значения ключа собраны вместе):
  ru: [5, 2, 1]  -> сумма 8
  us: [3, 4]  -> сумма 7
  de: [7]  -> сумма 7

В нашем примере «перетасовка» — это пара строк в цикле. В кластере те же значения ключа ru физически переезжают с трёх разных машин на одну по сети — вот откуда стоимость.

Канонический пример: word count

Подсчёт частоты слов — «hello world» больших данных, и он отлично показывает узкие и широкие шаги. Цепочка: прочитать строки → flatMap в слова (узкая) → пары (слово, 1) (узкая) → reduceByKey сумма (широкая, здесь shuffle). Узкие шаги летят локально; единственное дорогое место — свод по ключу, где слова переезжают, чтобы одинаковые собрались вместе.

text = "spark считает слова spark делит данные spark быстро считает"

# Узкие шаги: разбить на слова и сделать пары (слово, 1)
words = text.split()
pairs = [(w, 1) for w in words]

# Широкий шаг (shuffle): свод по ключу
counts = {}
for w, c in pairs:
    counts[w] = counts.get(w, 0) + c

# Топ по частоте
for w, c in sorted(counts.items(), key=lambda kv: -kv[1]):
    print(f"{w}: {c}")

Вывод:

spark: 3
считает: 2
слова: 1
делит: 1
данные: 1
быстро: 1

В реальном Spark тот же word count на RDD: sc.textFile(...).flatMap(lambda l: l.split()).map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b). Единственная широкая операция — reduceByKey, и именно она формирует границу stage и shuffle.

Кэширование: cache и persist

Помним: без кэша каждое действие пересчитывает всю цепочку с нуля по lineage. Если один и тот же набор используется многократно (итеративный алгоритм, несколько отчётов из одной подготовленной таблицы), это расточительно. Кэширование просит Spark материализовать набор и держать его наготове.

  • cache() — закэшировать с уровнем по умолчанию. Для DataFrame это MEMORY_AND_DISK (в памяти, при нехватке — спилл на диск).
  • persist(StorageLevel) — то же, но с явным выбором уровня хранения.

Главные уровни хранения:

УровеньЧто значит
MEMORY_ONLYтолько в памяти; не влезло — будет пересчитано при обращении
MEMORY_AND_DISKв памяти, излишки спиллятся на диск (дефолт для DataFrame)
DISK_ONLYтолько на диске (когда память дороже скорости)
MEMORY_AND_DISK_2как MEMORY_AND_DISK, но с репликацией на 2 узла (надёжность)

Кэш тоже ленив: данные реально лягут в кэш при первом действии. После работы кэш освобождают через unpersist(), чтобы не держать память зря.

Подводные камни

  • Кэшировать всё подряд. Кэш занимает память, которой и так не хватает на вычисления. Кэшируйте только то, что реально читается несколько раз. Лишний кэш вытесняет полезные данные и замедляет работу.
  • Кэш использован один раз. Если набор читается ровно одним действием, кэш бесполезен (даже вреден — лишняя материализация). Кэш окупается от двух обращений.
  • Забыли unpersist(). Висящий кэш съедает память до конца приложения.
  • Перекос при shuffle. Если один ключ встречается в разы чаще остальных, после shuffle почти все его записи попадут в одну партицию — одна задача будет считать часами (data skew). Это отдельная тема раздела 5.

Best practices

  • Главный девиз тюнинга: уменьшай shuffle. Считай в плане границы stages — это места перемешивания.
  • Заменяй groupByKey на reduceByKey/aggregateByKey; фильтруй до join/groupBy, а не после.
  • Кэшируй только многократно используемые наборы; после — unpersist().
  • По умолчанию хватает MEMORY_AND_DISK; меняй уровень осознанно под задачу.

Итог

  • Узкие трансформации (map, filter) считаются локально и склеиваются в stage; широкие (groupBy, join, distinct) вызывают shuffle.
  • Shuffle дорог, потому что бьёт по диску, сети и процессору; минимизация shuffle — главный приём тюнинга.
  • Word count: узкие flatMap/map плюс единственный широкий reduceByKey.
  • cache()/persist() избавляют от повторного пересчёта; уровни хранения управляют, где держать данные.
  • Кэшируйте только переиспользуемое и освобождайте через unpersist().
Проверьте себя
1. Почему shuffle считается самой дорогой операцией в Spark?
AОн удваивает объём данных в памяти
BОн задействует диск, сеть и процессор: записи перетасовываются между партициями по сети с сериализацией
CОн отключает параллелизм
DОн требует репликации всех данных
2. Какая из трансформаций является узкой (narrow) и НЕ вызывает shuffle?
AgroupByKey
Bjoin
Cfilter
Ddistinct
3. Когда кэширование (cache/persist) даёт выигрыш?
AВсегда — кэшировать нужно каждый DataFrame
BКогда один и тот же набор данных используется несколькими действиями и иначе пересчитывался бы заново
CТолько при записи на диск
DКогда набор читается ровно один раз
Поддержать проект