UDF и pandas UDF: почему обычные UDF медленные

Урок объясняет, когда нужны UDF, почему обычные Python-UDF — частая причина тормозов в PySpark и как pandas UDF решают проблему.

UDF (User-Defined Function) — собственная функция, применяемая к колонкам, когда встроенных не хватает. В PySpark обычные UDF дороги, потому что данные сериализуются между JVM и Python построчно.

Зачем вообще UDF и почему их избегают

Иногда нужной логики нет среди встроенных функций pyspark.sql.functions — тогда пишут UDF. Но это должно быть последним средством, и вот почему. Обычная Python-UDF в PySpark платит двойную цену:

  • Сериализация между JVM и Python. Spark работает в JVM, ваша UDF — в Python. Чтобы её применить, Spark на каждую строку сериализует данные, отправляет в Python-процесс, получает результат, десериализует обратно. Этот «мост» (pickle построчно) — огромная накладная нагрузка.
  • Чёрный ящик для Catalyst. Оптимизатор не видит, что внутри UDF. Он не может протолкнуть сквозь неё фильтр (predicate pushdown), не может переставить операции. Любой UDF разрывает оптимизацию и мешает whole-stage codegen.

Поэтому первое правило: прежде чем писать UDF, поищите встроенную функцию. Большинство задач (строки, даты, условия, регулярки, математика) уже покрыты pyspark.sql.functions, и встроенные работают внутри JVM на скорости Tungsten.

Сравним стоимость подходов наглядно

Идею «построчного моста vs векторной обработки» легко прочувствовать на чистом Python, замерив, сколько «дорогих переходов» делает каждый подход. В обычной UDF переход через границу происходит на каждую строку; в pandas UDF — раз на целый батч строк.

rows = list(range(1, 11))   # 10 строк данных

# Обычная UDF: «переход JVM<->Python» на КАЖДУЮ строку
crossings_plain = 0
result_plain = []
for x in rows:
    crossings_plain += 1          # дорогой переход границы
    result_plain.append(x * x)

# pandas UDF: один «переход» на весь батч, обработка векторно
crossings_pandas = 1              # один переход на батч
result_pandas = [x * x for x in rows]   # векторная обработка батча

print("Обычная UDF — переходов границы:", crossings_plain)
print("pandas UDF  — переходов границы:", crossings_pandas)
print("Результат одинаковый:", result_plain == result_pandas)

Вывод:

Обычная UDF — переходов границы: 10
pandas UDF  — переходов границы: 1
Результат одинаковый: True

На 10 строках разница невелика, но на сотнях миллионов строк «переход на каждую» против «переход на батч» — это разница между часами и минутами.

pandas UDF: векторно и через Arrow

pandas UDF (они же vectorized UDF) решают обе проблемы построчного моста. Вместо строки за строкой Spark передаёт в Python батч строк сразу — как pandas.Series — используя Apache Arrow для быстрой бессериализационной передачи колоночных данных между JVM и Python. Ваша функция обрабатывает весь батч векторно (силами pandas/NumPy) и возвращает Series. Так:

  • переход границы происходит раз на батч, а не на строку;
  • данные едут в компактном колоночном формате Arrow, почти без накладных расходов;
  • внутри работает векторизованный pandas/NumPy вместо медленного построчного Python-цикла.
import pandas as pd
from pyspark.sql.functions import pandas_udf

# Обычная UDF (медленная): строка за строкой через мост
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

@udf(DoubleType())
def slow_square(x):
    return float(x * x)

# pandas UDF (быстрая): батч как pandas.Series, векторно, через Arrow
@pandas_udf(DoubleType())
def fast_square(s: pd.Series) -> pd.Series:
    return s * s            # векторная операция над всем батчем

df.withColumn("sq", fast_square("value"))

pandas UDF особенно выигрывают на тяжёлой числовой логике, где можно опереться на векторные операции NumPy/pandas. Для интеграции с ML (например, применить обученную модель к колонке) это де-факто стандарт.

Иерархия предпочтений

ПодходСкоростьКогда
Встроенные функции F.*максимальная (JVM, Catalyst, codegen)всегда, когда возможно
pandas UDF (Arrow, векторно)хорошаянужна логика без встроенного аналога, особенно числовая/ML
Обычная Python-UDFнизкая (построчный мост)в крайнем случае, когда иначе никак

Подводные камни

  • UDF вместо встроенной функции. Частая ошибка — написать UDF для того, что уже есть в F (конкатенация, регулярка, дата). Сначала ищите готовое.
  • UDF ломает pushdown. Фильтр после UDF не протолкнётся к источнику — лишнее чтение. Фильтруйте до UDF, где можно.
  • Забыли указать тип возврата. UDF требует явного типа результата; неверный тип даёт null или ошибку.
  • Тяжёлая инициализация в теле UDF. Если UDF на каждый вызов грузит модель/файл — катастрофа. Для дорогой инициализации подходят итераторные pandas UDF, инициализирующие ресурс раз на партицию.

Best practices

  • Правило №1: ищите встроенную функцию в pyspark.sql.functions прежде, чем писать UDF.
  • Нужна своя логика — предпочитайте pandas UDF (Arrow, векторно) обычным Python-UDF.
  • Включите Arrow (spark.sql.execution.arrow.pyspark.enabled) для ускорения обмена с Python.
  • Всегда указывайте корректный тип возврата UDF.
  • Фильтруйте данные до применения UDF, чтобы не гонять лишнее через мост.

Итог

  • Обычные Python-UDF медленны: построчная сериализация между JVM и Python плюс «чёрный ящик» для Catalyst.
  • Сначала всегда ищите встроенную функцию — она работает в JVM на скорости Tungsten.
  • pandas UDF передают батч строк через Arrow и обрабатывают векторно: переход границы раз на батч, а не на строку.
  • Иерархия: встроенные функции → pandas UDF → обычная UDF в крайнем случае.
Проверьте себя
1. Почему обычная Python-UDF в PySpark работает медленно?
AОна выполняется только на драйвере
BНа каждую строку данные сериализуются и пересылаются между JVM и Python, а сама UDF — чёрный ящик, который рвёт оптимизацию Catalyst
CОна не поддерживает числовые типы
DUDF всегда вызывает shuffle
2. Чем pandas UDF (vectorized UDF) быстрее обычной Python-UDF?
AОна выполняется в JVM без Python
BОна передаёт батч строк через Arrow и обрабатывает его векторно — переход границы происходит раз на батч, а не на каждую строку
CОна не возвращает результат
DОна кэширует данные
3. Что следует сделать в первую очередь, прежде чем писать собственную UDF?
AВключить кэширование
BПоискать готовую встроенную функцию в pyspark.sql.functions — она работает в JVM на скорости Tungsten
CПерейти на RDD
DУвеличить число партиций
Поддержать проект