Потокобезопасная очередь в Python

В этой статье вы узнаете, как использовать синхронизированную очередь для безопасного обмена данными между несколькими потоками.

Встроенный модуль queue позволяет безопасно обмениваться данными между несколькими потоками. Класс Queue из модуля queue реализует всю необходимую семантику блокировки.

Создаем новую очередь

Чтобы создать новую очередь, нужно использовать конструктор Queue следующим образом:

from queue import Queue


queue = Queue()

Чтобы создать очередь с ограничением по размеру, можно использовать параметр maxsize. Например, ниже создается очередь, которая может хранить до 10 элементов:

from queue import Queue


queue = Queue(maxsize=10)

Добавляем элемент в очередь

Чтобы добавить элемент в очередь, нужно использовать метод put() следующим образом:

# ...
queue.put(item)

Как только очередь заполнится, вы не сможете добавить в нее элемент. Вызов метода put() будет блокироваться до тех пор, пока в очереди не освободится место.

Если вы не хотите, чтобы метод put() блокировался, если очередь переполнена, вы можете передать аргумент block=False:

queue.put(item, block=False)

В примере ниже метод put() вызовет исключение queue.Full, если очередь переполнена:

try:
   queue.put(item, block=False)
except queue.Full as e:
   # обработка исключения

Чтобы добавить элемент в ограниченную по размеру очередь и заблокировать его по таймауту, вы можете использовать параметр таймаута следующим образом:

try:
   queue.put(item, timeout=3)
except queue.Full as e:
   # обработка исключения

Получаем элемент из очереди

Чтобы получить элемент из очереди, вы можете использовать метод get():

item = queue.get()

Метод get() будет блокироваться до тех пор, пока элемент не будет доступен для получения из очереди.

Чтобы получить элемент из очереди без блокировки, можно передать аргумент block=False:

try:
   queue.get(block=False)
except queue.Empty:
   # обработка исключения

Чтобы получить элемент из очереди и блокировать его с ограничением по времени, можно использовать метод get() с таймаутом:

try:
   item = queue.get(timeout=10)
except queue.Empty:
   # ...

Получаем размер очереди

Метод qsize() возвращает количество элементов в очереди:

size = queue.size()

Кроме того, метод empty() возвращает True, если очередь пуста, или False в противном случае. С другой стороны, метод full() возвращает True, если очередь заполнена, или False в противном случае.

Помечаем задачу как выполненную

Элемент, который вы добавляете в очередь, представляет собой единицу работы или задачу.

Когда поток вызывает метод get() для получения элемента из очереди, ему может потребоваться обработать его, прежде чем задача будет считаться выполненной.

После завершения поток может вызвать метод task_done(), чтобы указать, что он полностью обработал задачу:

item = queue.get()

# обработка элемента
# ...

# помечаем элемент как выполненный
queue.task_done()

Ждем завершение всех задач в очереди

Чтобы дождаться завершения всех задач в очереди, можно вызвать метод join() на объекте очереди:

queue.join()

Пример потокобезопасной очереди

Следующий пример демонстрирует использование потокобезопасной очереди для обмена данными между двумя потоками:

import time
from queue import Empty, Queue
from threading import Thread


def producer(queue):
    for i in range(1, 6):
        print(f'Вставляем элемент {i} в очередь')
        time.sleep(1)
        queue.put(i)


def consumer(queue):
    while True:
        try:
            item = queue.get()
        except Empty:
            continue
        else:
            print(f'Обрабатываем элемент {item}')
            time.sleep(2)
            queue.task_done()


def main():
    queue = Queue()

    # создаем поток-производитель и запускаем его
    producer_thread = Thread(
        target=producer,
        args=(queue,)
    )
    producer_thread.start()

    # создаем поток-потребитель и запускаем его
    consumer_thread = Thread(
        target=consumer,
        args=(queue,),
        daemon=True
    )
    consumer_thread.start()

    # дожидаемся, пока все задачи добавятся в очередь
    producer_thread.join()

    # дожидаемся, пока все задачи в очереди будут завершены
    queue.join()


if __name__ == '__main__':
    main()

Как это работает

1. Сначала мы создаем функцию producer(), которая добавляет в очередь числа от 1 до 11. На каждой итерации она задерживается на одну секунду:

def producer(queue):
    for i in range(1, 6):
        print(f'Вставляем элемент {i} в очередь')
        time.sleep(1)
        queue.put(i)

2. Создаем функцию consumer(), которая получает элемент из очереди и обрабатывает его. Она задерживается на две секунды после обработки каждого элемента в очереди:

def consumer(queue):
    while True:
        try:
            item = queue.get()
        except Empty:
            continue
        else:
            print(f'Обрабатываем элемент {item}')
            time.sleep(2)
            queue.task_done()

queue.task_done() указывает на то, что функция обработала элемент очереди.

3. Создаем функцию main(), которая создает два потока. Один поток добавляет номер в очередь каждую секунду, а другой обрабатывает элемент в очереди каждые две секунды:

def main():
    queue = Queue()

    # создаем поток-производитель и запускаем его
    producer_thread = Thread(
        target=producer,
        args=(queue,)
    )
    producer_thread.start()

    # создаем поток-потребитель и запускаем его
    consumer_thread = Thread(
        target=consumer,
        args=(queue,),
        daemon=True
    )
    consumer_thread.start()

    # дожидаемся, пока все задачи добавятся в очередь
    producer_thread.join()

    # дожидаемся, пока все задачи в очереди будут завершены
    queue.join()
  • Создаем новую очередь, вызвав конструктор Queue().
  • Создаем новый поток с именем producer_thread и немедленно запускаем его.
  • Создаем поток-демон consumer_thread и немедленно запускаем его.
  • Ждем добавления всех номеров в очередь с помощью метода join().
  • Ждем завершения всех задач в очереди с помощью метода join().

Поток-производитель добавляет число в очередь каждую секунду, а поток-потребитель обрабатывает число из очереди каждые две секунды. Он также отображает числа в очереди каждую секунду.

Вывод

Вставляем 1 элемент в очередь
Вставляем 2 элемент в очередь
Обрабатываем элемент 1
Вставляем 3 элемент в очередь
Обрабатываем элемент 2
Вставляем 4 элемент в очередь
Вставляем 5 элемент в очередь
Обрабатываем элемент 3
Обрабатываем элемент 4
Обрабатываем элемент 5
codechick

СodeСhick.io - простой и эффективный способ изучения программирования.

2024 ©