DataFrame против RDD: Catalyst и Tungsten

Урок объясняет, почему в современном Spark пишут на DataFrame, а не на RDD: всё дело в оптимизаторе Catalyst и движке Tungsten.

DataFrame — распределённая таблица с именованными колонками и схемой; в отличие от RDD, Spark «понимает» её структуру и оптимизирует запросы через Catalyst и Tungsten.

RDD ничего не знает о ваших данных

RDD — это коллекция непрозрачных для Spark объектов. Когда вы пишете rdd.map(lambda x: x.amount * 2), Spark видит лишь «какая-то функция Python над какими-то объектами». Он не знает, какие там поля, какие типы, можно ли отбросить лишние колонки или переставить фильтр. Оптимизировать он не может — выполняет ровно то и в том порядке, как написали.

DataFrame устроен иначе: у него есть схема — известные имена и типы колонок. Spark видит не «функцию над объектами», а структурный запрос: «выбрать колонки A и B, отфильтровать по C, сгруппировать по D». А раз он понимает намерение, он может его переписать в эквивалентный, но более быстрый план. Это и есть фундаментальное преимущество DataFrame над RDD.

Catalyst: оптимизатор запросов

Catalyst — оптимизатор Spark SQL. Он берёт ваш высокоуровневый код (DataFrame-операции или SQL) и прогоняет через несколько фаз, превращая в эффективный физический план:

  1. Разбор и анализ. Из кода строится логический план; проверяются имена колонок и типы по схеме.
  2. Логическая оптимизация (правила). Здесь происходит магия. Catalyst применяет десятки правил переписывания:
    • Predicate pushdown — двигает фильтры как можно ближе к источнику, чтобы отбросить строки до того, как они пройдут дорогие шаги (а для Parquet/баз — даже не читать их).
    • Column pruning — отсекает колонки, которые не нужны в результате, и не читает их из источника.
    • Constant folding — заранее вычисляет константные выражения.
  3. Физическое планирование. Для логического плана генерируется несколько физических вариантов (например, разные стратегии join), и по оценке стоимости выбирается лучший.
  4. Генерация кода. Выбранный план компилируется в байт-код JVM (об этом — Tungsten).

Практический итог: вы можете написать DataFrame-запрос «как удобно» — сначала join, потом фильтр — а Catalyst сам передвинет фильтр до join, если это корректно и быстрее. С RDD так нельзя: что написали, то и выполнится.

Конкретный пример выигрыша. Допустим, вы соединяете таблицу заказов (1 млрд строк) со справочником и в конце фильтруете «только за январь» — это 3% строк. Наивно фильтр стоит после join, а значит, join должен был бы перемолоть весь миллиард. Catalyst видит, что фильтр по дате не зависит от join, и протолкнёт его до join (а если данные в Parquet с партиционированием по дате — то и до чтения, прочитав только январские файлы). В join теперь входит не миллиард, а 30 млн строк. Тот же написанный код, но Catalyst превратил его в план, который дешевле в десятки раз. Именно за такие автоматические переписывания и любят DataFrame.

Tungsten: ближе к железу

Если Catalyst оптимизирует что считать, то Tungsten оптимизирует как это выполняется на процессоре и в памяти. Три его кита:

  • Управление памятью вне кучи (off-heap) и бинарный формат. Tungsten хранит данные в компактном бинарном виде, минуя дорогие Java-объекты и сборщик мусора. Меньше накладных расходов на объект, меньше пауз GC.
  • Cache-aware вычисления. Данные раскладываются в памяти так, чтобы хорошо ложиться в кэши процессора (L1/L2/L3). Процессор меньше простаивает в ожидании данных из RAM.
  • Whole-stage code generation. Вместо того чтобы для каждой строки вызывать цепочку виртуальных функций (filter, потом project, потом ...), Tungsten генерирует единую функцию на весь stage. В плане выполнения это видно как WholeStageCodegen. Эффект — как будто вы вручную написали плотный цикл по данным.

В сумме Catalyst и Tungsten дают то, ради чего перешли на DataFrame: код, написанный высокоуровнево и читаемо, выполняется почти как вручную оптимизированный низкоуровневый.

А что с языком? PySpark и «налог» на Python

Есть ещё одна причина любить DataFrame именно в PySpark. RDD-операции на Python требуют гонять данные между JVM (где живёт Spark) и процессом Python — это сериализация туда-обратно на каждый элемент, дорого. А DataFrame-операции выражены через встроенные функции Spark и выполняются целиком внутри JVM, без перехода в Python. Поэтому DataFrame-код в PySpark по скорости близок к Scala, тогда как RDD на Python заметно медленнее. Вывод: в PySpark особенно важно оставаться в DataFrame API и избегать Python-UDF и RDD там, где можно.

Когда всё-таки RDD

DataFrame покрывает почти всё, но RDD иногда нужен: для очень низкоуровневого контроля над партициями, для совсем нетипизированных/неструктурированных данных, или когда нужна логика, не выражаемая встроенными функциями. В обычной дата-инженерии это редкость — по умолчанию DataFrame.

Подводные камни

  • Python-UDF убивают оптимизацию. Обычная пользовательская функция на Python — «чёрный ящик» для Catalyst: он не видит, что внутри, не может протолкнуть фильтр сквозь неё, и каждая строка едет из JVM в Python и обратно. Предпочитайте встроенные функции из pyspark.sql.functions (раздел 4).
  • Возврат к RDD «по привычке». Привыкшие к Scala/функциональщине иногда зовут .rdd и пишут map/filter руками — теряя Catalyst и Tungsten. Почти всегда есть DataFrame-эквивалент.
  • Вера, что «DataFrame всегда оптимален». Catalyst силён, но не всемогущ: неудачный порядок join, перекос ключей или забытый broadcast он не исправит. Понимать план всё равно нужно.

Best practices

  • По умолчанию работайте на уровне DataFrame/Spark SQL — это даёт Catalyst и Tungsten бесплатно.
  • В PySpark избегайте RDD и Python-UDF; оставайтесь во встроенных функциях, чтобы не платить «налог» на сериализацию в Python.
  • Доверяйте Catalyst переставлять фильтры — пишите запрос читаемо, оптимизацию порядка он возьмёт на себя.
  • Но проверяйте план через explain() (раздел 5): оптимизатор силён, но не заменяет понимания.

Итог

  • DataFrame имеет схему, поэтому Spark понимает структуру запроса и может его оптимизировать; RDD для Spark непрозрачен.
  • Catalyst переписывает план: predicate pushdown, column pruning, выбор стратегии join.
  • Tungsten ускоряет выполнение: off-heap бинарный формат, cache-aware раскладка, whole-stage codegen.
  • В PySpark DataFrame ещё и избегает дорогой сериализации между JVM и Python.
  • RDD оставляют для редких низкоуровневых случаев; по умолчанию — DataFrame.
Проверьте себя
1. Почему Spark может оптимизировать DataFrame-запрос, но не операции над RDD?
ADataFrame всегда меньше по размеру
BУ DataFrame есть схема, поэтому Spark понимает структуру запроса; RDD-функции для него непрозрачный чёрный ящик
CRDD не поддерживают параллелизм
DDataFrame не используют lineage
2. Что делает оптимизация predicate pushdown в Catalyst?
AУдаляет дубликаты строк
BДвигает фильтры ближе к источнику данных, чтобы отбросить лишние строки как можно раньше
CРеплицирует данные на узлы
DКэширует промежуточные результаты
3. За что отвечает Tungsten в Spark?
AЗа разбор SQL в логический план
BЗа низкоуровневое выполнение: off-heap бинарный формат, cache-aware раскладка и whole-stage codegen
CЗа хранение резервных копий данных
DЗа распределение задач между исполнителями
Поддержать проект