Анализ использования и принципа реализации Nodejs Stream pipe

Node.js
Анализ использования и принципа реализации Nodejs Stream pipe

С помощью потоковой передачи мы можем разделить большой фрагмент данных на мелкие части и передавать их понемногу, не читая их все сразу.В Linux мы можем использовать для этого символ |, подобно модулю Stream в Nodejs. предоставляет нам метод pipe() для реализации.

1. Базовый пример Nodejs Stream pipe

Я выбрал Koa для реализации этой простой демонстрации, потому что кто-то ранее задавал вопрос в группе обмена «Nodejs Technology Stack», как вернуть поток в Koa, кстати, я воспользуюсь этой возможностью, чтобы упомянуть об этом ниже.

1.1 Когда Stream pipe не используется

В Nodejs операции ввода-вывода являются асинхронными. Во-первых, используйте метод promisify модуля util, чтобы преобразовать форму обратного вызова fs.readFile в форму Promise. Этот код кажется хорошим, но его опыт не очень хорош, потому что он Чтение данных в память за один раз, а затем их возврат также потребляют память, когда файл данных большой, поэтому это не рекомендуется.

const Koa = require('koa');
const fs = require('fs');
const app = new Koa();
const { promisify } = require('util');
const { resolve } = require('path');
const readFile = promisify(fs.readFile);

app.use(async ctx => {
  try {
    ctx.body = await readFile(resolve(__dirname, 'test.json'));
  } catch(err) { ctx.body = err };
});

app.listen(3000);

1.2 Использование потокового канала

Далее давайте посмотрим, как реагировать на данные в фреймворке Koa через Stream.

...
app.use(async ctx => {
  try {
    const readable = fs.createReadStream(resolve(__dirname, 'test.json'));
    ctx.body = readable;
  } catch(err) { ctx.body = err };
});

В приведенном выше вы можете напрямую создать читаемый поток в Koa и назначить его ctx.body.Вам может быть интересно, почему нет метода pipe, потому что фреймворк инкапсулировал его за вас, так что не смущайтесь внешним видом , Взгляните на соответствующий исходный код:

// https://github.com/koajs/koa/blob/master/lib/application.js#L256
function respond(ctx) {
  ...
  let body = ctx.body;
  if (body instanceof Stream) return body.pipe(res);
  ...
}

В этом нет никакой магии, фреймворк оценивает уровень при возврате, потому что res — это объект потока, доступный для записи, если body также является объектом Stream (тело — это поток, доступный для чтения в это время), тогда используйте body.pipe(res) Respond in поток.

1.3 Использование потока VS без использования потока

Когда я увидел картину, я должен сказать, что картина действительно милая, источникwoo woo woo.cn blog on.com/VA Joy/afraid/634…

2 Анализ процесса вызова и принцип реализации pipe

Основной реализацией потокового реагирования на данные является использование метода канала для реализации ввода и вывода. В центре внимания этого раздела также находится изучение реализации канала. Лучший способ открыть его — прочитать исходный код.

2.1 Следуй за лозой

На уровне приложения мы вызвали метод fs.createReadStream() и пошли по следу, чтобы найти реализацию метода канала читаемого объекта потока, созданного этим методом.Ниже приведена только реализация основного кода, основанная на Nodejs v12. х исходный код.

2.1.1 /lib/fs.js

Экспортируйте метод createReadStream, в котором создается читаемый объект потока ReadStream, а ReadStream поступает из файла internal/fs/streams, продолжайте смотреть вниз.

// https://github.com/nodejs/node/blob/v12.x/lib/fs.js
// 懒加载,主要在用到的时候用来实例化 ReadStream、WriteStream ... 等对象
function lazyLoadStreams() {
  if (!ReadStream) {
    ({ ReadStream, WriteStream } = require('internal/fs/streams'));
    [ FileReadStream, FileWriteStream ] = [ ReadStream, WriteStream ];
  }
}

function createReadStream(path, options) {
  lazyLoadStreams();
  return new ReadStream(path, options); // 创建一个可读流
}

module.exports = fs = {
  createReadStream, // 导出 createReadStream 方法
  ...
}

2.1.2 /lib/internal/fs/streams.js

Конструктор ReadStream определен в этом методе, а методы open, _read, _destroy и другие определены в прототипе, и нет никакого метода канала, который мы ищем.

ноНаследование достигается через метод ObjectSetPrototypeOf, ReadStream наследует функцию, определенную в прототипе Readable, а затем продолжает находить реализацию Readable.

// https://github.com/nodejs/node/blob/v12.x/lib/internal/fs/streams.js
const { Readable, Writable } = require('stream');

function ReadStream(path, options) {
  if (!(this instanceof ReadStream))
    return new ReadStream(path, options);

  ...
  Readable.call(this, options);
  ...
}
ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
ObjectSetPrototypeOf(ReadStream, Readable);

ReadStream.prototype.open = function() { ... };

ReadStream.prototype._read = function(n) { ... };;

ReadStream.prototype._destroy = function(err, cb) { ... };
...

module.exports = {
  ReadStream,
  WriteStream
};

2.1.3 /lib/stream.js

В реализации stream.js есть примечание:Импортировать поток до Readable/Writable/Duplex/... , причина в том, чтобы избежать перекрестных ссылок (требуется), почему это так?

Первый шаг Stream.js реплицируется для потока.

Следующие модули _stream_readable, Writable, Duplex... также будут по очереди ссылаться на файл stream.js. Конкретная реализация будет показана ниже.

Поток импортирует внутренние/потоки/устаревшие

Приведенный выше файл /lib/internal/fs/streams.js получает объект Readable из модуля потока, который является определением Stream.Readable ниже.

// https://github.com/nodejs/node/blob/v12.x/lib/stream.js
// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy');

Stream.Readable = require('_stream_readable');
Stream.Writable = require('_stream_writable');
Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');
...

2.1.4 /lib/internal/streams/legacy.js

Приведенный выше Stream равен internal/streams/legacy. Сначала он наследует модуль Events, а затем определяет метод pipe на прототипе. Когда я впервые увидел его, я подумал, что реализация здесь, но после просмотра реализации _stream_readable , я обнаружил, что _stream_readable повторно реализует метод канала после наследования от Stream, поэтому вопрос в том, что делает метод канала этого модуля? Когда он будет использоваться? Перевести имя файла "legacy=legacy"? Я не совсем понимаю, это осталось? Если у вас есть явный босс, вы можете дать подсказки.Вы также можете добавить мой WeChat в фон общедоступного аккаунта «Технологический стек Nodejs» для обсуждения!

// https://github.com/nodejs/node/blob/v12.x/lib/internal/streams/legacy.js
const {
  ObjectSetPrototypeOf,
} = primordials;
const EE = require('events');
function Stream(opts) {
  EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);

Stream.prototype.pipe = function(dest, options) {
  ...
};

module.exports = Stream;

2.1.5 /lib/_stream_readable.js

Конструктор Readable определен в реализации _stream_readable.js и наследуется от Stream.Этот поток является файлом /lib/stream.js, о котором мы упоминали выше, а файл /lib/stream.js загружает файл internal/streams/legacy и переопределить метод трубы, определенный в нем.

После серии приведенных выше анализов, наконец, найден канал, в котором читаемые потоки, а также дальнейшее понимание вызывающего процесса при создании читаемого потока позволит добиться основного взгляда на этот метод ниже.

module.exports = Readable;
Readable.ReadableState = ReadableState;

const EE = require('events');
const Stream = require('stream');

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);

function Readable(options) {
  if (!(this instanceof Readable))
    return new Readable(options);

  ...
  Stream.call(this, options); // 继承自 Stream 构造函数的定义
}
...

2.2 Анализ реализации _stream_readable

2.2.1 Объявление конструктора доступным для чтения

Объявить конструктор Readable наследует конструктор и прототип Stream.

Поток — это файл /lib/stream.js, который, как анализировалось выше, наследует событие events, и на данный момент также имеет атрибуты, определенные в прототипе событий, такие как on, emit и другие методы.

const Stream = require('stream');
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);

function Readable(options) {
  if (!(this instanceof Readable))
    return new Readable(options);

  ...

  Stream.call(this, options);
}

2.2.2 Объявить метод канала и подписаться на событие данных

Объявите метод pipe в прототипе Stream, подпишитесь на событие данных, src — это объект потока, доступный для чтения, а dest — объект потока, доступный для записи.

Когда мы используем метод канала, мы также прослушиваем событие данных и записываем данные во время чтения данных.

Взгляните на несколько основных реализаций метода ondata():

  • dest.write(chunk): получить данные записи фрагмента, если внутренний буфер меньше, чем highWaterMark, настроенный при создании потока, вернуть true, в противном случаеВозврат false должен остановить запись данных в поток до тех пор, пока не будет запущено событие «слив»..
  • src.pause(): Доступный для чтения поток остановит событие данных, что означает, что запись данных в это время приостановлена.

ЗачемВызов src.pause() предназначен для предотвращения слишком быстрого чтения данных для записи., когда вы знаете, что уже слишком поздно писать, это зависит от того, когда dest.write(chunk) возвращает false, что основано на атрибуте highWaterMark, переданном при создании потока, значение по умолчанию — 16384 (16 КБ), а значение по умолчанию для поток в объектном режиме равен 16.

Readable.prototype.pipe = function(dest, options) {
  const src = this;
  src.on('data', ondata);
  function ondata(chunk) {
    const ret = dest.write(chunk);
    if (ret === false) {
      ...
      src.pause();
    }
  }
  ...
};

2.2.3 Подпишитесь на событие слива и продолжайте передавать данные

Как упоминалось выше, в событии данных, если вызов dest.write(chunk) возвращает false, он вызовет src.pause(), чтобы остановить поток данных. Когда он снова будет включен?

Если вы можете продолжать записывать события в поток, будет срабатывать событие слива, а когда dest.write(chunk) равно false, если ondrain не существует, будет зарегистрировано событие слива.

Readable.prototype.pipe = function(dest, options) {
  const src = this;
  src.on('data', ondata);
  function ondata(chunk) {
    const ret = dest.write(chunk);
    if (ret === false) {
      ...
      if (!ondrain) {
        // When the dest drains, it reduces the awaitDrain counter
        // on the source.  This would be more elegant with a .once()
        // handler in flow(), but adding and removing repeatedly is
        // too slow.
        ondrain = pipeOnDrain(src);
        dest.on('drain', ondrain);
      }
      src.pause();
    }
  }
  ...
};

// 当可写入流 dest 耗尽时,它将会在可读流对象 source 上减少 awaitDrain 计数器
// 为了确保所有需要缓冲的写入都完成,即 state.awaitDrain === 0 和 src 可读流上的 data 事件存在,切换流到流动模式
function pipeOnDrain(src) {
  return function pipeOnDrainFunctionResult() {
    const state = src._readableState;
    debug('pipeOnDrain', state.awaitDrain);
    if (state.awaitDrain)
      state.awaitDrain--;
    if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
      state.flowing = true;
      flow(src);
    }
  };
}

// stream.read() 从内部缓冲拉取并返回数据。如果没有可读的数据,则返回 null。在可读流上 src 还有一个 readable 属性,如果可以安全地调用 readable.read(),则为 true
function flow(stream) {
  const state = stream._readableState;
  debug('flow', state.flowing);
  while (state.flowing && stream.read() !== null);
}

2.2.4 Запуск события данных

Вызовите метод возобновления () для чтения, чтобы вызвать событие «данные» для чтения и войти в режим потоковой передачи.

Readable.prototype.pipe = function(dest, options) {
  const src = this;
  // Start the flow if it hasn't been started already.
  if (!state.flowing) {
    debug('pipe resume');
    src.resume();
  }
  ...

Затем возобновление экземпляра (определенного в прототипе Readable) вызовет метод возобновления(), который вызовет возобновление_() внутри метода и, наконец, выполнит stream.read(0) для однократного чтения пустых данных (размер установлен 0 ), будет запущен метод _read() для экземпляра, а затем будет запущено событие данных.

function resume(stream, state) {
  ...
  process.nextTick(resume_, stream, state);
}

function resume_(stream, state) {
  debug('resume', state.reading);
  if (!state.reading) {
    stream.read(0);
  }

  ...
}

2.2.5 Подписка на конечное событие

конечное событие: срабатывает, когда в доступном для чтения потоке нет данных, доступных для потребления, вызывается функция onend и выполняется метод dest.end(), чтобы указать, что нет данных для записи в доступный для записи поток, и закрыть (закройте fd доступного для записи потока), последующие вызовы stream.write() приведут к ошибке.

Readable.prototype.pipe = function(dest, options) {
  ...
  const doEnd = (!pipeOpts || pipeOpts.end !== false) &&
              dest !== process.stdout &&
              dest !== process.stderr;

  const endFn = doEnd ? onend : unpipe;
  if (state.endEmitted)
    process.nextTick(endFn);
  else
    src.once('end', endFn);

  dest.on('unpipe', onunpipe);
  ...

  function onend() {
    debug('onend');
    dest.end();
  }
}

2.2.6 Инициировать событие канала

В методе канала событие канала будет инициировано в конце, передавая доступный для чтения объект потока.

Readable.prototype.pipe = function(dest, options) {
  ...
  const source = this;
  dest.emit('pipe', src);
  ...
};

При использовании прикладного уровня вы можете подписаться на событие канала в доступном для записи потоке и сделать некоторые выводы.Подробности см. в этом примере, приведенном на официальном сайте.stream_event_pipe

2.2.7 поддерживает привязные звонки

Наконец, верните dest, поддержите аналогичное использование unix: A.pipe (B) .pipe (C)

Readable.prototype.pipe = function(dest, options) {
  return dest;
};

3. Резюме

Эта статья обычно делится на две части:

  • Первая часть относительно проста и объясняет, как метод канала Nodejs Stream применяется в Koa2.
  • Вторая часть по-прежнему посвящена методу канала Nodejs Stream, поиску его реализации и простому анализу исходного кода.На самом деле, суть метода канала заключается в прослушивании события данных и записи данных в доступный для записи поток. внутренний буфер больше, чем созданный поток. Когда highWaterMark сконфигурирован, поток данных должен быть остановлен до тех пор, пока не будет инициировано или завершено событие слива, и, конечно же, необходимо отслеживать такие события, как конец и ошибка, чтобы выполнить некоторую обработку.

4. Reference

  • nodejs.cn/api/stream.html
  • cnodejs.org/topic/56ba030271204e03637a3870
  • github.com/nodejs/node/blob/master/lib/_stream_readable.js