JOIN в Spark: broadcast против sort-merge

Урок разбирает самую тяжёлую и самую частую операцию аналитики — join — и две стратегии, между которыми решается её скорость.

Broadcast join рассылает маленькую таблицу целиком на все узлы и соединяет без перемешивания большой; sort-merge join перемешивает и сортирует обе таблицы по ключу. Выбор стратегии решает, будет ли join быстрым.

Join — это потенциально гигантский shuffle

Соединить две таблицы по ключу значит свести вместе строки с одинаковым ключом из обеих таблиц. Если таблицы большие и распределены по кластеру, по умолчанию это требует перемешать обе так, чтобы один ключ оказался на одной машине. Это самый дорогой shuffle в типичном пайплайне: едут по сети не одна, а сразу две большие таблицы. Поэтому понимание стратегий join — ключ к производительности.

Сами по себе типы соединений стандартны (inner, left/right outer, full outer, left semi, left anti) и работают как в обычном SQL. Покажем базовый inner join в живой песочнице:

CREATE TABLE customers (
  id   INTEGER PRIMARY KEY,
  name TEXT,
  city_id INTEGER
);
CREATE TABLE cities (
  id   INTEGER PRIMARY KEY,
  city TEXT
);
INSERT INTO customers (id, name, city_id) VALUES
  (1,'Аня',10), (2,'Борис',20), (3,'Вера',10), (4,'Глеб',30);
INSERT INTO cities (id, city) VALUES
  (10,'Москва'), (20,'Казань'), (30,'Самара');

SELECT c.name, t.city
FROM customers c
JOIN cities t ON c.city_id = t.id
ORDER BY c.name;

Вывод:

name  | city
Аня   | Москва
Борис | Казань
Вера  | Москва
Глеб  | Самара

Здесь cities — крошечный справочник. В Spark это идеальный случай для broadcast join.

Sort-merge join: стратегия по умолчанию для больших таблиц

Когда обе таблицы большие, Spark применяет sort-merge join:

  1. Shuffle обеих таблиц по ключу join: записи с одинаковым ключом отправляются в одну и ту же партицию на одну машину.
  2. Сортировка каждой партиции по ключу.
  3. Слияние: два отсортированных потока проходятся параллельно, сопоставляя совпадающие ключи.

Это надёжно работает на данных любого размера и поддерживает все типы join, но дорого: перемешиваются и сортируются обе таблицы. Это «честная» стратегия для случая «большое к большому».

Broadcast hash join: когда одна таблица маленькая

Если одна из таблиц достаточно мала, чтобы поместиться в память каждого исполнителя, Spark применяет куда более дешёвый broadcast hash join:

  1. Маленькая таблица целиком рассылается (broadcast) на все узлы и строится в хеш-таблицу в памяти.
  2. Каждая партиция большой таблицы соединяется со своей локальной копией маленькой — без перемешивания большой таблицы вообще.

Ключевая выгода: огромную таблицу не нужно гонять по сети. Мы платим за рассылку маленькой (это дёшево), но избегаем самого дорогого — shuffle большой. Для типичной схемы «большой факт + маленький справочник» broadcast join ускоряет соединение в разы.

Spark выбирает broadcast автоматически, если оценивает размер таблицы меньше порога spark.sql.autoBroadcastJoinThreshold (по умолчанию ~10 МБ). Когда оценка неточна (например, после фильтров), можно подсказать явно:

from pyspark.sql.functions import broadcast

# Подсказка: маленькую таблицу cities — broadcast'ом
result = big_facts.join(broadcast(cities), "city_id")

Прикинем выигрыш «на салфетке». Пусть факт — 500 ГБ, справочник — 20 МБ, в кластере 100 исполнителей. Sort-merge join перемешает по сети обе таблицы — порядка 500 ГБ трафика плюс сортировка. Broadcast разошлёт 20 МБ на каждый из 100 узлов — 2 ГБ суммарного трафика — и не тронет 500 ГБ факта вообще. Разница в объёме сетевого обмена — два с лишним порядка, отсюда и кратное ускорение. Именно поэтому связка «большой факт + маленький справочник» почти всегда должна идти через broadcast.

Перекос при join (skew)

Самая коварная проблема join — перекос данных. Если один ключ встречается в большом числе строк (например, 40% заказов привязаны к «складу по умолчанию»), то после shuffle почти все эти строки попадут в одну партицию. Одна задача будет соединять часами, остальной кластер простаивает. Симптом в Spark UI: 199 задач завершились за секунды, а одна висит и висит. Перекос мы детально разберём в разделе 5 (диагностика, salting, AQE skew join), но запомнить нужно сейчас: равномерность ключей join критична для скорости.

Стратегии join в Spark

СтратегияКогда применяетсяСтоимость
Broadcast hash joinодна таблица помещается в память исполнителядёшево: shuffle большой таблицы нет
Sort-merge joinобе таблицы большиедорого: shuffle + сортировка обеих
Shuffle hash joinсредний случай (одна сторона заметно меньше, но не broadcast)shuffle обеих, но без сортировки

Подводные камни

  • Broadcast слишком большой таблицы. Если форсировать broadcast() на таблице, которая не влезает в память исполнителя, получите OOM на каждом узле. Broadcast — только для действительно маленьких.
  • Неверная оценка размера после фильтра. Catalyst оценивает размеры до выполнения и может не заметить, что после фильтра таблица стала крошечной. Тогда поможет явный broadcast().
  • Перекос ключа. Один «горячий» ключ — и sort-merge join зависает на одной задаче. Диагностируйте по Spark UI.
  • Дубликаты ключей раздувают результат. Если ключ не уникален в обеих таблицах, join даёт декартово произведение совпадений — результат может неожиданно вырасти в разы.

Best practices

  • Соединяете большой факт с маленьким справочником — добивайтесь broadcast join (он часто и так выбирается, но проверяйте план и при нужде ставьте broadcast()).
  • Фильтруйте обе таблицы до join, чтобы уменьшить объём shuffle.
  • Следите за равномерностью ключа join; перекос — главный убийца производительности соединений.
  • Проверяйте уникальность ключей, чтобы случайно не получить раздувание результата.
  • Смотрите в explain(), какая стратегия выбрана (BroadcastHashJoin vs SortMergeJoin) — раздел 5.

Итог

  • Join больших таблиц — самый дорогой shuffle: перемешиваются обе таблицы.
  • Sort-merge join (дефолт для «большое к большому») — shuffle + сортировка обеих сторон.
  • Broadcast hash join рассылает маленькую таблицу на все узлы и избегает shuffle большой — выбирайте его для «большой + маленький справочник».
  • Перекос ключа делает одну задачу join бесконечной — следите за равномерностью.
Проверьте себя
1. Почему broadcast hash join дешевле sort-merge join для соединения большой таблицы с маленькой?
AОн не использует ключ соединения
BМаленькая таблица рассылается на все узлы, и большую таблицу не нужно перемешивать (shuffle) по сети
CОн соединяет данные на драйвере
DОн не поддерживает inner join
2. Что произойдёт, если форсировать broadcast() на таблице, которая не помещается в память исполнителя?
ASpark проигнорирует подсказку
BПолучите OutOfMemory на каждом узле, куда таблицу пытаются разослать
CJoin автоматически станет sort-merge
DТаблица запишется на диск
3. Как проявляется перекос данных (skew) при sort-merge join?
AВсе задачи завершаются одновременно
BОдин горячий ключ собирает почти все строки в одну партицию — одна задача висит часами, остальной кластер простаивает
CРезультат join становится пустым
DSpark отказывается выполнять join
Поддержать проект