Функции, типы, null и чтение/запись (Parquet)
Урок про арсенал колоночных функций, корректную работу с null и грамотное чтение-запись данных, где главный герой — Parquet.
Parquet — колоночный бинарный формат хранения со схемой и сжатием; для Spark он быстрее и компактнее, чем CSV или JSON.
pyspark.sql.functions: язык выражений
Преобразования колонок выражают встроенными функциями из pyspark.sql.functions. Это критично: они выполняются внутри JVM и видны Catalyst (в отличие от Python-UDF). Ключевые:
col("name")— ссылка на колонку.lit(0)— литерал (константа) как колонка; нужен, чтобы смешивать константы с колонками.when(cond, a).otherwise(b)— условное выражение (аналог CASE WHEN в SQL).- Агрегаты:
count,sum,avg,min,max— для группировок (раздел 4).
Логику when/otherwise и агрегаты удобно показать на живой SQL-песочнице — это ровно тот же стандартный SQL:
CREATE TABLE orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
city TEXT,
amount INTEGER
);
INSERT INTO orders (city, amount) VALUES
('Москва', 1500), ('Москва', 300), ('Казань', 900),
('Казань', 50), ('Самара', 2000);
-- CASE WHEN == функция when(...).otherwise(...) в DataFrame
SELECT
city,
amount,
CASE WHEN amount >= 1000 THEN 'крупный' ELSE 'мелкий' END AS size
FROM orders
ORDER BY amount DESC;
То же на DataFrame (текст, не запускается):
from pyspark.sql.functions import col, when, lit
df.withColumn(
"size",
when(col("amount") >= 1000, lit("крупный")).otherwise(lit("мелкий"))
)
Null: тихий источник багов
В распределённых данных null встречается постоянно, и Spark обращается с ним по правилам SQL, которые легко забыть:
- Сравнение с null даёт null, а не True/False.
col("x") == Noneне работает — используйтеcol("x").isNull()/isNotNull(). - Арифметика с null даёт null.
null + 5= null. Это «отравляет» вычисления, если не подставить значение. - Агрегаты игнорируют null.
avg,sumпропускают null, а вотcount("*")считает все строки, тогда какcount("col")— только не-null в колонке. Разное поведение — частая ловушка.
Инструменты: na.fill(value) — заменить null на значение; na.drop() — выбросить строки с null; coalesce(a, b) — вернуть первый не-null. Покажем стандартное поведение null в агрегатах на SQL-песочнице:
CREATE TABLE scores (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
score INTEGER
);
INSERT INTO scores (name, score) VALUES
('Аня', 80), ('Борис', NULL), ('Вера', 100), ('Глеб', NULL);
-- COUNT(*) считает все строки, COUNT(score) — только не-NULL; AVG игнорирует NULL
SELECT
COUNT(*) AS rows_total,
COUNT(score) AS with_score,
AVG(score) AS avg_score
FROM scores;
Вывод:
rows_total | with_score | avg_score 4 | 2 | 90.0
Почему Parquet, а не CSV/JSON
Формат хранения сильнее влияет на скорость, чем кажется новичкам. Сравнение:
| CSV/JSON (строчные, текст) | Parquet (колоночный, бинарный) | |
| Схема | нет, типы угадываются | встроена в файл |
| Сжатие | слабое/нет | сильное (snappy/zstd), однородные колонки жмутся отлично |
| Чтение части колонок | нельзя — читается вся строка | читаются только нужные колонки (column pruning) |
| Pushdown фильтров | нет | да — статистика в блоках позволяет пропускать целые куски |
Колоночность Parquet идеально дружит с Catalyst: если запросу нужны 2 колонки из 100, Spark прочитает с диска только эти 2. Плюс статистика min/max по блокам позволяет вообще пропустить блоки, не попадающие под фильтр. На практике переход CSV → Parquet часто ускоряет пайплайн в разы и уменьшает объём на диске. Вывод прост: храните промежуточные и витринные данные в Parquet (или родственных Delta/Iceberg); CSV/JSON оставляйте лишь для обмена с внешними системами.
Чтение и запись, partitionBy
# Чтение
df = spark.read.parquet("/data/events")
# Запись с режимом и партиционированием по колонке
(df.write
.mode("overwrite") # overwrite | append | ignore | error
.partitionBy("dt") # раскладывает по папкам dt=2026-01-01/ ...
.parquet("/out/events"))
partitionBy("dt") физически раскладывает данные по подпапкам вида dt=2026-01-01/. Это partition pruning: запрос с WHERE dt = '2026-01-01' прочитает только нужную папку, не трогая остальные. Мощный приём для таблиц с фильтрацией по дате. Но партиционировать нужно по колонке с умеренной кардинальностью (дата — хорошо, user_id с миллионами значений — катастрофа: миллионы крошечных файлов).
Подводные камни
- Проблема мелких файлов. Слишком дробное
partitionBy(или много мелких партиций при записи) плодит тысячи крошечных файлов. Это убивает производительность: накладные расходы на открытие каждого файла превышают полезную работу. Партиционируйте по разумной колонке и при необходимостиcoalesceперед записью. - Сравнение с null через
==. Молча даёт null (не ошибку), строки «теряются». ТолькоisNull()/isNotNull(). - CSV для больших данных. Нет схемы, нет column pruning, слабое сжатие — медленно и громоздко.
- Python-UDF вместо встроенных функций. Теряется оптимизация и появляется «налог» на сериализацию. Сначала ищите готовую функцию в
pyspark.sql.functions.
Best practices
- Выражайте логику встроенными функциями
pyspark.sql.functions, а не Python-UDF. - Обрабатывайте null явно:
isNull/isNotNull,na.fill,coalesce; помните, что count(*) и count(col) считают по-разному. - Храните данные в Parquet (или Delta/Iceberg): схема, сжатие, column pruning, pushdown.
- Партиционируйте при записи по колонке умеренной кардинальности (часто — дата) и берегитесь мелких файлов.
Итог
- Логику колонок выражают функциями (col, lit, when/otherwise, агрегаты) — они видны Catalyst и быстры.
- null ведёт себя по правилам SQL: сравнение даёт null, агрегаты игнорируют null, count(*) ≠ count(col).
- Parquet (колоночный, со схемой и сжатием) предпочтительнее CSV/JSON: column pruning и pushdown.
- partitionBy ускоряет фильтрацию (partition pruning), но требует колонки умеренной кардинальности и осторожности с мелкими файлами.