Redis Streams: надёжные очереди событий

Когда Pub/Sub теряет сообщения, а списки не дают подтверждений — на сцену выходят Streams. Это полноценный журнал событий внутри Redis.

Stream — это append-only журнал: сообщения не исчезают после прочтения, у каждого есть ID, а consumer groups дают подтверждения и распределение нагрузки. Это очередь промышленного уровня.

Redis Streams — это структура-журнал, добавляемая только в конец (append-only log). В отличие от Pub/Sub, сообщения хранятся и доступны для повторного чтения. В отличие от простых списков, есть подтверждения доставки и группы потребителей. Это самый надёжный механизм обмена сообщениями в Redis.

Базовые команды

# Добавить событие (ID генерируется автоматически)
XADD events * type "signup" user "42"
"1718900000000-0"

# Прочитать все события
XRANGE events - +

# Прочитать новые, начиная с конца
XREAD COUNT 10 STREAMS events $

# Длина потока
XLEN events

XADD добавляет запись (* = авто-ID на основе времени), XRANGE читает диапазон, XREAD читает новые сообщения, XLEN — длина.

Consumer groups: распределение и подтверждения

Главная мощь Streams — группы потребителей. Группа делит поток между несколькими воркерами: каждое сообщение получает один участник группы. После обработки воркер подтверждает сообщение через XACK. Неподтверждённые сообщения видны в «pending»-списке и могут быть переназначены упавшего воркера другому.

# Создать группу
XGROUP CREATE events workers $

# Воркер читает свою порцию
XREADGROUP GROUP workers worker1 COUNT 1 STREAMS events >

# Подтвердить обработку
XACK events workers 1718900000000-0

# Посмотреть зависшие (необработанные)
XPENDING events workers
   Stream с consumer group

   Издатели          Stream (журнал)        Группа "workers"
   --------          ---------------        ----------------
   XADD --> [e1][e2][e3][e4][e5]  --> worker1: e1, e3
                                   --> worker2: e2, e4
                                   --> worker3: e5
   Каждое событие — одному воркеру. XACK подтверждает.
   Без XACK событие висит в pending и может быть переназначено.

Pub/Sub vs списки vs Streams

СвойствоPub/SubСписокStream
Хранениенетдада
Повторное чтениенетнетда
Подтверждениянетнетда (XACK)
Группы воркеровнетвручнуюда
Доставкаat-most-onceat-most-onceat-least-once

Демонстрация: stream с consumer group на Python

# Моделируем stream и группу потребителей
stream = []          # журнал событий (append-only)
counter = [0]
pending = {}         # доставленные, но не подтверждённые
acked = []

def xadd(data):
    counter[0] += 1
    entry_id = f"{counter[0]}-0"
    stream.append((entry_id, data))
    return entry_id

def xreadgroup(consumer, n=1):
    # выдать следующие неназначенные события этому воркеру
    delivered = []
    for eid, data in stream:
        if eid not in pending and eid not in acked:
            pending[eid] = consumer
            delivered.append((eid, data))
            if len(delivered) >= n:
                break
    return delivered

def xack(eid):
    pending.pop(eid, None)
    acked.append(eid)

# Издатель добавляет события
for ev in ["signup", "login", "purchase"]:
    xadd({"type": ev})
print("В потоке событий:", len(stream))

# worker1 берёт и подтверждает
got = xreadgroup("worker1", n=1)
print("worker1 получил:", got)
xack(got[0][0])
print("worker1 подтвердил (XACK)")

# worker2 берёт, но НЕ подтверждает (упал)
got = xreadgroup("worker2", n=1)
print("worker2 получил:", got, "-> и упал без XACK")

print("\nЗависшие без подтверждения (pending):", list(pending.keys()))
print("История событий цела — Stream хранит всё, в отличие от Pub/Sub.")

Событие worker2 осталось в pending — его можно переназначить другому воркеру. А весь журнал событий сохранён и доступен для повторного чтения.

Как работает под капотом

Внутри Stream — это radix-дерево, индексирующее записи по их ID (timestamp-последовательность). Каждая запись имеет уникальный возрастающий ID, что даёт строгий порядок. Consumer group хранит «последний выданный ID» и PEL (Pending Entries List) — список доставленных, но не подтверждённых записей по каждому воркеру. Именно PEL обеспечивает at-least-once: пока нет XACK, запись считается необработанной и может быть востребована через XCLAIM.

Частые ошибки

  • Не делать XACK. Pending-список будет расти, память течь, сообщения «зависать».
  • Неограниченный рост потока. Без XADD ... MAXLEN ~ N или XTRIM журнал растёт бесконечно.
  • Считать доставку exactly-once. Streams дают at-least-once; обработчик должен быть идемпотентным.

Best practices

  • Используйте Streams, когда нужны хранение событий, подтверждения и группы воркеров.
  • Всегда подтверждайте обработку через XACK и сторожем переназначайте зависшие через XCLAIM/XAUTOCLAIM.
  • Ограничивайте длину через MAXLEN, чтобы поток не съел память.

Итог: Streams — надёжный append-only журнал событий с хранением, ID, подтверждениями (XACK) и consumer groups. Это очередь промышленного уровня внутри Redis: at-least-once доставка, повторное чтение, распределение нагрузки. За это отвечают radix-дерево и PEL.

Проверьте себя
1. Чем Redis Streams принципиально отличается от Pub/Sub?
AStreams работает только с числами
BStreams хранит сообщения, поддерживает повторное чтение, подтверждения (XACK) и consumer groups
CStreams быстрее, но теряет сообщения как Pub/Sub
DStreams не поддерживает несколько потребителей
2. Что обеспечивает at-least-once доставку в consumer group Streams?
AСообщения дублируются при добавлении
BPEL (Pending Entries List): пока воркер не сделал XACK, сообщение считается необработанным и может быть переназначено
CКаждое сообщение отправляется всем воркерам сразу
DStreams автоматически повторяет XADD