Потоки: Readable и Writable
Поток — это данные, приходящие или уходящие частями (chunks), а не целиком: так Node обрабатывает гигабайты в мегабайтах памяти.
Stream (поток) — абстракция для работы с данными по кускам во времени. Readable — источник, из которого данные читают; Writable — приёмник, в который данные пишут. Поток наследует EventEmitter и общается событиями.
Простейший способ прочитать файл — fs.readFile: он загружает весь файл в память и отдаёт буфер. Для файла на пару килобайт это нормально. Но что если файл — это лог на 5 ГБ или видео? Загрузить его целиком невозможно: не хватит памяти. Решение — обрабатывать данные частями, по мере поступления. Именно это делают потоки.
Этот урок — про то, зачем нужна потоковая модель, какие бывают потоки, в каких режимах работает Readable и какими событиями всё это управляется.
Зачем это на практике
- Большие файлы: посчитать строки в гигабайтном логе, читая его кусками по 64 КБ, а не целиком.
- Сеть: HTTP-запрос и ответ в Node — это потоки; тело приходит частями по мере прибытия пакетов.
- Раннее начало работы: начать отдавать ответ клиенту, не дожидаясь, пока соберётся весь результат.
- Конвейеры: прочитать → сжать → записать, не держа промежуточные данные в памяти полностью.
Примеры используют модули fs и stream — это серверный API Node, поэтому кнопки «Запустить» под кодом нет. Запускайте их в локальном Node.
Четыре типа потоков
В Node поток всегда одного из типов. Понимать различие важно, чтобы выбрать правильный инструмент:
| Тип | Роль | Пример |
Readable | источник — из него читают | fs.createReadStream, тело запроса |
Writable | приёмник — в него пишут | fs.createWriteStream, ответ сервера |
Duplex | и чтение, и запись (две стороны) | TCP-сокет |
Transform | дуплекс, который преобразует данные | сжатие gzip (следующий урок) |
В этом уроке — два базовых: Readable (чтение) и Writable (запись).
Чтение: события data, end, error
Readable-поток сообщает о происходящем событиями. Три ключевых:
data | пришёл очередной кусок (chunk) — обычно Buffer |
end | данные закончились, кусков больше не будет |
error | произошла ошибка (нет файла, оборвалось соединение) |
const fs = require('fs');
const stream = fs.createReadStream('big.log');
let bytes = 0;
stream.on('data', (chunk) => {
bytes += chunk.length; // обрабатываем кусок, не весь файл
});
stream.on('end', () => {
console.log('Готово, всего байт:', bytes);
});
stream.on('error', (err) => {
console.error('Ошибка чтения:', err.message);
});
Файл читается частями: на каждый кусок срабатывает data, в конце один раз — end. В любой момент мы держим в памяти только текущий chunk (по умолчанию около 64 КБ), а не весь файл. Так гигабайтный лог обрабатывается в мегабайтах памяти.
Обработчик error обязателен. Если поток столкнётся с ошибкой, а слушателя error нет, Node выбросит необработанное исключение и процесс упадёт.
Запись: метод write и событие finish
Writable-поток принимает данные методом write(chunk), а по окончании ему говорят end(). Когда все данные физически записаны, поток испускает событие finish.
const fs = require('fs');
const out = fs.createWriteStream('out.txt');
out.write('Первая строка\n');
out.write('Вторая строка\n');
out.end('Последняя строка\n'); // end можно передать финальный кусок
out.on('finish', () => {
console.log('Файл записан полностью');
});
out.on('error', (err) => {
console.error('Ошибка записи:', err.message);
});
Важно: write() не гарантирует, что данные уже на диске — они могут попасть во внутренний буфер. Именно finish сигнализирует, что всё дописано. После end() писать в поток нельзя — попытка write() вызовет ошибку.
Режимы flowing и paused
Readable-поток существует в одном из двух режимов, и это частый источник путаницы.
- paused (по умолчанию): данные не текут сами. Вы вытягиваете их вручную методом
read()или дожидаетесь событияreadable. - flowing: данные текут автоматически, поток сам испускает события
dataкак можно быстрее.
Поток переключается в режим flowing, как только вы делаете одно из трёх: вешаете обработчик на событие data, вызываете resume() или подключаете поток через pipe() (следующий урок). Вернуть в paused можно методом pause().
const stream = fs.createReadStream('data.txt');
// paused: тянем данные сами, когда поток сигналит 'readable'
stream.on('readable', () => {
let chunk;
while ((chunk = stream.read()) !== null) {
console.log('Кусок длиной', chunk.length);
}
});
Зачем два режима? Flowing проще и подходит для большинства задач — «дайте мне всё по порядку». Paused даёт точный контроль: вы сами решаете, когда и сколько прочитать, что важно при ручном управлении нагрузкой. Главная ловушка: если повесить обработчик data на поток, переключить его в flowing и при этом тормозить обработку — данные продолжат течь и копиться. Об этой проблеме (backpressure) и о том, как pipe решает её автоматически, — следующий урок.
Как это работает под капотом
У каждого потока есть внутренний буфер и порог highWaterMark (по умолчанию 64 КБ для байтовых потоков, 16 объектов — для объектных). Readable читает из источника наперёд, складывая прочитанное в буфер, пока не упрётся в порог; затем приостанавливает чтение из источника и ждёт, пока вы заберёте накопленное. У Writable наоборот: данные кладутся в буфер и пишутся в приёмник по мере готовности.
Метод write() возвращает булево значение. true значит «буфер ещё не полон, можно писать дальше»; false — «буфер переполнен, притормози и дождись события drain». Это и есть механизм обратного давления на уровне одного потока. Вручную его соблюдать утомительно, поэтому на практике потоки соединяют так, чтобы давление учитывалось автоматически — через pipe и pipeline.
Частые ошибки
- Нет обработчика error. Поток без слушателя
errorпри сбое роняет весь процесс необработанным исключением. Вешайтеerrorвсегда. - Декодируют текст по chunk наугад. Граница куска может разрезать многобайтовый UTF-8-символ. Либо собирайте куски в массив и склеивайте
Buffer.concatв конце, либо используйтеStringDecoder/ задайте кодировку черезsetEncoding. - Путают end и finish.
end— событие читающего потока (данные кончились);finish— событие пишущего (всё дописано). Это разные стороны. - Пишут после end(). После
writable.end()поток закрыт;write()вызовет ошибку. - Считают, что write() уже сохранил данные.
write()может лишь положить данные в буфер. Гарантию даёт событиеfinish, а не сам вызов.
Итоги
- Поток обрабатывает данные частями (chunks) во времени — так Node работает с большими файлами и сетью в малой памяти.
- Типы:
Readable(читают),Writable(пишут),DuplexиTransform(две стороны / преобразование). - Readable общается событиями
data(кусок),end(конец),error(сбой); Writable —write(),end(), событиеfinish. - Режимы: paused (тянем сами через
read()) и flowing (данные текут сами); обработчикdata,resume()илиpipe()включают flowing. - Порог
highWaterMarkи булев результатwrite()— это основа обратного давления (backpressure) на уровне потока.