Производительность и кэширование
Урок о том, где FastAPI-приложение теряет скорость и как её вернуть: кэширование ответов, пул соединений, профилирование и дисциплина async.
Узкое место (bottleneck) — это самый медленный участок в пути запроса, который определяет общее время ответа; оптимизировать имеет смысл именно его, а не то, что «кажется» медленным.
FastAPI сам по себе очень быстрый, и почти всегда тормозит не он, а то, что вокруг: запрос к базе без индекса, синхронный вызов, заблокировавший event loop, поход во внешний API на каждый запрос. Оптимизация по наитию вредна — вы рискуете усложнить код там, где это ничего не даёт. Правильный порядок: сначала измерить и найти узкое место, потом устранить именно его. Этот урок — про типовые узкие места FastAPI и проверенные способы их расшить.
Зачем это нужно на практике
Медленный API — это не только раздражённые пользователи, но и счёт за инфраструктуру: чтобы держать нагрузку, приходится поднимать больше серверов. Часто одно точечное улучшение — индекс, кэш горячего ответа, перевод блокирующего вызова в пул — даёт кратный выигрыш и экономит железо. Но чтобы попасть в цель, нужно работать по данным профилирования, а не по догадкам.
Где узкие места
В типичном FastAPI-сервисе кандидаты на «тормоз» почти всегда одни и те же:
- База данных. Запрос без индекса, проблема N+1 (запрос в цикле), отсутствие пула соединений — самый частый источник задержек.
- Блокировка event loop. Синхронная CPU- или I/O-операция внутри
async defостанавливает весь цикл и все параллельные запросы. - Внешние вызовы. Поход в сторонний API или сервис на каждый запрос, особенно без таймаута и без кэша.
- Сериализация больших ответов. Огромный JSON формируется и валидируется на каждый запрос.
Заметьте: FastAPI и Pydantic в этом списке почти никогда не главные виновники. Поэтому начинать оптимизацию надо с базы и блокировок, а не с микротюнинга обработчиков.
Кэширование ответов
Самый дешёвый способ ускорить — не делать работу вовсе. Если ответ редко меняется и одинаков для многих запросов (справочник, агрегат, конфигурация), его кэшируют — обычно в Redis или в памяти процесса. Запрос сначала смотрит в кэш и лишь при промахе идёт в базу.
import json
from fastapi import FastAPI
app = FastAPI()
async def get_top_articles():
cache_key = "top_articles"
cached = await redis.get(cache_key)
if cached is not None:
return json.loads(cached) # попадание: без обращения к БД
data = await db.fetch_top_articles() # промах: считаем
await redis.set(cache_key, json.dumps(data), ex=60) # TTL 60 секунд
return data
Ключевой параметр — TTL (время жизни). Он задаёт компромисс между свежестью и нагрузкой: больше TTL — меньше запросов к базе, но данные дольше устаревают. Кэшируйте только то, что переживёт устаревание на эти секунды, и продумайте инвалидацию: при изменении данных кэш нужно сбросить, иначе пользователи увидят старое. Для маленьких и редко меняющихся справочников иногда хватает кэша в памяти процесса (но помните: у каждого воркера он свой).
Пул соединений к БД
Открыть соединение с базой — дорого: TCP-рукопожатие, аутентификация, инициализация. Делать это на каждый запрос — расточительно. Пул соединений держит набор уже открытых соединений и выдаёт их обработчикам, возвращая обратно после использования.
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/app",
pool_size=10, # постоянных соединений в пуле
max_overflow=5, # сколько ещё можно открыть на пике
pool_timeout=30, # сколько ждать свободное, прежде чем ошибка
pool_pre_ping=True, # проверять живость соединения перед выдачей
)
Размер пула согласуйте с лимитом подключений самой СУБД и с числом воркеров: если у вас 4 воркера по 10 соединений, база должна выдерживать минимум 40. Слишком маленький пул создаёт очередь и таймауты, слишком большой — перегружает базу. Опция pool_pre_ping страхует от «мёртвых» соединений, которые база уже закрыла по тайм-ауту.
Профилирование
Прежде чем что-то ускорять, измерьте, на что уходит время. Начните с простого — логируйте длительность запроса через middleware, чтобы видеть медленные эндпоинты в проде.
import time
from fastapi import Request
@app.middleware("http")
async def add_timing(request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration_ms = (time.perf_counter() - start) * 1000
response.headers["X-Process-Time"] = f"{duration_ms:.1f}"
return response
Когда виновник — конкретный участок кода, помогает профайлер вроде cProfile или построчный py-spy (он снимает профиль с работающего процесса, не останавливая его). Отдельно следите за медленными SQL: в Postgres их показывает EXPLAIN ANALYZE и лог медленных запросов. Главное правило профилирования: сначала данные, потом оптимизация — иначе вы ускоряете не то.
async везде
FastAPI держит много одновременных запросов одним процессом за счёт асинхронности — но только если код действительно неблокирующий. Один синхронный вызов внутри async def (тяжёлый расчёт, обращение к синхронному драйверу БД, time.sleep) замораживает event loop и вместе с ним — все параллельные запросы.
# ПЛОХО: блокирующий вызов в async-обработчике замораживает loop
@app.get("/report")
async def report():
data = sync_db.run_heavy_query() # синхронно -> стоп всему event loop
return data
# ЛУЧШЕ 1: используйте async-драйвер
@app.get("/report")
async def report():
data = await async_db.fetch(...) # неблокирующе
# ЛУЧШЕ 2: если функция обязана быть синхронной — определите обработчик как def
@app.get("/report")
def report(): # FastAPI выполнит его в threadpool
return sync_db.run_heavy_query()
Важная деталь FastAPI: если объявить обработчик как обычный def (без async), фреймворк выполнит его в отдельном пуле потоков и не заблокирует основной event loop. Поэтому правило не «всё подряд делать async», а «async-обработчик должен быть честно неблокирующим; если внутри есть синхронная работа без async-аналога — делайте обработчик синхронным def». Тяжёлый CPU-расчёт лучше и вовсе вынести в фоновую задачу или отдельный воркер.
Как это работает под капотом
FastAPI крутит один event loop на воркер. Асинхронный обработчик при await на I/O (ожидание ответа БД или сети) отдаёт управление циклу, и тот в это время обслуживает другие запросы — отсюда высокая конкурентность одним процессом. Но event loop однопоточен по своей природе: пока выполняется синхронный код, он не может переключиться, поэтому блокирующий вызов «застопоривает» сразу всех. Чтобы это обойти, FastAPI запускает обычные def-обработчики и синхронные зависимости в пуле потоков (через anyio), возвращая управление циклу. Пул соединений ускоряет работу, исключая повторное рукопожатие с БД: соединение берётся из набора готовых и возвращается обратно. Кэш экономит сильнее всего, потому что вовсе убирает обращение к узкому месту: попадание в Redis занимает доли миллисекунды против запроса к базе.
Частые ошибки
- Оптимизация без профилирования. Ускоряют то, что «кажется» медленным, а реальное узкое место остаётся.
- Блокирующий вызов в
async def. Замораживает event loop и убивает конкурентность всего воркера. - Нет пула соединений. Новое подключение к БД на каждый запрос — лишние задержки и нагрузка на СУБД.
- Кэш без инвалидации. Данные изменились, а пользователи видят старое, потому что кэш не сброшен.
- Пул больше лимита БД. Воркеры открывают больше соединений, чем выдерживает база, — ошибки на пике.
- N+1 запросов. Запрос к БД в цикле вместо одного с
JOIN/предзагрузкой — десятки лишних обращений.
Итоги
- Сначала профилируйте и находите узкое место, потом оптимизируйте — чаще всего тормозит БД или блокировка, а не FastAPI.
- Кэшируйте редко меняющиеся ответы с разумным TTL и не забывайте про инвалидацию.
- Используйте пул соединений к БД, согласованный с лимитом СУБД и числом воркеров.
- Держите async-обработчики неблокирующими; синхронную работу выносите в
def-обработчик (threadpool) или фоновую задачу. - Измеряйте длительность запросов (middleware,
py-spy,EXPLAIN ANALYZE) и оптимизируйте по данным.