Streams вглубь: consumer groups

Streams — это упорядоченный, неудаляемый журнал сообщений; consumer groups превращают его в надёжную очередь задач с подтверждением обработки.

Stream — структура «append-only log»: записи с авто-id <ms>-<seq> добавляются в конец и не пропадают после чтения. Consumer group распределяет записи между потребителями и отслеживает, что каждое сообщение обработано (подтверждено через XACK).

Базовое знакомство со Streams у вас есть: XADD пишет, XRANGE читает диапазон, XLEN считает длину. Этот урок — про то, ради чего Streams и придумали: про надёжные очереди задач, в которых ни одно сообщение не теряется, обработку можно масштабировать на несколько воркеров, а после падения воркера его незавершённую работу подхватит другой. Этого не дают ни списки, ни Pub/Sub.

XADD и XREAD: запись и чтение журнала

XADD key * field value ... добавляет запись; * просит Redis сгенерировать id из метки времени. Возвращается присвоенный id.

XADD orders * task send-email to [email protected]   # => "1719500000000-0"
XADD orders * task resize-img id 42        # => "1719500000001-0"
XLEN orders                                # => 2

XREAD читает записи новее заданного id. С BLOCK он ждёт появления новых данных, не нагружая CPU опросом.

# ждать до 5 секунд новые записи после последней виденной
XREAD BLOCK 5000 COUNT 10 STREAMS orders $

Специальный id $ означает «только то, что придёт после этого вызова». Но XREAD сам по себе не помнит, кто что прочитал, и не делит работу между воркерами — для этого нужны группы.

Зачем нужны consumer groups

Представьте очередь задач и три воркера. Хочется, чтобы каждую задачу обработал ровно один воркер (а не все трое), чтобы при росте нагрузки можно было добавить воркеров, и чтобы задача, которую воркер взял, но не доделал (упал), не потерялась. Именно это и даёт consumer group: группа помнит, до какого id она раздала записи, делит новые записи между потребителями и держит список «взято, но не подтверждено» (PEL, Pending Entries List).

XGROUP, XREADGROUP, XACK

XGROUP CREATE создаёт группу. Параметр после ключа — стартовый id: 0 — читать с начала истории, $ — только новые записи.

XGROUP CREATE orders workers $ MKSTREAM   # группа workers, MKSTREAM создаст поток если нет

XREADGROUP GROUP <group> <consumer> ... STREAMS <key> > выдаёт потребителю ещё не розданные записи. Специальный id > означает «новые для этой группы». Каждая выданная запись попадает в PEL потребителя — пока он её не подтвердит.

# воркер worker-1 берёт до 10 новых задач
XREADGROUP GROUP workers worker-1 COUNT 10 STREAMS orders >
# ... обработали задачу с id 1719500000000-0 ...
XACK orders workers 1719500000000-0       # => 1  подтвердили, ушла из PEL

XACK убирает запись из списка незавершённых: «обработано, можно забыть». Не подтвердил — запись остаётся в PEL и будет ждать.

Как это работает под капотом

Группа хранит курсор «последний розданный id» (last-delivered-id) и по одному PEL на каждого потребителя. XREADGROUP с > двигает курсор вперёд и кладёт выданное в PEL. Если воркер упал, не подтвердив, запись остаётся в его PEL; другой воркер может посмотреть «зависшие» через XPENDING и забрать их себе командой XCLAIM (или автоматически — XAUTOCLAIM). Так достигается гарантия «не потеряем»: пока нет XACK, сообщение числится за кем-то и подлежит повторной обработке. Смоделируем ядро механики на стандартной библиотеке Python:

from collections import OrderedDict

class MiniStream:
    def __init__(self):
        self.entries = OrderedDict()   # id -> поля записи
        self.seq = 0
        self.delivered = {}            # consumer -> список взятых, но не подтверждённых
        self.last_id = 0               # докуда группа уже раздала (курсор)
    def xadd(self, fields):
        self.seq += 1
        eid = f"{self.seq}-0"
        self.entries[eid] = fields
        return eid
    def xreadgroup(self, consumer, count=1):
        out = []
        for eid in self.entries:
            seq = int(eid.split("-")[0])
            if seq > self.last_id and len(out) < count:
                out.append(eid)
                self.delivered.setdefault(consumer, []).append(eid)
                self.last_id = seq
        return out
    def xack(self, consumer, eid):
        if eid in self.delivered.get(consumer, []):
            self.delivered[consumer].remove(eid)
            return 1
        return 0
    def pending(self):
        return {c: ids for c, ids in self.delivered.items() if ids}

s = MiniStream()
s.xadd({"task": "send-email", "to": "[email protected]"})
s.xadd({"task": "send-email", "to": "[email protected]"})
s.xadd({"task": "resize-img", "id": "42"})

m1 = s.xreadgroup("worker-1", count=2)
m2 = s.xreadgroup("worker-2", count=2)
print("worker-1 получил:", m1)
print("worker-2 получил:", m2)
s.xack("worker-1", m1[0])               # подтвердили только первую
print("Не подтверждено (PEL):", s.pending())

Вывод:

worker-1 получил: ['1-0', '2-0']
worker-2 получил: ['3-0']
Не подтверждено (PEL): {'worker-1': ['2-0'], 'worker-2': ['3-0']}

Видно главное: записи поделились между воркерами (никто не получил чужую), а всё, что взяли, но не подтвердили через xack, осталось в PEL. В настоящем Redis именно эти «зависшие» записи другой воркер заберёт через XCLAIM после таймаута — так падение обработчика не теряет сообщение.

Отличие от Pub/Sub и списков

СвойствоPub/SubList (LPUSH/BRPOP)Stream + group
Сообщение хранится после доставкинет (fire-and-forget)нет (BRPOP удаляет)да (журнал)
Подтверждение обработки (ack)нетнетда (XACK + PEL)
Несколько воркеров делят нагрузкунет, все получают копиюда, но без учёта незавершённыхда, с учётом и переназначением
Переживёт падение воркерасообщение потеряновзятый элемент потерянда (XCLAIM зависших)

Коротко: Pub/Sub — это «кто слушал, тот услышал» без памяти; список — простая очередь, но взятый и потерянный элемент не вернуть; Stream с группой — надёжная очередь с историей, подтверждениями и восстановлением.

Частые ошибки

  • Забыть XACK. Без подтверждения записи копятся в PEL вечно. PEL растёт, «зависшие» накапливаются — обязательно подтверждайте обработанное.
  • Создать группу с $, ожидая историю. XGROUP CREATE ... $ начинает с конца — старые записи группа не увидит. Чтобы обработать накопленное, создавайте с 0.
  • Думать, что > отдаёт повторно. Id > даёт только новые записи. Свои незавершённые из PEL читают, передав конкретный id (например, 0) вместо >.
  • Безграничный рост потока. Журнал не чистится сам. Ограничивайте длину при записи: XADD orders MAXLEN ~ 100000 * ..., иначе поток съест память.
  • Опрос вместо BLOCK. Гонять XREADGROUP в цикле без BLOCK зря жжёт CPU. Передавайте BLOCK, чтобы ждать новые записи эффективно.

Итоги

  • Stream — упорядоченный неудаляемый журнал; записи получают id <ms>-<seq> и остаются после чтения.
  • Consumer group делит записи между потребителями, помнит курсор раздачи и держит PEL — список взятых, но не подтверждённых сообщений.
  • XREADGROUP ... > выдаёт новые записи в PEL, XACK их подтверждает; зависшие после падения воркера забирают через XCLAIM/XAUTOCLAIM.
  • В отличие от Pub/Sub (без памяти) и списков (взятое теряется), Stream с группой даёт надёжную доставку с подтверждением и восстановлением.
  • Не забывайте XACK, ограничивайте длину через MAXLEN и используйте BLOCK вместо активного опроса.
Проверьте себя
1. Зачем в consumer group нужен XACK?
AЧтобы добавить новое сообщение в поток
BЧтобы подтвердить обработку и убрать запись из списка незавершённых (PEL)
CЧтобы создать саму группу потребителей
DЧтобы удалить поток целиком
2. Чем Stream с consumer group принципиально отличается от Pub/Sub?
APub/Sub быстрее и поэтому надёжнее
BStream хранит сообщения и поддерживает подтверждение обработки, а Pub/Sub доставляет без памяти (fire-and-forget)
CВ Pub/Sub каждое сообщение обрабатывает только один подписчик
DStream не умеет работать с несколькими потребителями
3. Воркер взял задачу через XREADGROUP, но упал, не сделав XACK. Что произойдёт с этой задачей?
AОна удалится из потока вместе с воркером
BОстанется в PEL воркера, и другой потребитель сможет забрать её через XCLAIM
CАвтоматически вернётся в Pub/Sub-канал
DБудет потеряна безвозвратно
4. Вы создали группу командой XGROUP CREATE orders workers $. Почему воркеры не видят уже накопленные старые записи?
AПотому что $ означает «начинать с конца потока» — только новые записи
BПотому что группа всегда читает только последнюю запись
CПотому что старые записи требуют отдельного XACK
DПотому что MKSTREAM стёр историю