Как запустить одну функцию в несколько потоков через ThreadPoolExecutor?
Качаю 50 страниц по сети, по очереди это жутко медленно. Слышал, что можно распараллелить через потоки и concurrent.futures, но не понимаю, как скормить одну функцию в пул с разными аргументами и собрать результаты. И ещё все пугают каким-то GIL — потоки в Python вообще работают или это бессмысленно?
2 ответа
Для этого как раз и есть ThreadPoolExecutor из concurrent.futures — он сам управляет пулом потоков, тебе не надо вручную создавать Thread и звать join.
Самый простой способ — метод .map(): он применяет функцию к каждому элементу и возвращает результаты по порядку:
from concurrent.futures import ThreadPoolExecutor
import time
def download(url):
time.sleep(1) # имитация сетевого запроса
return f"готово: {url}"
urls = [f"site{i}.com" for i in range(50)]
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(download, urls))
print(results[:3])
max_workers=10 — сколько потоков работают одновременно. 50 задач прогонятся пачками по 10. Вместо ~50 секунд получишь ~5.
Если нужно обрабатывать результаты по мере готовности (а не по порядку) или передавать разные аргументы — используй submit + as_completed:
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(download, u): u for u in urls}
for future in as_completed(futures):
print(future.result()) # .result() поднимет исключение, если оно было
Теперь про GIL (Global Interpreter Lock). Это блокировка, из-за которой в любой момент Python-байткод исполняет только один поток. Звучит так, будто потоки бесполезны, но есть нюанс: при операциях ввода-вывода (сеть, диск) поток отпускает GIL во время ожидания. Поэтому:
- I/O-bound задачи (запросы, файлы, БД) — потоки отлично ускоряют, потоки ждут параллельно. Твой случай ровно такой.
- CPU-bound (тяжёлые вычисления) — потоки НЕ помогут из-за GIL, бери
ProcessPoolExecutor(отдельные процессы).
Главные грабли — общее изменяемое состояние. Если несколько потоков пишут в один список/счётчик, будут гонки. Либо защищай threading.Lock, либо (лучше) пусть каждая задача просто возвращает результат, а ты собираешь их снаружи, как в примерах выше.
Дополню про ту самую гонку, чтобы было нагляднее, почему «просто += в потоках» — это баг:
from concurrent.futures import ThreadPoolExecutor
import threading
counter = 0
lock = threading.Lock()
def inc():
global counter
with lock: # без lock результат будет случайным < 100000
counter += 1
with ThreadPoolExecutor(max_workers=8) as ex:
for _ in range(100000):
ex.submit(inc)
print(counter) # 100000 — стабильно, благодаря lock
counter += 1 не атомарна (чтение → +1 → запись), потоки затирают друг друга. Но вообще лучший паттерн — вообще не шарить состояние: пусть функция возвращает значение, а агрегацию делай в основном потоке. Меньше блокировок — меньше боли.