Spark SQL, группировки и оконные функции

Урок показывает SQL-лицо Spark: запросы через spark.sql, группировки и мощные оконные функции.

Spark SQL — возможность писать обычные SQL-запросы к DataFrame; внутри они проходят тот же оптимизатор Catalyst, что и DataFrame API.

Два диалекта одного движка

Одну и ту же логику в Spark можно выразить двумя способами — DataFrame API и SQL — и они полностью эквивалентны: оба компилируются в один логический план и оптимизируются Catalyst одинаково. Выбор — дело вкуса и удобства. Чтобы писать SQL, DataFrame регистрируют как временное представление:

# Регистрируем DataFrame как временную таблицу
df.createOrReplaceTempView("orders")

# Теперь можно писать обычный SQL
result = spark.sql("""
    SELECT city, SUM(amount) AS total
    FROM orders
    GROUP BY city
    ORDER BY total DESC
""")

Поскольку Spark SQL — это стандартный реляционный SQL, все примеры этого раздела можно прогнать в живой SQL-песочнице: синтаксис SELECT/GROUP BY/JOIN/оконных функций совпадает.

groupBy и агрегации

Группировка — базовая аналитическая операция: схлопнуть строки в группы и посчитать агрегат на каждую. На DataFrame это df.groupBy("city").agg(...), на SQL — GROUP BY. Важно помнить из раздела 2: groupBy — широкая операция, она вызывает shuffle (записи одного города собираются вместе). Живой пример:

CREATE TABLE sales (
  id     INTEGER PRIMARY KEY AUTOINCREMENT,
  city   TEXT,
  product TEXT,
  amount INTEGER
);
INSERT INTO sales (city, product, amount) VALUES
  ('Москва','A',1500), ('Москва','B',300), ('Москва','A',700),
  ('Казань','A',900),  ('Казань','B',50),  ('Самара','A',2000);

-- Агрегации по группам: сумма, среднее, количество
SELECT
  city,
  COUNT(*)        AS orders,
  SUM(amount)     AS revenue,
  ROUND(AVG(amount), 1) AS avg_check
FROM sales
GROUP BY city
ORDER BY revenue DESC;

Вывод:

city   | orders | revenue | avg_check
Москва | 3      | 2500    | 833.3
Самара | 1      | 2000    | 2000.0
Казань | 2      | 950     | 475.0

Оконные функции: расчёт внутри группы без схлопывания

Часто нужно посчитать что-то по группе, но сохранить все строки: добавить к каждому заказу долю в выручке города, ранг по сумме, нарастающий итог. GROUP BY тут не годится — он схлопывает строки. Это работа для оконных функций: они вычисляют агрегат по «окну» строк (например, по всем строкам того же города), но возвращают значение для каждой строки.

CREATE TABLE sales (
  id     INTEGER PRIMARY KEY AUTOINCREMENT,
  city   TEXT,
  amount INTEGER
);
INSERT INTO sales (city, amount) VALUES
  ('Москва',1500), ('Москва',300), ('Москва',700),
  ('Казань',900),  ('Казань',50);

-- Ранг заказа внутри города и доля заказа в выручке города
SELECT
  city,
  amount,
  RANK() OVER (PARTITION BY city ORDER BY amount DESC) AS rank_in_city,
  SUM(amount) OVER (PARTITION BY city) AS city_total
FROM sales
ORDER BY city, rank_in_city;

Вывод:

city   | amount | rank_in_city | city_total
Казань | 900    | 1            | 950
Казань | 50     | 2            | 950
Москва | 1500   | 1            | 2500
Москва | 700    | 2            | 2500
Москва | 300    | 3            | 2500

Ключевые части окна: PARTITION BY делит данные на группы-окна (здесь — по городу), ORDER BY задаёт порядок внутри окна (для ранга и нарастающих итогов). Типичные оконные функции: ROW_NUMBER, RANK, DENSE_RANK (нумерация/ранжирование), LAG/LEAD (предыдущее/следующее значение — для разниц «день к дню»), агрегаты SUM/AVG OVER (...) (нарастающие итоги, доли).

На DataFrame то же делается через Window:

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, sum as ssum, col

w = Window.partitionBy("city").orderBy(col("amount").desc())
df.withColumn("rank_in_city", rank().over(w))

Стоимость окон

Оконные функции тоже требуют, чтобы строки одной партиции окна оказались вместе и отсортированы — то есть это shuffle плюс сортировка. На больших данных и широких окнах это дорого. Особенно осторожно с окнами без PARTITION BY: тогда всё окно — это весь датасет, который придётся свести в одну партицию. Это классический способ положить задачу OOM на больших данных.

Подводные камни

  • Окно без PARTITION BY на большом наборе. Всё едет в одну партицию → перекос и OOM. Всегда партиционируйте окно осмысленной колонкой, если данные большие.
  • GROUP BY там, где нужно окно. Если нужны все исходные строки плюс агрегат — это окно, а не group by с последующим join (последнее — лишний shuffle).
  • Несколько окон с разной спецификацией. Каждое уникальное определение окна — потенциально отдельный shuffle. Старайтесь переиспользовать одно определение Window.
  • Временные представления живут в сессии. createTempView виден только в текущей SparkSession; для общего доступа есть createGlobalTempView.

Best practices

  • Выбирайте SQL или DataFrame API по удобству — производительность одинакова (один Catalyst).
  • Для расчётов «по группе, но с сохранением строк» используйте оконные функции, а не group by + join.
  • Всегда задавайте осмысленный PARTITION BY в окне на больших данных, чтобы избежать схлопывания в одну партицию.
  • Помните: и groupBy, и окна — это shuffle; фильтруйте данные до них.

Итог

  • Spark SQL и DataFrame API эквивалентны: один план, один Catalyst; SQL включают через createTempView.
  • groupBy/agg схлопывают строки в группы (широкая операция, shuffle).
  • Оконные функции считают агрегат по окну, не схлопывая строки: PARTITION BY делит, ORDER BY упорядочивает.
  • Окна — это shuffle + сортировка; окно без PARTITION BY на больших данных грозит OOM.
Проверьте себя
1. Чем оконная функция (например, SUM(amount) OVER (PARTITION BY city)) отличается от GROUP BY?
AОконная функция работает быстрее, потому что не вызывает shuffle
BGROUP BY схлопывает строки в группы, а оконная функция считает агрегат по окну, сохраняя все исходные строки
CОконные функции не поддерживаются в Spark SQL
DРазницы нет
2. Почему оконная функция без PARTITION BY опасна на больших данных?
AОна не компилируется
BВсё окно становится одной партицией — весь датасет сводится в одно место, грозит перекос и OOM
CОна игнорирует ORDER BY
DОна удаляет дубликаты
3. В каком отношении находятся Spark SQL и DataFrame API по производительности?
ASQL всегда медленнее, потому что парсится
BDataFrame API всегда быстрее
CОни эквивалентны: оба компилируются в один логический план и оптимизируются одним Catalyst
DЗависит от языка (Python/Scala)
Поддержать проект