Источники и приёмники данных
Урок систематизирует, откуда конвейер берёт данные и куда их кладёт.
Источник (source) — система, откуда конвейер извлекает данные. Приёмник (sink) — система, куда конвейер их записывает.
Типы источников
Дата-инженер постоянно подключается к разным системам. Их полезно разложить по типам, потому что от типа зависит способ извлечения.
- Базы данных (PostgreSQL, MySQL) — читаем по SQL или через журнал изменений.
- REST/GraphQL API (рекламные кабинеты, CRM) — забираем по HTTP с пагинацией и лимитами.
- Файлы (CSV, JSON, Parquet в S3) — выгрузки и логи.
- Очереди и стримы (Kafka) — потоки событий в реальном времени.
Тип источника определяет не только способ подключения, но и стратегию извлечения. Базу можно читать инкрементально по полю updated_at; API часто отдаёт данные постранично и ограничивает частоту запросов; файлы появляются пачками по расписанию; очередь — это бесконечный поток, который никогда не «заканчивается». Хороший дата-инженер сначала отвечает на вопрос «что это за источник и как он отдаёт данные», и только потом пишет код извлечения.
Типы приёмников
Куда едут данные после обработки:
| Приёмник | Зачем |
| Хранилище (warehouse) | аналитика, дашборды, отчёты |
| Озеро данных (data lake) | дешёвое хранение сырья и Parquet-файлов |
| Поисковый движок (OpenSearch) | полнотекстовый поиск и логи |
| Кэш / витрина | быстрый доступ приложения к готовым агрегатам |
Как работает под капотом
Главное свойство хорошего приёмника — устойчивость к повтору. Если конвейер запустился дважды, в приёмнике не должно появиться дублей. Для этого используют upsert: «обнови, если запись с таким ключом есть, иначе вставь». В SQLite это делается через INSERT OR REPLACE по первичному ключу.
CREATE TABLE customers (id INTEGER PRIMARY KEY, name TEXT, city TEXT);
INSERT INTO customers VALUES (1, 'Анна', 'Москва'), (2, 'Борис', 'Казань');
-- повторный запуск конвейера: id=1 обновился, id=3 добавился, дублей нет
INSERT OR REPLACE INTO customers VALUES (1, 'Анна', 'Сочи'), (3, 'Вера', 'Пермь');
SELECT * FROM customers ORDER BY id;А вот извлечение из «файлового источника» на чистом Python — разбираем строки лога и группируем по статусу.
from collections import Counter
log = ["GET 200", "POST 500", "GET 200", "GET 404", "POST 200"]
codes = Counter(line.split()[1] for line in log)
for code, n in sorted(codes.items()):
print(code, n)Вывод:
200 3 404 1 500 1
Этот разбор лога — миниатюрный Extract из файлового источника: мы прочитали сырые строки, выделили нужное поле и агрегировали. Дальше результат отправился бы в приёмник через upsert из примера выше. Связка «извлечь из источника → положить в приёмник идемпотентно» — это скелет любого конвейера, и оба его конца мы только что увидели в коде.
Частые ошибки
- Читать большую таблицу целиком каждый раз. Источник нужно забирать инкрементально (только новое), иначе нагрузка растёт линейно с объёмом.
- Приёмник без ключа. Если в целевой таблице нет уникального ключа, повторный запуск создаёт дубли — upsert становится невозможен.
- Игнорировать лимиты API. Источники-API часто ограничивают частоту запросов; без пауз и ретраев извлечение упадёт.
Итог
- Источники бывают: базы, API, файлы, очереди — способ извлечения зависит от типа.
- Приёмники: хранилища, озёра, поисковые движки, витрины.
- Хороший приёмник поддерживает upsert по ключу, чтобы повтор не создавал дублей.