JOIN в Spark: broadcast против sort-merge
Урок разбирает самую тяжёлую и самую частую операцию аналитики — join — и две стратегии, между которыми решается её скорость.
Broadcast join рассылает маленькую таблицу целиком на все узлы и соединяет без перемешивания большой; sort-merge join перемешивает и сортирует обе таблицы по ключу. Выбор стратегии решает, будет ли join быстрым.
Join — это потенциально гигантский shuffle
Соединить две таблицы по ключу значит свести вместе строки с одинаковым ключом из обеих таблиц. Если таблицы большие и распределены по кластеру, по умолчанию это требует перемешать обе так, чтобы один ключ оказался на одной машине. Это самый дорогой shuffle в типичном пайплайне: едут по сети не одна, а сразу две большие таблицы. Поэтому понимание стратегий join — ключ к производительности.
Сами по себе типы соединений стандартны (inner, left/right outer, full outer, left semi, left anti) и работают как в обычном SQL. Покажем базовый inner join в живой песочнице:
CREATE TABLE customers (
id INTEGER PRIMARY KEY,
name TEXT,
city_id INTEGER
);
CREATE TABLE cities (
id INTEGER PRIMARY KEY,
city TEXT
);
INSERT INTO customers (id, name, city_id) VALUES
(1,'Аня',10), (2,'Борис',20), (3,'Вера',10), (4,'Глеб',30);
INSERT INTO cities (id, city) VALUES
(10,'Москва'), (20,'Казань'), (30,'Самара');
SELECT c.name, t.city
FROM customers c
JOIN cities t ON c.city_id = t.id
ORDER BY c.name;
Вывод:
name | city Аня | Москва Борис | Казань Вера | Москва Глеб | Самара
Здесь cities — крошечный справочник. В Spark это идеальный случай для broadcast join.
Sort-merge join: стратегия по умолчанию для больших таблиц
Когда обе таблицы большие, Spark применяет sort-merge join:
- Shuffle обеих таблиц по ключу join: записи с одинаковым ключом отправляются в одну и ту же партицию на одну машину.
- Сортировка каждой партиции по ключу.
- Слияние: два отсортированных потока проходятся параллельно, сопоставляя совпадающие ключи.
Это надёжно работает на данных любого размера и поддерживает все типы join, но дорого: перемешиваются и сортируются обе таблицы. Это «честная» стратегия для случая «большое к большому».
Broadcast hash join: когда одна таблица маленькая
Если одна из таблиц достаточно мала, чтобы поместиться в память каждого исполнителя, Spark применяет куда более дешёвый broadcast hash join:
- Маленькая таблица целиком рассылается (broadcast) на все узлы и строится в хеш-таблицу в памяти.
- Каждая партиция большой таблицы соединяется со своей локальной копией маленькой — без перемешивания большой таблицы вообще.
Ключевая выгода: огромную таблицу не нужно гонять по сети. Мы платим за рассылку маленькой (это дёшево), но избегаем самого дорогого — shuffle большой. Для типичной схемы «большой факт + маленький справочник» broadcast join ускоряет соединение в разы.
Spark выбирает broadcast автоматически, если оценивает размер таблицы меньше порога spark.sql.autoBroadcastJoinThreshold (по умолчанию ~10 МБ). Когда оценка неточна (например, после фильтров), можно подсказать явно:
from pyspark.sql.functions import broadcast
# Подсказка: маленькую таблицу cities — broadcast'ом
result = big_facts.join(broadcast(cities), "city_id")
Прикинем выигрыш «на салфетке». Пусть факт — 500 ГБ, справочник — 20 МБ, в кластере 100 исполнителей. Sort-merge join перемешает по сети обе таблицы — порядка 500 ГБ трафика плюс сортировка. Broadcast разошлёт 20 МБ на каждый из 100 узлов — 2 ГБ суммарного трафика — и не тронет 500 ГБ факта вообще. Разница в объёме сетевого обмена — два с лишним порядка, отсюда и кратное ускорение. Именно поэтому связка «большой факт + маленький справочник» почти всегда должна идти через broadcast.
Перекос при join (skew)
Самая коварная проблема join — перекос данных. Если один ключ встречается в большом числе строк (например, 40% заказов привязаны к «складу по умолчанию»), то после shuffle почти все эти строки попадут в одну партицию. Одна задача будет соединять часами, остальной кластер простаивает. Симптом в Spark UI: 199 задач завершились за секунды, а одна висит и висит. Перекос мы детально разберём в разделе 5 (диагностика, salting, AQE skew join), но запомнить нужно сейчас: равномерность ключей join критична для скорости.
Стратегии join в Spark
| Стратегия | Когда применяется | Стоимость |
| Broadcast hash join | одна таблица помещается в память исполнителя | дёшево: shuffle большой таблицы нет |
| Sort-merge join | обе таблицы большие | дорого: shuffle + сортировка обеих |
| Shuffle hash join | средний случай (одна сторона заметно меньше, но не broadcast) | shuffle обеих, но без сортировки |
Подводные камни
- Broadcast слишком большой таблицы. Если форсировать
broadcast()на таблице, которая не влезает в память исполнителя, получите OOM на каждом узле. Broadcast — только для действительно маленьких. - Неверная оценка размера после фильтра. Catalyst оценивает размеры до выполнения и может не заметить, что после фильтра таблица стала крошечной. Тогда поможет явный
broadcast(). - Перекос ключа. Один «горячий» ключ — и sort-merge join зависает на одной задаче. Диагностируйте по Spark UI.
- Дубликаты ключей раздувают результат. Если ключ не уникален в обеих таблицах, join даёт декартово произведение совпадений — результат может неожиданно вырасти в разы.
Best practices
- Соединяете большой факт с маленьким справочником — добивайтесь broadcast join (он часто и так выбирается, но проверяйте план и при нужде ставьте
broadcast()). - Фильтруйте обе таблицы до join, чтобы уменьшить объём shuffle.
- Следите за равномерностью ключа join; перекос — главный убийца производительности соединений.
- Проверяйте уникальность ключей, чтобы случайно не получить раздувание результата.
- Смотрите в
explain(), какая стратегия выбрана (BroadcastHashJoin vs SortMergeJoin) — раздел 5.
Итог
- Join больших таблиц — самый дорогой shuffle: перемешиваются обе таблицы.
- Sort-merge join (дефолт для «большое к большому») — shuffle + сортировка обеих сторон.
- Broadcast hash join рассылает маленькую таблицу на все узлы и избегает shuffle большой — выбирайте его для «большой + маленький справочник».
- Перекос ключа делает одну задачу join бесконечной — следите за равномерностью.