Архитектура Spark: driver, executors и кластер
Урок разбирает анатомию приложения Spark: кто командует, кто работает и как задача превращается в параллельные кусочки.
Driver — мозг приложения: он строит план и раздаёт задания. Executors — рабочие процессы на узлах кластера, которые реально обрабатывают партиции данных.
Три действующих лица
Любое приложение Spark — это связка из трёх ролей. Понимание, кто за что отвечает, объясняет половину ошибок в продакшне.
- Driver (драйвер). Процесс, в котором запущен ваш код (ваш
main/ Python-скрипт). Драйвер создаётSparkSession, строит из ваших трансформаций план выполнения (DAG), разбивает его на этапы и задачи и рассылает задачи исполнителям. Драйвер собирает финальные результаты и хранит метаданные. Если падает драйвер — падает всё приложение. - Executors (исполнители). Рабочие процессы (JVM), запущенные на узлах кластера. Каждый исполнитель получает несколько ядер (cores) и кусок памяти, выполняет назначенные ему задачи над партициями данных и хранит кэшированные данные. Исполнителей много, и именно они делают всю тяжёлую работу.
- Cluster manager (менеджер кластера). Внешняя система, которая выдаёт ресурсы (машины, ядра, память) под исполнителей. Драйвер просит у него ресурсы, он запускает исполнителей. Spark работает поверх нескольких менеджеров: Standalone (встроенный, простой), YARN (из мира Hadoop), Kubernetes (всё популярнее), раньше был и Mesos.
Схема словами: вы пишете код → он исполняется в драйвере → драйвер просит ресурсы у менеджера кластера → менеджер поднимает исполнителей → драйвер шлёт исполнителям задачи → исполнители считают свои партиции и шлют результаты обратно.
Партиции — единица параллелизма
Данные в Spark всегда поделены на партиции. Это не абстракция, а буквально куски, лежащие в памяти разных исполнителей. Число партиций задаёт максимальный параллелизм: если у вас 200 партиций и 50 ядер на исполнителях, Spark обработает 50 партиций одновременно, остальные ждут очереди. Если партиция всего одна — параллелизма нет вообще, работает одно ядро, остальной кластер простаивает.
Отсюда два правила здравого смысла, к которым мы вернёмся в разделе про производительность: партиций должно быть заметно больше, чем ядер (чтобы все были загружены и работа балансировалась), но не слишком много (иначе накладные расходы на планирование каждой мелкой задачи съедают выигрыш).
Job → stages → tasks: как задача дробится
Когда вы вызываете действие (например, count() или write), Spark запускает job (работу). Каждый job планировщик разбивает на stages (этапы), а каждый stage — на tasks (задачи). Иерархия строгая:
| Уровень | Что это | Чем порождается |
| Job | вся работа ради одного результата | одно действие (action) |
| Stage | группа операций без перемешивания данных | граница stage = shuffle (перемешивание) |
| Task | наименьшая единица работы | одна task на одну партицию stage |
Ключевая мысль: границы между stages — это всегда shuffle, то есть момент, когда данные нужно перетасовать между исполнителями по сети. Операции, которые считаются внутри партиции (map, filter), Spark «склеивает» в один stage и гонит по конвейеру без записи промежутка. Как только нужен groupBy или join — данные с одним ключом должны оказаться на одной машине, начинается shuffle, и здесь проходит граница stage. Поэтому, глядя на число stages в плане, опытный инженер сразу видит, сколько раз данные перемешиваются по сети.
Внутри stage число tasks равно числу партиций: 200 партиций → 200 задач, которые исполнители разбирают параллельно по мере освобождения ядер. Одна задача = «обработай вот эту партицию вот этими операциями».
Сквозной пример: как запрос превращается в работу
Сложим всё вместе на конкретном запросе: «прочитать заказы, оставить дорогие, посчитать сумму по городам». Логически это read → filter → groupBy(city).sum(amount) → write. Что происходит под капотом, шаг за шагом:
- Драйвер строит из этих трансформаций план (DAG) и ждёт действия. Действие здесь —
write. - Планировщик видит ровно один shuffle — на
groupBy(записи одного города надо собрать вместе). Значит, план делится на два stage: до перемешивания и после. - Stage 1 (чтение + фильтр + частичная агрегация по городу внутри партиции) разбивается на задачи по числу входных партиций — скажем, файл дал 120 партиций, значит 120 задач. Они выполняются параллельно на всех ядрах исполнителей;
readиfilter— узкие, они конвейеризуются в один проход без промежуточной материализации. - На границе stage происходит shuffle: частичные суммы по городам пишутся в корзины и пересылаются по сети.
- Stage 2 (досуммировать по городам и записать) имеет столько задач, сколько shuffle-партиций (по умолчанию 200). Каждая задача собирает «свои» города и пишет результат.
Итого: один job (его запустил write), два stage (граница — единственный shuffle на groupBy), 120 + 200 задач. Эта картина — «сколько shuffle, столько границ stage» — позволяет, ещё не запуская код, прикинуть, насколько он дорог. Опытный инженер читает DataFrame-цепочку и сразу видит будущие границы stage.
Client mode и cluster mode
Где запущен драйвер — важный практический вопрос:
- Client mode. Драйвер живёт на машине, с которой вы запустили приложение (например, ваш ноутбук или edge-узел). Удобно для интерактивной работы и отладки, но если связь с ноутбуком оборвётся — приложение умрёт.
- Cluster mode. Драйвер запускается внутри кластера как ещё один управляемый процесс. Так делают для продакшн-задач: приложение не зависит от вашей машины и переживает обрыв вашей сессии.
Подводные камни
- OutOfMemory в драйвере от
collect(). Вызовdf.collect()илиtoPandas()стягивает все данные со всех исполнителей в память драйвера. На большом наборе драйвер мгновенно падает с OOM. Драйвер не рассчитан держать весь датасет. - Одна гигантская партиция. Если данные перекошены и почти всё попало в одну партицию, одна task будет работать часами, пока остальной кластер простаивает. Это «эффект отстающего» (straggler).
- Слишком много мелких партиций. Десятки тысяч крошечных задач — это огромные накладные расходы планировщика; кластер занят управлением, а не счётом.
Best practices
- Никогда не стягивайте большой датасет в драйвер. Для просмотра используйте
show()илиlimit(), а неcollect()/toPandas()на полном наборе. - Держите в голове число партиций своих данных — это прямой рычаг параллелизма.
- Для продакшна используйте cluster mode, чтобы приложение не зависело от машины, с которой его запустили.
- Читая план выполнения, считайте границы stages: каждая граница — это shuffle, то есть сетевой обмен и потенциальный тормоз.
Итог
- Приложение Spark — это driver (мозг), executors (рабочие) и cluster manager (выдаёт ресурсы).
- Данные поделены на партиции; число партиций задаёт максимальный параллелизм.
- Действие порождает job → stages → tasks; границы stages — это всегда shuffle.
- Внутри stage одна task обрабатывает одну партицию.
collect()на большом наборе кладёт драйвер; в продакшне предпочтителен cluster mode.