UDF и pandas UDF: почему обычные UDF медленные
Урок объясняет, когда нужны UDF, почему обычные Python-UDF — частая причина тормозов в PySpark и как pandas UDF решают проблему.
UDF (User-Defined Function) — собственная функция, применяемая к колонкам, когда встроенных не хватает. В PySpark обычные UDF дороги, потому что данные сериализуются между JVM и Python построчно.
Зачем вообще UDF и почему их избегают
Иногда нужной логики нет среди встроенных функций pyspark.sql.functions — тогда пишут UDF. Но это должно быть последним средством, и вот почему. Обычная Python-UDF в PySpark платит двойную цену:
- Сериализация между JVM и Python. Spark работает в JVM, ваша UDF — в Python. Чтобы её применить, Spark на каждую строку сериализует данные, отправляет в Python-процесс, получает результат, десериализует обратно. Этот «мост» (pickle построчно) — огромная накладная нагрузка.
- Чёрный ящик для Catalyst. Оптимизатор не видит, что внутри UDF. Он не может протолкнуть сквозь неё фильтр (predicate pushdown), не может переставить операции. Любой UDF разрывает оптимизацию и мешает whole-stage codegen.
Поэтому первое правило: прежде чем писать UDF, поищите встроенную функцию. Большинство задач (строки, даты, условия, регулярки, математика) уже покрыты pyspark.sql.functions, и встроенные работают внутри JVM на скорости Tungsten.
Сравним стоимость подходов наглядно
Идею «построчного моста vs векторной обработки» легко прочувствовать на чистом Python, замерив, сколько «дорогих переходов» делает каждый подход. В обычной UDF переход через границу происходит на каждую строку; в pandas UDF — раз на целый батч строк.
rows = list(range(1, 11)) # 10 строк данных
# Обычная UDF: «переход JVM<->Python» на КАЖДУЮ строку
crossings_plain = 0
result_plain = []
for x in rows:
crossings_plain += 1 # дорогой переход границы
result_plain.append(x * x)
# pandas UDF: один «переход» на весь батч, обработка векторно
crossings_pandas = 1 # один переход на батч
result_pandas = [x * x for x in rows] # векторная обработка батча
print("Обычная UDF — переходов границы:", crossings_plain)
print("pandas UDF — переходов границы:", crossings_pandas)
print("Результат одинаковый:", result_plain == result_pandas)
Вывод:
Обычная UDF — переходов границы: 10 pandas UDF — переходов границы: 1 Результат одинаковый: True
На 10 строках разница невелика, но на сотнях миллионов строк «переход на каждую» против «переход на батч» — это разница между часами и минутами.
pandas UDF: векторно и через Arrow
pandas UDF (они же vectorized UDF) решают обе проблемы построчного моста. Вместо строки за строкой Spark передаёт в Python батч строк сразу — как pandas.Series — используя Apache Arrow для быстрой бессериализационной передачи колоночных данных между JVM и Python. Ваша функция обрабатывает весь батч векторно (силами pandas/NumPy) и возвращает Series. Так:
- переход границы происходит раз на батч, а не на строку;
- данные едут в компактном колоночном формате Arrow, почти без накладных расходов;
- внутри работает векторизованный pandas/NumPy вместо медленного построчного Python-цикла.
import pandas as pd
from pyspark.sql.functions import pandas_udf
# Обычная UDF (медленная): строка за строкой через мост
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
@udf(DoubleType())
def slow_square(x):
return float(x * x)
# pandas UDF (быстрая): батч как pandas.Series, векторно, через Arrow
@pandas_udf(DoubleType())
def fast_square(s: pd.Series) -> pd.Series:
return s * s # векторная операция над всем батчем
df.withColumn("sq", fast_square("value"))
pandas UDF особенно выигрывают на тяжёлой числовой логике, где можно опереться на векторные операции NumPy/pandas. Для интеграции с ML (например, применить обученную модель к колонке) это де-факто стандарт.
Иерархия предпочтений
| Подход | Скорость | Когда |
Встроенные функции F.* | максимальная (JVM, Catalyst, codegen) | всегда, когда возможно |
| pandas UDF (Arrow, векторно) | хорошая | нужна логика без встроенного аналога, особенно числовая/ML |
| Обычная Python-UDF | низкая (построчный мост) | в крайнем случае, когда иначе никак |
Подводные камни
- UDF вместо встроенной функции. Частая ошибка — написать UDF для того, что уже есть в
F(конкатенация, регулярка, дата). Сначала ищите готовое. - UDF ломает pushdown. Фильтр после UDF не протолкнётся к источнику — лишнее чтение. Фильтруйте до UDF, где можно.
- Забыли указать тип возврата. UDF требует явного типа результата; неверный тип даёт null или ошибку.
- Тяжёлая инициализация в теле UDF. Если UDF на каждый вызов грузит модель/файл — катастрофа. Для дорогой инициализации подходят итераторные pandas UDF, инициализирующие ресурс раз на партицию.
Best practices
- Правило №1: ищите встроенную функцию в
pyspark.sql.functionsпрежде, чем писать UDF. - Нужна своя логика — предпочитайте pandas UDF (Arrow, векторно) обычным Python-UDF.
- Включите Arrow (
spark.sql.execution.arrow.pyspark.enabled) для ускорения обмена с Python. - Всегда указывайте корректный тип возврата UDF.
- Фильтруйте данные до применения UDF, чтобы не гонять лишнее через мост.
Итог
- Обычные Python-UDF медленны: построчная сериализация между JVM и Python плюс «чёрный ящик» для Catalyst.
- Сначала всегда ищите встроенную функцию — она работает в JVM на скорости Tungsten.
- pandas UDF передают батч строк через Arrow и обрабатывают векторно: переход границы раз на батч, а не на строку.
- Иерархия: встроенные функции → pandas UDF → обычная UDF в крайнем случае.