Статья переведена с:Node.js Streams: Everything you need to know
Среди разработчиков распространено мнение, что потоки Node.js не только сложны в применении, но и сложны для понимания. Теперь есть хорошие новости: с потоковой передачей Node.js больше не будет проблем. За последние несколько лет разработчики разработали множество сторонних пакетов Node.js для облегчения управления потоками Node.js. Но в этой статье я сосредоточусь на представлении собственного приложения потокового интерфейса Node.js.
«Потоки — лучшая и самая неправильно понятая идея Node».
— Доминик Тарр
что такое стрим
Потоки — это наборы данных, такие как массивы или строки. Разница в том, что потоки не обязательно использовать все сразу, и при этом они не должны помещаться в память. Эти две функции делают потоки очень эффективными при обработке больших объемов данных или одновременном возврате больших порций данных наружу.
Возможность компоновки потокового кода обеспечивает новые возможности для потоковой передачи больших объемов данных. Потоки Node.js реализуют функциональность каналов данных так же, как крошечные команды Linux объединяются в многофункциональные составные команды.
const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)
Многие встроенные модули Node.js реализуют потоковый интерфейс:
В API, показанном выше, некоторые нативные объекты Node.js являются потоками как для чтения, так и для записи, например потоки TCP Sockets, Zlib и Crypto.
Стоит отметить, что части поведения объектов тесно связаны между собой. Например: объекты HTTP на стороне клиента — это потоки, доступные для чтения, а объекты HTTP на стороне сервера — потоки, доступные для записи. Это связано с тем, что в HTTP программа считывает данные из одного объекта (http.IncomingMessage), а затем записывает прочитанные данные в другой объект (http.ServerResponse).
Практический пример использования потоковой передачи
Теория звучит великолепно, но она не полностью передает всю тонкость течения. Давайте рассмотрим пример, в котором мы можем увидеть различные эффекты использования потоков или их отсутствия на объем памяти.
Сначала создадим большой файл:
const fs = require('fs');
const file = fs.createWriteStream('./big.file');
for(let i=0; i<= 1e6; i++) {
file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}
file.end();
В приведенном выше примере кода модуль fs может читать и записывать файлы через потоковый интерфейс. Запишите данные в big.file, перебирая доступный для записи поток миллион раз.
Выполняется соответствующий код, в результате получается файл размером около 400 мегабайт.
Вот код сервера Node, предназначенный для управления этим большим файлом:
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
fs.readFile('./big.file', (err, data) => {
if (err) throw err;
res.end(data);
});
});
server.listen(8000);
Когда сервис получит запрос, программа отправит запросчику пакет данных через асинхронную функцию fs.readFile. На первый взгляд такой код не блокирует цикл обработки событий программы, правда ли это?
Что ж, когда мы запустим эту службу, а затем запросим эту службу, посмотрим, как изменится использование памяти.
Когда служба запущена, объем памяти, используемой службой, составляет 8,7 МБ.
Затем запросите эту услугу, обратите внимание на использование памяти:
Ничего себе ---- использование памяти внезапно подскочило до 434,8 Мб.
По сути, программа записывает все данные в память перед записью большого файла данных в объект ответа http. Эффективность этого кода очень неэффективна.
Объект ответа HTTP также является доступным для записи потоком.Если мы соединим доступный для чтения поток, представляющий содержимое big.file, с доступным для записи потоком соответствующего HTTP-объекта в конвейере, программа сможет пройти через два потоковых конвейера, не генерируя почти 400 мегабайт. памяти. , добиться того же результата.
Модуль fs в Node.js создает читаемый поток для чтения файлов с помощью метода createReadStream. Затем программа может направить читаемый поток в объект ответа http:
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);
Когда сервис запрашивается снова, происходит волшебство (обратите внимание на объем памяти):
что случилось
Когда клиент запрашивает большой файл, программа генерирует потоковый файл по частям за раз, что означает, что нам не нужно кэшировать данные в памяти. Использование памяти увеличилось только на 25 МБ.
Мы можем довести этот тест до предела. Восстановите big.file с пятью миллионами строк вместо одного миллиона, и восстановленный файл будет иметь размер 2 ГБ, что больше, чем размер кэша Node.js по умолчанию.
Используйте fs.readFile для чтения больших файлов памяти, лучше не изменять размер кеша программы по умолчанию. Но если вы используете fs.createReadStream, не будет проблем даже с запросом потока данных размером 2 ГБ. Объем памяти при использовании второго метода в качестве сервисной программы практически не меняется.
поток
В Node.js есть четыре типа потоков: Readable, Writable, Duplex и Transform.
- Поток для чтения: абстракция ресурсов, которые можно потреблять, например метод fs.createReadStream.
- Поток с возможностью записи: абстракция, в которой данные могут быть записаны в пункт назначения, например метод fs.createWriteStream.
- Дуплексный поток: как для чтения, так и для записи потоки, такие как сокеты TCP.
- Преобразование потока: в зависимости от дуплексного потока данные чтения или записи изменяются или преобразуются. Например, функция zlib.createGzip использует метод gzip для сжатия данных. мы можем думатьВход потока преобразования — это поток, доступный для записи, а выход — поток, доступный для чтения.. Это то, что вы слышали о преобразовании потоков «через потоки».
Все потоки являются экземплярами модуля EventEmitter, инициирующего события чтения и записи данных. Однако программы могут потреблять потоковые данные с помощью функции конвейера.
Функция трубы
Вот кусок магического кода, который вы должны запомнить:
readableSrc.pipe(writableDest)
В этом простом кодеНаправьте вывод читаемого потока (источника данных) в качестве ввода (назначения) записываемого потока.. Исходные данные должны быть потоком, доступным для чтения, а конечные данные должны быть потоком, доступным для записи. Они также могут быть одновременно дуплексным потоком или потоком перехода. На самом деле, если разработчик передает дуплексный поток в канал, мы можем связать его с вызовом канала, как это делает Linux:
readableSrc
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(finalWrtitableDest)
Функция канала возвращает целевой поток, что позволяет программе выполнять вышеуказанные связанные вызовы. Следующий код: поток a — это поток, доступный для чтения, потоки b и c — дуплексные потоки, а поток c — поток, доступный для записи.
a.pipe(b).pipe(c).pipe(d)
# Which is equivalent to:
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Which, in Linux, is equivalent to:
$ a | b | c | d
Метод канала — это самый простой способ реализации потокового потребления. Обычно рекомендуется использовать конвейерную функцию (pipe) или поток потребления событий, но избегайте их смешивания. Обычно, когда вы используете конвейерные функции, вам не нужно использовать события. Но если программе нужно настроить потребление потоков, события могут быть хорошим выбором.
Потоковые события
В дополнение к чтению источника потока, доступного для чтения, и записи считанных данных в место назначения, доступное для записи. Pipes также может управлять некоторыми вещами автоматически. Например: он может обрабатывать исключения и заканчивать файлы, когда один поток быстрее или медленнее, чем другие.
Однако потоки можно потреблять непосредственно через события. Ниже приведена программа, эквивалентная методу канала, которая использует упрощенный код, эквивалентный событиям, для чтения или записи данных.
# readable.pipe(writable)
readable.on('data', (chunk) => {
writable.write(chunk);
});
readable.on('end', () => {
writable.end();
});
Существует ряд событий или функций, которые можно использовать с доступными для чтения и записи потоками.
Эти события или функции обычно каким-то образом связаны, событиями в читаемых потоках являются:
- событие данных, срабатывающее, когда поток доставляется потребителю
- конечное событие, срабатывающее, когда данные в потоке не потребляются
Важные события о доступных для записи потоках: -drain событие, сигнал, когда доступный для записи поток принимает данные -финишное событие, срабатывающее, когда все данные были сброшены на дно системы
События и функции можно комбинировать вместе для настройки потоков или их оптимизации. Чтобы потреблять читаемые потоки, программы могут использовать методы pipe/unpipe или методы read/unshift/resume. Чтобы потреблять доступный для записи поток, программа может использовать его в качестве пункта назначения конвейера/удаления канала или записывать данные с помощью метода записи и вызывать метод завершения после завершения записи.
Режимы Paused и Flowing в доступных для чтения потоках
В читаемых потоках есть два режима, которые влияют на использование программой читаемых потоков:
- Доступно для чтения или в режиме паузы
- либо в потоковом режиме
Эти режимы также известны как режимы вытягивания и выталкивания.
Все доступные для чтения потоки по умолчанию запускаются в режиме паузы и переключаются либо в режим потоковой передачи, либо в режим паузы, когда это необходимо программе. Иногда этот переход происходит спонтанно.
Когда доступный для чтения поток находится в режиме паузы, мы можем использовать метод чтения для чтения данных потока по запросу. Однако для читаемого потока в потоковом режиме мы должны потреблять данные, прослушивая события.
В потоковом режиме данные могут быть потеряны, если они не используются. Вот почему событие данных необходимо для обработки данных, когда в программе есть текущий читаемый поток. Фактически, простое добавление события данных переводит поток из режима паузы в режим потоковой передачи и отвязывает программу от прослушивателя событий, переводя поток из режима потоковой передачи в режим паузы. Некоторые из них предназначены для обратной совместимости со старыми версиями потокового интерфейса Node.
Разработчики могут использовать метод возобновления и метод паузы, чтобы вручную реализовать преобразование между двумя режимами потока.
Когда программа потребляет читаемый поток с помощью метода канала, разработчикам не нужно заботиться о преобразовании режима потока, потому что канал (конвейер) будет реализован автоматически.
##процесс реализации
Когда мы стримим в Node.js, есть две разные задачи:
- Наследовать задачу потока
- Задачи потребления потоков
До сих пор мы обсуждали только потоки потребления. Давайте реализуем несколько примеров!
Реализация потока требует от нас введения в программу модуля потока.
реализовать доступный для записи поток
Разработчики могут использовать конструктор Writeable в модуле потока для реализации доступных для записи потоков.
const { Writable } = require('stream');
Разработчики могут реализовывать доступные для записи потоки разными способами. Например: наследуя конструктор с возможностью записи
class myWritableStream extends Writable {
}
Однако я предпочитаю реализацию с использованием конструктора. Просто создайте объект через интерфейс с возможностью записи и передайте некоторые параметры: требуемый параметр функции — это функция записи, передающая блок данных для записи.
const { Writable } = require('stream');
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
process.stdin.pipe(outStream);
Функция записи имеет три параметра:
- чанк обычно представляет собой массив буферов, если только разработчик не настроил поток вручную.
- Параметр кодирования требуется в тестовых примерах, но обычно разработчики могут его игнорировать.
- callback — функция обратного вызова, вызываемая разработчиком после того, как программа обработает блок данных. Обычно это сигнал о том, что операция записи прошла успешно или нет. Если это сигнал исключения записи, вызовите функцию обратного вызова, где произошло исключение.
В классе outStream программа просто преобразует данные в строковый тип и распечатывает их, а также вызывает функцию обратного вызова, когда нет исключения, чтобы отметить успешное выполнение программы. Это простой, но не особо эффективный эхо-поток, программа будет выводить любые входные данные.
Чтобы использовать этот поток, мы можем использовать его с process.stdin, который является доступным для чтения потоком, передающим поток process.stdin в outStream.
Когда программа выполняется, любые данные, введенные через process.stdin, будут распечатаны функцией console.log в outStream.
Но эта функциональность доступна через встроенные модули Node.js, так что это не очень практично. По функциям он очень похож на process.stdout, мы можем добиться того же, используя следующий код:
process.stdin.pipe(process.stdout);
Реализовать читаемый поток
Чтобы реализовать читаемый поток, разработчикам необходимо ввести интерфейс Readable и создавать объекты через этот интерфейс:
const { Readable } = require('stream');
const inStream = new Readable({});
Это самый простой способ реализовать читаемый поток, в котором разработчики могут передавать данные непосредственно для потребления.
const { Readable } = require('stream');
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);
Когда программа отправляет пустой объект, это означает, что больше нет данных для передачи читаемому потоку.
Разработчики могут передавать читаемые потоки в process.stdout, чтобы они потребляли читаемые потоки.
Выполняя этот код, программа может считывать данные из читаемого потока и распечатывать данные. Очень просто, но неэффективно.
Суть приведенного выше кода такова: поместить данные в поток, а затем передать поток в process.stdout для потребления. На самом деле программа может отправлять данные по запросу, когда потребитель запрашивает поток, что более эффективно, чем предыдущий. Реализуя функцию чтения в читаемом потоке:
const inStream = new Readable({
read(size) {
// there is a demand on the data... Someone wants to read it.
}
});
Вызывая функцию чтения на читаемом потоке, программа может передать часть данных в очередь. Например: каждый раз, когда письмо помещается в очередь, порядковый номер письма начинается с 65 (представляющего A), а порядковый номер каждого нажатия увеличивается на 1:
const inStream = new Readable({
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);
Когда потребитель потребляет читаемый поток, активируется функция чтения, и программа отправляет больше букв. Цикл завершается помещением пустых объектов в очередь. В приведенном выше коде, когда порядковый номер буквы превышает 90, цикл завершается.
Функция этого кода эквивалентна коду, реализованному ранее, но когда потребитель хочет прочитать поток, программа может более эффективно передавать данные по запросу. Поэтому эта реализация рекомендуется.
Реализовать дуплекс, преобразовать поток
Дуплексный поток: реализуйте поток для чтения и поток для записи соответственно для одного и того же объекта, как если бы объект наследовал два интерфейса для чтения и записи.
Ниже приведен дуплексный поток, который объединяет поток для чтения и уже реализованный выше пример потока с возможностью записи:
const { Duplex } = require('stream');
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);
Реализуя объект дуплексного потока, программа может читать буквы от A до Z и печатать их по порядку. Разработчик направляет читаемый поток стандартного ввода в дуплексный поток, а затем направляет дуплексный поток в поток stdout, доступный для записи, печатая буквы от A до Z.
Поток для чтения и поток для записи в дуплексном потоке полностью независимы.Дуплексный поток — это просто объект, который имеет функции как читаемого, так и записываемого потока. Понимание этого имеет решающее значение.
Потоки преобразования более интересны, чем дуплексные потоки, потому что их результат вычисляется из входного потока.
Для дуплексного потока нет необходимости реализовывать функции чтения и записи, разработчику нужно реализовать только функцию преобразования, поскольку функция преобразования уже реализовала функцию чтения и функцию записи.
Далее следует преобразовать входные буквы в формат верхнего регистра, а затем передать преобразованные данные в доступный для записи поток:
const { Transform } = require('stream');
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCaseTr).pipe(process.stdout);
В этом примере разработчик реализует функцию дуплексного потока выше только через функцию преобразования. В функции преобразования программа преобразует данные в верхний регистр и помещает их в доступный для записи поток.
шаблон объекта потока
По умолчанию потоки принимают только буферные и строковые данные. Но разработчики могут заставить поток принимать любые данные Javascript, установив значение флага objectMode.
Следующий пример демонстрирует это. Преобразует строку, разделенную запятыми, в объект Javscript через набор потоков, поэтому "a,b,c,d" становится {a:b,c:d}.
const { Transform } = require('stream');
const commaSplitter = new Transform({
readableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(chunk.toString().trim().split(','));
callback();
}
});
const arrayToObject = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk, encoding, callback) {
const obj = {};
for(let i=0; i < chunk.length; i+=2) {
obj[chunk[i]] = chunk[i+1];
}
this.push(obj);
callback();
}
});
const objectToString = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(JSON.stringify(chunk) + '\n');
callback();
}
});
process.stdin
.pipe(commaSplitter)
.pipe(arrayToObject)
.pipe(objectToString)
.pipe(process.stdout)
Поток преобразования commaSplitter преобразует входную строку (например: "a,b,c,d") в массив (["a", "b", "c", "d"]). Установите флаг writeObjectMode, поскольку данные, передаваемые в функцию преобразования, представляют собой объект, а не строку.
Затем читаемый поток, выводимый commaSplitter, передается в преобразованный поток arrayToObject. Поскольку объект получен, флаг writableObjectMode также должен быть установлен в arrayToObject. Так как объект нужно протолкнуть в программу (преобразовать входящий массив в объект), это также является причиной того, что в программе установлен флаг readableObjectMode. Наконец, поток преобразования objectToString получает объекты, но выводит строки. Поэтому в программе установлен только флаг writableObjectModel. Выходной читаемый поток представляет собой обычную строку (сериализованный массив).
Встроенный поток преобразования узла
Node имеет множество встроенных потоков преобразования, таких как потоки lib и crypto.
Следующий код представляет собой код для сжатия файла с использованием потока zlib.createGzip() в сочетании с доступными для чтения и записи потоками модуля fs:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'));
Программа передает доступный для чтения поток прочитанного файла во встроенный поток преобразования zlib Node и, наконец, в доступный для записи поток, который создает сжатый файл. Таким образом, разработчики могут сжимать любой файл, если они передают в программу путь к сжимаемому файлу в качестве параметра.
Разработчики могут использовать конвейерные функции в сочетании с событиями, что является еще одной причиной выбора конвейерных функций. Например: разработчик заставляет программу распечатать маркер, чтобы показать, что сценарий выполняется, и распечатать сообщение «Готово» после выполнения сценария. Функция pipe возвращает целевой поток, и программа может зарегистрировать цепочку событий после получения целевого потока:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.on('data', () => process.stdout.write('.'))
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));
Разработчики могут легко манипулировать потоком через функцию конвейера и даже при необходимости выполнять некоторые настраиваемые взаимодействия с целевым потоком, обрабатываемым функцией конвейера, посредством событий.
Сила конвейерных функций заключается в том, что они объединяют несколько конвейерных функций понятным образом. Например: в отличие от предыдущего примера, разработчик может передать поток преобразования, чтобы определить, что скрипт выполняется.
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
const { Transform } = require('stream');
const reportProgress = new Transform({
transform(chunk, encoding, callback) {
process.stdout.write('.');
callback(null, chunk);
}
});
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(reportProgress)
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));
reportProgress — это просто поток перехода, в котором определяется, что скрипт выполняется. Стоит отметить, что код использует функцию обратного вызова для отправки данных в преобразование. Это эквивалентно функциональности this.push() в предыдущем примере.
Существует еще множество сценариев применения объединения потоков. Например: разработчик должен сначала зашифровать файл, а затем сжать файл или сжать, а затем зашифровать. Для этого программа просто передает файлы в поток по порядку, используя модуль crypto:
const crypto = require('crypto');
// ...
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(crypto.createCipher('aes192', 'a_secret'))
.pipe(reportProgress)
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));
Приведенный выше код реализует сжатые и зашифрованные файлы, и только пользователи, знающие пароль, могут использовать зашифрованные файлы. Потому что разработчик не может распаковать зашифрованный сжатый файл обычным инструментом распаковки.
Для любого файла, сжатого приведенным выше кодом, разработчику нужно использовать только потоки crypto и zlib в обратном порядке, код выглядит следующим образом:
fs.createReadStream(file)
.pipe(crypto.createDecipher('aes192', 'a_secret'))
.pipe(zlib.createGunzip())
.pipe(reportProgress)
.pipe(fs.createWriteStream(file.slice(0, -3)))
.on('finish', () => console.log('Done'));
Предполагая, что переданный файл является сжатым файлом, вышеуказанная программа сначала сгенерирует читаемый поток, затем передаст его в поток шифрования createDecipher(), а затем передаст файл выходного потока в поток createGunzip() zlib и, наконец, записать в файл.
Вышеизложенное является моим кратким изложением темы, спасибо за чтение и с нетерпением жду встречи с вами в следующий раз.