ETL-пайплайн: CSV в базу данных

Урок собирает первый полноценный ETL-пайплайн: из CSV — в таблицу базы данных.

ETL-пайплайн — последовательность Extract → Transform → Load, превращающая сырой источник в готовую таблицу хранилища.

Постановка задачи

Есть выгрузка заказов в CSV: id, customer, amount, status. Нужно отфильтровать отменённые, привести суммы к числам, посчитать выручку по клиентам и загрузить результат в таблицу витрины. Разберём каждый из трёх шагов отдельно, а потом соберём вместе.

Почему важно держать шаги раздельными, мы уже обсуждали, но в коде это видно особенно ясно: Extract зависит от формата источника (CSV), Transform — это чистая бизнес-логика на Python, Load — это SQL для конкретного приёмника. Поменялся источник (стал API вместо файла) — переписываем только Extract. Поменялась витрина — только Load. Логика подсчёта выручки при этом остаётся нетронутой. Такая развязка делает пайплайн гибким и тестируемым.

orders.csv ──Extract──> список словарей ──Transform──> агрегаты ──Load──> revenue_by_customer

Шаг Extract и Transform на Python

Извлекаем из CSV и преобразуем средствами стандартной библиотеки — это реально исполнимо в браузере.

import csv, io
from collections import defaultdict

csv_text = """id,customer,amount,status
1,Анна,300,paid
2,Борис,500,paid
3,Анна,200,cancelled
4,Анна,400,paid"""

# Extract
rows = list(csv.DictReader(io.StringIO(csv_text)))

# Transform: убираем отменённые, считаем выручку по клиентам
revenue = defaultdict(int)
for r in rows:
    if r["status"] == "paid":
        revenue[r["customer"]] += int(r["amount"])

for customer, total in sorted(revenue.items()):
    print(customer, total)

Вывод:

Анна 700
Борис 500

Шаг Load на SQL

Загружаем посчитанные агрегаты в витрину. Делаем загрузку идемпотентной через delete-insert: повтор не создаст дублей.

CREATE TABLE revenue_by_customer (customer TEXT PRIMARY KEY, revenue INTEGER);

-- идемпотентная загрузка результата Transform
DELETE FROM revenue_by_customer WHERE customer IN ('Анна','Борис');
INSERT INTO revenue_by_customer VALUES ('Анна', 700), ('Борис', 500);

SELECT customer, revenue
FROM revenue_by_customer
ORDER BY revenue DESC;

Как работает под капотом

В Airflow эти три шага стали бы тремя задачами одного DAG, связанными стрелками. Код требует airflow, помечен text.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract(): ...      # прочитать orders.csv
def transform(): ...    # отфильтровать и агрегировать
def load(): ...         # delete-insert в витрину

with DAG("orders_etl", start_date=datetime(2026,6,1), schedule="@daily", catchup=False) as dag:
    e = PythonOperator(task_id="extract", python_callable=extract)
    t = PythonOperator(task_id="transform", python_callable=transform)
    l = PythonOperator(task_id="load", python_callable=load)
    e >> t >> l

Частые ошибки

  • Смешать преобразование и загрузку в один шаг. Раздельные Extract/Transform/Load проще тестировать и перезапускать по отдельности.
  • Грузить через чистый INSERT. Без delete-insert или upsert повторный прогон ETL продублирует выручку.
  • Не приводить типы. Из CSV суммы приходят строками; забыли int() — и агрегаты посчитаются неверно или упадут.

Итог

  • ETL-пайплайн состоит из раздельных шагов Extract, Transform, Load.
  • Преобразование удобно делать на Python (csv, collections), загрузку — на SQL.
  • Загрузку делают идемпотентной (delete-insert/upsert), чтобы повтор не дублировал данные.
Проверьте себя
1. Почему шаги Extract, Transform и Load обычно держат раздельными?
AТак требует синтаксис SQL
BИх проще тестировать, перезапускать и отлаживать по отдельности
CИначе данные шифруются
DЭто ускоряет извлечение в 10 раз
2. Что произойдёт, если шаг Load использует чистый INSERT и пайплайн запустят дважды?
AНичего, данные не изменятся
BВыручка задублируется, потому что строки добавятся повторно
CБаза данных удалит таблицу
DPipeline станет быстрее
3. Почему при чтении сумм из CSV важно вызвать int()?
AИз CSV всё приходит строками, и без преобразования агрегаты посчитаются неверно
Bint() ускоряет чтение файла
CЭто требуется для подключения к Airflow
DCSV хранит числа в шестнадцатеричном виде