Паттерн producer-consumer и очередь задач

Самый востребованный паттерн конкурентности: одни производят работу, другие её разбирают.

Producer-consumer — паттерн, где потоки-производители кладут элементы в общую очередь, а потоки-потребители забирают и обрабатывают их, что развязывает скорости производства и потребления.

Очередь между ними играет роль буфера. Производители и потребители не знают друг о друге и работают в своём темпе: если производители временно быстрее, задачи копятся в очереди; если медленнее — потребители ждут. Это классическая «развязка» компонентов.

Структура паттерна

[Producer 1] --\
[Producer 2] ---> [ Очередь задач ] ---> [Consumer 1]
[Producer 3] --/                      \--> [Consumer 2]

producer.put(task)        consumer.get() -> обработка

Реализация на queue.Queue

Потокобезопасная очередь делает всю синхронизацию за нас: get() блокируется на пустой очереди, put() кладёт элемент, а сигнал завершения — «отравленная пилюля» (None) — говорит потребителю остановиться.

import threading, queue

q = queue.Queue()

def producer():
    for i in range(5):
        q.put(i)          # положили задачу
    q.put(None)           # сигнал «работа кончилась»

def consumer():
    while True:
        item = q.get()
        if item is None:
            break
        print("обработал", item)

threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()

Моделируем развязку скоростей

import collections

q = collections.deque()
produced = [10, 20, 30]

# producer кладёт всё разом (он быстрее)
for x in produced:
    q.append(x)
print("в очереди:", list(q))

# consumer разбирает в своём темпе
while q:
    item = q.popleft()
    print("обработал:", item * 2)

Вывод:

в очереди: [10, 20, 30]
обработал: 20
обработал: 40
обработал: 60

Зачем ограничивать размер очереди

Если производители всегда быстрее потребителей, неограниченная очередь будет расти, пока не кончится память. Поэтому в проде используют ограниченную очередь (queue.Queue(maxsize=N)): когда она полна, put() блокируется, заставляя производителей притормозить. Это встроенный механизм обратного давления (backpressure).

Как работает под капотом

Внутри queue.Queue — список плюс блокировка и условные переменные. get() на пустой очереди вызывает wait() и усыпляет поток; put() добавляет элемент и через notify() будит ожидающего потребителя. Симметрично для полной ограниченной очереди. Поэтому вам не нужно писать ни одной блокировки руками — вся корректная синхронизация уже внутри. Метод task_done() и join() позволяют дождаться обработки всех задач.

Частые ошибки

  • Неограниченная очередь под быстрый producer. Память кончится — ставьте maxsize.
  • Нет сигнала завершения. Потребители в while True зависнут навсегда; шлите «отравленную пилюлю» каждому.
  • Делить очередь руками через список и блокировки. Велик риск ошибки; берите queue.Queue.

Итог

  • Producer-consumer развязывает скорости через буферную очередь.
  • queue.Queue берёт всю синхронизацию на себя.
  • Сигнал завершения («отравленная пилюля») корректно останавливает потребителей.
  • Ограничение размера очереди даёт обратное давление и бережёт память.
Проверьте себя
1. Какую роль играет очередь в паттерне producer-consumer?
AУскоряет процессор
BСлужит буфером, развязывая скорости производителей и потребителей
CЗаменяет потоки процессами
DОтключает синхронизацию
2. Зачем ограничивать размер очереди (maxsize) в producer-consumer?
AЧтобы запретить многопоточность
BЧтобы при переполнении put() притормаживал производителей (backpressure) и не кончилась память
CЧтобы ускорить потребителей
DЭто не имеет смысла
3. Как корректно остановить потребителя в цикле while True?
AНикак, он работает вечно
BПоложить в очередь сигнал завершения («отравленную пилюлю», например None) для каждого потребителя
CУдалить очередь
DПерезагрузить компьютер