Spark SQL, группировки и оконные функции
Урок показывает SQL-лицо Spark: запросы через spark.sql, группировки и мощные оконные функции.
Spark SQL — возможность писать обычные SQL-запросы к DataFrame; внутри они проходят тот же оптимизатор Catalyst, что и DataFrame API.
Два диалекта одного движка
Одну и ту же логику в Spark можно выразить двумя способами — DataFrame API и SQL — и они полностью эквивалентны: оба компилируются в один логический план и оптимизируются Catalyst одинаково. Выбор — дело вкуса и удобства. Чтобы писать SQL, DataFrame регистрируют как временное представление:
# Регистрируем DataFrame как временную таблицу
df.createOrReplaceTempView("orders")
# Теперь можно писать обычный SQL
result = spark.sql("""
SELECT city, SUM(amount) AS total
FROM orders
GROUP BY city
ORDER BY total DESC
""")
Поскольку Spark SQL — это стандартный реляционный SQL, все примеры этого раздела можно прогнать в живой SQL-песочнице: синтаксис SELECT/GROUP BY/JOIN/оконных функций совпадает.
groupBy и агрегации
Группировка — базовая аналитическая операция: схлопнуть строки в группы и посчитать агрегат на каждую. На DataFrame это df.groupBy("city").agg(...), на SQL — GROUP BY. Важно помнить из раздела 2: groupBy — широкая операция, она вызывает shuffle (записи одного города собираются вместе). Живой пример:
CREATE TABLE sales (
id INTEGER PRIMARY KEY AUTOINCREMENT,
city TEXT,
product TEXT,
amount INTEGER
);
INSERT INTO sales (city, product, amount) VALUES
('Москва','A',1500), ('Москва','B',300), ('Москва','A',700),
('Казань','A',900), ('Казань','B',50), ('Самара','A',2000);
-- Агрегации по группам: сумма, среднее, количество
SELECT
city,
COUNT(*) AS orders,
SUM(amount) AS revenue,
ROUND(AVG(amount), 1) AS avg_check
FROM sales
GROUP BY city
ORDER BY revenue DESC;
Вывод:
city | orders | revenue | avg_check Москва | 3 | 2500 | 833.3 Самара | 1 | 2000 | 2000.0 Казань | 2 | 950 | 475.0
Оконные функции: расчёт внутри группы без схлопывания
Часто нужно посчитать что-то по группе, но сохранить все строки: добавить к каждому заказу долю в выручке города, ранг по сумме, нарастающий итог. GROUP BY тут не годится — он схлопывает строки. Это работа для оконных функций: они вычисляют агрегат по «окну» строк (например, по всем строкам того же города), но возвращают значение для каждой строки.
CREATE TABLE sales (
id INTEGER PRIMARY KEY AUTOINCREMENT,
city TEXT,
amount INTEGER
);
INSERT INTO sales (city, amount) VALUES
('Москва',1500), ('Москва',300), ('Москва',700),
('Казань',900), ('Казань',50);
-- Ранг заказа внутри города и доля заказа в выручке города
SELECT
city,
amount,
RANK() OVER (PARTITION BY city ORDER BY amount DESC) AS rank_in_city,
SUM(amount) OVER (PARTITION BY city) AS city_total
FROM sales
ORDER BY city, rank_in_city;
Вывод:
city | amount | rank_in_city | city_total Казань | 900 | 1 | 950 Казань | 50 | 2 | 950 Москва | 1500 | 1 | 2500 Москва | 700 | 2 | 2500 Москва | 300 | 3 | 2500
Ключевые части окна: PARTITION BY делит данные на группы-окна (здесь — по городу), ORDER BY задаёт порядок внутри окна (для ранга и нарастающих итогов). Типичные оконные функции: ROW_NUMBER, RANK, DENSE_RANK (нумерация/ранжирование), LAG/LEAD (предыдущее/следующее значение — для разниц «день к дню»), агрегаты SUM/AVG OVER (...) (нарастающие итоги, доли).
На DataFrame то же делается через Window:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, sum as ssum, col
w = Window.partitionBy("city").orderBy(col("amount").desc())
df.withColumn("rank_in_city", rank().over(w))
Стоимость окон
Оконные функции тоже требуют, чтобы строки одной партиции окна оказались вместе и отсортированы — то есть это shuffle плюс сортировка. На больших данных и широких окнах это дорого. Особенно осторожно с окнами без PARTITION BY: тогда всё окно — это весь датасет, который придётся свести в одну партицию. Это классический способ положить задачу OOM на больших данных.
Подводные камни
- Окно без PARTITION BY на большом наборе. Всё едет в одну партицию → перекос и OOM. Всегда партиционируйте окно осмысленной колонкой, если данные большие.
- GROUP BY там, где нужно окно. Если нужны все исходные строки плюс агрегат — это окно, а не group by с последующим join (последнее — лишний shuffle).
- Несколько окон с разной спецификацией. Каждое уникальное определение окна — потенциально отдельный shuffle. Старайтесь переиспользовать одно определение
Window. - Временные представления живут в сессии.
createTempViewвиден только в текущей SparkSession; для общего доступа естьcreateGlobalTempView.
Best practices
- Выбирайте SQL или DataFrame API по удобству — производительность одинакова (один Catalyst).
- Для расчётов «по группе, но с сохранением строк» используйте оконные функции, а не group by + join.
- Всегда задавайте осмысленный
PARTITION BYв окне на больших данных, чтобы избежать схлопывания в одну партицию. - Помните: и groupBy, и окна — это shuffle; фильтруйте данные до них.
Итог
- Spark SQL и DataFrame API эквивалентны: один план, один Catalyst; SQL включают через createTempView.
- groupBy/agg схлопывают строки в группы (широкая операция, shuffle).
- Оконные функции считают агрегат по окну, не схлопывая строки: PARTITION BY делит, ORDER BY упорядочивает.
- Окна — это shuffle + сортировка; окно без PARTITION BY на больших данных грозит OOM.