Change Streams и реактивность

Часто приложению нужно реагировать на изменения данных мгновенно: обновить кэш, отправить пуш, синхронизировать поисковый индекс. Раньше для этого опрашивали базу в цикле; Change Streams делают это правильно — база сама присылает события.

Change Stream — поток событий об изменениях данных в реальном времени. Через watch() вы подписываетесь на insert, update, replace, delete и получаете каждое изменение почти сразу после того, как оно произошло.

Зачем это на практике

Представьте интернет-магазин. Когда меняется цена товара, надо: обновить кэш, переиндексировать товар в поиске, разослать события другим микросервисам, обновить дашборд. Наивное решение — polling: каждые несколько секунд опрашивать коллекцию «что изменилось?». Это создаёт лишнюю нагрузку, реагирует с задержкой и легко пропускает быстрые изменения.

Change Streams решают задачу элегантно: вы открываете «подписку» на коллекцию, и MongoDB сама присылает каждое изменение в виде события. На этом строят уведомления, инвалидацию кэша, материализованные представления, синхронизацию с Elasticsearch и обмен событиями между сервисами.

Базовое использование: watch()

Подписаться можно на уровне коллекции, базы или всего кластера:

// поток изменений по конкретной коллекции
const stream = db.collection("products").watch()

for await (const change of stream) {
  console.log(change.operationType)   // insert | update | delete | replace
  console.log(change.documentKey)     // { _id: ... } затронутого документа
}

// можно слушать всю базу или весь кластер
db.watch()       // все коллекции базы
client.watch()   // весь деплой

Каждое событие — это документ. Главные поля:

ПолеЧто содержит
_idresume token — позиция в потоке (см. ниже)
operationTypeтип операции: insert / update / replace / delete
documentKey_id затронутого документа
fullDocumentсам документ (для insert — всегда; для update — по запросу)
updateDescriptionчто именно изменилось: updatedFields и removedFields

Фильтрация и полный документ при обновлении

В watch() можно передать конвейер агрегации — например, ловить только нужные операции или поля:

// только обновления, и сразу подтягиваем актуальный документ целиком
const stream = db.collection("products").watch(
  [ { $match: { operationType: "update" } } ],
  { fullDocument: "updateLookup" }
)

По умолчанию при update событие содержит лишь updateDescription (дельту изменений), но не весь документ. Опция fullDocument: "updateLookup" заставляет MongoDB дополнительно подтянуть текущую версию документа. Важная тонкость: это именно текущая версия на момент чтения, а не состояние ровно в момент изменения — если документ успели поменять ещё раз, вы увидите более свежую версию.

Resume token — устойчивость к разрывам

Соединение может разорваться: рестарт сервиса, сетевой сбой, переключение primary. Чтобы не потерять и не продублировать события, у каждого события есть resume token (это поле _id события). Сохранив последний обработанный токен, вы можете возобновить поток ровно с того места:

let resumeToken = loadTokenFromDisk()   // из БД/файла — пережить рестарт

const stream = db.collection("products").watch([], {
  resumeAfter: resumeToken   // продолжить после последнего обработанного события
})

for await (const change of stream) {
  handle(change)
  resumeToken = change._id
  saveTokenToDisk(resumeToken)   // персистим прогресс
}

Без сохранённого токена после рестарта вы начнёте слушать «с текущего момента» и пропустите всё, что произошло, пока сервис был недоступен. С токеном — продолжите без пропусков (при условии, что нужные операции ещё в oplog).

Как это работает под капотом

Change Streams — это надстройка над тем же oplog, что обеспечивает репликацию. Поэтому ключевое требование: они работают только на replica set или шардированном кластере, у standalone-сервера oplog нет, и watch() не поддерживается. По сути это управляемый курсор по oplog с гарантией согласованного порядка событий: в шардированном кластере mongos сливает потоки с разных шардов и упорядочивает их по кластерному времени (cluster time). Resume token как раз кодирует позицию в этом упорядоченном потоке.

Change Streams уважают права доступа (вы видите только то, на что есть права) и переживают выборы primary — поток продолжится с нового primary. В версиях 6.0+ можно включить pre/post images, чтобы получать состояние документа до и после изменения (включая полный документ при delete), но для этого коллекцию надо настроить заранее.

Частые ошибки

  • Ждать fullDocument при update «бесплатно». Без опции updateLookup событие update вернёт только дельту полей, а не весь документ. Если нужен весь документ — включите fullDocument: "updateLookup".
  • Не сохранять resume token. Сервис перезапустился — и события за время простоя потеряны. Персистите токен после каждого обработанного события.
  • Запуск на standalone. Change Streams требуют oplog; на одиночном mongod без replica set они недоступны. Для разработки поднимите хотя бы одноузловой replica set.
  • Считать, что updateLookup даёт состояние на момент изменения. Он возвращает текущую версию документа; при частых изменениях это может быть уже не то состояние, что вызвало событие. Нужна точность «до/после» — используйте pre/post images.
  • Игнорировать окно oplog. Если сервис простоял дольше, чем oplog хранит операции, возобновление по старому токену не сработает — события «прокрутились». Следите за размером oplog и лагом обработки.

Итоги

  • Change Streams дают поток изменений в реальном времени — замена опросу базы (polling).
  • watch() доступен на уровне коллекции, базы и всего деплоя; событие несёт operationType, documentKey, при необходимости fullDocument.
  • На них строят уведомления, инвалидацию кэша, синхронизацию поиска и обмен событиями между сервисами.
  • Resume token позволяет продолжить поток после разрыва без потери событий — сохраняйте его персистентно.
  • Работают только на replica set / шардированном кластере (опираются на oplog) и переживают выборы primary.
Проверьте себя
1. Почему Change Streams нельзя использовать на одиночном (standalone) mongod?
AИз-за лицензионных ограничений
BОни построены на oplog, которого у standalone-сервера нет — нужен replica set
CStandalone слишком медленный
DНужно минимум три шарда
2. Для чего нужен resume token в Change Stream?
AЧтобы ускорить запросы по индексу
BЧтобы возобновить поток ровно с места разрыва и не потерять/не продублировать события
CЧтобы аутентифицировать пользователя
DЧтобы выбрать shard key
3. Что нужно, чтобы событие update в Change Stream содержало весь документ целиком, а не только список изменённых полей?
AНичего, fullDocument приходит всегда
BУказать опцию fullDocument: "updateLookup"
CВключить шардирование коллекции
DДобавить арбитра в кластер