Пул потоков (worker pool)
Почему держать готовый набор воркеров выгоднее, чем плодить по потоку на каждую задачу.
Пул потоков (worker pool) — фиксированный набор заранее созданных потоков, которые по очереди разбирают задачи из общей очереди; потоки переиспользуются, а их число ограничивает степень параллелизма.
Зачем это нужно на практике
Наивный подход к параллельной работе — «на каждую задачу новый поток». Пришёл запрос — создали поток, отработал — поток умер. На десятке задач это терпимо, но на тысячах превращается в катастрофу. Создание потока стоит дорого: ядро выделяет стек (обычно мегабайты), регистрирует поток у планировщика, а потом всё это сносит. Если задач много и они короткие, вы тратите на рождение и смерть потоков больше, чем на саму работу. Хуже того, при всплеске нагрузки можно случайно поднять тысячи потоков — и система захлебнётся переключением контекста или просто упрётся в лимит памяти.
Пул решает обе беды сразу. Потоки создаются один раз и живут долго, разбирая задачу за задачей — затраты на создание размазываются почти в ноль. А поскольку потоков ровно столько, сколько вы задали, нагрузка на систему предсказуема: десять воркеров — максимум десять задач параллельно, остальные ждут в очереди. Это типовой механизм веб-серверов, обработчиков очередей, пакетной загрузки по сети.
Переиспользование воркеров
Внутри пула живёт уже знакомый паттерн «производитель-потребитель»: вы (производитель) кладёте задачи в очередь, а воркеры (потребители) их разбирают. Смоделируем это детерминированно — посчитаем, сколько задач достанется каждому из трёх воркеров, раздавая работу по кругу. Видно, что задач больше, чем воркеров: значит, каждый поток переиспользуется, беря несколько задач подряд.
import queue
from collections import Counter
tasks = list(range(10)) # 10 задач
WORKERS = 3 # пул из 3 воркеров
q = queue.Queue()
for t in tasks:
q.put(t)
done_by = Counter()
results = {}
# по кругу: воркер берёт следующую задачу, пока очередь не пуста
worker = 0
while not q.empty():
t = q.get()
results[t] = t * t # "работа": возвести в квадрат
done_by[worker] += 1
worker = (worker + 1) % WORKERS
print("Задач всего:", len(tasks), "| воркеров:", WORKERS)
for w in range(WORKERS):
print(f" воркер {w}: выполнил {done_by[w]} задач")
print("Результаты:", [results[t] for t in tasks])
Вывод:
Задач всего: 10 | воркеров: 3 воркер 0: выполнил 4 задач воркер 1: выполнил 3 задач воркер 2: выполнил 3 задач Результаты: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Десять задач на трёх воркеров — каждый берёт по три-четыре. Ни один поток не создавался под отдельную задачу: одни и те же три воркера прокрутили весь объём.
ThreadPoolExecutor: пул из коробки
Руками такое писать незачем — в стандартной библиотеке есть concurrent.futures.ThreadPoolExecutor. Вы задаёте max_workers, отдаёте задачи через submit или map — и пул сам разруливает очередь и переиспользование. Этот код исполняется в настоящих ОС-потоках, поэтому показываем его как пример для чтения:
from concurrent.futures import ThreadPoolExecutor
def fetch(url):
return download(url) # медленная сетевая операция
urls = [...] # сотни адресов
with ThreadPoolExecutor(max_workers=8) as pool:
# map сохраняет порядок входа: результаты придут в том же порядке, что urls
for url, html in zip(urls, pool.map(fetch, urls)):
save(url, html)
# выход из with дожидается завершения всех задач и закрывает пул
Здесь восемь воркеров обслужат хоть тысячу адресов: одновременно качается максимум восемь, остальные ждут. Если бы вы делали поток на адрес, на тысяче URL поднялась бы тысяча потоков. Метод map удобен тем, что сохраняет порядок: результат для i-го входа окажется на i-м месте, как бы ни планировались потоки.
Как это работает под капотом
При создании ThreadPoolExecutor заводит внутреннюю потокобезопасную очередь и (лениво) до max_workers рабочих потоков. Каждый воркер крутит вечный цикл: взять задачу из очереди → выполнить → положить результат в её Future → повторить. Когда задач нет, воркеры спят на пустой очереди, не нагружая процессор. submit кладёт в очередь упаковку «функция + аргументы + Future» и немедленно возвращает этот Future — объект-обещание результата (про него отдельный урок).
Важно понимать ограничение именно потокового пула в CPython: из-за GIL в любой момент Python-байткод исполняет лишь один поток. Поэтому потоковый пул ускоряет I/O-задачи (сеть, диск, ожидание) — пока один поток ждёт ответа, GIL отпущен и работают другие. А вот для тяжёлых вычислений (перемножение матриц, сжатие) потоки не дадут ускорения — там берут ProcessPoolExecutor, который поднимает отдельные процессы со своим интерпретатором.
| Подход | Когда уместен |
| Поток на задачу | задач единицы, они долгие и редкие |
| ThreadPoolExecutor | много I/O-задач (сеть, диск, ожидание ответа) |
| ProcessPoolExecutor | тяжёлые CPU-вычисления, упирающиеся в GIL |
Частые ошибки
- Слишком большой пул. «Поставлю 500 воркеров — будет быстрее» — нет: лишние потоки только грузят планировщик и память. Для I/O ориентируются на десятки, для CPU — на число ядер.
- Потоковый пул для счётных задач. Из-за GIL потоки не ускорят перемалывание чисел. Нужен
ProcessPoolExecutorили векторные библиотеки. - Не закрыли пул. Без
withилиshutdown()программа может не завершиться, удерживая живые потоки. Контекстный менеджер дожидается задач и аккуратно гасит пул. - Проглоченное исключение. Если задача упала, ошибка «лежит» в её
Futureи всплывёт только при чтении результата. Молча потерять её легко — всегда забирайте результат или проверяйтеfuture.exception().
Итоги
- Пул создаёт потоки один раз и переиспользует их, убирая дорогую возню «создал-убил» на каждую задачу.
- Число воркеров ограничивает параллелизм, делая нагрузку предсказуемой под всплесками.
ThreadPoolExecutor(max_workers=N)— готовый пул;mapсохраняет порядок результатов.- Потоковый пул ускоряет I/O; для CPU-задач из-за GIL берут пул процессов.