Функции, типы, null и чтение/запись (Parquet)

Урок про арсенал колоночных функций, корректную работу с null и грамотное чтение-запись данных, где главный герой — Parquet.

Parquet — колоночный бинарный формат хранения со схемой и сжатием; для Spark он быстрее и компактнее, чем CSV или JSON.

pyspark.sql.functions: язык выражений

Преобразования колонок выражают встроенными функциями из pyspark.sql.functions. Это критично: они выполняются внутри JVM и видны Catalyst (в отличие от Python-UDF). Ключевые:

  • col("name") — ссылка на колонку.
  • lit(0) — литерал (константа) как колонка; нужен, чтобы смешивать константы с колонками.
  • when(cond, a).otherwise(b) — условное выражение (аналог CASE WHEN в SQL).
  • Агрегаты: count, sum, avg, min, max — для группировок (раздел 4).

Логику when/otherwise и агрегаты удобно показать на живой SQL-песочнице — это ровно тот же стандартный SQL:

CREATE TABLE orders (
  id     INTEGER PRIMARY KEY AUTOINCREMENT,
  city   TEXT,
  amount INTEGER
);
INSERT INTO orders (city, amount) VALUES
  ('Москва', 1500), ('Москва', 300), ('Казань', 900),
  ('Казань', 50),   ('Самара', 2000);

-- CASE WHEN == функция when(...).otherwise(...) в DataFrame
SELECT
  city,
  amount,
  CASE WHEN amount >= 1000 THEN 'крупный' ELSE 'мелкий' END AS size
FROM orders
ORDER BY amount DESC;

То же на DataFrame (текст, не запускается):

from pyspark.sql.functions import col, when, lit

df.withColumn(
    "size",
    when(col("amount") >= 1000, lit("крупный")).otherwise(lit("мелкий"))
)

Null: тихий источник багов

В распределённых данных null встречается постоянно, и Spark обращается с ним по правилам SQL, которые легко забыть:

  • Сравнение с null даёт null, а не True/False. col("x") == None не работает — используйте col("x").isNull() / isNotNull().
  • Арифметика с null даёт null. null + 5 = null. Это «отравляет» вычисления, если не подставить значение.
  • Агрегаты игнорируют null. avg, sum пропускают null, а вот count("*") считает все строки, тогда как count("col") — только не-null в колонке. Разное поведение — частая ловушка.

Инструменты: na.fill(value) — заменить null на значение; na.drop() — выбросить строки с null; coalesce(a, b) — вернуть первый не-null. Покажем стандартное поведение null в агрегатах на SQL-песочнице:

CREATE TABLE scores (
  id    INTEGER PRIMARY KEY AUTOINCREMENT,
  name  TEXT,
  score INTEGER
);
INSERT INTO scores (name, score) VALUES
  ('Аня', 80), ('Борис', NULL), ('Вера', 100), ('Глеб', NULL);

-- COUNT(*) считает все строки, COUNT(score) — только не-NULL; AVG игнорирует NULL
SELECT
  COUNT(*)      AS rows_total,
  COUNT(score)  AS with_score,
  AVG(score)    AS avg_score
FROM scores;

Вывод:

rows_total | with_score | avg_score
4          | 2          | 90.0

Почему Parquet, а не CSV/JSON

Формат хранения сильнее влияет на скорость, чем кажется новичкам. Сравнение:

CSV/JSON (строчные, текст)Parquet (колоночный, бинарный)
Схеманет, типы угадываютсявстроена в файл
Сжатиеслабое/нетсильное (snappy/zstd), однородные колонки жмутся отлично
Чтение части колонокнельзя — читается вся строкачитаются только нужные колонки (column pruning)
Pushdown фильтровнетда — статистика в блоках позволяет пропускать целые куски

Колоночность Parquet идеально дружит с Catalyst: если запросу нужны 2 колонки из 100, Spark прочитает с диска только эти 2. Плюс статистика min/max по блокам позволяет вообще пропустить блоки, не попадающие под фильтр. На практике переход CSV → Parquet часто ускоряет пайплайн в разы и уменьшает объём на диске. Вывод прост: храните промежуточные и витринные данные в Parquet (или родственных Delta/Iceberg); CSV/JSON оставляйте лишь для обмена с внешними системами.

Чтение и запись, partitionBy

# Чтение
df = spark.read.parquet("/data/events")

# Запись с режимом и партиционированием по колонке
(df.write
   .mode("overwrite")          # overwrite | append | ignore | error
   .partitionBy("dt")          # раскладывает по папкам dt=2026-01-01/ ...
   .parquet("/out/events"))

partitionBy("dt") физически раскладывает данные по подпапкам вида dt=2026-01-01/. Это partition pruning: запрос с WHERE dt = '2026-01-01' прочитает только нужную папку, не трогая остальные. Мощный приём для таблиц с фильтрацией по дате. Но партиционировать нужно по колонке с умеренной кардинальностью (дата — хорошо, user_id с миллионами значений — катастрофа: миллионы крошечных файлов).

Подводные камни

  • Проблема мелких файлов. Слишком дробное partitionBy (или много мелких партиций при записи) плодит тысячи крошечных файлов. Это убивает производительность: накладные расходы на открытие каждого файла превышают полезную работу. Партиционируйте по разумной колонке и при необходимости coalesce перед записью.
  • Сравнение с null через ==. Молча даёт null (не ошибку), строки «теряются». Только isNull()/isNotNull().
  • CSV для больших данных. Нет схемы, нет column pruning, слабое сжатие — медленно и громоздко.
  • Python-UDF вместо встроенных функций. Теряется оптимизация и появляется «налог» на сериализацию. Сначала ищите готовую функцию в pyspark.sql.functions.

Best practices

  • Выражайте логику встроенными функциями pyspark.sql.functions, а не Python-UDF.
  • Обрабатывайте null явно: isNull/isNotNull, na.fill, coalesce; помните, что count(*) и count(col) считают по-разному.
  • Храните данные в Parquet (или Delta/Iceberg): схема, сжатие, column pruning, pushdown.
  • Партиционируйте при записи по колонке умеренной кардинальности (часто — дата) и берегитесь мелких файлов.

Итог

  • Логику колонок выражают функциями (col, lit, when/otherwise, агрегаты) — они видны Catalyst и быстры.
  • null ведёт себя по правилам SQL: сравнение даёт null, агрегаты игнорируют null, count(*) ≠ count(col).
  • Parquet (колоночный, со схемой и сжатием) предпочтительнее CSV/JSON: column pruning и pushdown.
  • partitionBy ускоряет фильтрацию (partition pruning), но требует колонки умеренной кардинальности и осторожности с мелкими файлами.
Проверьте себя
1. Почему Parquet предпочтительнее CSV для хранения данных в Spark?
AParquet — текстовый формат, его легко читать глазами
BParquet колоночный, со встроенной схемой и сжатием: можно читать только нужные колонки и пропускать блоки по статистике (pushdown)
CCSV не поддерживается Spark
DParquet всегда занимает больше места, но быстрее
2. Как правильно отобрать строки, где колонка x равна null?
Adf.filter(col('x') == None)
Bdf.filter(col('x').isNull())
Cdf.filter(col('x') = null)
Ddf.filter(col('x') is None)
3. В чём опасность чрезмерно дробного partitionBy (например, по колонке с миллионами уникальных значений)?
ASpark откажется записывать данные
BВозникает проблема мелких файлов: тысячи крошечных файлов, накладные расходы на их открытие убивают производительность
CДанные потеряют схему
DPartition pruning перестанет работать
Поддержать проект