MLlib, развёртывание и конфигурация
Урок про машинное обучение на масштабе (MLlib) и про то, как запускать и настраивать Spark-приложения в продакшне.
MLlib Pipeline — последовательность этапов подготовки данных и обучения модели как единый объект; spark-submit — стандартный способ отправить приложение Spark на кластер.
MLlib: ML, который не влезает в одну машину
Когда обучающая выборка не помещается в память одной машины, обычные scikit-learn/pandas не справятся — здесь работает MLlib, библиотека машинного обучения Spark, выполняющая обучение распределённо. Её центральная абстракция — Pipeline (по аналогии со scikit-learn): цепочка этапов, где каждый этап — это либо Transformer (преобразует данные: кодирование категорий, сборка признаков в вектор), либо Estimator (обучается и даёт модель). Pipeline связывает подготовку признаков и обучение в один воспроизводимый объект, который можно обучить и применить целиком.
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
indexer = StringIndexer(inputCol="city", outputCol="city_idx")
assembler = VectorAssembler(inputCols=["city_idx", "age"], outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label")
pipeline = Pipeline(stages=[indexer, assembler, lr])
model = pipeline.fit(train_df) # обучает весь конвейер
preds = model.transform(test_df) # применяет его целиком
Важная практическая ремарка: MLlib силён именно на больших данных и линейных/древесных моделях. Для глубокого обучения и средних объёмов чаще берут специализированные фреймворки; Spark в ML-пайплайне обычно отвечает за подготовку признаков на масштабе.
spark-submit: запуск приложения
В продакшне приложение не запускают из ноутбука — его отправляют на кластер командой spark-submit, где задают менеджер кластера, режим и ресурсы. Это bash-команда:
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \
--executor-cores 4 \
--executor-memory 8g \
--conf spark.sql.shuffle.partitions=400 \
my_job.py
Здесь и решается, сколько ресурсов получит приложение. Разберём ключевые параметры.
cluster mode против client mode
Мы касались этого в разделе 1; теперь — практический смысл для развёртывания:
--deploy-mode cluster— драйвер запускается внутри кластера. Приложение не зависит от машины, с которой его отправили; стандарт для продакшн-задач и расписаний (Airflow и т.п.).--deploy-mode client— драйвер живёт на машине запуска. Удобно для интерактива и отладки (видно логи локально), но обрыв связи убивает приложение.
Ресурсы исполнителей: память, ядра, число
Три параметра определяют форму кластера под вашу задачу:
| Параметр | Что задаёт | Ориентир |
executor-memory | память одного исполнителя | не делать «толстых» исполнителей с десятками ГБ — растут паузы GC; обычно единицы–десяток ГБ |
executor-cores | ядер (параллельных задач) на исполнителе | обычно ~3–5; слишком много ядер на исполнителе бьются за ввод-вывод |
num-executors | сколько исполнителей всего | под объём данных и доступные ресурсы кластера |
Память исполнителя делится между вычислениями (shuffle, сортировки), кэшем и накладными расходами. Если данные не влезают — будут спиллы на диск (это видно в Spark UI) или OOM. Баланс «средние исполнители, их побольше» обычно лучше, чем «несколько гигантских».
Динамическое распределение ресурсов
Dynamic allocation позволяет Spark наращивать и сокращать число исполнителей на лету по нагрузке: больше задач в очереди — запросить ещё исполнителей у менеджера кластера; исполнители простаивают — вернуть их. Это экономит ресурсы на общих кластерах, где приложений много, и особенно полезно для нагрузок с переменной интенсивностью.
AQE: Adaptive Query Execution
Венец современного тюнинга — AQE (в Spark 3 включён по умолчанию). Обычный Catalyst строит план до выполнения по оценкам, которые бывают неточны. AQE же пересматривает план во время выполнения, опираясь на реальные размеры данных после уже выполненных stage. Три его главных приёма:
- Динамическое сжатие shuffle-партиций. Видя реальные размеры, AQE объединяет слишком мелкие партиции — снимает проблему «дефолтных 200» автоматически.
- Превращение sort-merge join в broadcast. Если после фильтров одна сторона оказалась маленькой (а оценка до выполнения этого не знала), AQE на лету переключит на broadcast join.
- Skew join optimization. AQE замечает перекошенные партиции и разбивает их на части — автоматическая первая линия обороны против перекоса (раздел 5).
Практический совет: держите AQE включённым — он закрывает большую часть ручного тюнинга числа партиций, выбора broadcast и борьбы с перекосом.
Подводные камни
- client mode в продакшне. Привязывает жизнь приложения к машине запуска; для расписаний используйте cluster mode.
- «Толстые» исполнители. Огромная память на исполнителя ведёт к длинным паузам GC; лучше больше исполнителей среднего размера.
- Слишком много ядер на исполнителе. Десятки ядер на одном исполнителе дерутся за диск и сеть; разумный диапазон — несколько.
- Выключенный AQE. Без AQE придётся вручную делать то, что он умеет сам; в Spark 3 его обычно держат включённым.
- MLlib для маленьких данных. На данных, влезающих в память, scikit-learn проще и быстрее; MLlib окупается на масштабе.
Best practices
- Оформляйте ML как Pipeline — это воспроизводимо и применяется целиком.
- В продакшне запускайте через
spark-submitв cluster mode. - Предпочитайте несколько исполнителей среднего размера (умеренные память и ~3–5 ядер) «толстым» гигантам.
- Включайте dynamic allocation на общих кластерах и держите AQE включённым.
Итог
- MLlib обучает модели распределённо; Pipeline связывает подготовку признаков и обучение в один объект.
- Приложения отправляют через
spark-submit; cluster mode — для продакшна, client — для отладки. - Ресурсы задают executor-memory/-cores/num-executors; лучше средние исполнители, чем «толстые».
- Dynamic allocation масштабирует исполнителей по нагрузке; AQE на лету правит план (партиции, broadcast, перекос).