Создание DataFrame, схемы и базовые операции

Урок учит создавать DataFrame, правильно задавать схему и выполнять базовые преобразования колонок и строк.

Схема (schema) — описание имён и типов колонок DataFrame. Явная схема надёжнее и быстрее, чем автоопределение типов по данным.

Откуда берутся DataFrame

Три типичных способа создать DataFrame. PySpark-код тут не запускается в браузере (нужна JVM), поэтому он дан как текст для чтения.

# 1) Из локальной коллекции (для примеров и тестов)
data = [("Аня", 25), ("Борис", 30), ("Вера", 28)]
df = spark.createDataFrame(data, ["name", "age"])

# 2) Из файла — основной способ в продакшне
df = spark.read.parquet("/data/users")
df = spark.read.csv("/data/users.csv", header=True, inferSchema=True)
df = spark.read.json("/data/events.json")

# 3) Из SQL-запроса к зарегистрированной таблице/источнику
df = spark.sql("SELECT name, age FROM users WHERE age > 18")

inferSchema против явной схемы

При чтении CSV/JSON Spark должен откуда-то узнать типы колонок. Два пути, и выбор между ними важнее, чем кажется:

  • inferSchema=True — Spark сам угадывает типы, прочитав данные лишний раз (дополнительный проход по файлу). Удобно для разведки, но: (1) лишнее чтение замедляет старт; (2) угадывание ненадёжно — пустой числовой столбец станет строкой, а ID с ведущими нулями потеряет нули, став числом.
  • Явная схема через StructType — вы сами задаёте имена и типы. Один проход по данным, предсказуемые типы, контроль над null. В продакшне почти всегда выбирают этот вариант.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("name", StringType(), nullable=False),
    StructField("age", IntegerType(), nullable=True),
])

df = spark.read.csv("/data/users.csv", header=True, schema=schema)
df.printSchema()

Параметр nullable в каждом поле говорит, может ли колонка содержать null. Это не пустая формальность: Spark использует эту информацию в оптимизациях, и неверное nullable=False на колонке с null приведёт к трудноуловимым ошибкам.

Базовые операции: проектируем колонки и фильтруем строки

Поскольку Spark SQL — это, по сути, тот же реляционный SQL, идеи операций удобно показать на живой SQL-песочнице. Этот блок исполняемый (SQLite): создаём таблицу, наполняем и выбираем — ровно то, что делают select/where в DataFrame.

CREATE TABLE users (
  id    INTEGER PRIMARY KEY AUTOINCREMENT,
  name  TEXT,
  age   INTEGER,
  city  TEXT
);
INSERT INTO users (name, age, city) VALUES
  ('Аня', 25, 'Москва'),
  ('Борис', 17, 'Казань'),
  ('Вера', 30, 'Москва'),
  ('Глеб', 42, 'Самара');

-- select + where (== DataFrame.select(...).filter(...))
SELECT name, age
FROM users
WHERE age >= 18
ORDER BY age DESC;

А вот как те же операции выглядят на DataFrame API (не запускается в браузере — текст):

from pyspark.sql.functions import col

# select — выбрать колонки (проекция)
df.select("name", "age")

# filter / where — отобрать строки (это синонимы)
df.filter(col("age") >= 18)
df.where("age >= 18")            # можно и SQL-строкой

# withColumn — добавить/заменить колонку
df.withColumn("is_adult", col("age") >= 18)

# drop — убрать колонку
df.drop("city")

# distinct — уникальные строки
df.select("city").distinct()

Чем DataFrame-операции отличаются от pandas

Внешне похоже, но есть принципиальные отличия:

  • Ленивость. Ни один из вызовов выше ничего не считает — строится план. Пока не вызвано действие (show(), count(), write), вычислений нет.
  • Неизменяемость. Как и RDD, DataFrame неизменяем: withColumn не меняет исходный, а возвращает новый DataFrame. Поэтому операции чейнят: df.filter(...).select(...).withColumn(...).
  • Нет упорядоченного индекса. В отличие от pandas, у DataFrame нет индекса и гарантированного порядка строк — данные распределены. Порядок задаётся явно через orderBy и только в нужный момент (это shuffle, поэтому сортируют как можно позже).

Действия над DataFrame: чем смотреть данные

Раз операции ленивы, что-то должно их запускать. Полезно знать главные действия и их цену, чтобы не запустить случайно дорогой полный проход:

ДействиеЧто делаетЦена
show(n)печатает первые n строкдёшево: берёт мало строк
take(n) / head(n)возвращает первые n строк драйверудёшево
count()число строкдорого: полный проход
collect()все строки на драйверопасно: весь набор в память драйвера
writeзапись в хранилищедорого: полный проход + ввод-вывод

Главная мысль: для разведки данных хватает show()/take(), которые трогают лишь несколько строк. count() и collect() сканируют весь набор и в распределённом мире обходятся дорого, а collect() на большом наборе ещё и кладёт драйвер. Привычка из pandas «давай посмотрю весь датафрейм» здесь вредна.

Подводные камни

  • inferSchema в продакшне. Лишний проход по данным и непредсказуемые типы. Для регулярных пайплайнов задавайте схему явно.
  • df.count() ради «посмотреть, что внутри». count — это полный проход по всем данным (действие). Чтобы заглянуть в данные, используйте df.show(5) или df.limit(5).
  • Ожидание порядка строк. Без orderBy порядок не гарантирован и может меняться между запусками — это распределённая система.
  • Цепочка withColumn на десятки колонок. Каждый withColumn — отдельный шаг в плане; для множества колонок одним махом эффективнее select со списком выражений.

Best practices

  • В продакшне задавайте схему явно через StructType — это быстрее и предсказуемее inferSchema.
  • Чейните операции: DataFrame неизменяем, а Catalyst всё равно соберёт из цепочки один план.
  • Сортируйте (orderBy) как можно позже и только когда порядок действительно нужен — это shuffle.
  • Для просмотра данных используйте show()/printSchema(), а не дорогой count() или collect().

Итог

  • DataFrame создают из коллекций, файлов (parquet/csv/json) и SQL.
  • Явная схема (StructType) надёжнее и быстрее, чем inferSchema; nullable важен для оптимизаций.
  • Базовые операции: select (колонки), filter/where (строки), withColumn (новая колонка), drop, distinct.
  • DataFrame ленив, неизменяем и не имеет гарантированного порядка строк без orderBy.
Проверьте себя
1. Почему в продакшн-пайплайне обычно задают схему явно, а не используют inferSchema?
AinferSchema не поддерживает CSV
BinferSchema делает лишний проход по данным и может неверно угадать типы (например, превратить ID с ведущими нулями в число)
CЯвная схема ускоряет shuffle
DinferSchema работает только на драйвере
2. Что вернёт вызов df.withColumn('is_adult', col('age') >= 18)?
AИзменит исходный df на месте
BНовый DataFrame с добавленной колонкой; исходный df не меняется
CСписок значений новой колонки
DНемедленно вычислит и покажет результат
3. Почему df.show(5) предпочтительнее df.count() для того, чтобы просто заглянуть в данные?
Acount не работает с DataFrame
Bcount — это полный проход по всем данным (действие), а show(5) берёт лишь несколько строк
Cshow(5) кэширует данные
Dcount возвращает неверный результат
Поддержать проект