Пул потоков в Python
В этой статье вы узнаете, как использовать ThreadPoolExecutor для разработки многопоточных программ.
В статье о многопоточности вы уже узнали, как управлять несколькими потоками в программе с помощью класса Thread
модуля threading
.
Класс Thread
полезен, когда вы хотите создавать потоки вручную. Однако ручное управление потоками неэффективно, поскольку частое создание и уничтожение множества потоков требует больших вычислительных затрат.
Вместо ручного управления можно повторно использовать потоки, если предполагается, что в программе будет выполняться много специальных задач. Пул потоков позволяет этого добиться.
Пул потоков
Пул потоков (Thread Pool) — это шаблон для автоматического эффективного управления потоками в программе.
Каждый поток в пуле называется рабочим потоком или просто рабочим (worker). Пул потоков позволяет повторно использовать рабочие потоки после выполнения задач. Он также защищает от неожиданных сбоев — например, исключений.
Обычно пул потоков позволяет настраивать количество рабочих потоков и предоставляет определенное соглашение об именовании для каждого рабочего потока.
Для создания пулов потоков используется класс ThreadPoolExecutor
из модуля concurrent.futures.
Класс ThreadPoolExecutor
Класс ThreadPoolExecutor
расширяет класс Executor
и возвращает объект Future
.
Класс Executor
У класса Executor
есть три метода для управления пулом потоков:
submit()
— отправляет функцию на выполнение и возвращает объектFuture
. Методsubmit()
принимает функцию и выполняет ее асинхронно.map()
— асинхронно выполняет функции для каждого элемента итерабельной таблицы.shutdown()
— завершает работу исполнителя.
Когда вы создаете новый экземпляр класса ThreadPoolExecutor
, Python запускает исполнителя.
После завершения работы с исполнителем вы должны явно вызвать метод shutdown()
, чтобы освободить ресурс, удерживаемый исполнителем. Чтобы явно не вызывать метод shutdown()
, можно воспользоваться менеджером контекста.
Объект Future
Future
— это объект, который представляет собой конечный результат асинхронной операции. У класса Future
есть два полезных метода:
result()
— возвращает результат асинхронной операции.exception()
— возвращает исключение асинхронной операции в случае возникновения исключения.
Пример ThreadPoolExecutor
Следующая программа использует один поток:
from time import sleep, perf_counter
def task(id):
print(f'Начинаем задачу {id}...')
sleep(1)
return f'Закончили задачу {id}'
start = perf_counter()
print(task(1))
print(task(2))
finish = perf_counter()
print(f"Выполнение заняло {finish-start} секунд.")
Вывод
Начинаем задачу 1...
Закончили задачу 1
Начинаем задачу 2...
Закончили задачу 2
Работа заняла 2.0144479 секунд.
Как это работает
1. Создаем функцию task()
, которой требуется около одной секунды для завершения работы. Функция task()
вызывает функцию sleep()
для имитации задержки:
def task(id):
print(f'Начинаем задачу {id}...')
sleep(1)
return f'Закончили задачу {id}'
2. Дважды вызываем функцию task()
и выводим результат. До и после вызова функции task()
мы используем функцию perf_counter()
для измерения времени начала и окончания работы:
start = perf_counter()
print(task(1))
print(task(2))
finish = perf_counter()
3. Выводим количество времени, которое заняло выполнение двух функций task().
print(f"Выполнение заняло {finish-start} секунд.")
Поскольку выполнение функции task()
занимает одну секунду, ее вызов дважды займет около 2 секунд.
Используем метод submit()
Для одновременного выполнения функции task()
можно использовать класс ThreadPoolExecutor
:
from time import sleep, perf_counter
from concurrent.futures import ThreadPoolExecutor
def task(id):
print(f'Начинаем задачу {id}...')
sleep(1)
return f'Закончили задачу {id}'
start = perf_counter()
with ThreadPoolExecutor() as executor:
f1 = executor.submit(task, 1)
f2 = executor.submit(task, 2)
print(f1.result())
print(f2.result())
finish = perf_counter()
print(f"Выполнение заняло {finish-start} секунд.")
Вывод
Начинаем задачу 1...
Начинаем задачу 2...
Закончили задачу 1
Закончили задачу 2
Выполнение заняло 1.0177214 секунд.
Вывод показывает, что программе потребовалась примерно 1 секунда для завершения.
Как это работает
Сфокусируемся на пуле потоков.
1. Импортируем класс ThreadPoolExecutor
из модуля concurrent.futures
:
from concurrent.futures import ThreadPoolExecutor
2. Создаем пул потоков с помощью ThreadPoolExecutor
, используя контекстный менеджер:
with ThreadPoolExecutor() as executor:
3. Дважды вызываем функцию task()
, передавая ее в метод submit()
исполнителя:
with ThreadPoolExecutor() as executor:
f1 = executor.submit(task, 1)
f2 = executor.submit(task, 2)
print(f1.result())
print(f2.result())
Метод submit()
возвращает объект Future. В этом примере у нас есть два объекта Future — f1
и f2
. Чтобы получить результат от объекта Future, мы вызвали его метод result()
.
Используем метод map()
В следующей программе используется класс ThreadPoolExecutor
. Однако в ней вместо метода submit()
мы используем метод map()
для выполнения функции:
from time import sleep, perf_counter
from concurrent.futures import ThreadPoolExecutor
def task(id):
print(f'Начинаем задачу {id}...')
sleep(1)
return f'Завершили задачу {id}'
start = perf_counter()
with ThreadPoolExecutor() as executor:
results = executor.map(task, [1,2])
for result in results:
print(result)
finish = perf_counter()
print(f"Выполнение заняло {finish-start} секунд.")
Как это работает
1. Вызываем метод map()
объекта-исполнителя, чтобы выполнить функцию задачи для каждого идентификатора в списке [1,2]. Метод map()
возвращает итератор, содержащий результат вызова функции.
results = executor.map(task, [1,2])
2. Перебираем результаты и выводим их на экран:
for result in results:
print(result)
Используем ThreadPoolExecutor на практике
Давайте отработаем пул потоков. Для этого напишем программу, которая загружает несколько изображений из Википедии с помощью пула потоков.
from concurrent.futures import ThreadPoolExecutor
from urllib.request import urlopen
import time
import os
def download_image(url):
image_data = None
with urlopen(url) as f:
image_data = f.read()
if not image_data:
raise Exception(f"Ошибка: невозможно скачать изображение {url}")
filename = os.path.basename(url)
with open(filename, 'wb') as image_file:
image_file.write(image_data)
print(f'{filename} скачан...')
start = time.perf_counter()
urls = ['https://upload.wikimedia.org/wikipedia/commons/9/9d/Python_bivittatus_1701.jpg',
'https://upload.wikimedia.org/wikipedia/commons/4/48/Python_Regius.jpg',
'https://upload.wikimedia.org/wikipedia/commons/d/d3/Baby_carpet_python_caudal_luring.jpg',
'https://upload.wikimedia.org/wikipedia/commons/f/f0/Rock_python_pratik.JPG',
'https://upload.wikimedia.org/wikipedia/commons/0/07/Dulip_Wilpattu_Python1.jpg']
with ThreadPoolExecutor() as executor:
executor.map(download_image, urls)
finish = time.perf_counter()
print(f'Выполнение заняло {finish-start} секунд.')
Как это работает
1. Создаем функцию download_image()
, которая загружает изображение с URL и сохраняет его в файл:
def download_image(url):
image_data = None
with urlopen(url) as f:
image_data = f.read()
if not image_data:
raise Exception(f"Ошибка: невозможно скачать изображение {url}")
filename = os.path.basename(url)
with open(filename, 'wb') as image_file:
image_file.write(image_data)
print(f'{filename} скачан...')
Функция download_image()
использует функцию urlopen()
из модуля urllib.request
для загрузки изображения по URL.
2. Вызываем функцию download_image()
, используя пул потоков, вызвав метод map()
объекта ThreadPoolExecutor
:
with ThreadPoolExecutor() as executor:
executor.map(download_image, urls)
Что нужно запомнить
- Пул потоков — это шаблон для эффективного управления несколькими потоками.
- Для управления пулом потоков в Python используйте класс
ThreadPoolExecutor
. - Чтобы передать задачу в пул потоков для выполнения, вызовите метод
submit()
классаThreadPoolExecutor
. Метод вернет объект Future. - Для выполнения функции в пуле потоков с каждым элементом списка используйте метод
map()
классаThreadPoolExecutor
.