← Все вопросы

Как запустить одну функцию в несколько потоков через ThreadPoolExecutor?

Задан 16 часов назад319 просмотров2 ответа
1

Качаю 50 страниц по сети, по очереди это жутко медленно. Слышал, что можно распараллелить через потоки и concurrent.futures, но не понимаю, как скормить одну функцию в пул с разными аргументами и собрать результаты. И ещё все пугают каким-то GIL — потоки в Python вообще работают или это бессмысленно?

2 ответа

8
✓ Принятый ответ — помог автору

Для этого как раз и есть ThreadPoolExecutor из concurrent.futures — он сам управляет пулом потоков, тебе не надо вручную создавать Thread и звать join.

Самый простой способ — метод .map(): он применяет функцию к каждому элементу и возвращает результаты по порядку:

from concurrent.futures import ThreadPoolExecutor
import time

def download(url):
    time.sleep(1)          # имитация сетевого запроса
    return f"готово: {url}"

urls = [f"site{i}.com" for i in range(50)]

with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(download, urls))

print(results[:3])

max_workers=10 — сколько потоков работают одновременно. 50 задач прогонятся пачками по 10. Вместо ~50 секунд получишь ~5.

Если нужно обрабатывать результаты по мере готовности (а не по порядку) или передавать разные аргументы — используй submit + as_completed:

from concurrent.futures import ThreadPoolExecutor, as_completed

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = {executor.submit(download, u): u for u in urls}
    for future in as_completed(futures):
        print(future.result())   # .result() поднимет исключение, если оно было

Теперь про GIL (Global Interpreter Lock). Это блокировка, из-за которой в любой момент Python-байткод исполняет только один поток. Звучит так, будто потоки бесполезны, но есть нюанс: при операциях ввода-вывода (сеть, диск) поток отпускает GIL во время ожидания. Поэтому:

  • I/O-bound задачи (запросы, файлы, БД) — потоки отлично ускоряют, потоки ждут параллельно. Твой случай ровно такой.
  • CPU-bound (тяжёлые вычисления) — потоки НЕ помогут из-за GIL, бери ProcessPoolExecutor (отдельные процессы).

Главные грабли — общее изменяемое состояние. Если несколько потоков пишут в один список/счётчик, будут гонки. Либо защищай threading.Lock, либо (лучше) пусть каждая задача просто возвращает результат, а ты собираешь их снаружи, как в примерах выше.

3

Дополню про ту самую гонку, чтобы было нагляднее, почему «просто += в потоках» — это баг:

from concurrent.futures import ThreadPoolExecutor
import threading

counter = 0
lock = threading.Lock()

def inc():
    global counter
    with lock:          # без lock результат будет случайным < 100000
        counter += 1

with ThreadPoolExecutor(max_workers=8) as ex:
    for _ in range(100000):
        ex.submit(inc)

print(counter)   # 100000 — стабильно, благодаря lock

counter += 1 не атомарна (чтение → +1 → запись), потоки затирают друг друга. Но вообще лучший паттерн — вообще не шарить состояние: пусть функция возвращает значение, а агрегацию делай в основном потоке. Меньше блокировок — меньше боли.

Ваш ответ

Войдите, чтобы ответить на вопрос.
Поддержать проект