Пул потоков (worker pool)

Почему держать готовый набор воркеров выгоднее, чем плодить по потоку на каждую задачу.

Пул потоков (worker pool) — фиксированный набор заранее созданных потоков, которые по очереди разбирают задачи из общей очереди; потоки переиспользуются, а их число ограничивает степень параллелизма.

Зачем это нужно на практике

Наивный подход к параллельной работе — «на каждую задачу новый поток». Пришёл запрос — создали поток, отработал — поток умер. На десятке задач это терпимо, но на тысячах превращается в катастрофу. Создание потока стоит дорого: ядро выделяет стек (обычно мегабайты), регистрирует поток у планировщика, а потом всё это сносит. Если задач много и они короткие, вы тратите на рождение и смерть потоков больше, чем на саму работу. Хуже того, при всплеске нагрузки можно случайно поднять тысячи потоков — и система захлебнётся переключением контекста или просто упрётся в лимит памяти.

Пул решает обе беды сразу. Потоки создаются один раз и живут долго, разбирая задачу за задачей — затраты на создание размазываются почти в ноль. А поскольку потоков ровно столько, сколько вы задали, нагрузка на систему предсказуема: десять воркеров — максимум десять задач параллельно, остальные ждут в очереди. Это типовой механизм веб-серверов, обработчиков очередей, пакетной загрузки по сети.

Переиспользование воркеров

Внутри пула живёт уже знакомый паттерн «производитель-потребитель»: вы (производитель) кладёте задачи в очередь, а воркеры (потребители) их разбирают. Смоделируем это детерминированно — посчитаем, сколько задач достанется каждому из трёх воркеров, раздавая работу по кругу. Видно, что задач больше, чем воркеров: значит, каждый поток переиспользуется, беря несколько задач подряд.

import queue
from collections import Counter

tasks = list(range(10))          # 10 задач
WORKERS = 3                       # пул из 3 воркеров

q = queue.Queue()
for t in tasks:
    q.put(t)

done_by = Counter()
results = {}
# по кругу: воркер берёт следующую задачу, пока очередь не пуста
worker = 0
while not q.empty():
    t = q.get()
    results[t] = t * t           # "работа": возвести в квадрат
    done_by[worker] += 1
    worker = (worker + 1) % WORKERS

print("Задач всего:", len(tasks), "| воркеров:", WORKERS)
for w in range(WORKERS):
    print(f"  воркер {w}: выполнил {done_by[w]} задач")
print("Результаты:", [results[t] for t in tasks])

Вывод:

Задач всего: 10 | воркеров: 3
  воркер 0: выполнил 4 задач
  воркер 1: выполнил 3 задач
  воркер 2: выполнил 3 задач
Результаты: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Десять задач на трёх воркеров — каждый берёт по три-четыре. Ни один поток не создавался под отдельную задачу: одни и те же три воркера прокрутили весь объём.

ThreadPoolExecutor: пул из коробки

Руками такое писать незачем — в стандартной библиотеке есть concurrent.futures.ThreadPoolExecutor. Вы задаёте max_workers, отдаёте задачи через submit или map — и пул сам разруливает очередь и переиспользование. Этот код исполняется в настоящих ОС-потоках, поэтому показываем его как пример для чтения:

from concurrent.futures import ThreadPoolExecutor

def fetch(url):
    return download(url)        # медленная сетевая операция

urls = [...]                    # сотни адресов
with ThreadPoolExecutor(max_workers=8) as pool:
    # map сохраняет порядок входа: результаты придут в том же порядке, что urls
    for url, html in zip(urls, pool.map(fetch, urls)):
        save(url, html)
# выход из with дожидается завершения всех задач и закрывает пул

Здесь восемь воркеров обслужат хоть тысячу адресов: одновременно качается максимум восемь, остальные ждут. Если бы вы делали поток на адрес, на тысяче URL поднялась бы тысяча потоков. Метод map удобен тем, что сохраняет порядок: результат для i-го входа окажется на i-м месте, как бы ни планировались потоки.

Как это работает под капотом

При создании ThreadPoolExecutor заводит внутреннюю потокобезопасную очередь и (лениво) до max_workers рабочих потоков. Каждый воркер крутит вечный цикл: взять задачу из очереди → выполнить → положить результат в её Future → повторить. Когда задач нет, воркеры спят на пустой очереди, не нагружая процессор. submit кладёт в очередь упаковку «функция + аргументы + Future» и немедленно возвращает этот Future — объект-обещание результата (про него отдельный урок).

Важно понимать ограничение именно потокового пула в CPython: из-за GIL в любой момент Python-байткод исполняет лишь один поток. Поэтому потоковый пул ускоряет I/O-задачи (сеть, диск, ожидание) — пока один поток ждёт ответа, GIL отпущен и работают другие. А вот для тяжёлых вычислений (перемножение матриц, сжатие) потоки не дадут ускорения — там берут ProcessPoolExecutor, который поднимает отдельные процессы со своим интерпретатором.

ПодходКогда уместен
Поток на задачузадач единицы, они долгие и редкие
ThreadPoolExecutorмного I/O-задач (сеть, диск, ожидание ответа)
ProcessPoolExecutorтяжёлые CPU-вычисления, упирающиеся в GIL

Частые ошибки

  • Слишком большой пул. «Поставлю 500 воркеров — будет быстрее» — нет: лишние потоки только грузят планировщик и память. Для I/O ориентируются на десятки, для CPU — на число ядер.
  • Потоковый пул для счётных задач. Из-за GIL потоки не ускорят перемалывание чисел. Нужен ProcessPoolExecutor или векторные библиотеки.
  • Не закрыли пул. Без with или shutdown() программа может не завершиться, удерживая живые потоки. Контекстный менеджер дожидается задач и аккуратно гасит пул.
  • Проглоченное исключение. Если задача упала, ошибка «лежит» в её Future и всплывёт только при чтении результата. Молча потерять её легко — всегда забирайте результат или проверяйте future.exception().

Итоги

  • Пул создаёт потоки один раз и переиспользует их, убирая дорогую возню «создал-убил» на каждую задачу.
  • Число воркеров ограничивает параллелизм, делая нагрузку предсказуемой под всплесками.
  • ThreadPoolExecutor(max_workers=N) — готовый пул; map сохраняет порядок результатов.
  • Потоковый пул ускоряет I/O; для CPU-задач из-за GIL берут пул процессов.
Проверьте себя
1. Почему пул потоков выгоднее, чем создавать новый поток на каждую задачу?
AПул отключает GIL
BПотоки создаются один раз и переиспользуются, а их число ограничивает параллелизм — нет дорогой возни «создал-убил» и неконтролируемого роста
CПул всегда быстрее ровно в число воркеров раз
DПоток на задачу вообще не работает в Python
2. Для каких задач потоковый ThreadPoolExecutor в CPython реально ускоряет работу?
AДля тяжёлых вычислений вроде перемножения матриц
BДля I/O-задач (сеть, диск, ожидание ответа), потому что во время ожидания GIL отпущен
CДля любых задач одинаково
DНи для каких, потоки в Python бесполезны
3. Что гарантирует метод pool.map по сравнению с as_completed?
AЧто задачи выполнятся строго по очереди в один поток
BЧто результаты вернутся в порядке входных аргументов, независимо от того, как планировались потоки
CЧто исключения никогда не возникнут
DЧто пул не будет переиспользовать воркеры