Streams вглубь: consumer groups
Streams — это упорядоченный, неудаляемый журнал сообщений; consumer groups превращают его в надёжную очередь задач с подтверждением обработки.
Stream — структура «append-only log»: записи с авто-id
<ms>-<seq>добавляются в конец и не пропадают после чтения. Consumer group распределяет записи между потребителями и отслеживает, что каждое сообщение обработано (подтверждено черезXACK).
Базовое знакомство со Streams у вас есть: XADD пишет, XRANGE читает диапазон, XLEN считает длину. Этот урок — про то, ради чего Streams и придумали: про надёжные очереди задач, в которых ни одно сообщение не теряется, обработку можно масштабировать на несколько воркеров, а после падения воркера его незавершённую работу подхватит другой. Этого не дают ни списки, ни Pub/Sub.
XADD и XREAD: запись и чтение журнала
XADD key * field value ... добавляет запись; * просит Redis сгенерировать id из метки времени. Возвращается присвоенный id.
XADD orders * task send-email to [email protected] # => "1719500000000-0"
XADD orders * task resize-img id 42 # => "1719500000001-0"
XLEN orders # => 2
XREAD читает записи новее заданного id. С BLOCK он ждёт появления новых данных, не нагружая CPU опросом.
# ждать до 5 секунд новые записи после последней виденной
XREAD BLOCK 5000 COUNT 10 STREAMS orders $
Специальный id $ означает «только то, что придёт после этого вызова». Но XREAD сам по себе не помнит, кто что прочитал, и не делит работу между воркерами — для этого нужны группы.
Зачем нужны consumer groups
Представьте очередь задач и три воркера. Хочется, чтобы каждую задачу обработал ровно один воркер (а не все трое), чтобы при росте нагрузки можно было добавить воркеров, и чтобы задача, которую воркер взял, но не доделал (упал), не потерялась. Именно это и даёт consumer group: группа помнит, до какого id она раздала записи, делит новые записи между потребителями и держит список «взято, но не подтверждено» (PEL, Pending Entries List).
XGROUP, XREADGROUP, XACK
XGROUP CREATE создаёт группу. Параметр после ключа — стартовый id: 0 — читать с начала истории, $ — только новые записи.
XGROUP CREATE orders workers $ MKSTREAM # группа workers, MKSTREAM создаст поток если нет
XREADGROUP GROUP <group> <consumer> ... STREAMS <key> > выдаёт потребителю ещё не розданные записи. Специальный id > означает «новые для этой группы». Каждая выданная запись попадает в PEL потребителя — пока он её не подтвердит.
# воркер worker-1 берёт до 10 новых задач
XREADGROUP GROUP workers worker-1 COUNT 10 STREAMS orders >
# ... обработали задачу с id 1719500000000-0 ...
XACK orders workers 1719500000000-0 # => 1 подтвердили, ушла из PEL
XACK убирает запись из списка незавершённых: «обработано, можно забыть». Не подтвердил — запись остаётся в PEL и будет ждать.
Как это работает под капотом
Группа хранит курсор «последний розданный id» (last-delivered-id) и по одному PEL на каждого потребителя. XREADGROUP с > двигает курсор вперёд и кладёт выданное в PEL. Если воркер упал, не подтвердив, запись остаётся в его PEL; другой воркер может посмотреть «зависшие» через XPENDING и забрать их себе командой XCLAIM (или автоматически — XAUTOCLAIM). Так достигается гарантия «не потеряем»: пока нет XACK, сообщение числится за кем-то и подлежит повторной обработке. Смоделируем ядро механики на стандартной библиотеке Python:
from collections import OrderedDict
class MiniStream:
def __init__(self):
self.entries = OrderedDict() # id -> поля записи
self.seq = 0
self.delivered = {} # consumer -> список взятых, но не подтверждённых
self.last_id = 0 # докуда группа уже раздала (курсор)
def xadd(self, fields):
self.seq += 1
eid = f"{self.seq}-0"
self.entries[eid] = fields
return eid
def xreadgroup(self, consumer, count=1):
out = []
for eid in self.entries:
seq = int(eid.split("-")[0])
if seq > self.last_id and len(out) < count:
out.append(eid)
self.delivered.setdefault(consumer, []).append(eid)
self.last_id = seq
return out
def xack(self, consumer, eid):
if eid in self.delivered.get(consumer, []):
self.delivered[consumer].remove(eid)
return 1
return 0
def pending(self):
return {c: ids for c, ids in self.delivered.items() if ids}
s = MiniStream()
s.xadd({"task": "send-email", "to": "[email protected]"})
s.xadd({"task": "send-email", "to": "[email protected]"})
s.xadd({"task": "resize-img", "id": "42"})
m1 = s.xreadgroup("worker-1", count=2)
m2 = s.xreadgroup("worker-2", count=2)
print("worker-1 получил:", m1)
print("worker-2 получил:", m2)
s.xack("worker-1", m1[0]) # подтвердили только первую
print("Не подтверждено (PEL):", s.pending())
Вывод:
worker-1 получил: ['1-0', '2-0']
worker-2 получил: ['3-0']
Не подтверждено (PEL): {'worker-1': ['2-0'], 'worker-2': ['3-0']}
Видно главное: записи поделились между воркерами (никто не получил чужую), а всё, что взяли, но не подтвердили через xack, осталось в PEL. В настоящем Redis именно эти «зависшие» записи другой воркер заберёт через XCLAIM после таймаута — так падение обработчика не теряет сообщение.
Отличие от Pub/Sub и списков
| Свойство | Pub/Sub | List (LPUSH/BRPOP) | Stream + group |
| Сообщение хранится после доставки | нет (fire-and-forget) | нет (BRPOP удаляет) | да (журнал) |
| Подтверждение обработки (ack) | нет | нет | да (XACK + PEL) |
| Несколько воркеров делят нагрузку | нет, все получают копию | да, но без учёта незавершённых | да, с учётом и переназначением |
| Переживёт падение воркера | сообщение потеряно | взятый элемент потерян | да (XCLAIM зависших) |
Коротко: Pub/Sub — это «кто слушал, тот услышал» без памяти; список — простая очередь, но взятый и потерянный элемент не вернуть; Stream с группой — надёжная очередь с историей, подтверждениями и восстановлением.
Частые ошибки
- Забыть XACK. Без подтверждения записи копятся в PEL вечно. PEL растёт, «зависшие» накапливаются — обязательно подтверждайте обработанное.
- Создать группу с
$, ожидая историю.XGROUP CREATE ... $начинает с конца — старые записи группа не увидит. Чтобы обработать накопленное, создавайте с0. - Думать, что
>отдаёт повторно. Id>даёт только новые записи. Свои незавершённые из PEL читают, передав конкретный id (например,0) вместо>. - Безграничный рост потока. Журнал не чистится сам. Ограничивайте длину при записи:
XADD orders MAXLEN ~ 100000 * ..., иначе поток съест память. - Опрос вместо BLOCK. Гонять
XREADGROUPв цикле безBLOCKзря жжёт CPU. ПередавайтеBLOCK, чтобы ждать новые записи эффективно.
Итоги
- Stream — упорядоченный неудаляемый журнал; записи получают id
<ms>-<seq>и остаются после чтения. - Consumer group делит записи между потребителями, помнит курсор раздачи и держит PEL — список взятых, но не подтверждённых сообщений.
XREADGROUP ... >выдаёт новые записи в PEL,XACKих подтверждает; зависшие после падения воркера забирают черезXCLAIM/XAUTOCLAIM.- В отличие от Pub/Sub (без памяти) и списков (взятое теряется), Stream с группой даёт надёжную доставку с подтверждением и восстановлением.
- Не забывайте
XACK, ограничивайте длину черезMAXLENи используйтеBLOCKвместо активного опроса.