SparkSession и экосистема Spark
Урок показывает точку входа в PySpark — SparkSession — и даёт карту всей экосистемы Spark.
SparkSession — единая точка входа в Spark: объект, через который вы читаете данные, создаёте DataFrame'ы и запускаете SQL. Из него доступны все возможности движка.
Точка входа: SparkSession
Любая программа PySpark начинается с создания сессии. Это объект, который держит соединение с кластером и через который проходят все операции. Этот код не запускается в браузере (PySpark требует JVM и кластера), поэтому он дан как текст — для чтения и понимания структуры.
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("MyFirstApp") # имя приложения (видно в Spark UI)
.master("local[*]") # где запускать: local[*] — все ядра локально
.config("spark.sql.shuffle.partitions", "200")
.getOrCreate() # создать новую или взять существующую
)
# Теперь spark — наша точка входа ко всему.
df = spark.read.parquet("/data/events")
df.printSchema()
spark.stop()
Разберём важное:
.master("local[*]")— режим запуска.local[*]означает «локально, использовать все ядра машины» (удобно для разработки). В кластере здесь будетyarn,k8s://...или адрес standalone-мастера. Обычно master не зашивают в код, а передают черезspark-submit..getOrCreate()— вернёт уже существующую сессию, если она есть (в кластере одна сессия на приложение).- Исторически были отдельные объекты
SparkContext,SQLContext,HiveContext. С версии 2.0SparkSessionобъединил их в одну точку входа;SparkContext(низкоуровневый, для RDD) доступен какspark.sparkContext.
Ленивость на практике: трансформации копятся, действие запускает
Главная особенность работы со Spark, к которой нужно привыкнуть: трансформации ленивы. Когда вы пишете df.filter(...) или df.select(...), ничего не считается — Spark лишь дописывает шаг в план. Реальное вычисление запускает только действие (action): count(), show(), collect(), write. Идею ленивого плана легко промоделировать на чистом Python: будем не считать сразу, а накапливать список операций, и выполним их только по команде «действие».
class LazyPlan:
def __init__(self, data, ops=None):
self.data = data
self.ops = ops or [] # накопленный план — пока ничего не считаем
def filter(self, fn):
print(" + добавили filter в план (но НЕ выполнили)")
return LazyPlan(self.data, self.ops + [("filter", fn)])
def map(self, fn):
print(" + добавили map в план (но НЕ выполнили)")
return LazyPlan(self.data, self.ops + [("map", fn)])
def collect(self): # ДЕЙСТВИЕ — вот теперь считаем
print(" >>> действие collect(): запускаем весь план")
result = self.data
for kind, fn in self.ops:
if kind == "filter":
result = [x for x in result if fn(x)]
else:
result = [fn(x) for x in result]
return result
plan = LazyPlan([1, 2, 3, 4, 5, 6])
print("Строим план:")
plan = plan.filter(lambda x: x % 2 == 0).map(lambda x: x * 10)
print("План построен, но вычислений ещё НЕ было.")
print("Результат:", plan.collect())
Вывод:
Строим план: + добавили filter в план (но НЕ выполнили) + добавили map в план (но НЕ выполнили) План построен, но вычислений ещё НЕ было. >>> действие collect(): запускаем весь план Результат: [20, 40, 60]
Зачем такая ленивость? Накопив весь план до запуска, Spark может его оптимизировать целиком: переставить фильтры ближе к чтению (чтобы сразу отбросить лишние строки), объединить операции, выкинуть ненужные колонки. Если бы каждый шаг исполнялся немедленно, такой глобальной оптимизации не было бы. Этой ленивости и оптимизатору посвящены разделы 2 и 3.
Как отличить трансформацию от действия
Практический навык — с ходу понимать, что перед вами. Правило простое и почти безотказное: смотрите на тип результата.
- Операция вернула новый DataFrame/RDD (можно продолжить чейнить
.filter(...).select(...)) — это трансформация, она ленива. - Операция вернула обычное значение — число (
count()), список строк (collect(),take()), вывела на экран (show()) или записала на диск (write) — это действие, оно запускает вычисление.
Из этого вытекает важное следствие для отладки: пока вы строите цепочку из трансформаций, ошибок выполнения вы не увидите — план только копится. Реальная ошибка (например, обращение к несуществующей колонке) всплывёт лишь на действии, когда план побежит. Новичков это путает: «код до этой строки же отработал». На самом деле он ничего не выполнял — просто строил план до первого действия.
Ещё одно следствие — про Spark UI, веб-интерфейс мониторинга (обычно на порту 4040). Поскольку реальная работа стартует только на действии, именно действия порождают видимые в UI jobs. Если в коде три действия, в UI будет три job'а, и каждый по умолчанию пересчитывает свою цепочку с нуля. Это первое место, куда смотрит инженер, диагностируя, что и сколько раз реально считалось.
Экосистема: один движок, четыре библиотеки
Слово «Spark» обычно означает целый стек. В основе — Spark Core (движок, RDD, планировщик, управление памятью). Поверх него построены специализированные библиотеки, которые делят один движок и могут комбинироваться в одном приложении:
| Компонент | Для чего |
| Spark SQL / DataFrame | структурированные данные: таблицы, SQL-запросы, DataFrame API. Здесь работает оптимизатор Catalyst. Это то, чем пользуются 90% времени. |
| Structured Streaming | потоковая обработка: тот же DataFrame API, но над «бесконечной» таблицей данных, прибывающих в реальном времени. |
| MLlib | машинное обучение на масштабе: подготовка признаков, обучение моделей, Pipeline по аналогии со scikit-learn. |
| GraphX / GraphFrames | обработка графов: социальные связи, маршруты, PageRank. В PySpark чаще используют GraphFrames. |
Сила этого единства в том, что в одном приложении можно прочитать поток событий (Streaming), обогатить его SQL-джойном со справочником (Spark SQL) и тут же прогнать через ML-модель (MLlib) — всё на одном движке, без перекладывания данных между разными системами.
Подводные камни
- «Почему мой
filterничего не вывел?» Потому что это трансформация — она ленива. Пока нет действия (show,count,write), ничего не выполняется. Это нормально, а не баг. - Несколько действий = несколько пересчётов. Если вызвать
count(), а потомshow()над одной цепочкой, Spark по умолчанию пересчитает всё дважды с нуля. Чтобы переиспользовать результат, его кэшируют (раздел 2). - master в коде. Зашивать
.master("local")в продакшн-код — частая ошибка: приложение не запустится в кластере. Master передают снаружи черезspark-submit.
Best practices
- Создавайте одну
SparkSessionна приложение черезgetOrCreate(). - Помните: пока нет действия, ничего не считается — это даёт Spark пространство для оптимизации, пользуйтесь этим, выстраивая длинные цепочки трансформаций.
- Конфигурацию (master, память, число партиций) выносите наружу, а не в код.
- Начинайте знакомство и реальную работу с DataFrame/Spark SQL — это самый оптимизированный и удобный слой; к низкоуровневым RDD спускайтесь только при необходимости.
Итог
SparkSession— единая точка входа; создаётся черезbuilder ... getOrCreate().- Трансформации ленивы и лишь строят план; вычисление запускает действие.
- Ленивость нужна, чтобы Spark оптимизировал весь план целиком до запуска.
- Экосистема: Spark Core + Spark SQL, Structured Streaming, MLlib, GraphX поверх одного движка.
- Для большинства задач работают на уровне DataFrame/Spark SQL.