pipe, Transform и backpressure
pipe соединяет источник с приёмником и сам соблюдает обратное давление; Transform — поток, преобразующий данные на лету; pipeline() делает это надёжно.
Backpressure (обратное давление) — механизм, при котором быстрый источник притормаживает, если медленный приёмник не успевает: данные не копятся в памяти бесконтрольно. pipe соединяет Readable с Writable и соблюдает backpressure автоматически.
В прошлом уроке мы читали и писали потоки вручную, развешивая обработчики data, end, error и следя за переполнением буфера. Это работает, но многословно и легко ошибиться. На практике потоки соединяют в конвейер: источник → (преобразование) → приёмник. Инструменты для этого — pipe(), Transform и pipeline().
Этот урок — про то, как одной строкой соединить потоки, как вставить в конвейер своё преобразование и почему без обратного давления программа «съедает» память.
Зачем это на практике
- Копирование/сжатие файла: читать → gzip → писать, не держа файл в памяти целиком.
- Отдача файла по HTTP:
fs.createReadStream(file).pipe(response)— сервер стримит файл клиенту. - Преобразование на лету: построчная обработка лога, перевод текста в верхний регистр, парсинг CSV — кусок за куском.
- Защита от перегрузки памяти: медленный диск или медленный клиент не должны заставлять Node буферизовать гигабайты.
Код использует серверный API Node (fs, zlib, stream) — кнопки «Запустить» нет, проверяйте в локальном Node.
pipe: соединяем потоки
readable.pipe(writable) подключает источник к приёмнику: всё, что выходит из readable, идёт в writable. Это заменяет ручную развеску data/end и, что важнее, само соблюдает обратное давление.
const fs = require('fs');
// скопировать файл потоково
fs.createReadStream('input.bin')
.pipe(fs.createWriteStream('output.bin'));
Одна строка вместо десятка. pipe переводит источник в режим flowing, передаёт каждый кусок в приёмник, а когда источник закончится — вызывает end() у приёмника. pipe возвращает целевой поток, поэтому вызовы можно сцеплять в цепочку:
const zlib = require('zlib');
fs.createReadStream('input.txt')
.pipe(zlib.createGzip()) // сжать
.pipe(fs.createWriteStream('input.txt.gz'));
Здесь данные текут источник → gzip → файл, и на каждом стыке память под контролем. zlib.createGzip() — это Transform-поток, к которому мы переходим.
Transform: преобразование на лету
Transform — поток, который одновременно читающий и пишущий: в него пишут исходные данные, из него читают преобразованные. Это идеальное «звено» в середине конвейера. Чтобы создать свой Transform, реализуют метод transform(chunk, encoding, callback): он получает кусок, что-то с ним делает и через callback отдаёт результат дальше.
const { Transform } = require('stream');
const upper = new Transform({
transform(chunk, encoding, callback) {
// превратить кусок в верхний регистр
const result = chunk.toString().toUpperCase();
callback(null, result); // (ошибка, преобразованные данные)
}
});
fs.createReadStream('input.txt')
.pipe(upper)
.pipe(fs.createWriteStream('shout.txt'));
Каждый кусок проходит через upper и выходит в верхнем регистре. Первый аргумент callback — ошибка (или null), второй — результат. Если для куска нечего отдавать прямо сейчас, вызывают callback() без второго аргумента. Так устроены десятки готовых Transform-потоков: gzip, шифрование, парсеры. Важная тонкость текстовых Transform: кусок может разрезать строку или UTF-8-символ посередине, поэтому для построчной обработки данные сначала собирают, а уже потом режут по \n.
Что такое backpressure и почему он важен
Представьте: вы читаете файл с быстрого SSD и пишете на медленный сетевой диск. Источник выдаёт куски быстрее, чем приёмник успевает их записывать. Куда деваются непринятые данные? Они копятся в буфере приёмника. Если источник не остановить, буфер раздуется на гигабайты — и процесс упадёт по нехватке памяти.
Backpressure — это сигнал «помедленнее» от приёмника к источнику. Механика на уровне потоков такая: метод write() приёмника возвращает false, когда его буфер переполнен. Получив false, источник должен сделать pause() и ждать события drain («буфер освободился»), затем resume().
// ПЛОХО: backpressure игнорируется — буфер приёмника растёт без предела
src.on('data', (chunk) => {
dst.write(chunk); // результат write() не проверяется!
});
// ХОРОШО: pipe делает ровно эту проверку за вас
src.pipe(dst);
Главная ценность pipe именно в этом: он сам слушает результат write(), приостанавливает источник на false и возобновляет на drain. Поэтому потоковое копирование через pipe работает в постоянной памяти независимо от размера данных — а ручной data+write без проверки результата эту гарантию теряет.
pipeline: надёжная замена pipe
У pipe есть слабое место — обработка ошибок. Если в середине цепочки a.pipe(b).pipe(c) упадёт b, остальные потоки не закроются автоматически: возникает утечка ресурсов (открытые файлы, сокеты). Поэтому в современном Node для соединения используют функцию pipeline() из модуля stream: она соединяет потоки, правильно прокидывает ошибки и гарантированно закрывает все звенья.
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz'),
(err) => {
if (err) console.error('Конвейер упал:', err.message);
else console.log('Готово');
}
);
Последний аргумент — колбэк, который вызовется один раз: с ошибкой, если что-то сломалось на любом звене, или без неё при успехе. Есть и промис-версия из stream/promises, удобная с async/await. Правило простое: для одного-двух потоков на чтение можно pipe, но для конвейеров в продакшене предпочитайте pipeline — он не теряет ошибки и не оставляет висящих ресурсов.
Как это работает под капотом
Под капотом pipe — это аккуратная подписка на события. Он вешает на источник data и для каждого куска зовёт dst.write(chunk). Если write вернул false, pipe вызывает src.pause(); на событие drain приёмника — src.resume(). На end источника он зовёт dst.end(). То есть весь «танец» с обратным давлением и завершением спрятан внутри.
Чего pipe не делает — не следит за ошибками промежуточных звеньев и не уничтожает их при сбое. pipeline добавляет именно это: оборачивает каждое звено, и при ошибке на любом из них вызывает destroy() у всех, освобождая ресурсы, после чего сообщает об ошибке один раз. Поэтому pipeline — не просто синтаксический сахар, а корректная по ресурсам версия конвейера.
Частые ошибки
- Ручной data+write без проверки результата. Если игнорировать булев результат
write(), backpressure не соблюдается и буфер растёт без предела. Используйтеpipe/pipeline. - pipe без обработки ошибок. При сбое звена в цепочке
pipeостальные потоки не закрываются — утечка. Для конвейеров беритеpipeline. - Forgotten callback в Transform. Если в
transformне вызватьcallback, поток «зависнет»: следующий кусок не придёт, конвейер встанет. - Режут строки по границе chunk. Transform получает произвольные куски; строка или UTF-8-символ может оказаться разрезанным. Накапливайте остаток и режьте по
\nаккуратно. - Путают, кто кому pipe. Всегда
readable.pipe(writable)— из источника в приёмник, не наоборот.
Итоги
readable.pipe(writable)соединяет потоки одной строкой и автоматически соблюдает обратное давление.Transform— звено-преобразователь: методtransform(chunk, enc, cb)получает кусок и отдаёт результат черезcb(err, data).- Backpressure — сигнал «помедленнее»:
write()возвращаетfalseпри переполнении, источник ждётdrain; без этого память растёт неограниченно. pipeне закрывает потоки при ошибке промежуточного звена — это его слабость.pipeline()соединяет потоки, корректно прокидывает ошибки и закрывает все звенья — предпочтительный способ для продакшена.