Паттерн producer-consumer и очередь задач
Самый востребованный паттерн конкурентности: одни производят работу, другие её разбирают.
Producer-consumer — паттерн, где потоки-производители кладут элементы в общую очередь, а потоки-потребители забирают и обрабатывают их, что развязывает скорости производства и потребления.
Очередь между ними играет роль буфера. Производители и потребители не знают друг о друге и работают в своём темпе: если производители временно быстрее, задачи копятся в очереди; если медленнее — потребители ждут. Это классическая «развязка» компонентов.
Структура паттерна
[Producer 1] --\
[Producer 2] ---> [ Очередь задач ] ---> [Consumer 1]
[Producer 3] --/ \--> [Consumer 2]
producer.put(task) consumer.get() -> обработкаРеализация на queue.Queue
Потокобезопасная очередь делает всю синхронизацию за нас: get() блокируется на пустой очереди, put() кладёт элемент, а сигнал завершения — «отравленная пилюля» (None) — говорит потребителю остановиться.
import threading, queue
q = queue.Queue()
def producer():
for i in range(5):
q.put(i) # положили задачу
q.put(None) # сигнал «работа кончилась»
def consumer():
while True:
item = q.get()
if item is None:
break
print("обработал", item)
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()Моделируем развязку скоростей
import collections
q = collections.deque()
produced = [10, 20, 30]
# producer кладёт всё разом (он быстрее)
for x in produced:
q.append(x)
print("в очереди:", list(q))
# consumer разбирает в своём темпе
while q:
item = q.popleft()
print("обработал:", item * 2)Вывод:
в очереди: [10, 20, 30] обработал: 20 обработал: 40 обработал: 60
Зачем ограничивать размер очереди
Если производители всегда быстрее потребителей, неограниченная очередь будет расти, пока не кончится память. Поэтому в проде используют ограниченную очередь (queue.Queue(maxsize=N)): когда она полна, put() блокируется, заставляя производителей притормозить. Это встроенный механизм обратного давления (backpressure).
Как работает под капотом
Внутри queue.Queue — список плюс блокировка и условные переменные. get() на пустой очереди вызывает wait() и усыпляет поток; put() добавляет элемент и через notify() будит ожидающего потребителя. Симметрично для полной ограниченной очереди. Поэтому вам не нужно писать ни одной блокировки руками — вся корректная синхронизация уже внутри. Метод task_done() и join() позволяют дождаться обработки всех задач.
Частые ошибки
- Неограниченная очередь под быстрый producer. Память кончится — ставьте
maxsize. - Нет сигнала завершения. Потребители в
while Trueзависнут навсегда; шлите «отравленную пилюлю» каждому. - Делить очередь руками через список и блокировки. Велик риск ошибки; берите
queue.Queue.
Итог
- Producer-consumer развязывает скорости через буферную очередь.
queue.Queueберёт всю синхронизацию на себя.- Сигнал завершения («отравленная пилюля») корректно останавливает потребителей.
- Ограничение размера очереди даёт обратное давление и бережёт память.