pipe, Transform и backpressure

pipe соединяет источник с приёмником и сам соблюдает обратное давление; Transform — поток, преобразующий данные на лету; pipeline() делает это надёжно.

Backpressure (обратное давление) — механизм, при котором быстрый источник притормаживает, если медленный приёмник не успевает: данные не копятся в памяти бесконтрольно. pipe соединяет Readable с Writable и соблюдает backpressure автоматически.

В прошлом уроке мы читали и писали потоки вручную, развешивая обработчики data, end, error и следя за переполнением буфера. Это работает, но многословно и легко ошибиться. На практике потоки соединяют в конвейер: источник → (преобразование) → приёмник. Инструменты для этого — pipe(), Transform и pipeline().

Этот урок — про то, как одной строкой соединить потоки, как вставить в конвейер своё преобразование и почему без обратного давления программа «съедает» память.

Зачем это на практике

  • Копирование/сжатие файла: читать → gzip → писать, не держа файл в памяти целиком.
  • Отдача файла по HTTP: fs.createReadStream(file).pipe(response) — сервер стримит файл клиенту.
  • Преобразование на лету: построчная обработка лога, перевод текста в верхний регистр, парсинг CSV — кусок за куском.
  • Защита от перегрузки памяти: медленный диск или медленный клиент не должны заставлять Node буферизовать гигабайты.

Код использует серверный API Node (fs, zlib, stream) — кнопки «Запустить» нет, проверяйте в локальном Node.

pipe: соединяем потоки

readable.pipe(writable) подключает источник к приёмнику: всё, что выходит из readable, идёт в writable. Это заменяет ручную развеску data/end и, что важнее, само соблюдает обратное давление.

const fs = require('fs');

// скопировать файл потоково
fs.createReadStream('input.bin')
  .pipe(fs.createWriteStream('output.bin'));

Одна строка вместо десятка. pipe переводит источник в режим flowing, передаёт каждый кусок в приёмник, а когда источник закончится — вызывает end() у приёмника. pipe возвращает целевой поток, поэтому вызовы можно сцеплять в цепочку:

const zlib = require('zlib');

fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())              // сжать
  .pipe(fs.createWriteStream('input.txt.gz'));

Здесь данные текут источник → gzip → файл, и на каждом стыке память под контролем. zlib.createGzip() — это Transform-поток, к которому мы переходим.

Transform: преобразование на лету

Transform — поток, который одновременно читающий и пишущий: в него пишут исходные данные, из него читают преобразованные. Это идеальное «звено» в середине конвейера. Чтобы создать свой Transform, реализуют метод transform(chunk, encoding, callback): он получает кусок, что-то с ним делает и через callback отдаёт результат дальше.

const { Transform } = require('stream');

const upper = new Transform({
  transform(chunk, encoding, callback) {
    // превратить кусок в верхний регистр
    const result = chunk.toString().toUpperCase();
    callback(null, result); // (ошибка, преобразованные данные)
  }
});

fs.createReadStream('input.txt')
  .pipe(upper)
  .pipe(fs.createWriteStream('shout.txt'));

Каждый кусок проходит через upper и выходит в верхнем регистре. Первый аргумент callback — ошибка (или null), второй — результат. Если для куска нечего отдавать прямо сейчас, вызывают callback() без второго аргумента. Так устроены десятки готовых Transform-потоков: gzip, шифрование, парсеры. Важная тонкость текстовых Transform: кусок может разрезать строку или UTF-8-символ посередине, поэтому для построчной обработки данные сначала собирают, а уже потом режут по \n.

Что такое backpressure и почему он важен

Представьте: вы читаете файл с быстрого SSD и пишете на медленный сетевой диск. Источник выдаёт куски быстрее, чем приёмник успевает их записывать. Куда деваются непринятые данные? Они копятся в буфере приёмника. Если источник не остановить, буфер раздуется на гигабайты — и процесс упадёт по нехватке памяти.

Backpressure — это сигнал «помедленнее» от приёмника к источнику. Механика на уровне потоков такая: метод write() приёмника возвращает false, когда его буфер переполнен. Получив false, источник должен сделать pause() и ждать события drain («буфер освободился»), затем resume().

// ПЛОХО: backpressure игнорируется — буфер приёмника растёт без предела
src.on('data', (chunk) => {
  dst.write(chunk); // результат write() не проверяется!
});

// ХОРОШО: pipe делает ровно эту проверку за вас
src.pipe(dst);

Главная ценность pipe именно в этом: он сам слушает результат write(), приостанавливает источник на false и возобновляет на drain. Поэтому потоковое копирование через pipe работает в постоянной памяти независимо от размера данных — а ручной data+write без проверки результата эту гарантию теряет.

pipeline: надёжная замена pipe

У pipe есть слабое место — обработка ошибок. Если в середине цепочки a.pipe(b).pipe(c) упадёт b, остальные потоки не закроются автоматически: возникает утечка ресурсов (открытые файлы, сокеты). Поэтому в современном Node для соединения используют функцию pipeline() из модуля stream: она соединяет потоки, правильно прокидывает ошибки и гарантированно закрывает все звенья.

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('input.txt.gz'),
  (err) => {
    if (err) console.error('Конвейер упал:', err.message);
    else console.log('Готово');
  }
);

Последний аргумент — колбэк, который вызовется один раз: с ошибкой, если что-то сломалось на любом звене, или без неё при успехе. Есть и промис-версия из stream/promises, удобная с async/await. Правило простое: для одного-двух потоков на чтение можно pipe, но для конвейеров в продакшене предпочитайте pipeline — он не теряет ошибки и не оставляет висящих ресурсов.

Как это работает под капотом

Под капотом pipe — это аккуратная подписка на события. Он вешает на источник data и для каждого куска зовёт dst.write(chunk). Если write вернул false, pipe вызывает src.pause(); на событие drain приёмника — src.resume(). На end источника он зовёт dst.end(). То есть весь «танец» с обратным давлением и завершением спрятан внутри.

Чего pipe не делает — не следит за ошибками промежуточных звеньев и не уничтожает их при сбое. pipeline добавляет именно это: оборачивает каждое звено, и при ошибке на любом из них вызывает destroy() у всех, освобождая ресурсы, после чего сообщает об ошибке один раз. Поэтому pipeline — не просто синтаксический сахар, а корректная по ресурсам версия конвейера.

Частые ошибки

  • Ручной data+write без проверки результата. Если игнорировать булев результат write(), backpressure не соблюдается и буфер растёт без предела. Используйте pipe/pipeline.
  • pipe без обработки ошибок. При сбое звена в цепочке pipe остальные потоки не закрываются — утечка. Для конвейеров берите pipeline.
  • Forgotten callback в Transform. Если в transform не вызвать callback, поток «зависнет»: следующий кусок не придёт, конвейер встанет.
  • Режут строки по границе chunk. Transform получает произвольные куски; строка или UTF-8-символ может оказаться разрезанным. Накапливайте остаток и режьте по \n аккуратно.
  • Путают, кто кому pipe. Всегда readable.pipe(writable) — из источника в приёмник, не наоборот.

Итоги

  • readable.pipe(writable) соединяет потоки одной строкой и автоматически соблюдает обратное давление.
  • Transform — звено-преобразователь: метод transform(chunk, enc, cb) получает кусок и отдаёт результат через cb(err, data).
  • Backpressure — сигнал «помедленнее»: write() возвращает false при переполнении, источник ждёт drain; без этого память растёт неограниченно.
  • pipe не закрывает потоки при ошибке промежуточного звена — это его слабость.
  • pipeline() соединяет потоки, корректно прокидывает ошибки и закрывает все звенья — предпочтительный способ для продакшена.
Проверьте себя
1. Что такое backpressure и почему он важен?
AЭто сигнал «помедленнее» от медленного приёмника к быстрому источнику; без него данные копятся в буфере и память растёт без предела
BЭто ускорение записи за счёт сжатия данных в буфере
CЭто ошибка, возникающая при разрыве сетевого соединения
DЭто режим, в котором поток читает данные в обратном порядке
2. В чём ключевое преимущество pipeline() над цепочкой pipe()?
Apipeline корректно прокидывает ошибки и гарантированно закрывает все звенья при сбое, тогда как pipe оставляет потоки открытыми
Bpipeline работает быстрее, потому что отключает backpressure
Cpipe нельзя использовать с Transform-потоками, а pipeline можно
Dpipeline не требует Readable-источника
3. Что обязательно сделать в методе transform(chunk, encoding, callback) у Transform-потока?
AВызвать callback (первым аргументом — ошибку или null, вторым — преобразованные данные), иначе поток зависнет
BВернуть преобразованный chunk через return
CЗаписать результат напрямую в файл внутри метода
DНичего — Node сам отдаёт исходный chunk дальше