Потоки: Readable и Writable

Поток — это данные, приходящие или уходящие частями (chunks), а не целиком: так Node обрабатывает гигабайты в мегабайтах памяти.

Stream (поток) — абстракция для работы с данными по кускам во времени. Readable — источник, из которого данные читают; Writable — приёмник, в который данные пишут. Поток наследует EventEmitter и общается событиями.

Простейший способ прочитать файл — fs.readFile: он загружает весь файл в память и отдаёт буфер. Для файла на пару килобайт это нормально. Но что если файл — это лог на 5 ГБ или видео? Загрузить его целиком невозможно: не хватит памяти. Решение — обрабатывать данные частями, по мере поступления. Именно это делают потоки.

Этот урок — про то, зачем нужна потоковая модель, какие бывают потоки, в каких режимах работает Readable и какими событиями всё это управляется.

Зачем это на практике

  • Большие файлы: посчитать строки в гигабайтном логе, читая его кусками по 64 КБ, а не целиком.
  • Сеть: HTTP-запрос и ответ в Node — это потоки; тело приходит частями по мере прибытия пакетов.
  • Раннее начало работы: начать отдавать ответ клиенту, не дожидаясь, пока соберётся весь результат.
  • Конвейеры: прочитать → сжать → записать, не держа промежуточные данные в памяти полностью.

Примеры используют модули fs и stream — это серверный API Node, поэтому кнопки «Запустить» под кодом нет. Запускайте их в локальном Node.

Четыре типа потоков

В Node поток всегда одного из типов. Понимать различие важно, чтобы выбрать правильный инструмент:

ТипРольПример
Readableисточник — из него читаютfs.createReadStream, тело запроса
Writableприёмник — в него пишутfs.createWriteStream, ответ сервера
Duplexи чтение, и запись (две стороны)TCP-сокет
Transformдуплекс, который преобразует данныесжатие gzip (следующий урок)

В этом уроке — два базовых: Readable (чтение) и Writable (запись).

Чтение: события data, end, error

Readable-поток сообщает о происходящем событиями. Три ключевых:

dataпришёл очередной кусок (chunk) — обычно Buffer
endданные закончились, кусков больше не будет
errorпроизошла ошибка (нет файла, оборвалось соединение)
const fs = require('fs');
const stream = fs.createReadStream('big.log');

let bytes = 0;
stream.on('data', (chunk) => {
  bytes += chunk.length;        // обрабатываем кусок, не весь файл
});
stream.on('end', () => {
  console.log('Готово, всего байт:', bytes);
});
stream.on('error', (err) => {
  console.error('Ошибка чтения:', err.message);
});

Файл читается частями: на каждый кусок срабатывает data, в конце один раз — end. В любой момент мы держим в памяти только текущий chunk (по умолчанию около 64 КБ), а не весь файл. Так гигабайтный лог обрабатывается в мегабайтах памяти.

Обработчик error обязателен. Если поток столкнётся с ошибкой, а слушателя error нет, Node выбросит необработанное исключение и процесс упадёт.

Запись: метод write и событие finish

Writable-поток принимает данные методом write(chunk), а по окончании ему говорят end(). Когда все данные физически записаны, поток испускает событие finish.

const fs = require('fs');
const out = fs.createWriteStream('out.txt');

out.write('Первая строка\n');
out.write('Вторая строка\n');
out.end('Последняя строка\n'); // end можно передать финальный кусок

out.on('finish', () => {
  console.log('Файл записан полностью');
});
out.on('error', (err) => {
  console.error('Ошибка записи:', err.message);
});

Важно: write() не гарантирует, что данные уже на диске — они могут попасть во внутренний буфер. Именно finish сигнализирует, что всё дописано. После end() писать в поток нельзя — попытка write() вызовет ошибку.

Режимы flowing и paused

Readable-поток существует в одном из двух режимов, и это частый источник путаницы.

  • paused (по умолчанию): данные не текут сами. Вы вытягиваете их вручную методом read() или дожидаетесь события readable.
  • flowing: данные текут автоматически, поток сам испускает события data как можно быстрее.

Поток переключается в режим flowing, как только вы делаете одно из трёх: вешаете обработчик на событие data, вызываете resume() или подключаете поток через pipe() (следующий урок). Вернуть в paused можно методом pause().

const stream = fs.createReadStream('data.txt');

// paused: тянем данные сами, когда поток сигналит 'readable'
stream.on('readable', () => {
  let chunk;
  while ((chunk = stream.read()) !== null) {
    console.log('Кусок длиной', chunk.length);
  }
});

Зачем два режима? Flowing проще и подходит для большинства задач — «дайте мне всё по порядку». Paused даёт точный контроль: вы сами решаете, когда и сколько прочитать, что важно при ручном управлении нагрузкой. Главная ловушка: если повесить обработчик data на поток, переключить его в flowing и при этом тормозить обработку — данные продолжат течь и копиться. Об этой проблеме (backpressure) и о том, как pipe решает её автоматически, — следующий урок.

Как это работает под капотом

У каждого потока есть внутренний буфер и порог highWaterMark (по умолчанию 64 КБ для байтовых потоков, 16 объектов — для объектных). Readable читает из источника наперёд, складывая прочитанное в буфер, пока не упрётся в порог; затем приостанавливает чтение из источника и ждёт, пока вы заберёте накопленное. У Writable наоборот: данные кладутся в буфер и пишутся в приёмник по мере готовности.

Метод write() возвращает булево значение. true значит «буфер ещё не полон, можно писать дальше»; false — «буфер переполнен, притормози и дождись события drain». Это и есть механизм обратного давления на уровне одного потока. Вручную его соблюдать утомительно, поэтому на практике потоки соединяют так, чтобы давление учитывалось автоматически — через pipe и pipeline.

Частые ошибки

  • Нет обработчика error. Поток без слушателя error при сбое роняет весь процесс необработанным исключением. Вешайте error всегда.
  • Декодируют текст по chunk наугад. Граница куска может разрезать многобайтовый UTF-8-символ. Либо собирайте куски в массив и склеивайте Buffer.concat в конце, либо используйте StringDecoder / задайте кодировку через setEncoding.
  • Путают end и finish. end — событие читающего потока (данные кончились); finish — событие пишущего (всё дописано). Это разные стороны.
  • Пишут после end(). После writable.end() поток закрыт; write() вызовет ошибку.
  • Считают, что write() уже сохранил данные. write() может лишь положить данные в буфер. Гарантию даёт событие finish, а не сам вызов.

Итоги

  • Поток обрабатывает данные частями (chunks) во времени — так Node работает с большими файлами и сетью в малой памяти.
  • Типы: Readable (читают), Writable (пишут), Duplex и Transform (две стороны / преобразование).
  • Readable общается событиями data (кусок), end (конец), error (сбой); Writable — write(), end(), событие finish.
  • Режимы: paused (тянем сами через read()) и flowing (данные текут сами); обработчик data, resume() или pipe() включают flowing.
  • Порог highWaterMark и булев результат write() — это основа обратного давления (backpressure) на уровне потока.
Проверьте себя
1. Зачем читать большой файл потоком (createReadStream), а не целиком через readFile?
AПоток обрабатывает файл частями (chunks), удерживая в памяти лишь текущий кусок, поэтому гигабайтный файл умещается в мегабайты памяти
BПоток читает файл быстрее за счёт пропуска части данных
CreadFile не умеет читать файлы больше 1 МБ
DПоток автоматически сжимает данные при чтении
2. Что переведёт Readable-поток в режим flowing?
AПодписка на событие data, вызов resume() или подключение через pipe()
BТолько явный вызов flow(true)
CПоток всегда стартует в flowing и его нельзя приостановить
DВызов read() переключает в flowing навсегда
3. Чем отличаются события end и finish?
Aend — событие Readable-потока (данные на чтение закончились), finish — событие Writable (все данные дописаны)
BЭто синонимы одного и того же события
Cend срабатывает при ошибке, а finish — при успехе
Dfinish бывает только у Readable, а end — только у Writable