Архитектура Spark: driver, executors и кластер

Урок разбирает анатомию приложения Spark: кто командует, кто работает и как задача превращается в параллельные кусочки.

Driver — мозг приложения: он строит план и раздаёт задания. Executors — рабочие процессы на узлах кластера, которые реально обрабатывают партиции данных.

Три действующих лица

Любое приложение Spark — это связка из трёх ролей. Понимание, кто за что отвечает, объясняет половину ошибок в продакшне.

  • Driver (драйвер). Процесс, в котором запущен ваш код (ваш main / Python-скрипт). Драйвер создаёт SparkSession, строит из ваших трансформаций план выполнения (DAG), разбивает его на этапы и задачи и рассылает задачи исполнителям. Драйвер собирает финальные результаты и хранит метаданные. Если падает драйвер — падает всё приложение.
  • Executors (исполнители). Рабочие процессы (JVM), запущенные на узлах кластера. Каждый исполнитель получает несколько ядер (cores) и кусок памяти, выполняет назначенные ему задачи над партициями данных и хранит кэшированные данные. Исполнителей много, и именно они делают всю тяжёлую работу.
  • Cluster manager (менеджер кластера). Внешняя система, которая выдаёт ресурсы (машины, ядра, память) под исполнителей. Драйвер просит у него ресурсы, он запускает исполнителей. Spark работает поверх нескольких менеджеров: Standalone (встроенный, простой), YARN (из мира Hadoop), Kubernetes (всё популярнее), раньше был и Mesos.

Схема словами: вы пишете код → он исполняется в драйвере → драйвер просит ресурсы у менеджера кластера → менеджер поднимает исполнителей → драйвер шлёт исполнителям задачи → исполнители считают свои партиции и шлют результаты обратно.

Партиции — единица параллелизма

Данные в Spark всегда поделены на партиции. Это не абстракция, а буквально куски, лежащие в памяти разных исполнителей. Число партиций задаёт максимальный параллелизм: если у вас 200 партиций и 50 ядер на исполнителях, Spark обработает 50 партиций одновременно, остальные ждут очереди. Если партиция всего одна — параллелизма нет вообще, работает одно ядро, остальной кластер простаивает.

Отсюда два правила здравого смысла, к которым мы вернёмся в разделе про производительность: партиций должно быть заметно больше, чем ядер (чтобы все были загружены и работа балансировалась), но не слишком много (иначе накладные расходы на планирование каждой мелкой задачи съедают выигрыш).

Job → stages → tasks: как задача дробится

Когда вы вызываете действие (например, count() или write), Spark запускает job (работу). Каждый job планировщик разбивает на stages (этапы), а каждый stage — на tasks (задачи). Иерархия строгая:

УровеньЧто этоЧем порождается
Jobвся работа ради одного результатаодно действие (action)
Stageгруппа операций без перемешивания данныхграница stage = shuffle (перемешивание)
Taskнаименьшая единица работыодна task на одну партицию stage

Ключевая мысль: границы между stages — это всегда shuffle, то есть момент, когда данные нужно перетасовать между исполнителями по сети. Операции, которые считаются внутри партиции (map, filter), Spark «склеивает» в один stage и гонит по конвейеру без записи промежутка. Как только нужен groupBy или join — данные с одним ключом должны оказаться на одной машине, начинается shuffle, и здесь проходит граница stage. Поэтому, глядя на число stages в плане, опытный инженер сразу видит, сколько раз данные перемешиваются по сети.

Внутри stage число tasks равно числу партиций: 200 партиций → 200 задач, которые исполнители разбирают параллельно по мере освобождения ядер. Одна задача = «обработай вот эту партицию вот этими операциями».

Сквозной пример: как запрос превращается в работу

Сложим всё вместе на конкретном запросе: «прочитать заказы, оставить дорогие, посчитать сумму по городам». Логически это read → filter → groupBy(city).sum(amount) → write. Что происходит под капотом, шаг за шагом:

  1. Драйвер строит из этих трансформаций план (DAG) и ждёт действия. Действие здесь — write.
  2. Планировщик видит ровно один shuffle — на groupBy (записи одного города надо собрать вместе). Значит, план делится на два stage: до перемешивания и после.
  3. Stage 1 (чтение + фильтр + частичная агрегация по городу внутри партиции) разбивается на задачи по числу входных партиций — скажем, файл дал 120 партиций, значит 120 задач. Они выполняются параллельно на всех ядрах исполнителей; read и filter — узкие, они конвейеризуются в один проход без промежуточной материализации.
  4. На границе stage происходит shuffle: частичные суммы по городам пишутся в корзины и пересылаются по сети.
  5. Stage 2 (досуммировать по городам и записать) имеет столько задач, сколько shuffle-партиций (по умолчанию 200). Каждая задача собирает «свои» города и пишет результат.

Итого: один job (его запустил write), два stage (граница — единственный shuffle на groupBy), 120 + 200 задач. Эта картина — «сколько shuffle, столько границ stage» — позволяет, ещё не запуская код, прикинуть, насколько он дорог. Опытный инженер читает DataFrame-цепочку и сразу видит будущие границы stage.

Client mode и cluster mode

Где запущен драйвер — важный практический вопрос:

  • Client mode. Драйвер живёт на машине, с которой вы запустили приложение (например, ваш ноутбук или edge-узел). Удобно для интерактивной работы и отладки, но если связь с ноутбуком оборвётся — приложение умрёт.
  • Cluster mode. Драйвер запускается внутри кластера как ещё один управляемый процесс. Так делают для продакшн-задач: приложение не зависит от вашей машины и переживает обрыв вашей сессии.

Подводные камни

  • OutOfMemory в драйвере от collect(). Вызов df.collect() или toPandas() стягивает все данные со всех исполнителей в память драйвера. На большом наборе драйвер мгновенно падает с OOM. Драйвер не рассчитан держать весь датасет.
  • Одна гигантская партиция. Если данные перекошены и почти всё попало в одну партицию, одна task будет работать часами, пока остальной кластер простаивает. Это «эффект отстающего» (straggler).
  • Слишком много мелких партиций. Десятки тысяч крошечных задач — это огромные накладные расходы планировщика; кластер занят управлением, а не счётом.

Best practices

  • Никогда не стягивайте большой датасет в драйвер. Для просмотра используйте show() или limit(), а не collect()/toPandas() на полном наборе.
  • Держите в голове число партиций своих данных — это прямой рычаг параллелизма.
  • Для продакшна используйте cluster mode, чтобы приложение не зависело от машины, с которой его запустили.
  • Читая план выполнения, считайте границы stages: каждая граница — это shuffle, то есть сетевой обмен и потенциальный тормоз.

Итог

  • Приложение Spark — это driver (мозг), executors (рабочие) и cluster manager (выдаёт ресурсы).
  • Данные поделены на партиции; число партиций задаёт максимальный параллелизм.
  • Действие порождает job → stages → tasks; границы stages — это всегда shuffle.
  • Внутри stage одна task обрабатывает одну партицию.
  • collect() на большом наборе кладёт драйвер; в продакшне предпочтителен cluster mode.
Проверьте себя
1. Что в Spark всегда служит границей между двумя stages?
AВызов любой трансформации
BShuffle — перемешивание данных между исполнителями по сети
CСоздание SparkSession
DКэширование данных
2. Сколько task будет в одном stage, если обрабатываемые данные разбиты на 200 партиций?
A1
B200
CЗависит от числа исполнителей
DЗависит от числа действий
3. Почему вызов df.collect() на большом датасете опасен?
AОн запускает лишний shuffle
BОн стягивает все данные в память драйвера, которого на это не хватает — OOM
CОн удаляет партиции
DОн работает только в cluster mode
Поддержать проект