Источники и приёмники данных

Урок систематизирует, откуда конвейер берёт данные и куда их кладёт.

Источник (source) — система, откуда конвейер извлекает данные. Приёмник (sink) — система, куда конвейер их записывает.

Типы источников

Дата-инженер постоянно подключается к разным системам. Их полезно разложить по типам, потому что от типа зависит способ извлечения.

  • Базы данных (PostgreSQL, MySQL) — читаем по SQL или через журнал изменений.
  • REST/GraphQL API (рекламные кабинеты, CRM) — забираем по HTTP с пагинацией и лимитами.
  • Файлы (CSV, JSON, Parquet в S3) — выгрузки и логи.
  • Очереди и стримы (Kafka) — потоки событий в реальном времени.

Тип источника определяет не только способ подключения, но и стратегию извлечения. Базу можно читать инкрементально по полю updated_at; API часто отдаёт данные постранично и ограничивает частоту запросов; файлы появляются пачками по расписанию; очередь — это бесконечный поток, который никогда не «заканчивается». Хороший дата-инженер сначала отвечает на вопрос «что это за источник и как он отдаёт данные», и только потом пишет код извлечения.

Типы приёмников

Куда едут данные после обработки:

ПриёмникЗачем
Хранилище (warehouse)аналитика, дашборды, отчёты
Озеро данных (data lake)дешёвое хранение сырья и Parquet-файлов
Поисковый движок (OpenSearch)полнотекстовый поиск и логи
Кэш / витринабыстрый доступ приложения к готовым агрегатам

Как работает под капотом

Главное свойство хорошего приёмника — устойчивость к повтору. Если конвейер запустился дважды, в приёмнике не должно появиться дублей. Для этого используют upsert: «обнови, если запись с таким ключом есть, иначе вставь». В SQLite это делается через INSERT OR REPLACE по первичному ключу.

CREATE TABLE customers (id INTEGER PRIMARY KEY, name TEXT, city TEXT);
INSERT INTO customers VALUES (1, 'Анна', 'Москва'), (2, 'Борис', 'Казань');

-- повторный запуск конвейера: id=1 обновился, id=3 добавился, дублей нет
INSERT OR REPLACE INTO customers VALUES (1, 'Анна', 'Сочи'), (3, 'Вера', 'Пермь');

SELECT * FROM customers ORDER BY id;

А вот извлечение из «файлового источника» на чистом Python — разбираем строки лога и группируем по статусу.

from collections import Counter

log = ["GET 200", "POST 500", "GET 200", "GET 404", "POST 200"]
codes = Counter(line.split()[1] for line in log)
for code, n in sorted(codes.items()):
    print(code, n)

Вывод:

200 3
404 1
500 1

Этот разбор лога — миниатюрный Extract из файлового источника: мы прочитали сырые строки, выделили нужное поле и агрегировали. Дальше результат отправился бы в приёмник через upsert из примера выше. Связка «извлечь из источника → положить в приёмник идемпотентно» — это скелет любого конвейера, и оба его конца мы только что увидели в коде.

Частые ошибки

  • Читать большую таблицу целиком каждый раз. Источник нужно забирать инкрементально (только новое), иначе нагрузка растёт линейно с объёмом.
  • Приёмник без ключа. Если в целевой таблице нет уникального ключа, повторный запуск создаёт дубли — upsert становится невозможен.
  • Игнорировать лимиты API. Источники-API часто ограничивают частоту запросов; без пауз и ретраев извлечение упадёт.

Итог

  • Источники бывают: базы, API, файлы, очереди — способ извлечения зависит от типа.
  • Приёмники: хранилища, озёра, поисковые движки, витрины.
  • Хороший приёмник поддерживает upsert по ключу, чтобы повтор не создавал дублей.
Проверьте себя
1. Что такое upsert в контексте приёмника данных?
AПолное удаление таблицы перед загрузкой
BОперация «обнови запись по ключу, если она есть, иначе вставь»
CИзвлечение данных из источника
DСжатие файлов перед загрузкой
2. Почему важно, чтобы в целевой таблице был уникальный ключ?
AБез него таблица занимает больше места
BБез него невозможен upsert, и повторный запуск создаёт дубли
CКлюч ускоряет извлечение из источника
DКлюч нужен только для красоты схемы