Создание DataFrame, схемы и базовые операции
Урок учит создавать DataFrame, правильно задавать схему и выполнять базовые преобразования колонок и строк.
Схема (schema) — описание имён и типов колонок DataFrame. Явная схема надёжнее и быстрее, чем автоопределение типов по данным.
Откуда берутся DataFrame
Три типичных способа создать DataFrame. PySpark-код тут не запускается в браузере (нужна JVM), поэтому он дан как текст для чтения.
# 1) Из локальной коллекции (для примеров и тестов)
data = [("Аня", 25), ("Борис", 30), ("Вера", 28)]
df = spark.createDataFrame(data, ["name", "age"])
# 2) Из файла — основной способ в продакшне
df = spark.read.parquet("/data/users")
df = spark.read.csv("/data/users.csv", header=True, inferSchema=True)
df = spark.read.json("/data/events.json")
# 3) Из SQL-запроса к зарегистрированной таблице/источнику
df = spark.sql("SELECT name, age FROM users WHERE age > 18")
inferSchema против явной схемы
При чтении CSV/JSON Spark должен откуда-то узнать типы колонок. Два пути, и выбор между ними важнее, чем кажется:
inferSchema=True— Spark сам угадывает типы, прочитав данные лишний раз (дополнительный проход по файлу). Удобно для разведки, но: (1) лишнее чтение замедляет старт; (2) угадывание ненадёжно — пустой числовой столбец станет строкой, а ID с ведущими нулями потеряет нули, став числом.- Явная схема через
StructType— вы сами задаёте имена и типы. Один проход по данным, предсказуемые типы, контроль над null. В продакшне почти всегда выбирают этот вариант.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("name", StringType(), nullable=False),
StructField("age", IntegerType(), nullable=True),
])
df = spark.read.csv("/data/users.csv", header=True, schema=schema)
df.printSchema()
Параметр nullable в каждом поле говорит, может ли колонка содержать null. Это не пустая формальность: Spark использует эту информацию в оптимизациях, и неверное nullable=False на колонке с null приведёт к трудноуловимым ошибкам.
Базовые операции: проектируем колонки и фильтруем строки
Поскольку Spark SQL — это, по сути, тот же реляционный SQL, идеи операций удобно показать на живой SQL-песочнице. Этот блок исполняемый (SQLite): создаём таблицу, наполняем и выбираем — ровно то, что делают select/where в DataFrame.
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
age INTEGER,
city TEXT
);
INSERT INTO users (name, age, city) VALUES
('Аня', 25, 'Москва'),
('Борис', 17, 'Казань'),
('Вера', 30, 'Москва'),
('Глеб', 42, 'Самара');
-- select + where (== DataFrame.select(...).filter(...))
SELECT name, age
FROM users
WHERE age >= 18
ORDER BY age DESC;
А вот как те же операции выглядят на DataFrame API (не запускается в браузере — текст):
from pyspark.sql.functions import col
# select — выбрать колонки (проекция)
df.select("name", "age")
# filter / where — отобрать строки (это синонимы)
df.filter(col("age") >= 18)
df.where("age >= 18") # можно и SQL-строкой
# withColumn — добавить/заменить колонку
df.withColumn("is_adult", col("age") >= 18)
# drop — убрать колонку
df.drop("city")
# distinct — уникальные строки
df.select("city").distinct()
Чем DataFrame-операции отличаются от pandas
Внешне похоже, но есть принципиальные отличия:
- Ленивость. Ни один из вызовов выше ничего не считает — строится план. Пока не вызвано действие (
show(),count(),write), вычислений нет. - Неизменяемость. Как и RDD, DataFrame неизменяем:
withColumnне меняет исходный, а возвращает новый DataFrame. Поэтому операции чейнят:df.filter(...).select(...).withColumn(...). - Нет упорядоченного индекса. В отличие от pandas, у DataFrame нет индекса и гарантированного порядка строк — данные распределены. Порядок задаётся явно через
orderByи только в нужный момент (это shuffle, поэтому сортируют как можно позже).
Действия над DataFrame: чем смотреть данные
Раз операции ленивы, что-то должно их запускать. Полезно знать главные действия и их цену, чтобы не запустить случайно дорогой полный проход:
| Действие | Что делает | Цена |
show(n) | печатает первые n строк | дёшево: берёт мало строк |
take(n) / head(n) | возвращает первые n строк драйверу | дёшево |
count() | число строк | дорого: полный проход |
collect() | все строки на драйвер | опасно: весь набор в память драйвера |
write | запись в хранилище | дорого: полный проход + ввод-вывод |
Главная мысль: для разведки данных хватает show()/take(), которые трогают лишь несколько строк. count() и collect() сканируют весь набор и в распределённом мире обходятся дорого, а collect() на большом наборе ещё и кладёт драйвер. Привычка из pandas «давай посмотрю весь датафрейм» здесь вредна.
Подводные камни
- inferSchema в продакшне. Лишний проход по данным и непредсказуемые типы. Для регулярных пайплайнов задавайте схему явно.
df.count()ради «посмотреть, что внутри». count — это полный проход по всем данным (действие). Чтобы заглянуть в данные, используйтеdf.show(5)илиdf.limit(5).- Ожидание порядка строк. Без
orderByпорядок не гарантирован и может меняться между запусками — это распределённая система. - Цепочка
withColumnна десятки колонок. КаждыйwithColumn— отдельный шаг в плане; для множества колонок одним махом эффективнееselectсо списком выражений.
Best practices
- В продакшне задавайте схему явно через
StructType— это быстрее и предсказуемее inferSchema. - Чейните операции: DataFrame неизменяем, а Catalyst всё равно соберёт из цепочки один план.
- Сортируйте (
orderBy) как можно позже и только когда порядок действительно нужен — это shuffle. - Для просмотра данных используйте
show()/printSchema(), а не дорогойcount()илиcollect().
Итог
- DataFrame создают из коллекций, файлов (parquet/csv/json) и SQL.
- Явная схема (StructType) надёжнее и быстрее, чем inferSchema; nullable важен для оптимизаций.
- Базовые операции: select (колонки), filter/where (строки), withColumn (новая колонка), drop, distinct.
- DataFrame ленив, неизменяем и не имеет гарантированного порядка строк без orderBy.