Производитель-потребитель

Как развязать «быстрых» и «медленных» через очередь, чтобы они не ждали друг друга и не теряли данные.

Производитель-потребитель — паттерн, в котором одни задачи (производители) кладут работу в общую очередь, а другие (потребители) её разбирают; очередь развязывает стороны по скорости и берёт на себя синхронизацию.

Зачем это нужно на практике

Почти любая реальная система состоит из частей, которые работают с разной скоростью. Веб-сервер мгновенно принимает HTTP-запрос на отправку письма, но само письмо уходит через SMTP секунду-две. Парсер выкачивает страницы пачками, а запись в базу идёт по одной. Если соединить такие части напрямую — быстрый будет простаивать, ожидая медленного, а под нагрузкой задачи начнут теряться. Паттерн «производитель-потребитель» вставляет между ними буфер-очередь: быстрый кладёт задачу и сразу освобождается, медленный берёт из очереди в своём темпе.

Классические примеры: обработка загруженных файлов, рассылка уведомлений, конвейер «скачать → распарсить → сохранить», пул сетевых соединений, логирование (приложение пишет строку в очередь, отдельный поток сбрасывает её на диск). Везде одна идея — развязать приём работы и её выполнение.

Очередь как точка синхронизации

Главная сложность многопоточности — общие данные, которые правят несколько потоков одновременно. Если бы производитель и потребитель толкались вокруг обычного list, мы бы ловили гонки: один дописывает, другой в этот момент читает длину, и всё рассыпается. Поэтому берут потокобезопасную очередь. В Python это queue.Queue: все её операции (put, get) защищены замком внутри, поэтому несколько потоков могут пользоваться ей одновременно безопасно.

Очередь решает сразу три задачи: хранит ещё не обработанные элементы, синхронизирует доступ и реализует обратное давление (backpressure). Если задать maxsize, очередь становится ограниченной: когда буфер заполнен, put блокирует производителя, пока потребитель не разберёт хоть что-то. Так быстрый автоматически притормаживает под медленного, и память не растёт бесконечно.

Ниже — суть паттерна без потоков, по шагам: производитель кладёт заказ, потребитель тут же его забирает. Так виден сам контракт очереди и стабильный результат.

import queue

q = queue.Queue(maxsize=3)   # ограниченный буфер = обратное давление

orders = ["pizza", "sushi", "burger", "salad", "soup"]
served = []

# производитель кладёт заказ, потребитель сразу забирает — шаг за шагом
for order in orders:
    q.put(order)                 # производитель
    dish = q.get()               # потребитель
    served.append(dish)

print("Подано блюд:", len(served))
print("Порядок подачи:", served)
print("Очередь пуста:", q.empty())

Вывод:

Подано блюд: 5
Порядок подачи: ['pizza', 'sushi', 'burger', 'salad', 'soup']
Очередь пуста: True

Балансировка скоростей

Самое интересное начинается, когда стороны работают с разной скоростью. Смоделируем это детерминированно: за один «такт» производитель пытается положить две задачи, а потребитель забирает одну. Буфер ограничен четырьмя ячейками — и хорошо видно, как он наполняется, упирается в потолок, а потом плавно опустошается, когда производитель закончил.

import queue

buf = queue.Queue(maxsize=4)
produced = 0
consumed = 0
to_produce = 8
log = []

tick = 0
while consumed < to_produce:
    tick += 1
    # производитель кладёт до 2 задач, но только если есть место
    for _ in range(2):
        if produced < to_produce and not buf.full():
            buf.put(produced)
            produced += 1
    # потребитель забирает одну, если есть
    if not buf.empty():
        buf.get()
        consumed += 1
    log.append((tick, produced, consumed, buf.qsize()))

print("такт | произведено | потреблено | в буфере")
for t, p, c, s in log:
    print(f"  {t:>2} |     {p:>2}      |    {c:>2}     |   {s}")

Вывод:

такт | произведено | потреблено | в буфере
   1 |      2      |     1     |   1
   2 |      4      |     2     |   2
   3 |      6      |     3     |   3
   4 |      7      |     4     |   3
   5 |      8      |     5     |   3
   6 |      8      |     6     |   2
   7 |      8      |     7     |   1
   8 |      8      |     8     |   0

Буфер растёт, пока производитель опережает потребителя, но maxsize не даёт ему раздуться: на такте 4 место кончилось, и producer положил лишь одну задачу вместо двух. Это и есть обратное давление в действии.

Как это работает под капотом

В настоящем коде производители и потребители — это потоки. queue.Queue внутри держит обычный deque, замок (Lock) и два условных уведомления (Condition): «не пусто» и «не полно». Когда потребитель зовёт get на пустой очереди, он не крутит цикл впустую, а засыпает на условии «не пусто»; как только производитель сделает put, очередь будит одного из спящих. Симметрично работает put на полной очереди. Поэтому ожидание почти не тратит процессор.

Завершение оформляют через «ядовитую пилюлю» (poison pill): когда работа кончилась, в очередь кладут особый маркер (например None) — по одному на каждого потребителя. Потребитель, увидев маркер, выходит из цикла. Так не нужно насильно убивать потоки. Вот как выглядит реальный многопоточный вариант (его не запускаем — это код для чтения):

import threading, queue

q = queue.Queue(maxsize=10)

def producer():
    for i in range(100):
        q.put(i)          # заблокируется, если буфер полон
    q.put(None)           # ядовитая пилюля для потребителя

def consumer():
    while True:
        item = q.get()
        if item is None:  # сигнал «работа кончилась»
            q.task_done()
            break
        handle(item)
        q.task_done()     # отметить, что элемент обработан

threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
q.join()                  # ждать, пока всё обработано

Метод task_done вместе с q.join() позволяет дождаться, когда все положенные элементы реально обработаны, а не просто вынуты.

Частые ошибки

  • Неограниченная очередь под нагрузкой. Без maxsize быстрый производитель забьёт всю память, если потребитель отстаёт. Для конвейеров почти всегда нужен предел.
  • Потребители не получают сигнал остановки. Если забыть про ядовитые пилюли, потоки-потребители повиснут навечно на get. Маркеров должно быть ровно столько, сколько потребителей.
  • Самодельная очередь на списке. Обычный list не потокобезопасен; append/pop из разных потоков ведут к гонкам и потере данных. Берите queue.Queue.
  • Тяжёлая работа под общим замком. Очередь синхронизирует только саму себя; всю настоящую обработку делайте после get, вне любых общих блокировок, иначе потеряете весь параллелизм.

Итоги

  • Паттерн развязывает стороны с разной скоростью через общий буфер-очередь.
  • Потокобезопасная очередь (queue.Queue) берёт на себя синхронизацию и засыпание потоков.
  • maxsize даёт обратное давление: производитель притормаживает, память под контролем.
  • Завершение — через ядовитые пилюли по числу потребителей, а не через убийство потоков.
Проверьте себя
1. Зачем между производителем и потребителем ставят очередь, а не соединяют их напрямую?
AЧтобы код выглядел сложнее и солиднее
BЧтобы развязать стороны по скорости: быстрый не ждёт медленного, а данные не теряются
CОчередь ускоряет сам процессор
DБез очереди потоки в Python запрещены
2. Что делает параметр maxsize у queue.Queue под нагрузкой?
AНичего, это просто подсказка для документации
BОграничивает число потребителей
CСоздаёт обратное давление: при полном буфере put блокирует производителя, пока не освободится место
DУдаляет старые элементы, освобождая место для новых
3. Зачем в очередь кладут «ядовитую пилюлю» (например None)?
AЧтобы очистить очередь от мусора
BЧтобы корректно сообщить потребителям о завершении и дать им выйти из цикла get без принудительного убийства потоков
CЧтобы ускорить get
DЭто обязательный первый элемент любой очереди