Практика эксплуатации Node.js в середине потока

Node.js задняя часть внешний интерфейс JavaScript
Практика эксплуатации Node.js в середине потока

Эта статья взята изNode.js CheatSheet | Основы синтаксиса Node.js, использование фреймворка и практические навыки, также читайтеJavaScript CheatSheetилиСовременные основы веб-разработки и инженерные практикиУзнайте больше JavaScript / Node.js практического применения.

Stream — это базовая концепция в Node.js, аналогичная EventEmitter, фокусирующаяся на управляемой событиями обработке данных в конвейерах ввода-вывода; по аналогии с массивами или картами, Stream также представляет собой набор данных, но он представляет данные, которые не обязательно находятся в памяти. . Node.js Stream делится на следующие типы:

  • Поток для чтения: Поток для чтения, генератор данных, например process.stdin.
  • Поток с возможностью записи: поток с возможностью записи, потребитель данных, например, process.stdout или process.stderr.
  • Дуплексный поток: двунаправленный поток, чтение и запись
  • Поток преобразования: поток преобразования, преобразователь данных

Сам поток предоставляет набор спецификаций интерфейса, и многие встроенные модули в Node.js следуют этой спецификации, например, знаменитыйfsМодуль использует интерфейс Stream для чтения и записи файлов; аналогично, каждый HTTP-запрос является потоком, доступным для чтения, а ответ HTTP — потоком, доступным для записи.

Readable Stream

const stream = require('stream');
const fs = require('fs');

const readableStream = fs.createReadStream(process.argv[2], {
  encoding: 'utf8'
});

// 手动设置流数据编码
// readableStream.setEncoding('utf8');

let wordCount = 0;

readableStream.on('data', function(data) {
  wordCount += data.split(/\s{1,}/).length;
});

readableStream.on('end', function() {
  // Don't count the end of the file.
  console.log('%d %s', --wordCount, process.argv[2]);
});

Когда мы создаем читаемый поток, он еще не начал передавать данные, он становится динамическим только тогда, когда добавляется прослушиватель событий для данных. После этого он прочитает небольшой фрагмент данных и передаст его нашей функции обратного вызова.dataЧастота срабатывания события также определяется реализатором, например, при чтении файла оно может срабатывать один раз на строку, при обработке HTTP-запроса оно может срабатывать только один раз для нескольких КБ данных. может относиться кnodejs/readable-stream/_stream_readableСвязанная реализация в , обнаружила, что функция on вызовет метод возобновления, который, в свою очередь, вызовет функцию потока для чтения потока:

// function on
if (ev === 'data') {
  // Start flowing on next tick if stream isn't explicitly paused
  if (this._readableState.flowing !== false) this.resume();
}
...
// function flow
while (state.flowing && stream.read() !== null) {}

мы также можем контролироватьreadableсобытие, а затем вручную прочитать данные:

let data = '';
let chunk;
readableStream.on('readable', function() {
  while ((chunk = readableStream.read()) != null) {
    data += chunk;
  }
});
readableStream.on('end', function() {
  console.log(data);
});

Readable Stream также включает следующие часто используемые методы:

  • Readable.pause(): этот метод приостанавливает поток. Другими словами, он больше не будет запускать событие данных.
  • Readable.resume(): этот метод противоположен предыдущему и возобновляет поток приостановленного потока.
  • Readable.unpipe(): этот метод удаляет пункт назначения. Если передан аргумент, он остановит доступный для чтения поток к определенному месту назначения, в противном случае он удалит все места назначения.

В ежедневной разработке мы можем использоватьstream-wormholeЧтобы имитировать потребление читаемого потока:

sendToWormhole(readStream, true);

Writable Stream

readableStream.on('data', function(chunk) {
  writableStream.write(chunk);
});

writableStream.end();

когдаend()Когда называются, все данные написаны, то поток триггерыfinishмероприятие. Обратите внимание на звонокend()После этого вы больше не сможете записывать данные в доступный для записи поток.

const { Writable } = require('stream');

const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);

Writable Stream также содержит некоторые важные события, связанные с Readable Stream:

  • ошибка: срабатывает, когда возникает ошибка записи или ссылки
  • pipe: это событие срабатывает, когда доступный для чтения поток связан с доступным для записи потоком.
  • unpipe: срабатывает, когда unpipe вызывается для читаемого потока

Труба | трубопровод

const fs = require('fs');

const inputFile = fs.createReadStream('REALLY_BIG_FILE.x');
const outputFile = fs.createWriteStream('REALLY_BIG_FILE_DEST.x');

// 当建立管道时,才发生了流的流动
inputFile.pipe(outputFile);

Несколько конвейеров вызываются последовательно, то есть строится Цепочка:

const fs = require('fs');
const zlib = require('zlib');
fs.createReadStream('input.txt.gz')
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream('output.txt'));

Конвейеры также часто используются для обработки файлов на веб-серверах.Взяв в качестве примера приложение в Egg.js, мы можем получить файловый поток из контекста и передать его в доступный для записи файловый поток:

📎 Полная ссылка на кодBackend Boilerplate/egg

const awaitWriteStream = require('await-stream-ready').write;
const sendToWormhole = require('stream-wormhole');
...
const stream = await ctx.getFileStream();

const filename =
  md5(stream.filename) + path.extname(stream.filename).toLocaleLowerCase();
//文件生成绝对路径

const target = path.join(this.config.baseDir, 'app/public/uploads', filename);

//生成一个文件写入文件流
const writeStream = fs.createWriteStream(target);
try {
  //异步把文件流写入
  await awaitWriteStream(stream.pipe(writeStream));
} catch (err) {
  //如果出现错误,关闭管道
  await sendToWormhole(stream);
  throw err;
}
...

Ссылаться наВведение в распределенные системы, видно, что в типичном сценарии обработки потока нам неизбежно приходится иметь дело с так называемой проблемой противодавления. Будь то Writable Stream или Readable Stream, данные фактически хранятся во внутреннем буфере, доступ к которому можно получить черезwritable.writableBufferилиreadable.readableBufferчитать. Когда хранилище данных для обработки превышаетhighWaterMarkИли когда текущий поток записи занят, функция записи вернетfalse.pipeФункция автоматически включит для нас механизм противодавления:

image

Когда механизм потока Node.js обнаруживает, что функция записи вернуласьfalse, система противодавления автоматически вмешается и приостановит операцию передачи данных текущего Readable Stream до тех пор, пока потребитель не будет готов.

+===============+
|   Your_Data   |
+=======+=======+
        |
+-------v-----------+          +-------------------+         +=================+
|  Readable Stream  |          |  Writable Stream  +--------->  .write(chunk)  |
+-------+-----------+          +---------^---------+         +=======+=========+
        |                                |                           |
        |     +======================+   |        +------------------v---------+
        +----->  .pipe(destination)  >---+        |    Is this chunk too big?  |
              +==^=======^========^==+            |    Is the queue busy?      |
                 ^       ^        ^               +----------+-------------+---+
                 |       |        |                          |             |
                 |       |        |  > if (!chunk)           |             |
                 ^       |        |      emit .end();        |             |
                 ^       ^        |  > else                  |             |
                 |       ^        |      emit .write();  +---v---+     +---v---+
                 |       |        ^----^-----------------<  No   |     |  Yes  |
                 ^       |                               +-------+     +---v---+
                 ^       |                                                 |
                 |       ^   emit .pause();        +=================+     |
                 |       ^---^---------------------+  return false;  <-----+---+
                 |                                 +=================+         |
                 |                                                             |
                 ^   when queue is empty   +============+                      |
                 ^---^-----------------^---<  Buffering |                      |
                     |                     |============|                      |
                     +> emit .drain();     |  <Buffer>  |                      |
                     +> emit .resume();    +------------+                      |
                                           |  <Buffer>  |                      |
                                           +------------+  add chunk to queue  |
                                           |            <--^-------------------<
                                           +============+

Duplex Stream

Duplex Stream можно рассматривать как совокупность потоков чтения и записи, включающую в себя два независимых друг от друга потока чтения и записи, имеющих независимые внутренние кэши.Операции чтения и записи также могут выполняться асинхронно:

                             Duplex Stream
                          ------------------|
                    Read  <-----               External Source
            You           ------------------|
                    Write ----->               External Sink
                          ------------------|

Мы можем моделировать простые операции с сокетами, используя Duplex:

const { Duplex } = require('stream');

class Duplexer extends Duplex {
  constructor(props) {
    super(props);
    this.data = [];
  }

  _read(size) {
    const chunk = this.data.shift();
    if (chunk == 'stop') {
      this.push(null);
    } else {
      if (chunk) {
        this.push(chunk);
      }
    }
  }

  _write(chunk, encoding, cb) {
    this.data.push(chunk);
    cb();
  }
}

const d = new Duplexer({ allowHalfOpen: true });
d.on('data', function(chunk) {
  console.log('read: ', chunk.toString());
});
d.on('readable', function() {
  console.log('readable');
});
d.on('end', function() {
  console.log('Message Complete');
});
d.write('....');

В разработке нам часто требуется напрямую выводить поток для чтения в поток для записи, в это время в него также можно внедрить PassThrough для облегчения дополнительного мониторинга:

const { PassThrough } = require('stream');
const fs = require('fs');

const duplexStream = new PassThrough();

// can be piped from reaable stream
fs.createReadStream('tmp.md').pipe(duplexStream);

// can pipe to writable stream
duplexStream.pipe(process.stdout);

// 监听数据,这里直接输出的是 Buffer<Buffer 60 60  ... >
duplexStream.on('data', console.log);

Transform Stream

Реализован поток преобразования_transformДуплексный поток метода, который имеет функции чтения и записи, а также может преобразовывать поток:

                                 Transform Stream
                           --------------|--------------
            You     Write  ---->                   ---->  Read  You
                           --------------|--------------

Здесь мы реализуем простой кодировщик Base64:

const util = require('util');
const Transform = require('stream').Transform;

function Base64Encoder(options) {
  Transform.call(this, options);
}

util.inherits(Base64Encoder, Transform);

Base64Encoder.prototype._transform = function(data, encoding, callback) {
  callback(null, data.toString('base64'));
};

process.stdin.pipe(new Base64Encoder()).pipe(process.stdout);