Производитель-потребитель
Как развязать «быстрых» и «медленных» через очередь, чтобы они не ждали друг друга и не теряли данные.
Производитель-потребитель — паттерн, в котором одни задачи (производители) кладут работу в общую очередь, а другие (потребители) её разбирают; очередь развязывает стороны по скорости и берёт на себя синхронизацию.
Зачем это нужно на практике
Почти любая реальная система состоит из частей, которые работают с разной скоростью. Веб-сервер мгновенно принимает HTTP-запрос на отправку письма, но само письмо уходит через SMTP секунду-две. Парсер выкачивает страницы пачками, а запись в базу идёт по одной. Если соединить такие части напрямую — быстрый будет простаивать, ожидая медленного, а под нагрузкой задачи начнут теряться. Паттерн «производитель-потребитель» вставляет между ними буфер-очередь: быстрый кладёт задачу и сразу освобождается, медленный берёт из очереди в своём темпе.
Классические примеры: обработка загруженных файлов, рассылка уведомлений, конвейер «скачать → распарсить → сохранить», пул сетевых соединений, логирование (приложение пишет строку в очередь, отдельный поток сбрасывает её на диск). Везде одна идея — развязать приём работы и её выполнение.
Очередь как точка синхронизации
Главная сложность многопоточности — общие данные, которые правят несколько потоков одновременно. Если бы производитель и потребитель толкались вокруг обычного list, мы бы ловили гонки: один дописывает, другой в этот момент читает длину, и всё рассыпается. Поэтому берут потокобезопасную очередь. В Python это queue.Queue: все её операции (put, get) защищены замком внутри, поэтому несколько потоков могут пользоваться ей одновременно безопасно.
Очередь решает сразу три задачи: хранит ещё не обработанные элементы, синхронизирует доступ и реализует обратное давление (backpressure). Если задать maxsize, очередь становится ограниченной: когда буфер заполнен, put блокирует производителя, пока потребитель не разберёт хоть что-то. Так быстрый автоматически притормаживает под медленного, и память не растёт бесконечно.
Ниже — суть паттерна без потоков, по шагам: производитель кладёт заказ, потребитель тут же его забирает. Так виден сам контракт очереди и стабильный результат.
import queue
q = queue.Queue(maxsize=3) # ограниченный буфер = обратное давление
orders = ["pizza", "sushi", "burger", "salad", "soup"]
served = []
# производитель кладёт заказ, потребитель сразу забирает — шаг за шагом
for order in orders:
q.put(order) # производитель
dish = q.get() # потребитель
served.append(dish)
print("Подано блюд:", len(served))
print("Порядок подачи:", served)
print("Очередь пуста:", q.empty())
Вывод:
Подано блюд: 5 Порядок подачи: ['pizza', 'sushi', 'burger', 'salad', 'soup'] Очередь пуста: True
Балансировка скоростей
Самое интересное начинается, когда стороны работают с разной скоростью. Смоделируем это детерминированно: за один «такт» производитель пытается положить две задачи, а потребитель забирает одну. Буфер ограничен четырьмя ячейками — и хорошо видно, как он наполняется, упирается в потолок, а потом плавно опустошается, когда производитель закончил.
import queue
buf = queue.Queue(maxsize=4)
produced = 0
consumed = 0
to_produce = 8
log = []
tick = 0
while consumed < to_produce:
tick += 1
# производитель кладёт до 2 задач, но только если есть место
for _ in range(2):
if produced < to_produce and not buf.full():
buf.put(produced)
produced += 1
# потребитель забирает одну, если есть
if not buf.empty():
buf.get()
consumed += 1
log.append((tick, produced, consumed, buf.qsize()))
print("такт | произведено | потреблено | в буфере")
for t, p, c, s in log:
print(f" {t:>2} | {p:>2} | {c:>2} | {s}")
Вывод:
такт | произведено | потреблено | в буфере 1 | 2 | 1 | 1 2 | 4 | 2 | 2 3 | 6 | 3 | 3 4 | 7 | 4 | 3 5 | 8 | 5 | 3 6 | 8 | 6 | 2 7 | 8 | 7 | 1 8 | 8 | 8 | 0
Буфер растёт, пока производитель опережает потребителя, но maxsize не даёт ему раздуться: на такте 4 место кончилось, и producer положил лишь одну задачу вместо двух. Это и есть обратное давление в действии.
Как это работает под капотом
В настоящем коде производители и потребители — это потоки. queue.Queue внутри держит обычный deque, замок (Lock) и два условных уведомления (Condition): «не пусто» и «не полно». Когда потребитель зовёт get на пустой очереди, он не крутит цикл впустую, а засыпает на условии «не пусто»; как только производитель сделает put, очередь будит одного из спящих. Симметрично работает put на полной очереди. Поэтому ожидание почти не тратит процессор.
Завершение оформляют через «ядовитую пилюлю» (poison pill): когда работа кончилась, в очередь кладут особый маркер (например None) — по одному на каждого потребителя. Потребитель, увидев маркер, выходит из цикла. Так не нужно насильно убивать потоки. Вот как выглядит реальный многопоточный вариант (его не запускаем — это код для чтения):
import threading, queue
q = queue.Queue(maxsize=10)
def producer():
for i in range(100):
q.put(i) # заблокируется, если буфер полон
q.put(None) # ядовитая пилюля для потребителя
def consumer():
while True:
item = q.get()
if item is None: # сигнал «работа кончилась»
q.task_done()
break
handle(item)
q.task_done() # отметить, что элемент обработан
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
q.join() # ждать, пока всё обработано
Метод task_done вместе с q.join() позволяет дождаться, когда все положенные элементы реально обработаны, а не просто вынуты.
Частые ошибки
- Неограниченная очередь под нагрузкой. Без
maxsizeбыстрый производитель забьёт всю память, если потребитель отстаёт. Для конвейеров почти всегда нужен предел. - Потребители не получают сигнал остановки. Если забыть про ядовитые пилюли, потоки-потребители повиснут навечно на
get. Маркеров должно быть ровно столько, сколько потребителей. - Самодельная очередь на списке. Обычный
listне потокобезопасен;append/popиз разных потоков ведут к гонкам и потере данных. Беритеqueue.Queue. - Тяжёлая работа под общим замком. Очередь синхронизирует только саму себя; всю настоящую обработку делайте после
get, вне любых общих блокировок, иначе потеряете весь параллелизм.
Итоги
- Паттерн развязывает стороны с разной скоростью через общий буфер-очередь.
- Потокобезопасная очередь (
queue.Queue) берёт на себя синхронизацию и засыпание потоков. maxsizeдаёт обратное давление: производитель притормаживает, память под контролем.- Завершение — через ядовитые пилюли по числу потребителей, а не через убийство потоков.