Пул потоков в Python

В этой статье вы узнаете, как использовать ThreadPoolExecutor для разработки многопоточных программ.

В статье о многопоточности вы уже узнали, как управлять несколькими потоками в программе с помощью класса Thread модуля threading.

Класс Thread полезен, когда вы хотите создавать потоки вручную. Однако ручное управление потоками неэффективно, поскольку частое создание и уничтожение множества потоков требует больших вычислительных затрат.

Вместо ручного управления можно повторно использовать потоки, если предполагается, что в программе будет выполняться много специальных задач. Пул потоков позволяет этого добиться.

Пул потоков

Пул потоков (Thread Pool) — это шаблон для автоматического эффективного управления потоками в программе. 

Каждый поток в пуле называется рабочим потоком или просто рабочим (worker). Пул потоков позволяет повторно использовать рабочие потоки после выполнения задач. Он также защищает от неожиданных сбоев — например, исключений.

Обычно пул потоков позволяет настраивать количество рабочих потоков и предоставляет определенное соглашение об именовании для каждого рабочего потока.

Для создания пулов потоков используется класс ThreadPoolExecutor из модуля concurrent.futures.

Класс ThreadPoolExecutor

Класс ThreadPoolExecutor расширяет класс Executor и возвращает объект Future.

Класс Executor

У класса Executor есть три метода для управления пулом потоков:

  • submit() — отправляет функцию на выполнение и возвращает объект Future. Метод submit() принимает функцию и выполняет ее асинхронно.
  • map() — асинхронно выполняет функции для каждого элемента итерабельной таблицы.
  • shutdown() — завершает работу исполнителя.

Когда вы создаете новый экземпляр класса ThreadPoolExecutor, Python запускает исполнителя.

После завершения работы с исполнителем вы должны явно вызвать метод shutdown(), чтобы освободить ресурс, удерживаемый исполнителем. Чтобы явно не вызывать метод shutdown(), можно воспользоваться менеджером контекста.

Объект Future

Future — это объект, который представляет собой конечный результат асинхронной операции. У класса Future есть два полезных метода:

  • result() — возвращает результат асинхронной операции.
  • exception() — возвращает исключение асинхронной операции в случае возникновения исключения.

Пример ThreadPoolExecutor

Следующая программа использует один поток:

from time import sleep, perf_counter

def task(id):
    print(f'Начинаем задачу {id}...')
    sleep(1)
    return f'Закончили задачу {id}'

start = perf_counter()

print(task(1))
print(task(2))

finish = perf_counter()

print(f"Выполнение заняло {finish-start} секунд.")

Вывод

Начинаем задачу 1...
Закончили задачу 1
Начинаем задачу 2...
Закончили задачу 2
Работа заняла 2.0144479 секунд.

Как это работает

1. Создаем функцию task(), которой требуется около одной секунды для завершения работы. Функция task() вызывает функцию sleep() для имитации задержки:

def task(id):
    print(f'Начинаем задачу {id}...')
    sleep(1)
    return f'Закончили задачу {id}'

2. Дважды вызываем функцию task() и выводим результат. До и после вызова функции task() мы используем функцию perf_counter() для измерения времени начала и окончания работы:

start = perf_counter()

print(task(1))
print(task(2))

finish = perf_counter()

3. Выводим количество времени, которое заняло выполнение двух функций task().

print(f"Выполнение заняло {finish-start} секунд.")

Поскольку выполнение функции task() занимает одну секунду, ее вызов дважды займет около 2 секунд.

Используем метод submit()

Для одновременного выполнения функции task() можно использовать класс ThreadPoolExecutor:

from time import sleep, perf_counter
from concurrent.futures import ThreadPoolExecutor

def task(id):
    print(f'Начинаем задачу {id}...')
    sleep(1)
    return f'Закончили задачу {id}'

start = perf_counter()

with ThreadPoolExecutor() as executor:
    f1 = executor.submit(task, 1)
    f2 = executor.submit(task, 2)

    print(f1.result())
    print(f2.result())    

finish = perf_counter()

print(f"Выполнение заняло {finish-start} секунд.")

Вывод

Начинаем задачу 1...
Начинаем задачу 2...
Закончили задачу 1
Закончили задачу 2
Выполнение заняло 1.0177214 секунд.

Вывод показывает, что программе потребовалась примерно 1 секунда для завершения.

Как это работает

Сфокусируемся на пуле потоков.

1. Импортируем класс ThreadPoolExecutor из модуля concurrent.futures:

from concurrent.futures import ThreadPoolExecutor

2. Создаем пул потоков с помощью ThreadPoolExecutor, используя контекстный менеджер:

with ThreadPoolExecutor() as executor:

3. Дважды вызываем функцию task(), передавая ее в метод submit() исполнителя:

with ThreadPoolExecutor() as executor:
    f1 = executor.submit(task, 1)
    f2 = executor.submit(task, 2)

    print(f1.result())
    print(f2.result()) 

Метод submit() возвращает объект Future. В этом примере у нас есть два объекта Future — f1 и f2. Чтобы получить результат от объекта Future, мы вызвали его метод result().

Используем метод map()

В следующей программе используется класс ThreadPoolExecutor. Однако в ней вместо метода submit() мы используем метод map() для выполнения функции:

from time import sleep, perf_counter
from concurrent.futures import ThreadPoolExecutor


def task(id):
    print(f'Начинаем задачу {id}...')
    sleep(1)
    return f'Завершили задачу {id}'

start = perf_counter()

with ThreadPoolExecutor() as executor:
    results = executor.map(task, [1,2])
    for result in results:
        print(result)

finish = perf_counter()

print(f"Выполнение заняло {finish-start} секунд.")

Как это работает

1. Вызываем метод map() объекта-исполнителя, чтобы выполнить функцию задачи для каждого идентификатора в списке [1,2]. Метод map() возвращает итератор, содержащий результат вызова функции.

results = executor.map(task, [1,2])

2. Перебираем результаты и выводим их на экран:

for result in results:
    print(result)

Используем ThreadPoolExecutor на практике

Давайте отработаем пул потоков. Для этого напишем программу, которая загружает несколько изображений из Википедии с помощью пула потоков.

from concurrent.futures import ThreadPoolExecutor
from urllib.request import urlopen
import time
import os

def download_image(url):
    image_data = None
    with urlopen(url) as f:
        image_data = f.read()

    if not image_data:
        raise Exception(f"Ошибка: невозможно скачать изображение {url}")

    filename = os.path.basename(url)
    with open(filename, 'wb') as image_file:
        image_file.write(image_data)
        print(f'{filename} скачан...')

start = time.perf_counter()

urls = ['https://upload.wikimedia.org/wikipedia/commons/9/9d/Python_bivittatus_1701.jpg',
        'https://upload.wikimedia.org/wikipedia/commons/4/48/Python_Regius.jpg',
        'https://upload.wikimedia.org/wikipedia/commons/d/d3/Baby_carpet_python_caudal_luring.jpg',
        'https://upload.wikimedia.org/wikipedia/commons/f/f0/Rock_python_pratik.JPG',
        'https://upload.wikimedia.org/wikipedia/commons/0/07/Dulip_Wilpattu_Python1.jpg']

with ThreadPoolExecutor() as executor:
      executor.map(download_image, urls)

finish = time.perf_counter()    

print(f'Выполнение заняло {finish-start} секунд.')

Как это работает

1. Создаем функцию download_image(), которая загружает изображение с URL и сохраняет его в файл:

def download_image(url):
    image_data = None
    with urlopen(url) as f:
        image_data = f.read()

    if not image_data:
        raise Exception(f"Ошибка: невозможно скачать изображение {url}")

    filename = os.path.basename(url)
    with open(filename, 'wb') as image_file:
        image_file.write(image_data)
        print(f'{filename} скачан...')

Функция download_image() использует функцию urlopen() из модуля urllib.request для загрузки изображения по URL. 

2. Вызываем функцию download_image(), используя пул потоков, вызвав метод map() объекта ThreadPoolExecutor:

with ThreadPoolExecutor() as executor:
      executor.map(download_image, urls)

Что нужно запомнить

  • Пул потоков — это шаблон для эффективного управления несколькими потоками.
  • Для управления пулом потоков в Python используйте класс ThreadPoolExecutor.
  • Чтобы передать задачу в пул потоков для выполнения, вызовите метод submit() класса ThreadPoolExecutor. Метод вернет объект Future.
  • Для выполнения функции в пуле потоков с каждым элементом списка используйте метод map() класса ThreadPoolExecutor .
codechick

СodeСhick.io - простой и эффективный способ изучения программирования.

2024 ©