Redis Streams: надёжные очереди событий
Когда Pub/Sub теряет сообщения, а списки не дают подтверждений — на сцену выходят Streams. Это полноценный журнал событий внутри Redis.
Stream — это append-only журнал: сообщения не исчезают после прочтения, у каждого есть ID, а consumer groups дают подтверждения и распределение нагрузки. Это очередь промышленного уровня.
Redis Streams — это структура-журнал, добавляемая только в конец (append-only log). В отличие от Pub/Sub, сообщения хранятся и доступны для повторного чтения. В отличие от простых списков, есть подтверждения доставки и группы потребителей. Это самый надёжный механизм обмена сообщениями в Redis.
Базовые команды
# Добавить событие (ID генерируется автоматически)
XADD events * type "signup" user "42"
"1718900000000-0"
# Прочитать все события
XRANGE events - +
# Прочитать новые, начиная с конца
XREAD COUNT 10 STREAMS events $
# Длина потока
XLEN events
XADD добавляет запись (* = авто-ID на основе времени), XRANGE читает диапазон, XREAD читает новые сообщения, XLEN — длина.
Consumer groups: распределение и подтверждения
Главная мощь Streams — группы потребителей. Группа делит поток между несколькими воркерами: каждое сообщение получает один участник группы. После обработки воркер подтверждает сообщение через XACK. Неподтверждённые сообщения видны в «pending»-списке и могут быть переназначены упавшего воркера другому.
# Создать группу
XGROUP CREATE events workers $
# Воркер читает свою порцию
XREADGROUP GROUP workers worker1 COUNT 1 STREAMS events >
# Подтвердить обработку
XACK events workers 1718900000000-0
# Посмотреть зависшие (необработанные)
XPENDING events workers
Stream с consumer group
Издатели Stream (журнал) Группа "workers"
-------- --------------- ----------------
XADD --> [e1][e2][e3][e4][e5] --> worker1: e1, e3
--> worker2: e2, e4
--> worker3: e5
Каждое событие — одному воркеру. XACK подтверждает.
Без XACK событие висит в pending и может быть переназначено.
Pub/Sub vs списки vs Streams
| Свойство | Pub/Sub | Список | Stream |
|---|---|---|---|
| Хранение | нет | да | да |
| Повторное чтение | нет | нет | да |
| Подтверждения | нет | нет | да (XACK) |
| Группы воркеров | нет | вручную | да |
| Доставка | at-most-once | at-most-once | at-least-once |
Демонстрация: stream с consumer group на Python
# Моделируем stream и группу потребителей
stream = [] # журнал событий (append-only)
counter = [0]
pending = {} # доставленные, но не подтверждённые
acked = []
def xadd(data):
counter[0] += 1
entry_id = f"{counter[0]}-0"
stream.append((entry_id, data))
return entry_id
def xreadgroup(consumer, n=1):
# выдать следующие неназначенные события этому воркеру
delivered = []
for eid, data in stream:
if eid not in pending and eid not in acked:
pending[eid] = consumer
delivered.append((eid, data))
if len(delivered) >= n:
break
return delivered
def xack(eid):
pending.pop(eid, None)
acked.append(eid)
# Издатель добавляет события
for ev in ["signup", "login", "purchase"]:
xadd({"type": ev})
print("В потоке событий:", len(stream))
# worker1 берёт и подтверждает
got = xreadgroup("worker1", n=1)
print("worker1 получил:", got)
xack(got[0][0])
print("worker1 подтвердил (XACK)")
# worker2 берёт, но НЕ подтверждает (упал)
got = xreadgroup("worker2", n=1)
print("worker2 получил:", got, "-> и упал без XACK")
print("\nЗависшие без подтверждения (pending):", list(pending.keys()))
print("История событий цела — Stream хранит всё, в отличие от Pub/Sub.")
Событие worker2 осталось в pending — его можно переназначить другому воркеру. А весь журнал событий сохранён и доступен для повторного чтения.
Как работает под капотом
Внутри Stream — это radix-дерево, индексирующее записи по их ID (timestamp-последовательность). Каждая запись имеет уникальный возрастающий ID, что даёт строгий порядок. Consumer group хранит «последний выданный ID» и PEL (Pending Entries List) — список доставленных, но не подтверждённых записей по каждому воркеру. Именно PEL обеспечивает at-least-once: пока нет XACK, запись считается необработанной и может быть востребована через XCLAIM.
Частые ошибки
- Не делать XACK. Pending-список будет расти, память течь, сообщения «зависать».
- Неограниченный рост потока. Без
XADD ... MAXLEN ~ NилиXTRIMжурнал растёт бесконечно. - Считать доставку exactly-once. Streams дают at-least-once; обработчик должен быть идемпотентным.
Best practices
- Используйте Streams, когда нужны хранение событий, подтверждения и группы воркеров.
- Всегда подтверждайте обработку через
XACKи сторожем переназначайте зависшие черезXCLAIM/XAUTOCLAIM. - Ограничивайте длину через
MAXLEN, чтобы поток не съел память.
Итог: Streams — надёжный append-only журнал событий с хранением, ID, подтверждениями (XACK) и consumer groups. Это очередь промышленного уровня внутри Redis: at-least-once доставка, повторное чтение, распределение нагрузки. За это отвечают radix-дерево и PEL.