Эта статья взята изNode.js CheatSheet | Основы синтаксиса Node.js, использование фреймворка и практические навыки, также читайтеJavaScript CheatSheetилиСовременные основы веб-разработки и инженерные практикиУзнайте больше JavaScript / Node.js практического применения.
Stream — это базовая концепция в Node.js, аналогичная EventEmitter, фокусирующаяся на управляемой событиями обработке данных в конвейерах ввода-вывода; по аналогии с массивами или картами, Stream также представляет собой набор данных, но он представляет данные, которые не обязательно находятся в памяти. . Node.js Stream делится на следующие типы:
- Поток для чтения: Поток для чтения, генератор данных, например process.stdin.
- Поток с возможностью записи: поток с возможностью записи, потребитель данных, например, process.stdout или process.stderr.
- Дуплексный поток: двунаправленный поток, чтение и запись
- Поток преобразования: поток преобразования, преобразователь данных
Сам поток предоставляет набор спецификаций интерфейса, и многие встроенные модули в Node.js следуют этой спецификации, например, знаменитыйfs
Модуль использует интерфейс Stream для чтения и записи файлов; аналогично, каждый HTTP-запрос является потоком, доступным для чтения, а ответ HTTP — потоком, доступным для записи.
Readable Stream
const stream = require('stream');
const fs = require('fs');
const readableStream = fs.createReadStream(process.argv[2], {
encoding: 'utf8'
});
// 手动设置流数据编码
// readableStream.setEncoding('utf8');
let wordCount = 0;
readableStream.on('data', function(data) {
wordCount += data.split(/\s{1,}/).length;
});
readableStream.on('end', function() {
// Don't count the end of the file.
console.log('%d %s', --wordCount, process.argv[2]);
});
Когда мы создаем читаемый поток, он еще не начал передавать данные, он становится динамическим только тогда, когда добавляется прослушиватель событий для данных. После этого он прочитает небольшой фрагмент данных и передаст его нашей функции обратного вызова.data
Частота срабатывания события также определяется реализатором, например, при чтении файла оно может срабатывать один раз на строку, при обработке HTTP-запроса оно может срабатывать только один раз для нескольких КБ данных. может относиться кnodejs/readable-stream/_stream_readableСвязанная реализация в , обнаружила, что функция on вызовет метод возобновления, который, в свою очередь, вызовет функцию потока для чтения потока:
// function on
if (ev === 'data') {
// Start flowing on next tick if stream isn't explicitly paused
if (this._readableState.flowing !== false) this.resume();
}
...
// function flow
while (state.flowing && stream.read() !== null) {}
мы также можем контролироватьreadable
событие, а затем вручную прочитать данные:
let data = '';
let chunk;
readableStream.on('readable', function() {
while ((chunk = readableStream.read()) != null) {
data += chunk;
}
});
readableStream.on('end', function() {
console.log(data);
});
Readable Stream также включает следующие часто используемые методы:
- Readable.pause(): этот метод приостанавливает поток. Другими словами, он больше не будет запускать событие данных.
- Readable.resume(): этот метод противоположен предыдущему и возобновляет поток приостановленного потока.
- Readable.unpipe(): этот метод удаляет пункт назначения. Если передан аргумент, он остановит доступный для чтения поток к определенному месту назначения, в противном случае он удалит все места назначения.
В ежедневной разработке мы можем использоватьstream-wormholeЧтобы имитировать потребление читаемого потока:
sendToWormhole(readStream, true);
Writable Stream
readableStream.on('data', function(chunk) {
writableStream.write(chunk);
});
writableStream.end();
когдаend()
Когда называются, все данные написаны, то поток триггерыfinish
мероприятие. Обратите внимание на звонокend()
После этого вы больше не сможете записывать данные в доступный для записи поток.
const { Writable } = require('stream');
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
process.stdin.pipe(outStream);
Writable Stream также содержит некоторые важные события, связанные с Readable Stream:
- ошибка: срабатывает, когда возникает ошибка записи или ссылки
- pipe: это событие срабатывает, когда доступный для чтения поток связан с доступным для записи потоком.
- unpipe: срабатывает, когда unpipe вызывается для читаемого потока
Труба | трубопровод
const fs = require('fs');
const inputFile = fs.createReadStream('REALLY_BIG_FILE.x');
const outputFile = fs.createWriteStream('REALLY_BIG_FILE_DEST.x');
// 当建立管道时,才发生了流的流动
inputFile.pipe(outputFile);
Несколько конвейеров вызываются последовательно, то есть строится Цепочка:
const fs = require('fs');
const zlib = require('zlib');
fs.createReadStream('input.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('output.txt'));
Конвейеры также часто используются для обработки файлов на веб-серверах.Взяв в качестве примера приложение в Egg.js, мы можем получить файловый поток из контекста и передать его в доступный для записи файловый поток:
📎 Полная ссылка на кодBackend Boilerplate/egg
const awaitWriteStream = require('await-stream-ready').write;
const sendToWormhole = require('stream-wormhole');
...
const stream = await ctx.getFileStream();
const filename =
md5(stream.filename) + path.extname(stream.filename).toLocaleLowerCase();
//文件生成绝对路径
const target = path.join(this.config.baseDir, 'app/public/uploads', filename);
//生成一个文件写入文件流
const writeStream = fs.createWriteStream(target);
try {
//异步把文件流写入
await awaitWriteStream(stream.pipe(writeStream));
} catch (err) {
//如果出现错误,关闭管道
await sendToWormhole(stream);
throw err;
}
...
Ссылаться наВведение в распределенные системы, видно, что в типичном сценарии обработки потока нам неизбежно приходится иметь дело с так называемой проблемой противодавления. Будь то Writable Stream или Readable Stream, данные фактически хранятся во внутреннем буфере, доступ к которому можно получить черезwritable.writableBuffer
илиreadable.readableBuffer
читать. Когда хранилище данных для обработки превышаетhighWaterMark
Или когда текущий поток записи занят, функция записи вернетfalse
.pipe
Функция автоматически включит для нас механизм противодавления:
Когда механизм потока Node.js обнаруживает, что функция записи вернуласьfalse
, система противодавления автоматически вмешается и приостановит операцию передачи данных текущего Readable Stream до тех пор, пока потребитель не будет готов.
+===============+
| Your_Data |
+=======+=======+
|
+-------v-----------+ +-------------------+ +=================+
| Readable Stream | | Writable Stream +---------> .write(chunk) |
+-------+-----------+ +---------^---------+ +=======+=========+
| | |
| +======================+ | +------------------v---------+
+-----> .pipe(destination) >---+ | Is this chunk too big? |
+==^=======^========^==+ | Is the queue busy? |
^ ^ ^ +----------+-------------+---+
| | | | |
| | | > if (!chunk) | |
^ | | emit .end(); | |
^ ^ | > else | |
| ^ | emit .write(); +---v---+ +---v---+
| | ^----^-----------------< No | | Yes |
^ | +-------+ +---v---+
^ | |
| ^ emit .pause(); +=================+ |
| ^---^---------------------+ return false; <-----+---+
| +=================+ |
| |
^ when queue is empty +============+ |
^---^-----------------^---< Buffering | |
| |============| |
+> emit .drain(); | <Buffer> | |
+> emit .resume(); +------------+ |
| <Buffer> | |
+------------+ add chunk to queue |
| <--^-------------------<
+============+
Duplex Stream
Duplex Stream можно рассматривать как совокупность потоков чтения и записи, включающую в себя два независимых друг от друга потока чтения и записи, имеющих независимые внутренние кэши.Операции чтения и записи также могут выполняться асинхронно:
Duplex Stream
------------------|
Read <----- External Source
You ------------------|
Write -----> External Sink
------------------|
Мы можем моделировать простые операции с сокетами, используя Duplex:
const { Duplex } = require('stream');
class Duplexer extends Duplex {
constructor(props) {
super(props);
this.data = [];
}
_read(size) {
const chunk = this.data.shift();
if (chunk == 'stop') {
this.push(null);
} else {
if (chunk) {
this.push(chunk);
}
}
}
_write(chunk, encoding, cb) {
this.data.push(chunk);
cb();
}
}
const d = new Duplexer({ allowHalfOpen: true });
d.on('data', function(chunk) {
console.log('read: ', chunk.toString());
});
d.on('readable', function() {
console.log('readable');
});
d.on('end', function() {
console.log('Message Complete');
});
d.write('....');
В разработке нам часто требуется напрямую выводить поток для чтения в поток для записи, в это время в него также можно внедрить PassThrough для облегчения дополнительного мониторинга:
const { PassThrough } = require('stream');
const fs = require('fs');
const duplexStream = new PassThrough();
// can be piped from reaable stream
fs.createReadStream('tmp.md').pipe(duplexStream);
// can pipe to writable stream
duplexStream.pipe(process.stdout);
// 监听数据,这里直接输出的是 Buffer<Buffer 60 60 ... >
duplexStream.on('data', console.log);
Transform Stream
Реализован поток преобразования_transform
Дуплексный поток метода, который имеет функции чтения и записи, а также может преобразовывать поток:
Transform Stream
--------------|--------------
You Write ----> ----> Read You
--------------|--------------
Здесь мы реализуем простой кодировщик Base64:
const util = require('util');
const Transform = require('stream').Transform;
function Base64Encoder(options) {
Transform.call(this, options);
}
util.inherits(Base64Encoder, Transform);
Base64Encoder.prototype._transform = function(data, encoding, callback) {
callback(null, data.toString('base64'));
};
process.stdin.pipe(new Base64Encoder()).pipe(process.stdout);