DataFrame против RDD: Catalyst и Tungsten
Урок объясняет, почему в современном Spark пишут на DataFrame, а не на RDD: всё дело в оптимизаторе Catalyst и движке Tungsten.
DataFrame — распределённая таблица с именованными колонками и схемой; в отличие от RDD, Spark «понимает» её структуру и оптимизирует запросы через Catalyst и Tungsten.
RDD ничего не знает о ваших данных
RDD — это коллекция непрозрачных для Spark объектов. Когда вы пишете rdd.map(lambda x: x.amount * 2), Spark видит лишь «какая-то функция Python над какими-то объектами». Он не знает, какие там поля, какие типы, можно ли отбросить лишние колонки или переставить фильтр. Оптимизировать он не может — выполняет ровно то и в том порядке, как написали.
DataFrame устроен иначе: у него есть схема — известные имена и типы колонок. Spark видит не «функцию над объектами», а структурный запрос: «выбрать колонки A и B, отфильтровать по C, сгруппировать по D». А раз он понимает намерение, он может его переписать в эквивалентный, но более быстрый план. Это и есть фундаментальное преимущество DataFrame над RDD.
Catalyst: оптимизатор запросов
Catalyst — оптимизатор Spark SQL. Он берёт ваш высокоуровневый код (DataFrame-операции или SQL) и прогоняет через несколько фаз, превращая в эффективный физический план:
- Разбор и анализ. Из кода строится логический план; проверяются имена колонок и типы по схеме.
- Логическая оптимизация (правила). Здесь происходит магия. Catalyst применяет десятки правил переписывания:
- Predicate pushdown — двигает фильтры как можно ближе к источнику, чтобы отбросить строки до того, как они пройдут дорогие шаги (а для Parquet/баз — даже не читать их).
- Column pruning — отсекает колонки, которые не нужны в результате, и не читает их из источника.
- Constant folding — заранее вычисляет константные выражения.
- Физическое планирование. Для логического плана генерируется несколько физических вариантов (например, разные стратегии join), и по оценке стоимости выбирается лучший.
- Генерация кода. Выбранный план компилируется в байт-код JVM (об этом — Tungsten).
Практический итог: вы можете написать DataFrame-запрос «как удобно» — сначала join, потом фильтр — а Catalyst сам передвинет фильтр до join, если это корректно и быстрее. С RDD так нельзя: что написали, то и выполнится.
Конкретный пример выигрыша. Допустим, вы соединяете таблицу заказов (1 млрд строк) со справочником и в конце фильтруете «только за январь» — это 3% строк. Наивно фильтр стоит после join, а значит, join должен был бы перемолоть весь миллиард. Catalyst видит, что фильтр по дате не зависит от join, и протолкнёт его до join (а если данные в Parquet с партиционированием по дате — то и до чтения, прочитав только январские файлы). В join теперь входит не миллиард, а 30 млн строк. Тот же написанный код, но Catalyst превратил его в план, который дешевле в десятки раз. Именно за такие автоматические переписывания и любят DataFrame.
Tungsten: ближе к железу
Если Catalyst оптимизирует что считать, то Tungsten оптимизирует как это выполняется на процессоре и в памяти. Три его кита:
- Управление памятью вне кучи (off-heap) и бинарный формат. Tungsten хранит данные в компактном бинарном виде, минуя дорогие Java-объекты и сборщик мусора. Меньше накладных расходов на объект, меньше пауз GC.
- Cache-aware вычисления. Данные раскладываются в памяти так, чтобы хорошо ложиться в кэши процессора (L1/L2/L3). Процессор меньше простаивает в ожидании данных из RAM.
- Whole-stage code generation. Вместо того чтобы для каждой строки вызывать цепочку виртуальных функций (filter, потом project, потом ...), Tungsten генерирует единую функцию на весь stage. В плане выполнения это видно как
WholeStageCodegen. Эффект — как будто вы вручную написали плотный цикл по данным.
В сумме Catalyst и Tungsten дают то, ради чего перешли на DataFrame: код, написанный высокоуровнево и читаемо, выполняется почти как вручную оптимизированный низкоуровневый.
А что с языком? PySpark и «налог» на Python
Есть ещё одна причина любить DataFrame именно в PySpark. RDD-операции на Python требуют гонять данные между JVM (где живёт Spark) и процессом Python — это сериализация туда-обратно на каждый элемент, дорого. А DataFrame-операции выражены через встроенные функции Spark и выполняются целиком внутри JVM, без перехода в Python. Поэтому DataFrame-код в PySpark по скорости близок к Scala, тогда как RDD на Python заметно медленнее. Вывод: в PySpark особенно важно оставаться в DataFrame API и избегать Python-UDF и RDD там, где можно.
Когда всё-таки RDD
DataFrame покрывает почти всё, но RDD иногда нужен: для очень низкоуровневого контроля над партициями, для совсем нетипизированных/неструктурированных данных, или когда нужна логика, не выражаемая встроенными функциями. В обычной дата-инженерии это редкость — по умолчанию DataFrame.
Подводные камни
- Python-UDF убивают оптимизацию. Обычная пользовательская функция на Python — «чёрный ящик» для Catalyst: он не видит, что внутри, не может протолкнуть фильтр сквозь неё, и каждая строка едет из JVM в Python и обратно. Предпочитайте встроенные функции из
pyspark.sql.functions(раздел 4). - Возврат к RDD «по привычке». Привыкшие к Scala/функциональщине иногда зовут
.rddи пишут map/filter руками — теряя Catalyst и Tungsten. Почти всегда есть DataFrame-эквивалент. - Вера, что «DataFrame всегда оптимален». Catalyst силён, но не всемогущ: неудачный порядок join, перекос ключей или забытый broadcast он не исправит. Понимать план всё равно нужно.
Best practices
- По умолчанию работайте на уровне DataFrame/Spark SQL — это даёт Catalyst и Tungsten бесплатно.
- В PySpark избегайте RDD и Python-UDF; оставайтесь во встроенных функциях, чтобы не платить «налог» на сериализацию в Python.
- Доверяйте Catalyst переставлять фильтры — пишите запрос читаемо, оптимизацию порядка он возьмёт на себя.
- Но проверяйте план через
explain()(раздел 5): оптимизатор силён, но не заменяет понимания.
Итог
- DataFrame имеет схему, поэтому Spark понимает структуру запроса и может его оптимизировать; RDD для Spark непрозрачен.
- Catalyst переписывает план: predicate pushdown, column pruning, выбор стратегии join.
- Tungsten ускоряет выполнение: off-heap бинарный формат, cache-aware раскладка, whole-stage codegen.
- В PySpark DataFrame ещё и избегает дорогой сериализации между JVM и Python.
- RDD оставляют для редких низкоуровневых случаев; по умолчанию — DataFrame.