SparkSession и экосистема Spark

Урок показывает точку входа в PySpark — SparkSession — и даёт карту всей экосистемы Spark.

SparkSession — единая точка входа в Spark: объект, через который вы читаете данные, создаёте DataFrame'ы и запускаете SQL. Из него доступны все возможности движка.

Точка входа: SparkSession

Любая программа PySpark начинается с создания сессии. Это объект, который держит соединение с кластером и через который проходят все операции. Этот код не запускается в браузере (PySpark требует JVM и кластера), поэтому он дан как текст — для чтения и понимания структуры.

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
        .appName("MyFirstApp")        # имя приложения (видно в Spark UI)
        .master("local[*]")           # где запускать: local[*] — все ядра локально
        .config("spark.sql.shuffle.partitions", "200")
        .getOrCreate()                # создать новую или взять существующую
)

# Теперь spark — наша точка входа ко всему.
df = spark.read.parquet("/data/events")
df.printSchema()
spark.stop()

Разберём важное:

  • .master("local[*]") — режим запуска. local[*] означает «локально, использовать все ядра машины» (удобно для разработки). В кластере здесь будет yarn, k8s://... или адрес standalone-мастера. Обычно master не зашивают в код, а передают через spark-submit.
  • .getOrCreate() — вернёт уже существующую сессию, если она есть (в кластере одна сессия на приложение).
  • Исторически были отдельные объекты SparkContext, SQLContext, HiveContext. С версии 2.0 SparkSession объединил их в одну точку входа; SparkContext (низкоуровневый, для RDD) доступен как spark.sparkContext.

Ленивость на практике: трансформации копятся, действие запускает

Главная особенность работы со Spark, к которой нужно привыкнуть: трансформации ленивы. Когда вы пишете df.filter(...) или df.select(...), ничего не считается — Spark лишь дописывает шаг в план. Реальное вычисление запускает только действие (action): count(), show(), collect(), write. Идею ленивого плана легко промоделировать на чистом Python: будем не считать сразу, а накапливать список операций, и выполним их только по команде «действие».

class LazyPlan:
    def __init__(self, data, ops=None):
        self.data = data
        self.ops = ops or []           # накопленный план — пока ничего не считаем

    def filter(self, fn):
        print("  + добавили filter в план (но НЕ выполнили)")
        return LazyPlan(self.data, self.ops + [("filter", fn)])

    def map(self, fn):
        print("  + добавили map в план (но НЕ выполнили)")
        return LazyPlan(self.data, self.ops + [("map", fn)])

    def collect(self):                 # ДЕЙСТВИЕ — вот теперь считаем
        print("  >>> действие collect(): запускаем весь план")
        result = self.data
        for kind, fn in self.ops:
            if kind == "filter":
                result = [x for x in result if fn(x)]
            else:
                result = [fn(x) for x in result]
        return result

plan = LazyPlan([1, 2, 3, 4, 5, 6])
print("Строим план:")
plan = plan.filter(lambda x: x % 2 == 0).map(lambda x: x * 10)
print("План построен, но вычислений ещё НЕ было.")
print("Результат:", plan.collect())

Вывод:

Строим план:
  + добавили filter в план (но НЕ выполнили)
  + добавили map в план (но НЕ выполнили)
План построен, но вычислений ещё НЕ было.
  >>> действие collect(): запускаем весь план
Результат: [20, 40, 60]

Зачем такая ленивость? Накопив весь план до запуска, Spark может его оптимизировать целиком: переставить фильтры ближе к чтению (чтобы сразу отбросить лишние строки), объединить операции, выкинуть ненужные колонки. Если бы каждый шаг исполнялся немедленно, такой глобальной оптимизации не было бы. Этой ленивости и оптимизатору посвящены разделы 2 и 3.

Как отличить трансформацию от действия

Практический навык — с ходу понимать, что перед вами. Правило простое и почти безотказное: смотрите на тип результата.

  • Операция вернула новый DataFrame/RDD (можно продолжить чейнить .filter(...).select(...)) — это трансформация, она ленива.
  • Операция вернула обычное значение — число (count()), список строк (collect(), take()), вывела на экран (show()) или записала на диск (write) — это действие, оно запускает вычисление.

Из этого вытекает важное следствие для отладки: пока вы строите цепочку из трансформаций, ошибок выполнения вы не увидите — план только копится. Реальная ошибка (например, обращение к несуществующей колонке) всплывёт лишь на действии, когда план побежит. Новичков это путает: «код до этой строки же отработал». На самом деле он ничего не выполнял — просто строил план до первого действия.

Ещё одно следствие — про Spark UI, веб-интерфейс мониторинга (обычно на порту 4040). Поскольку реальная работа стартует только на действии, именно действия порождают видимые в UI jobs. Если в коде три действия, в UI будет три job'а, и каждый по умолчанию пересчитывает свою цепочку с нуля. Это первое место, куда смотрит инженер, диагностируя, что и сколько раз реально считалось.

Экосистема: один движок, четыре библиотеки

Слово «Spark» обычно означает целый стек. В основе — Spark Core (движок, RDD, планировщик, управление памятью). Поверх него построены специализированные библиотеки, которые делят один движок и могут комбинироваться в одном приложении:

КомпонентДля чего
Spark SQL / DataFrameструктурированные данные: таблицы, SQL-запросы, DataFrame API. Здесь работает оптимизатор Catalyst. Это то, чем пользуются 90% времени.
Structured Streamingпотоковая обработка: тот же DataFrame API, но над «бесконечной» таблицей данных, прибывающих в реальном времени.
MLlibмашинное обучение на масштабе: подготовка признаков, обучение моделей, Pipeline по аналогии со scikit-learn.
GraphX / GraphFramesобработка графов: социальные связи, маршруты, PageRank. В PySpark чаще используют GraphFrames.

Сила этого единства в том, что в одном приложении можно прочитать поток событий (Streaming), обогатить его SQL-джойном со справочником (Spark SQL) и тут же прогнать через ML-модель (MLlib) — всё на одном движке, без перекладывания данных между разными системами.

Подводные камни

  • «Почему мой filter ничего не вывел?» Потому что это трансформация — она ленива. Пока нет действия (show, count, write), ничего не выполняется. Это нормально, а не баг.
  • Несколько действий = несколько пересчётов. Если вызвать count(), а потом show() над одной цепочкой, Spark по умолчанию пересчитает всё дважды с нуля. Чтобы переиспользовать результат, его кэшируют (раздел 2).
  • master в коде. Зашивать .master("local") в продакшн-код — частая ошибка: приложение не запустится в кластере. Master передают снаружи через spark-submit.

Best practices

  • Создавайте одну SparkSession на приложение через getOrCreate().
  • Помните: пока нет действия, ничего не считается — это даёт Spark пространство для оптимизации, пользуйтесь этим, выстраивая длинные цепочки трансформаций.
  • Конфигурацию (master, память, число партиций) выносите наружу, а не в код.
  • Начинайте знакомство и реальную работу с DataFrame/Spark SQL — это самый оптимизированный и удобный слой; к низкоуровневым RDD спускайтесь только при необходимости.

Итог

  • SparkSession — единая точка входа; создаётся через builder ... getOrCreate().
  • Трансформации ленивы и лишь строят план; вычисление запускает действие.
  • Ленивость нужна, чтобы Spark оптимизировал весь план целиком до запуска.
  • Экосистема: Spark Core + Spark SQL, Structured Streaming, MLlib, GraphX поверх одного движка.
  • Для большинства задач работают на уровне DataFrame/Spark SQL.
Проверьте себя
1. Что произойдёт, когда вы вызовете df.filter(df.age > 18) в PySpark?
ASpark немедленно отфильтрует данные и вернёт результат
BSpark добавит шаг в план, но ничего не вычислит, пока не будет вызвано действие
CSpark выгрузит данные на драйвер
DSpark запустит отдельный job
2. Зачем Spark делает трансформации ленивыми, а не выполняет их сразу?
AЧтобы экономить место на диске
BЧтобы накопить весь план и оптимизировать его целиком до запуска
CЧтобы не нагружать драйвер
DЭто требование JVM
3. Какой компонент экосистемы Spark отвечает за структурированные данные и SQL-запросы и содержит оптимизатор Catalyst?
AMLlib
BGraphX
CSpark SQL / DataFrame
DStructured Streaming
Поддержать проект