ETL-пайплайн: CSV в базу данных
Урок собирает первый полноценный ETL-пайплайн: из CSV — в таблицу базы данных.
ETL-пайплайн — последовательность Extract → Transform → Load, превращающая сырой источник в готовую таблицу хранилища.
Постановка задачи
Есть выгрузка заказов в CSV: id, customer, amount, status. Нужно отфильтровать отменённые, привести суммы к числам, посчитать выручку по клиентам и загрузить результат в таблицу витрины. Разберём каждый из трёх шагов отдельно, а потом соберём вместе.
Почему важно держать шаги раздельными, мы уже обсуждали, но в коде это видно особенно ясно: Extract зависит от формата источника (CSV), Transform — это чистая бизнес-логика на Python, Load — это SQL для конкретного приёмника. Поменялся источник (стал API вместо файла) — переписываем только Extract. Поменялась витрина — только Load. Логика подсчёта выручки при этом остаётся нетронутой. Такая развязка делает пайплайн гибким и тестируемым.
orders.csv ──Extract──> список словарей ──Transform──> агрегаты ──Load──> revenue_by_customerШаг Extract и Transform на Python
Извлекаем из CSV и преобразуем средствами стандартной библиотеки — это реально исполнимо в браузере.
import csv, io
from collections import defaultdict
csv_text = """id,customer,amount,status
1,Анна,300,paid
2,Борис,500,paid
3,Анна,200,cancelled
4,Анна,400,paid"""
# Extract
rows = list(csv.DictReader(io.StringIO(csv_text)))
# Transform: убираем отменённые, считаем выручку по клиентам
revenue = defaultdict(int)
for r in rows:
if r["status"] == "paid":
revenue[r["customer"]] += int(r["amount"])
for customer, total in sorted(revenue.items()):
print(customer, total)Вывод:
Анна 700 Борис 500
Шаг Load на SQL
Загружаем посчитанные агрегаты в витрину. Делаем загрузку идемпотентной через delete-insert: повтор не создаст дублей.
CREATE TABLE revenue_by_customer (customer TEXT PRIMARY KEY, revenue INTEGER);
-- идемпотентная загрузка результата Transform
DELETE FROM revenue_by_customer WHERE customer IN ('Анна','Борис');
INSERT INTO revenue_by_customer VALUES ('Анна', 700), ('Борис', 500);
SELECT customer, revenue
FROM revenue_by_customer
ORDER BY revenue DESC;Как работает под капотом
В Airflow эти три шага стали бы тремя задачами одного DAG, связанными стрелками. Код требует airflow, помечен text.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract(): ... # прочитать orders.csv
def transform(): ... # отфильтровать и агрегировать
def load(): ... # delete-insert в витрину
with DAG("orders_etl", start_date=datetime(2026,6,1), schedule="@daily", catchup=False) as dag:
e = PythonOperator(task_id="extract", python_callable=extract)
t = PythonOperator(task_id="transform", python_callable=transform)
l = PythonOperator(task_id="load", python_callable=load)
e >> t >> lЧастые ошибки
- Смешать преобразование и загрузку в один шаг. Раздельные Extract/Transform/Load проще тестировать и перезапускать по отдельности.
- Грузить через чистый INSERT. Без delete-insert или upsert повторный прогон ETL продублирует выручку.
- Не приводить типы. Из CSV суммы приходят строками; забыли
int()— и агрегаты посчитаются неверно или упадут.
Итог
- ETL-пайплайн состоит из раздельных шагов Extract, Transform, Load.
- Преобразование удобно делать на Python (csv, collections), загрузку — на SQL.
- Загрузку делают идемпотентной (delete-insert/upsert), чтобы повтор не дублировал данные.