Реальный пример: поток заказов в аналитику

Финальный урок: собираем всё изученное в один работающий конвейер «заказы -> аналитика реального времени».

Цель — спроектировать сквозной поток: интернет-магазин публикует события заказов, а несколько независимых потребителей в реальном времени строят аналитику, антифрод-сигналы и поисковый индекс.

Постановка задачи

Магазин генерирует события жизненного цикла заказа: created, paid, shipped, cancelled. Нужно: считать выручку по городам в реальном времени, ловить подозрительные заказы, обновлять поисковый индекс товаров по продажам, и складывать всё в хранилище для исторической аналитики. Классический случай «один поток — много потребителей».

Архитектура конвейера

  [ Сервис заказов ]
        |  ключ = order_id  (порядок событий заказа)
        v
  ==== Kafka: топик "orders" (12 партиций, RF=3) ====
        |               |                 |
        v               v                 v
  [ Streams ]      [ Антифрод ]     [ Connect sink ]
  выручка/город    подозрит.        -> ClickHouse
        |          заказы              (история)
        v               |
  топик              топик
  "revenue.city"     "fraud.alerts"
        |
        v
  [ Connect sink ] -> дашборд

Решения по дизайну

РешениеПочему
ключ = order_idсобытия одного заказа по порядку в одной партиции
12 партицийзапас параллелизма под пиковую нагрузку
RF=3, acks=all, ISR=2не терять заказы при сбое брокера
Schema Registry (Avro)контракт событий, безопасная эволюция
Streams + окнавыручка по городам за 5-минутные окна
отдельные группы консьюмеровкаждый потребитель читает весь поток независимо

Пример событий

{"key":"ord-7","event":"created","city":"Москва","amount":2500,"ts":"12:00:01"}
{"key":"ord-7","event":"paid","city":"Москва","amount":2500,"ts":"12:00:06"}
{"key":"ord-7","event":"shipped","city":"Москва","ts":"12:30:00"}

Поток обработки

Сервис заказов с acks=all и идемпотентным продюсером пишет события в orders, ключ — order_id, поэтому история каждого заказа упорядочена. Streams-приложение читает orders, фильтрует paid, группирует по городу и считает выручку в 5-минутных окнах, публикуя в revenue.city. Антифрод-консьюмер (своя группа) сверяет частоту и суммы, шлёт сигналы в fraud.alerts. Connect-sink-коннекторы выгружают сырые заказы и агрегаты в ClickHouse и на дашборд. Каждый потребитель — независим, со своим оффсетом и группой.

Как работает под капотом

Конвейер опирается на всё, что мы прошли. Лог хранит заказы, поэтому новый потребитель (скажем, отчёт по возвратам) подключается завтра и читает историю с начала — менять источник не нужно. RF=3 и ISR=2 переживают падение брокера без потери заказов. at-least-once + идемпотентная запись в ClickHouse (UPSERT по order_id) не плодит дублей. Лаг каждой группы мониторится: если антифрод отстаёт, его масштабируют, добавив консьюмеров (до 12 — по числу партиций). Окна в Streams решают «бесконечность» потока, давая конкретную выручку за интервал. Так из кирпичиков — топиков, партиций, групп, гарантий, Streams и Connect — собирается живая система реального времени.

Частые ошибки

  • Один group.id на всех потребителей. Они начнут делить партиции и «воровать» события друг у друга вместо независимого чтения.
  • Без ключа = order_id. События заказа разъедутся по партициям, и shipped обгонит paid.
  • Sink без идемпотентности. Повторная доставка задвоит строки заказов в ClickHouse.

Итоги

  • Один поток orders кормит много независимых потребителей: аналитику, антифрод, поиск, историю.
  • Ключ order_id держит порядок, RF=3/acks=all защищают данные, группы дают независимость и масштаб.
  • Streams считает окна, Connect выгружает в хранилища — целостный конвейер реального времени из изученных кирпичиков.
Проверьте себя
1. Почему ключом события заказа выбран order_id?
AДля сжатия
BЧтобы все события одного заказа попали в одну партицию и шли строго по порядку
CЧтобы удалить дубликаты
DЧтобы ускорить запись на диск
2. Почему антифрод, аналитика и поиск используют РАЗНЫЕ group.id?
AТак быстрее
BЧтобы каждый независимо прочитал весь поток со своим оффсетом, а не делил партиции
CЭто требование Schema Registry
DИначе Kafka не запустится
3. Что позволяет добавить нового потребителя (отчёт по возвратам) завтра, не трогая источник?
AВысокий acks
BХранимый лог: новый потребитель подключается и читает историю заказов с начала
CБольшая JVM-куча
DОтключение репликации