Change Streams и реактивность
Часто приложению нужно реагировать на изменения данных мгновенно: обновить кэш, отправить пуш, синхронизировать поисковый индекс. Раньше для этого опрашивали базу в цикле; Change Streams делают это правильно — база сама присылает события.
Change Stream — поток событий об изменениях данных в реальном времени. Через
watch()вы подписываетесь наinsert,update,replace,deleteи получаете каждое изменение почти сразу после того, как оно произошло.
Зачем это на практике
Представьте интернет-магазин. Когда меняется цена товара, надо: обновить кэш, переиндексировать товар в поиске, разослать события другим микросервисам, обновить дашборд. Наивное решение — polling: каждые несколько секунд опрашивать коллекцию «что изменилось?». Это создаёт лишнюю нагрузку, реагирует с задержкой и легко пропускает быстрые изменения.
Change Streams решают задачу элегантно: вы открываете «подписку» на коллекцию, и MongoDB сама присылает каждое изменение в виде события. На этом строят уведомления, инвалидацию кэша, материализованные представления, синхронизацию с Elasticsearch и обмен событиями между сервисами.
Базовое использование: watch()
Подписаться можно на уровне коллекции, базы или всего кластера:
// поток изменений по конкретной коллекции
const stream = db.collection("products").watch()
for await (const change of stream) {
console.log(change.operationType) // insert | update | delete | replace
console.log(change.documentKey) // { _id: ... } затронутого документа
}
// можно слушать всю базу или весь кластер
db.watch() // все коллекции базы
client.watch() // весь деплой
Каждое событие — это документ. Главные поля:
| Поле | Что содержит |
_id | resume token — позиция в потоке (см. ниже) |
operationType | тип операции: insert / update / replace / delete |
documentKey | _id затронутого документа |
fullDocument | сам документ (для insert — всегда; для update — по запросу) |
updateDescription | что именно изменилось: updatedFields и removedFields |
Фильтрация и полный документ при обновлении
В watch() можно передать конвейер агрегации — например, ловить только нужные операции или поля:
// только обновления, и сразу подтягиваем актуальный документ целиком
const stream = db.collection("products").watch(
[ { $match: { operationType: "update" } } ],
{ fullDocument: "updateLookup" }
)
По умолчанию при update событие содержит лишь updateDescription (дельту изменений), но не весь документ. Опция fullDocument: "updateLookup" заставляет MongoDB дополнительно подтянуть текущую версию документа. Важная тонкость: это именно текущая версия на момент чтения, а не состояние ровно в момент изменения — если документ успели поменять ещё раз, вы увидите более свежую версию.
Resume token — устойчивость к разрывам
Соединение может разорваться: рестарт сервиса, сетевой сбой, переключение primary. Чтобы не потерять и не продублировать события, у каждого события есть resume token (это поле _id события). Сохранив последний обработанный токен, вы можете возобновить поток ровно с того места:
let resumeToken = loadTokenFromDisk() // из БД/файла — пережить рестарт
const stream = db.collection("products").watch([], {
resumeAfter: resumeToken // продолжить после последнего обработанного события
})
for await (const change of stream) {
handle(change)
resumeToken = change._id
saveTokenToDisk(resumeToken) // персистим прогресс
}
Без сохранённого токена после рестарта вы начнёте слушать «с текущего момента» и пропустите всё, что произошло, пока сервис был недоступен. С токеном — продолжите без пропусков (при условии, что нужные операции ещё в oplog).
Как это работает под капотом
Change Streams — это надстройка над тем же oplog, что обеспечивает репликацию. Поэтому ключевое требование: они работают только на replica set или шардированном кластере, у standalone-сервера oplog нет, и watch() не поддерживается. По сути это управляемый курсор по oplog с гарантией согласованного порядка событий: в шардированном кластере mongos сливает потоки с разных шардов и упорядочивает их по кластерному времени (cluster time). Resume token как раз кодирует позицию в этом упорядоченном потоке.
Change Streams уважают права доступа (вы видите только то, на что есть права) и переживают выборы primary — поток продолжится с нового primary. В версиях 6.0+ можно включить pre/post images, чтобы получать состояние документа до и после изменения (включая полный документ при delete), но для этого коллекцию надо настроить заранее.
Частые ошибки
- Ждать
fullDocumentпри update «бесплатно». Без опцииupdateLookupсобытиеupdateвернёт только дельту полей, а не весь документ. Если нужен весь документ — включитеfullDocument: "updateLookup". - Не сохранять resume token. Сервис перезапустился — и события за время простоя потеряны. Персистите токен после каждого обработанного события.
- Запуск на standalone. Change Streams требуют oplog; на одиночном
mongodбез replica set они недоступны. Для разработки поднимите хотя бы одноузловой replica set. - Считать, что
updateLookupдаёт состояние на момент изменения. Он возвращает текущую версию документа; при частых изменениях это может быть уже не то состояние, что вызвало событие. Нужна точность «до/после» — используйте pre/post images. - Игнорировать окно oplog. Если сервис простоял дольше, чем oplog хранит операции, возобновление по старому токену не сработает — события «прокрутились». Следите за размером oplog и лагом обработки.
Итоги
- Change Streams дают поток изменений в реальном времени — замена опросу базы (polling).
watch()доступен на уровне коллекции, базы и всего деплоя; событие несётoperationType,documentKey, при необходимостиfullDocument.- На них строят уведомления, инвалидацию кэша, синхронизацию поиска и обмен событиями между сервисами.
- Resume token позволяет продолжить поток после разрыва без потери событий — сохраняйте его персистентно.
- Работают только на replica set / шардированном кластере (опираются на oplog) и переживают выборы primary.