«Шаблоны проектирования Node.js» Кодирование с помощью потоков

Node.js внешний интерфейс JavaScript
«Шаблоны проектирования Node.js» Кодирование с помощью потоков

Эта серия статей являетсяШаблоны проектирования Node.js, второе изданиеОригинальный перевод и примечания к чтению, сериализованные и обновленные на GitHub,Синхронизировать ссылку на перевод.

Добро пожаловать, обратите внимание на мою колонку, в колонке будут синхронизированы следующие записи блога:

Coding with Streams

StreamsдаNode.jsОдин из важнейших компонентов и узоров. В сообществе есть поговорка «Стримить все (Steam — это все)», и одного этого достаточно, чтобы описатьNode.jsположение в.Dominic Tarrв видеNode.jsКрупнейший участник сообщества, который определяет поток какNode.jsСамая лучшая и самая сложная концепция для понимания.

сделатьNode.jsизStreamsЕсть и другие причины такой привлекательности; кроме того,StreamsНе только о технических характеристиках, таких как производительность или эффективность, но, что более важно, об их элегантности и о том, как они связаны сNode.jsТо, как концепция дизайна подходит идеально.

В этой главе вы узнаете следующее:

  • StreamsзаNode.jsважность.
  • Как создать и использоватьStreams.
  • StreamsКак парадигма программирования, а не только дляI/OС точки зрения его применения и мощных функций в различных сценариях применения.
  • Конвейерный режим и подключение в разных конфигурацияхStreams.

Откройте для себя важность потоков

на событийных платформах, таких какNode.js), обработкаI / OСамый эффективный метод — обработка в реальном времени: как только есть входная информация, она будет обработана немедленно, а как только появится результат, который нужно вывести, сразу же будет выведена обратная связь.

В этом разделе мы сначала представимNode.jsизStreamsи его преимущества. Помните, что это всего лишь обзор, так как использование и комбинация будут подробно описаны позже в этой главе.Streams.

Сравнение потоков и буферов

Почти каждый асинхронный API, который мы видели в этой книге, используетBufferмодель. Для операций ввода,BufferСхема соберет все данные из ресурса вBufferобласти; как только весь ресурс будет прочитан, результат передается в функцию обратного вызова. На следующем рисунке показан реальный пример этой парадигмы:

Из приведенного выше рисунка мы видим, чтоt1Иногда некоторые данные принимаются от ресурса и сохраняются в буфере. существуетt2В данный момент последний фрагмент данных принят в другой блок данных, и операция чтения завершена, в это время потребителю отправляется содержимое всего буфера.

с другой стороны,StreamsПозволяет обрабатывать данные по мере их поступления. Как показано ниже:

Это изображение показываетStreamsКак получать каждый новый блок данных от ресурса и сразу предоставлять его потребителю, теперь потребителю не нужно ждать, пока все данные соберутся в буфер, перед обработкой каждого блока данных.

Но в чем разница между этими двумя методами? Мы можем обобщить их на два пункта:

  • космическая эффективность
  • эффективность времени

также,Node.jsизStreamsИмеет еще одно важное преимущество:компонуемость. Теперь давайте посмотрим, как эти свойства влияют на то, как мы разрабатываем и пишем приложения.

космическая эффективность

Во-первых,StreamsПозволяет нам делать вещи, которые кажутся невозможными, путем буферизации данных и их одновременной обработки. Например, предположим, что нам нужно прочитать очень большой файл, скажем, сотниMBдаже тысячаMB. По-видимому, ожидание полного чтения файла возвращает большиеBufferизAPIНе хорошая идея. Представьте, что если несколько больших файлов будут читаться одновременно, нашему приложению может легко не хватить памяти. Кроме,V8серединаBufferне может быть больше, чем0x3FFFFFFFбайт (менее1GB). Таким образом, мы можем столкнуться со стеной до того, как закончится физическая память.

Сжимайте файлы с помощью Buffered API

В качестве конкретного примера рассмотрим простой интерфейс командной строки (CLI) приложение, которое используетGzipОтформатируйте сжатый файл. использоватьBufferedизAPI, такое приложение вNode.jsПримерно так (обработка исключений для краткости опущена):

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.readFile(file, (err, buffer) => {
  zlib.gzip(buffer, (err, buffer) => {
    fs.writeFile(file + '.gz', buffer, err => {
      console.log('File successfully compressed');
    });
  });
});

Теперь мы можем попробовать поместить предыдущий код в файл с именемgzip.jsфайл, а затем выполните следующую команду:

node gzip <path to file>

Если мы выберем достаточно большой файл, скажем, больше, чем1GBфайл, мы получаем сообщение об ошибке, говорящее о том, что файл, который мы хотим прочитать, больше, чем максимально допустимый размер буфера, например:

RangeError: File size is greater than possible Buffer:0x3FFFFFFF

В приведенном выше примере большой файл не найден, но скорость чтения больших файлов действительно намного ниже.

Как мы и предполагали, используяBufferОчевидно, что читать большие файлы неправильно.

Сжимайте файлы с помощью потоков

мы должны исправить нашиGzipприложение, и самый простой способ заставить его работать с большими файлами — использоватьStreamsизAPI. Давайте посмотрим, как этого добиться. Давайте заменим содержимое модуля, который мы только что создали, следующим кодом:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'))
  .on('finish', () => console.log('File successfully compressed'));

«Правда?» — спросите вы. Да, как мы сказали, из-заStreamsинтерфейс и компонуемость, поэтому мы также можем писать такой более краткий, элегантный и изысканный код. Мы подробно рассмотрим это позже, но сейчас важно понять, что программы могут без проблем работать с файлами любого размера, в идеале с постоянным использованием памяти. Попробуйте (но учтите, что сжатие большого файла может занять некоторое время).

эффективность времени

Теперь рассмотрим сжатый файл и загрузим его на удаленныйHTTPПримеры серверных приложений, которые удаленноHTTPСервер, в свою очередь, распаковывает и сохраняет его в файловой системе. Если наш клиент используетBufferedизAPIреализовано, загрузка начнется только после того, как весь файл будет прочитан и сжат. С другой стороны, распаковка начнется на сервере только после получения всех данных. Лучшее решение для достижения того же результата включает использованиеStreams. На клиентской машинеStreamsБлоки данных могут быть сжаты и отправлены до тех пор, пока они считываются из файловой системы, в то время как на сервере каждый блок данных может быть распакован, пока он получен от удаленного узла. Мы демонстрируем это, создавая вышеупомянутое приложение, начиная со стороны сервера.

Мы создаемgzipReceive.jsмодуль, код выглядит следующим образом:

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log('File request received: ' + filename);
  req
    .pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on('finish', () => {
      res.writeHead(201, {
        'Content-Type': 'text/plain'
      });
      res.end('That\'s it\n');
      console.log(`File saved: ${filename}`);
    });
});

server.listen(3000, () => console.log('Listening'));

Сервер получает чанки из сети, распаковывает их и сохраняет чанки, как только они получены, благодаряNode.jsизStreams.

Клиенты нашего приложения будут вводить файл с именемgzipSend.jsмодуль следующим образом:

В предыдущем коде мы снова используемStreamsЧтение данных из файла, затем сжатие и отправка каждого блока данных при чтении из файловой системы.

Теперь, чтобы запустить приложение, мы сначала запускаем сервер с помощью следующей команды:

node gzipReceive

Затем мы можем указать файл для отправки и адрес сервера, указав (например,localhost) для запуска клиента:

node gzipSend <path to file> localhost

Если мы выберем достаточно большой файл, нам будет легче увидеть, как данные передаются от клиента к серверу, но почему в этом режиме мы используемStreams, чем при использованииBufferedизAPIболее эффективным? Изображение ниже должно дать нам подсказку:

Процесс обработки файла проходит следующие этапы:

  1. Клиент читает из файловой системы
  2. Сжатые данные клиента
  3. Клиент отправляет данные на сервер
  4. Сервер получает данные
  5. Сервер распаковывает данные
  6. Сервер записывает данные на диск

Чтобы завершить обработку, мы должны пройти каждый этап в конвейерном порядке до конца. На изображении выше мы видим, что с помощьюBufferedизAPI, процесс полностью последователен. Чтобы сжать данные, мы сначала должны дождаться, пока будет прочитан весь файл, затем, чтобы отправить данные, мы должны дождаться, пока весь файл будет прочитан и сжат, и так далее. когда мы используемStreams, конвейер запустится, как только мы получим первый блок данных, не дожидаясь чтения всего файла. Но еще более удивительно то, что когда становится доступным очередной фрагмент данных, нет необходимости ждать завершения предыдущего набора задач, вместо этого параллельно запускается еще одна сборочная линия. Поскольку каждая задача, которую мы выполняем, является асинхронной, что идеально, ее можно выполнить с помощьюNode.jsвыполнять параллельноStreamsсвязанные операции; единственным ограничением является то, что каждый этап должен гарантировать порядок поступления блоков данных.

Как видно из предыдущего рисунка, при использованииStreamsВ результате весь процесс занимает меньше времени, поскольку нам не нужно ждать, пока все данные будут прочитаны и обработаны.

композиционность

Код, который мы видели до сих пор, говорил нам, как использоватьpipe()способ сборкиStreamsблок данных,StreamsПозволяет нам подключать различные процессорные блоки, каждый из которых несет единую ответственность (это согласуется сNode.jsСтиль). Это возможно, потому чтоStreamsимеет единый интерфейс иAPIс точки зрения различныхStreamsОн также может хорошо взаимодействовать. Единственным условием является следующий из конвейераStreamsдолжны поддерживать предыдущиеStreamsРезультирующий тип данных может быть двоичным, текстовым или даже объектным, как мы увидим в следующих главах.

чтобы доказатьStreamsПреимущество композиционности, мы можем попытаться построитьgzipReceive / gzipSendДобавьте функцию шифрования в приложение. Для этого нам просто нужно добавить еще одинStreamsдля обновления клиента. В частности, поcrypto.createChipher()Возвращенный поток. Результирующий код должен выглядеть так:

const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');
const http = require('http');
const path = require('path');

const file = process.argv[2];
const server = process.argv[3];

const options = {
  hostname: server,
  port: 3000,
  path: '/',
  method: 'PUT',
  headers: {
    filename: path.basename(file),
    'Content-Type': 'application/octet-stream',
    'Content-Encoding': 'gzip'
  }
};

const req = http.request(options, res => {
  console.log('Server response: ' + res.statusCode);
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_shared_secret'))
  .pipe(req)
  .on('finish', () => {
    console.log('File successfully sent');
  });

Точно так же мы обновляем код на стороне сервера, чтобы он мог расшифровать блок данных перед его распаковкой:

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log('File request received: ' + filename);
  req
    .pipe(crypto.createDecipher('aes192', 'a_shared_secret'))
    .pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on('finish', () => {
      res.writeHead(201, {
        'Content-Type': 'text/plain'
      });
      res.end('That\'s it\n');
      console.log(`File saved: ${filename}`);
    });
});

server.listen(3000, () => console.log('Listening'));

cryptoЭто один из основных модулей Node.js, предоставляющий ряд алгоритмов шифрования.

Всего несколькими строками кода мы добавили в наше приложение уровень шифрования. Нам просто нужно просто передать существующийStreamsМодуль и уровень шифрования объединены вместе, и все. Точно так же мы можем добавлять и объединять другиеStreams, как будто играешь с кубиками Лего.

Очевидно, что основным преимуществом этого подхода является возможность повторного использования, но, как видно из кода, представленного до сих пор,StreamsТакже можно получить более четкий, модульный и лаконичный код. По этим причинам потоки часто используются не только для обработки чистых данных.I / O, а также средство упрощения и модульности кода.

Начало работы с потоками

В предыдущих главах мы видели, почемуStreamsтакой мощный, и этоNode.jsвезде, даже вNode.jsОн также присутствует в основном модуле . Например, мы видели, что,fsмодуль имеет для чтения из файлаcreateReadStream()и для записи в файлcreateWriteStream(),HTTPОбъекты запроса и ответа по существуStreamszlibмодули позволяют использоватьStreamsРежимAPIСжимайте и распаковывайте блоки данных.

Теперь мы знаем, почемуStreamsЭто так важно, давайте сделаем шаг назад и начнем изучать его более подробно.

Структура потоков

Node.jsкаждый изStreamsобаStreamsРеализация одного из четырех основных абстрактных классов, доступных в основном модуле:

  • stream.Readable
  • stream.Writable
  • stream.Duplex
  • stream.Transform

каждыйstreamкласс такжеEventEmitterэкземпляр . По факту,StreamsМожно генерировать несколько типов событий, напримерendСобытия будут в читаемомStreamsЗапускается, когда чтение завершено, или считывается ошибка, или во время процесса возникает исключение.

Обратите внимание, что для краткости мы часто опускаем правильную обработку ошибок в примерах, представленных в этой главе. Однако в производственной среде всегда рекомендуется регистрировать прослушиватели событий ошибок для всех потоков.

StreamsОдна из причин его гибкости заключается в том, что он может обрабатывать не только двоичные данные, но и почти любыеJavaScriptценность. По факту,StreamsМогут поддерживаться два режима работы:

  • Двоичный режим: в блоках данных (например,buffersилиstrings) потоковые данные
  • Шаблон объекта: обрабатывать потоковые данные как ряд дискретных объектов (это позволяет нам использовать практически любойJavaScriptценность)

Эти два режима работы позволяют не только использоватьI / Oпотоки, но и как инструмент элегантного комбинирования блоков обработки в функциональном стиле, как мы увидим позже в этой главе.

В этой главе мы будем в основном использовать потоковый интерфейс Node.js, представленный в Node.js 0.11, также известный как версия 3. См. отличный пост в блоге StrongLoop по адресу https://strongloop.com/strongblog/whats-new-io-js-beta-streams3/ для получения более подробной информации об отличиях от старого интерфейса.

Читаемые потоки

читаемыйStreamsпредставляет собой источник данных, вNode.js, оно используетstreamв модулеReadableabstractреализация класса.

Чтение информации из потоков

читаемый изStreamsСуществует два способа получения данных:non-flowingузор иflowingмодель. Разберем эти закономерности более подробно.

нетекучий режим (non-flowing mode)

из читаемогоStreamsРежим чтения данных по умолчанию заключается в присоединении к нему прослушивателя событий readable, который указывает на доступность новых данных для чтения. Затем в цикле мы считываем все данные до тех пор, пока внутреннийbufferопустеет. Это можно использоватьread()завершается метод, который синхронно считывает данные из внутреннего буфера и возвращаетBufferилиStringобъект.read()Методы используются по следующей схеме:

readable.read([size]);

Используя этот подход, данные легко доступны непосредственно изStreamsВыписка по требованию.

Чтобы проиллюстрировать, как это работает, создадим файл с именемreadStdin.js, который реализует простую программу, которая считывает данные со стандартного ввода (читаемый поток) и выводит все данные на стандартный вывод:

process.stdin
  .on('readable', () => {
    let chunk;
    console.log('New data available');
    while ((chunk = process.stdin.read()) !== null) {
      console.log(
        `Chunk read: (${chunk.length}) "${chunk.toString()}"`
      );
    }
  })
  .on('end', () => process.stdout.write('End of stream'));

read()метод является синхронной операцией, он начинается с читаемогоStreamsвнутреннийBuffersИзвлеките блоки данных из области. еслиStreamsРаботает в двоичном режиме, возвращаемый блок данных по умолчаниюBufferобъект.

В читаемом потоке, работающем в двоичном режиме, мы можем прочитать строку вместо объекта Buffer, вызвав setEncoding(encoding) в потоке и предоставив допустимый формат кодирования (например, utf8).

Данные считываются из читаемого слушателя, который вызывается всякий раз, когда появляются новые данные. Когда во внутреннем буфере больше нет данных,read()метод возвращаетnull; в этом случае мы должны дождаться другого читаемого события, сообщающего нам, что мы можем прочитать снова, или дождаться представленияStreamsпроцесс чтения завершенendСобытие запущено. Когда поток работает в бинарном режиме, мы также можем передатьread()метод проходитsizeпараметр, чтобы указать размер данных, которые мы хотим прочитать. Это особенно полезно при реализации сетевых протоколов или разборе определенных форматов данных.

Теперь мы готовы бежатьreadStdinмодуль и эксперимент. Давайте наберем в консоли несколько символов и нажмемEnterчтобы просмотреть данные, выводимые на стандартный вывод. Чтобы завершить поток и, таким образом, сгенерировать обычное конечное событие, нам нужно вставитьEOF(конец файла) символ (вWindowsиспользовать наCtrl + Zили вLinuxиспользовать наCtrl + D).

Мы также можем попробовать связать нашу программу с другими программами, это можно сделать с помощью оператора канала (|), который перенаправляет стандартный вывод программы на стандартный ввод другой программы. Например, мы можем запустить следующую команду:

cat <path to a file> | node readStdin

Это отличный пример того, как потоковая парадигма является универсальным интерфейсом, который позволяет нашим программам взаимодействовать независимо от того, на каком языке они написаны.

проточный режим

отStreamsЕще один способ считывания — присоединить прослушиватель кdataсобытие; это будетStreamsпереключить наflowingрежим, в котором данные не используютсяread()функцию для извлечения, но как только данные поступятdataСлушатель вталкивается в слушателя. Например, мы создали ранееreadStdinПриложение будет использовать потоковый режим:

process.stdin
  .on('data', chunk => {
    console.log('New data available');
    console.log(
      `Chunk read: (${chunk.length}) "${chunk.toString()}"`
    );
  })
  .on('end', () => process.stdout.write('End of stream'));

flowingУстаревший режимStreamsинтерфейс (также известный какStreams1), который менее гибкий,APIменьше. вместе сStreams2введение интерфейса,flowingрежим не является рабочим режимом по умолчанию, чтобы включить его, слушатель должен быть подключен кdataсобытие или явный вызовresume()метод. временно прерватьStreamsкурокdataсобытие, мы можем назватьpause()метод, заставляющий любые входящие данные кэшироваться внутриbufferсередина.

Вызов pause() не приведет к переключению Streams обратно в нетекущий режим.

Реализовать читаемые потоки

Теперь мы знаем, какStreamsчитать данные, следующим шагом будет узнать, как реализовать новыйReadableпоток данных. Для этого необходимо передать наследованиеstream.Readableпрототип для создания нового класса. Бетонный поток должен обеспечивать_read()Реализация метода:

readable._read(size)

ReadableВнутри класса вызовет_read()метод, который, в свою очередь, запускает использоватьpush()Заполнить внутренний буфер:

Обратите внимание, что read() — это метод, вызываемый потребителями Stream, а _read() — это метод, реализованный подклассами Stream, и его нельзя вызывать напрямую. Символы подчеркивания обычно указывают на то, что метод является закрытым и не должен вызываться напрямую.

Чтобы продемонстрировать, как реализовать новый читаемыйStreams, мы можем попробовать реализовать случайную строкуStreams. Давайте создадимrandomStream.jsновый модуль, который будет содержать нашу строкуgeneratorкод:

const stream = require('stream');
const Chance = require('chance');

const chance = new Chance();

class RandomStream extends stream.Readable {
  constructor(options) {
    super(options);
  }

  _read(size) {
    const chunk = chance.string(); //[1]
    console.log(`Pushing chunk of size: ${chunk.length}`);
    this.push(chunk, 'utf8'); //[2]
    if (chance.bool({
        likelihood: 5
      })) { //[3]
      this.push(null);
    }
  }
}

module.exports = RandomStream;

В верхней части файла мы загрузим наши зависимости. за исключением того, что мы загружаеммодуль шанса npmКроме этого, ничего особенного, это библиотека для генерации всех видов случайных значений, от чисел и строк до целых предложений.

Следующим шагом является создание файла с именемRandomStreamновый класс и указатьstream.Readableкак его родительский класс. В предыдущем коде мы вызываем конструктор родительского класса для инициализации его внутреннего состояния и передачи полученногоoptionsпараметры в качестве входных данных. пройти черезoptionsВозможные параметры, передаваемые объектом, включают следующее:

  • дляBuffersпреобразовать вStringsизencodingпараметр (по умолчаниюnull)
  • Включить ли объектный режим (objectModeПо умолчаниюfalse)
  • хранится внутриbufferВерхний предел данных в зоне, как только этот верхний предел превышен, данные будут приостановленыdata sourceчитать(highWaterMarkПо умолчанию16KB)

хорошо, теперь давайте объясним, что мы переписалиstream.ReadableКатегория_read()метод:

  • Этот метод используетchanceГенерировать случайные строки.
  • это будет струнаpushвнутреннийbuffer. Обратите внимание, что поскольку мыpushдаString, и мы также указываем кодировку какutf8(если блок данных просто бинарныйBuffer, не требуется).
  • от5%вероятность случайного прерыванияstreamслучайных строк, сгенерированныхpush nullв интерьерBufferПредставлятьEOF,Сейчасstreamконец.

Мы также можем видеть в_read()дано на входе функцииsizeпараметр игнорируется, поскольку это рекомендуемый параметр. Мы можем просто поместить все доступные данные вpushк внутреннемуbuffer, но если в одном и том же вызове есть несколько push-уведомлений, то мы должны проверитьpush()возвращаться лиfalse, так как это означает, что внутреннийbufferдостигhighWaterMarkпредел, мы должны прекратить добавлять больше данных.

ВышеупомянутоеRandomStreamмодуль, теперь мы готовы его использовать. Давайте создадимgenerateRandom.jsновый модуль, в этом модуле мы создаем экземпляр новогоRandomStreamобъект и извлечь из него некоторые данные:

const RandomStream = require('./randomStream');
const randomStream = new RandomStream();

randomStream.on('readable', () => {
  let chunk;
  while ((chunk = randomStream.read()) !== null) {
    console.log(`Chunk received: ${chunk.toString()}`);
  }
});

Теперь, когда все готово, давайте попробуем новый кастомstream. просто выполнить как обычноgenerateRandomМодуль для просмотра случайных строк, протекающих по экрану.

Доступные для записи потоки

записываемыйstreamПредставляет конечную точку данных вNode.js, оно используетstreamв модулеWritableабстрактный класс для реализации.

писать в поток

поместить некоторые данные в записываемыеstreamэто простая вещь, все, что нам нужно сделать, это использоватьwrite()метод, который имеет следующий формат:

writable.write(chunk, [encoding], [callback])

encodingПараметр является необязательным и находится вchunkдаStringтип (по умолчаниюutf8,еслиchunkдаBuffer, затем игнорируется); когда блок данных сбрасывается в базовый ресурс,callbackбудет называться,callbackПараметры также являются необязательными.

Чтобы указать, что больше данные не будут записаныstream, мы должны использоватьend()метод:

writable.end([chunk], [encoding], [callback])

мы можем пройтиend()метод предоставляет последний фрагмент данных. при этих обстоятельствах,callbakфункция эквивалентнаfinishСобытие регистрирует слушателя, когда весь блок данных записанstreamin, событие будет запущено.

Теперь давайте создадим небольшую выходную последовательность случайных строк, создавHTTPсервер, чтобы продемонстрировать, как это работает:

const Chance = require('chance');
const chance = new Chance();

require('http').createServer((req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/plain'
  }); //[1]
  while (chance.bool({
      likelihood: 95
    })) { //[2]
    res.write(chance.string() + '\n'); //[3]
  }
  res.end('\nThe end...\n'); //[4]
  res.on('finish', () => console.log('All data was sent')); //[5]
}).listen(8080, () => console.log('Listening on http://localhost:8080'));

мы создалиHTTP服务器, и записать данные вresобъект,resобъектhttp.ServerResponseЭкземпляр , также доступный для записиstream. Вот объяснение того, что происходит с приведенным выше кодом:

  1. Сначала мы пишемHTTP responseголова. Пожалуйста, обрати внимание,writeHead()нетWritableчасть интерфейса, на самом деле этот методhttp.ServerResponseВспомогательные методы, предоставляемые классом.
  2. мы начинаем5%Цикл завершается с вероятностью (вероятность входа в тело цикла равнаchance.bool()продукция, которая95%).
  3. Внутри цикла мы записываем случайную строку вstream.
  4. Как только мы вышли из цикла, мы вызываемstreamизend(), указывая не более Блок данных будет записан. Кроме того, мы предоставляем последнюю строку для записи в поток перед завершением.
  5. Наконец, мы регистрируемfinishпрослушиватель событий, когда все блоки данных сбрасываются на нижний слойsocket, это событие будет запущено.

Мы можем назвать этот маленький модуль именемentropyServer.js, а затем выполнить его. Чтобы протестировать этот сервер, мы можем обратитьсяhttp:// localhost:8080Откройте браузер или из терминала используйтеcurlкоманда следующим образом:

curl localhost:8080

В этот момент сервер должен начать спрашивать выбранный вамиHTTP客户端Отправьте случайную строку (обратите внимание, что некоторые браузеры могут буферизовать данные, и поведение потоковой передачи может быть незаметным).

Обратное давление

Подобно жидкостям, текущим в реальных трубопроводных системах,Node.jsизstreamтакже могут страдать от узких мест, данные могут записываться быстрее, чемstreamпотребление. Механизмы для решения этой проблемы включают буферизацию входных данных; однако, если данныеstreamБез какой-либо обратной связи с производителем у нас может возникнуть ситуация, когда все больше и больше данных накапливается во внутреннем буфере, вызывая утечку памяти.

Чтобы этого не произошло, при внутреннемbufferПревосходитьhighWaterMarkПри ограничении,writable.write()вернусьfalse. доступный для записиstreamимеютhighWaterMarkсобственность, котораяwrite()метод начинает возвращатьсяfalseвнутреннийBufferограничение размера области, один разBufferРазмер экстента превышает этот предел, что указывает на то, что приложение должно прекратить запись. Когда буфер опустошается, вызывается триггерdrainсобытие, можно безопасно уведомить, что можно безопасно начать запись снова. Этот механизм называетсяback-pressure.

Механизмы, описанные в этом разделе, также применимы к читаемым потокам. На самом деле противодавление также существует в читаемых потоках и срабатывает, когда метод push(), вызываемый внутри _read(), возвращает false. Однако это специфическая проблема для разработчиков потоков, поэтому мы не будем часто ею заниматься.

Мы можем изменить ранее созданныйentropyServerмодуль для демонстрации возможности записиstreamизback-pressure:

const Chance = require('chance');
const chance = new Chance();

require('http').createServer((req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/plain'
  });

  function generateMore() { //[1]
    while (chance.bool({
        likelihood: 95
      })) {
      const shouldContinue = res.write(
        chance.string({
          length: (16 * 1024) - 1
        }) //[2]
      );
      if (!shouldContinue) { //[3]
        console.log('Backpressure');
        return res.once('drain', generateMore);
      }
    }
    res.end('\nThe end...\n', () => console.log('All data was sent'));
  }
  generateMore();
}).listen(8080, () => console.log('Listening on http://localhost:8080'));

Наиболее важные шаги в предыдущем коде можно резюмировать следующим образом:

  1. Мы инкапсулируем основную логику вgenerateMore()в функции.
  2. Чтобы получить немногоback-pressureслучайно, мы увеличиваем размер блока данных до16KB-1Byte, что очень близко к значению по умолчаниюhighWaterMarkпредел.
  3. После записи блока данных мы проверяемres.write()Возвращаемое значение. если он вернетсяfalse, а это означает, что внутреннийbufferзаполнен, мы должны прекратить отправлять больше данных. В этом случае мы выходим из функции, а затем регистрируем нового издателя для записи событий, когдаdrainВызывается при возникновении событияgenerateMore.

Если мы сейчас попытаемся снова запустить сервер, то используемcurlгенерировать клиентский запрос, вполне вероятно, что будет какой-тоback-pressure, потому что сервер генерирует данные с очень высокой скоростью, даже быстрее, чем базовыйsocketБыстрее.

Реализовать доступные для записи потоки

Мы можем унаследоватьstream.Writableкласс для реализации нового записываемого потока, а для_write()метод обеспечивает реализацию. Реализовать наш собственный перезаписываемыйStreamsсвоего рода.

Давайте создадим записываемыйstream, который получает объекты в следующем формате:

{
  path: <path to a file>
  content: <string or buffer>
}

Роль этого класса такова: для каждого объекта нашstreamдолжно бытьcontentраздел сохраняется в файл, созданный по заданному пути. мы можем сразу увидеть, что мыstreamВход - это объект, а неStringsилиBuffers, а это значит, что нашstreamДолжен работать в объектном режиме.

вызывающий модульtoFileStream.js:

const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirp = require('mkdirp');

class ToFileStream extends stream.Writable {
  constructor() {
    super({
      objectMode: true
    });
  }

  _write(chunk, encoding, callback) {
    mkdirp(path.dirname(chunk.path), err => {
      if (err) {
        return callback(err);
      }
      fs.writeFile(chunk.path, chunk.content, callback);
    });
  }
}
module.exports = ToFileStream;

В качестве первого шага мы загружаем все необходимые нам зависимости. Обратите внимание, что нам нужны модулиmkdirp, как вы должны знать из предыдущих глав, он должен использоватьnpmУстановить.

Мы создаем новый класс, который начинается сstream.Writableрасширен.

Нам пришлось вызвать родительский конструктор, чтобы инициализировать его внутреннее состояние; мы также предоставилиoptionobject в качестве параметра, чтобы указать, что поток работает в объектном режиме (objectMode:true).stream.WritableДругие принятые варианты следующие:

  • highWaterMark(Значение по умолчанию16KB):контрольback-pressureверхний предел .
  • decodeStrings(по умолчаниюtrue): в строке, переданной_write()метод, автоматически декодирует строку в двоичныйbufferПлощадь. Этот параметр игнорируется в объектном режиме.

Наконец, мы_write()метод обеспечивает реализацию. Как видите, этот метод принимает блок данных, кодировку (только в двоичном режиме,streamопцииdecodeStringsУстановить какfalseсо смыслом).

Кроме того, метод принимает callback-функцию, которую нужно вызвать по завершении операции, необязательно передавать результат операции, но мы все же можем передатьerrorобъект, в результате чегоstreamкурокerrorсобытие.

Теперь, чтобы попробовать то, что мы только что построилиstream, мы можем создатьwriteToFile.js, и выполните некоторые операции записи в потоке:

const ToFileStream = require('./toFileStream.js');
const tfs = new ToFileStream();

tfs.write({path: "file1.txt", content: "Hello"});
tfs.write({path: "file2.txt", content: "Node.js"});
tfs.write({path: "file3.txt", content: "Streams"});
tfs.end(() => console.log("All files created"));

Таким образом, мы создали и использовали наш первый настраиваемый поток с возможностью записи. Запустите новый модуль, как обычно, чтобы проверить его вывод; вы увидите три новых файла, созданных после выполнения.

Двойной поток

ДвойнойstreamКак для чтения, так и для записи. Когда мы хотим описать объект, который является одновременно источником данных и получателем данных (например,socket), что оказалось очень полезным. Наследование дуплексного потокаstream.Readableиstream.Writableметод, поэтому он не нов для нас. Это означает, что мы можемread()илиwrite()данные, или может контролироватьreadableиdrainсобытие.

Создание пользовательского двойникаstream, мы должны_read()и_write()Предоставьте реализацию. Перейти кDuplex()конструкторoptionsОбъект внутренне перенаправляется наReadableиWritableконструктор.optionsСодержание параметров такое же, как обсуждалось ранее,optionsдобавилallowHalfOpenзначение (по умолчаниюtrue), если установленоfalse, будет иметь место до тех пор, покаstreamвечеринка (ReadableиWritable)Заканчивать,streamЭто конец.

Чтобы двойной поток работал в объектном режиме с одной стороны и в бинарном режиме с другой, нам нужно вручную установить следующие свойства в конструкторе потока:

this._writableState.objectMode
this._readableState.objectMode

Преобразованные потоки

преобразованныйStreamsэто особый тип двойногоStreams.

в простом двойникеStreamsв отstreamНет прямой связи между считываемыми данными и записываемыми в них данными (по крайней мере,streamявляется непознаваемым). подумай об одномTCP socket, он просто отправляет данные и получает данные от удаленных узлов.TCP socketОн не осознает, что существует какая-либо связь между входом и выходом.

На приведенной ниже диаграмме показана двойнаяStreamsПоток данных в:

С другой стороны, преобразованныйStreamsПримените некоторое преобразование к каждому блоку данных, полученному со стороны, доступной для записи, затем сделайте преобразованные данные доступными со стороны, доступной для чтения.

На изображении ниже показано, как данные преобразуютсяStreamsСредний поток:

Снаружи преобразованныйStreamsинтерфейс с двойнымStreamsинтерфейс точно такой же. Но когда мы хотим построить новый двойникStreams, мы должны предоставить_read()и_write()метод, и чтобы реализовать новый поток преобразования, мы должны заполнить еще пару методов:_transform()и_flush()).

Давайте продемонстрируем, как использовать пример для создания нового преобразованногоStreams.

Потоки, реализующие преобразование

Реализуем преобразованиеStreams, который заменит все вхождения данной строки. Для этого мы должны создатьreplaceStream.jsновый модуль. Перейдем непосредственно к тому, как это реализовать:

const stream = require('stream');
const util = require('util');

class ReplaceStream extends stream.Transform {
  constructor(searchString, replaceString) {
    super();
    this.searchString = searchString;
    this.replaceString = replaceString;
    this.tailPiece = '';
  }

  _transform(chunk, encoding, callback) {
    const pieces = (this.tailPiece + chunk)         //[1]
      .split(this.searchString);
    const lastPiece = pieces[pieces.length - 1];
    const tailPieceLen = this.searchString.length - 1;

    this.tailPiece = lastPiece.slice(-tailPieceLen);     //[2]
    pieces[pieces.length - 1] = lastPiece.slice(0,-tailPieceLen);

    this.push(pieces.join(this.replaceString));       //[3]
    callback();
  }

  _flush(callback) {
    this.push(this.tailPiece);
    callback();
  }
}

module.exports = ReplaceStream;

Как всегда, мы будем собирать модуль, начиная с его зависимостей. На этот раз мы не используем сторонние модули.

Затем мы создалиstream.TransformНовый класс, наследуемый от базового класса. Конструктор этого класса принимает два параметра:searchStringиreplaceString. Как вы можете себе представить, они позволяют нам определить текст для сопоставления и строку для использования в качестве замены. Мы также инициализируем_transform()используемый методtailPieceвнутренняя переменная.

Теперь давайте проанализируем_transform()метод, который является сердцем нашего нового класса._transform()методы и записываемыеstreamиз_write()методы имеют почти такой же формат, но вместо записи данных в базовый ресурс используютthis.push()втолкнуть его внутрьbuffer, то же самое, что мы имели бы в читаемом потоке_read()метод выполняется. Это показывает преобразованныйStreamsкак на самом деле соединяются две стороны.

ReplaceStreamиз_transform()Методы реализуют ядро ​​нашего нового класса. Обычно поиск и заменаbufferСтроки в зоне — простая задача, однако при потоковой передаче данных ситуация совершенно иная, и возможные совпадения могут быть распределены по нескольким фрагментам данных. Программу, стоящую за кодом, можно объяснить следующим образом:

  1. Наш алгоритм используетsearchStringФункция действует как разделитель для разделения блоков.
  2. Затем он берет последний элемент результирующего массива после разделенияlastPieceи извлеките его последний символsearchString.length - 1. Результаты сохраняются вtailPieceпеременная, она будет использоваться как префикс для следующего блока данных.
  3. Наконец, все изsplit()Полученный фрагментreplaceStringобъединены вместе как разделители и помещены внутрьbufferПлощадь.

когдаstreamВ конце концов, у нас может быть еще один последнийtailPieceПеременная не была помещена во внутренний буфер. это точно_flush()цель метода; он заключается вstreamвызывается перед завершением, и именно здесь у нас, наконец, есть шанс завершить поток или передать любые оставшиеся данные, прежде чем полностью завершить поток.

_flush()Методу нужна только функция обратного вызова в качестве параметра, мы должны обязательно вызвать эту функцию обратного вызова, когда все операции будут выполнены. Готово с этим, мы закончили нашReplaceStreamсвоего рода.

Теперь пришло время попробовать что-то новоеstream. Мы можем создать еще один с именемreplaceStreamTest.jsмодуль для записи некоторых данных, а затем чтения результата преобразования:

const ReplaceStream = require('./replaceStream');

const rs = new ReplaceStream('World', 'Node.js');
rs.on('data', chunk => console.log(chunk.toString()));

rs.write('Hello W');
rs.write('orld!');
rs.end();

Чтобы немного усложнить этот пример, мы распределяем условия поиска по двум разным фрагментам, а затем используемflowingрежим, мы начинаем с того жеstreamСчитайте данные в , записывая каждый преобразованный блок. Выполнение предыдущей программы должно привести к следующему результату:

Hel
lo Node.js
!

Стоит упомянуть пятый тип потока: stream.PassThrough. В отличие от других классов потоков, которые мы рассмотрели, PassThrough не является абстрактным и может быть создан непосредственно без реализации каких-либо методов. По сути, это трансформируемый поток, который выводит каждый фрагмент данных без какого-либо преобразования.

Соединение потоков с помощью каналов

UnixПонятие трубы такоеDouglas McllroyИзобретен; это позволяет выходу программы быть соединенным со входом следующей. Взгляните на следующую команду:

echo Hello World! | sed s/World/Node.js/g

В предыдущей командеechoбудетHello World!записывается в стандартный вывод, который затем перенаправляется наsedстандартный ввод команды (из-за оператора канала|). потомsedиспользоватьNode.jsзаменить любойWorld, и выводит результат на стандартный вывод (в этот раз на консоль).

Аналогичным образом можно использовать читаемыйStreamsизpipe()метод будетNode.jsизStreamsВместе они имеют следующие интерфейсы:

readable.pipe(writable, [options])

очень интуитивно,pipe()метод будет начинаться с читаемогоStreamsИзвлеките данные, испускаемые в предоставленной записиStreamsсередина. Кроме того, когда читаемыйStreamsпроблемаendсобытие (если мы не укажем{end:false}в видеoptions), записываемыйStreamsзакончится автоматически.pipe()Метод возвращает записываемый объект, переданный в качестве параметраStreams, если такойstreamтакже читаем (например, двойной или конвертируемыйStreams), позволяет нам создавать связанные вызовы.

поставить дваStreamsПри соединении вместе данные могут автоматически передаваться на записываемыйStreams, так что не надо звонитьread()илиwrite()метод; но самое главное не требует контроляback-pressure, так как он обрабатывается автоматически.

В качестве простого примера (примеров будет множество) мы можем создать файл с именемreplace.jsНовый модуль, который принимает поток текста со стандартного ввода, применяет преобразование замены и возвращает данные на стандартный вывод:

const ReplaceStream = require('./replaceStream');
process.stdin
  .pipe(new ReplaceStream(process.argv[2], process.argv[3]))
  .pipe(process.stdout);

Вышеприведенная программа передает данные из стандартного ввода вReplaceStream, затем возвращается к стандартному выводу. Теперь, чтобы попрактиковаться в этом маленьком приложении, мы можем использоватьUnixКанал перенаправляет некоторые данные на свой стандартный ввод следующим образом:

echo Hello World! | node replace World Node.js

Запуск вышеуказанной программы выведет следующие результаты:

Hello Node.js

Этот простой пример демонстрируетStreams(особенно текстStreams) — это общий интерфейс, а каналы — это в значительной степени общий способ их составления и соединения.

errorСобытия не распространяются автоматически по конвейеру. Например, посмотрите на следующий фрагмент кода:

stream1
  .pipe(stream2)
  .on('error', function() {});

В предыдущих цепочках вызовов мы будем собирать данные только изstream2ошибка, это связано с тем, что мы добавилиerorrпрослушиватель событий. Это означает, что если мы хотим захватить изstream1Любые сгенерированные ошибки, мы должны напрямую подключить другой прослушиватель ошибок. Позже мы увидим еще один паттерн, с помощью которого можно достичь общего отлова ошибок (слияниеStreams). Кроме того, следует отметить, что если цельStreams(читатьStreams) выдает ошибку, будетStreamsуведомить одинerror, что затем приводит к прерыванию конвейера.

Как потоки проходят через трубы

Пока мы создаем на заказStreamsпуть не следует точноNodeзаданной схеме; фактически, изstreamнаследование базового класса является нарушениемsmall surface area, и нужен пример кода. это не значитStreamsПлохо спроектированный, на самом деле, мы не должны забывать, потому чтоStreamsдаNode.jsчасть ядра, поэтому они должны быть максимально гибкими и широко расширяемымиStreamsТак что модули пользовательского уровня могут в полной мере использовать их.

Однако в большинстве случаев нам не нужны все возможности и расширяемость, которые может дать наследование прототипов, но зачастую все, что нам нужно, — это определить новыеStreamsРежим быстрого развития.Node.jsСообщество, конечно же, создало решение и для этого. Прекрасным примером являетсяthrough2, который позволяет нам просто создавать преобразованияStreamsнебольшая библиотека. пройти черезthrough2, мы можем создать новый кабриолет, вызвав простую функциюStreams:

const transform = through2([options], [_transform], [_flush]);

сродни,from2также позволяет нам создать удобочитаемый видStreams:

const readable = from2([options], _read);

Затем мы покажем их использование в оставшейся части этой главы, когда увидим преимущества использования этих небольших библиотек.

throughиfromоснован наStream1Каноническая библиотека верхнего уровня.

Асинхронный поток управления на основе потоков

Из приведенных нами примеров должно быть ясно, чтоStreamsЕго можно использовать не только для обработкиI / O, и может использоваться как элегантный шаблон программирования для работы с данными любого типа. Но на этом преимущества не заканчиваются, вы также можете воспользоватьсяStreamsдля реализации асинхронного потока управления, как мы увидим в этом разделе.

Последовательное исполнение

по умолчанию,StreamsДанные будут обрабатываться последовательно, например, преобразовыватьсяStreamsиз_transform()Функция выполняется в предыдущем блоке данныхcallback()Только после этого будет вызван следующий блок данных. ЭтоStreamsВажным свойствомStreamsРеализуйте элегантные традиционные шаблоны потока управления.

Код всегда лучше, чем слишком много объяснений, поэтому давайте рассмотрим пример использования потоков для последовательного выполнения асинхронных задач. Давайте создадим функцию для объединения набора полученных файлов в качестве входных данных, соблюдая указанный порядок. Мы создаемconcatFiles.js, и начнем с его зависимостей:

const fromArray = require('from2-array');
const through = require('through2');
const fs = require('fs');

мы будем использоватьthrough2чтобы упростить преобразованиеStreamsсоздается и используетfrom2-arrayСоздать читаемый из массива объектовStreams. Далее мы можем определитьconcatFiles()функция:

function concatFiles(destination, files, callback) {
  const destStream = fs.createWriteStream(destination);
  fromArray.obj(files)             //[1]
    .pipe(through.obj((file, enc, done) => {   //[2]
      const src = fs.createReadStream(file);
      src.pipe(destStream, {end: false});
      src.on('end', done); //[3]
    }))
    .on('finish', () => {         //[4]
      destStream.end();
      callback();
    });
}

module.exports = concatFiles;

Предыдущая функция передаетсяfilesмассив преобразуется вStreamsдля достиженияfilesПоследовательная итерация массива. Процедура, за которой следует функция, объясняется следующим образом:

  1. Во-первых, мы используемfrom2-arrayотfilesмассив создает читаемыйStreams.
  2. Далее мы используемthroughдля создания трансформированногоStreamsдля обработки каждого файла в последовательности. Для каждого файла мы создаем читаемыйStreams, и передать его в представление выходного файлаdestStreamсередина. После того, как исходный файл закончил чтение, передайтеpipe()указывается во втором параметре метода{end:false}, мы следим за тем, чтобы не закрытьdestStream.
  3. Когда все содержимое исходного файла перенесено вdestStreamкогда мы звонимthrough.objпубличныйdoneфункция передачи текущей обработки завершена, в нашем случае это необходимо для запуска обработки следующего файла.
  4. После обработки всех файловfinishсобытие запущено. мы можем, наконец, закончитьdestStreamи позвониconcatFiles()изcallback()функция, эта функция представляет собой завершение всей операции.

Теперь мы можем опробовать только что созданный нами небольшой модуль. Давайте создадимconcat.jsновый файл для завершения примера:

const concatFiles = require('./concatFiles');

concatFiles(process.argv[2], process.argv.slice(3), () => {
  console.log('Files concatenated successfully');
});

Теперь мы можем запустить приведенную выше программу с целевым файлом в качестве первого аргумента командной строки, за которым следует список файлов для объединения, например:

node concat allTogether.txt file1.txt file2.txt

Выполнение этой команды создастallTogether.txtновый файл для последовательного сохраненияfile1.txtиfile2.txtСодержание.

использоватьconcatFiles()функцию, мы можем просто использоватьStreamsРеализует последовательное выполнение асинхронных операций. как и мыChapter3 Asynchronous Control Flow Patters with CallbacksКак видно из, при использовании чистогоJavaScriptвнедрить или использоватьasyncи другие внешние библиотеки, вам необходимо использовать или реализовать итераторы. Теперь мы предоставляем другой метод, который достигает того же эффекта, и, как мы видим, он реализован очень элегантно и читабельно.

Шаблоны. Используя потоки или комбинацию потоков, можно легко последовательно выполнять набор асинхронных задач.

Внеочередное параллельное выполнение

мы только что виделиStreamsОбрабатывайте каждый блок данных последовательно, но иногда это может быть невозможно, потому что он не используется полностью.Node.jsпараллелизма. Если нам нужно выполнять медленную асинхронную операцию над каждым фрагментом данных, то совершенно необходимо распараллелить выполнение этого набора асинхронных задач. Конечно, этот шаблон можно применять только в том случае, если нет связи между каждым блоком данных, что часто может происходить в шаблоне объекта.Streams, но для бинарного режимаStreamsПараллельное выполнение вне очереди используется редко.

Примечание. Потоки с неупорядоченным параллельным выполнением нельзя использовать, когда важен порядок обработки данных.

Чтобы распараллелить конвертируемыйStreamsреализации, мы можем использоватьChapter3 Asynchronous Control Flow Patters with CallbacksТе же шаблоны, упомянутые для параллельного выполнения вне порядка, а затем внести некоторые изменения, чтобы заставить их работать дляStreams. Посмотрим, как это изменится.

Реализация неупорядоченных параллельных потоков

Перейдем сразу к делу на примере: мы создаемparallelStream.jsмодуль, затем настроить нормальный конвертируемыйStreams, а затем привести ряд методов, преобразующих потоки:

const stream = require('stream');

class ParallelStream extends stream.Transform {
  constructor(userTransform) {
    super({objectMode: true});
    this.userTransform = userTransform;
    this.running = 0;
    this.terminateCallback = null;
  }

  _transform(chunk, enc, done) {
    this.running++;
    this.userTransform(chunk, enc, this._onComplete.bind(this), this.push.bind(this));
    done();
  }

  _flush(done) {
    if(this.running > 0) {
      this.terminateCallback = done;
    } else {
      done();
    }
  }

  _onComplete(err) {
    this.running--;
    if(err) {
      return this.emit('error', err);
    }
    if(this.running === 0) {
      this.terminateCallback && this.terminateCallback();
    }
  }
}

module.exports = ParallelStream;

Давайте проанализируем этот новый пользовательский класс. Как видите, конструктор принимаетuserTransform()в качестве аргумента, который затем сохраняется как переменная экземпляра; мы также вызываем родительский конструктор и по умолчанию включаем объектный режим.

Далее см._transform()метод, в этом методе мы выполняемuserTransform()функцию, а затем увеличиваем количество запущенных в данный момент задач; наконец, мы вызываемdone()для уведомления о том, что текущий шаг преобразования завершен._transform()Метод показывает, как параллельно обрабатывать другую задачу. нам не нужно ждатьuserTransform()После выполнения метода вызовите его сноваdone(). Вместо этого мы немедленно выполняемdone()метод. С другой стороны, мы предоставляем специальную функцию обратного вызова дляuserTransform()метод, этоthis._onComplete()метод, чтобы мы моглиuserTransform()Получите уведомление, когда это будет сделано.

существуетStreamsПеред завершением он будет называться_flush()метод, поэтому, если еще есть запущенные задачи, мы можем сделать это, не вызываяdone()функция обратного вызова для задержкиfinishтриггер события. Вместо этого мы назначаем егоthis.terminateCallbackПеременная. чтобы понятьStreamsКак правильно завершать см._onComplete()метод.

Когда каждый набор асинхронных задач наконец завершается,_onComplete()метод будет вызван. Во-первых, он проверяет, запущена ли задача, и, если нет, вызываетthis.terminateCallback()функция, которая приведет кStreamsконец, триггер_flush()методfinishсобытие.

Используйте только что построенныйParallelStreamкласс для простого создания трансформируемого параллельного выполнения вне очередиStreamsinstance, но с оговоркой: он не сохраняет порядок получения элементов. На самом деле асинхронные операции потенциально могут выполняться и отправлять данные в любое время, независимо от того, когда они начинаются. Итак, мы знаем, что для двоичного режимаStreamsне применяется, потому что двоичныйStreamsТребования более высокого порядка.

Реализовать приложение для мониторинга URL

Теперь воспользуемсяParallelStreamМодуль реализует конкретный пример. Давайте представим следующее: мы хотим создать простой сервис для мониторинга большогоURLСостояние списка, представим себе следующее, все этиURLсодержится в отдельном файле, и каждыйURLзанимает пустую строку.

Streamsможет обеспечить эффективное и элегантное решение для этого сценария. Особенно, когда мы используем то, что только что написалиParallelStreamкласс, чтобы проверить их не по порядкуURL.

Далее создадим простойcheckUrls.jsмодульное приложение.

const fs = require('fs');
const split = require('split');
const request = require('request');
const ParallelStream = require('./parallelStream');

fs.createReadStream(process.argv[2])         //[1]
  .pipe(split())                             //[2]
  .pipe(new ParallelStream((url, enc, done, push) => {     //[3]
    if(!url) return done();
    request.head(url, (err, response) => {
      push(url + ' is ' + (err ? 'down' : 'up') + '\n');
      done();
    });
  }))
  .pipe(fs.createWriteStream('results.txt'))   //[4]
  .on('finish', () => console.log('All urls were checked'))
;

Как мы видим, с потоками наш код выглядит очень элегантно и интуитивно понятно. Давайте посмотрим, как это работает:

  1. Во-первых, мы создаем читаемый файл с заданным файловым аргументомStreams, чтобы файл можно было прочитать позже.
  2. мы проходимsplitвходной файлStreamsКонтент выводит конвертируемыйStreamsв конвейер, и каждая строка блока данных отделена.
  3. Тогда пришло время использовать нашParallelStreamПроверятьURL, мы отправляемHEADзапрос, а затем дождаться запрошенногоresponse. Когда запрос возвращается, мы помещаем результат запросаpushприбытьstreamсередина.
  4. Наконец, передайте результат вresults.txtв файле.
node checkUrls urlList.txt

файл здесьurlList.txtсодержит наборURL,Например:

  • http://www.mariocasciaro.me/
  • http://loige.co/
  • http://thiswillbedownforsure.com/

Когда приложение завершит выполнение, мы увидим файлresults.txtсоздается с результатом операции, например:

  • http://thiswillbedownforsure.com is down
  • http://loige.co is up
  • http://www.mariocasciaro.me is up

Порядок выходных результатов, вероятно, будет таким же, как указан во входном файле.URLпорядок разный. ЭтоStreamsОтличительные особенности внеочередного параллельного выполнения задач.

Из любопытства мы могли бы попробовать заменить ParallelStream на обычный поток through2 и сравнить их поведение и производительность (упражнение, которое вы, возможно, захотите сделать). Мы увидим, что использование through2 медленнее, потому что каждый URL будет проверяться по порядку, а порядок результатов в файле results.txt сохраняется.

Нестандартное предельное параллельное выполнение

При запуске файла, содержащего тысячи или миллионы URL-адресовcheckUrlsприложение, у нас определенно будут проблемы. Наше приложение будет создавать неконтролируемое количество одновременных подключений, параллельно отправлять большие объемы данных и потенциально нарушать стабильность приложения и доступность всей системы. Мы уже знаем, что неупорядоченное ограниченное параллельное выполнение контролируемых нагрузок является отличным решением.

Давайте создадимlimitedParallelStream.jsмодуль, чтобы увидеть, как он работает, этот модуль адаптирован из созданного в предыдущем разделеparallelStream.jsмодуль.

Посмотрим на его конструктор:

class LimitedParallelStream extends stream.Transform {
  constructor(concurrency, userTransform) {
    super({objectMode: true});
    this.concurrency = concurrency;
    this.userTransform = userTransform;
    this.running = 0;
    this.terminateCallback = null;
    this.continueCallback = null;
  }
// ...
}

нам нужноconcurrencyПеременные используются в качестве входных данных для ограничения количества параллелизма, на этот раз мы хотим сохранить две функции обратного вызова,continueCallbackдля любых ожидающих_transformметод,terminateCallbackОбратный вызов для метода _flush. см. далее_transform()метод:

_transform(chunk, enc, done) {
  this.running++;
  this.userTransform(chunk, enc,  this.push.bind(this), this._onComplete.bind(this));
  if(this.running < this.concurrency) {
    done();
  } else {
    this.continueCallback = done;
  }
}

на этот раз в_transform()метод, мы должны вызватьdone()Перед проверкой, достигнут ли предел максимального количества параллелей, если предел не достигнут, может быть запущена обработка следующего элемента. Если мы достигли предела максимального количества параллелей, мы можем просто положитьdone()обратный вызов сохраняется вcontinueCallbackпеременная, чтобы она вызывалась сразу после завершения задачи.

_flush()метод сParallelStreamКласс остался точно таким же, поэтому переходим сразу к реализации_onComplete()метод:

_onComplete(err) {
  this.running--;
  if(err) {
    return this.emit('error', err);
  }
  const tmpCallback = this.continueCallback;
  this.continueCallback = null;
  tmpCallback && tmpCallback();
  if(this.running === 0) {
    this.terminateCallback && this.terminateCallback();
  }
}

Всякий раз, когда задача завершается, мы вызываем любой сохраненныйcontinueCallback()приведет кstreamРазблокировать, запускает обработку следующего элемента.

ЭтоlimitedParallelStreamмодуль. мы можем сейчасcheckUrlsмодуль использовать его вместоparallelStream, и ограничьте параллелизм наших задач установленным нами значением.

Последовательное параллельное выполнение

параллель, которую мы ранее создалиStreamsМожно испортить порядок данных, но в некоторых случаях это недопустимо. Иногда на самом деле существуют бизнес-сценарии, требующие, чтобы каждый блок данных отправлялся в том же порядке, в котором он был получен. Мы все еще можем работать параллельноtransformфункция. Все, что нам нужно сделать, это отсортировать данные, выдаваемые каждой задачей, так, чтобы они следовали тому же порядку, что и полученные данные.

Эта техника предполагает использованиеbuffer, который переупорядочивает фрагменты по мере того, как каждая запущенная задача испускает. Для краткости мы не собираемся предоставлять такуюstreamреализации, так как объем этой книги довольно многословен; все, что нам нужно сделать, это повторно использоватьnpmодин из доступных пакетов на , например.through2-parallel.

Мы можем модифицировать существующийcheckUrlsмодуль для быстрой проверки поведения упорядоченного параллельного выполнения. Предположим, мы хотим, чтобы наши результаты были такими, как во входном файлеURLнаписано в том же порядке. Мы можем использоватьthrough2-parallelреализовать:

const fs = require('fs');
const split = require('split');
const request = require('request');
const throughParallel = require('through2-parallel');

fs.createReadStream(process.argv[2])
  .pipe(split())
  .pipe(throughParallel.obj({concurrency: 2}, function (url, enc, done) {
    if(!url) return done();
    request.head(url, (err, response) => {
      this.push(url + ' is ' + (err ? 'down' : 'up') + '\n');
      done();
    });
  }))
  .pipe(fs.createWriteStream('results.txt'))
  .on('finish', () => console.log('All urls were checked'))
;

Как мы можем видеть,through2-parallelинтерфейс сthrough2Интерфейсы очень похожи, разница только вthrough2-parallelтакже может предоставить намtransformФункция указывает предел параллелизма. Если мы попытаемся запустить эту новую версиюcheckUrls,посмотримresults.txtфайл перечисляет результаты в том же порядке, что и во входном файле Порядок, в котором появляются URL-адреса, тот же.

На этом мы резюмируем использованиеStreamsРеализовать анализ асинхронного потока управления, затем мы исследуем шаблон конвейера.

конвейерный режим

Как и в реальной жизни,Node.jsизStreamsСоединения труб также могут быть выполнены в соответствии с различными режимами. На самом деле, мы можем объединить два разныхStreamsсливаться в одинStreams, положитьStreamsРазделите на два или более конвейера или перенаправьте потоки в зависимости от условий. В этом разделе мы рассмотримNode.jsизStreamsСамая важная сантехническая технология.

Комбинированные потоки

В этой главе мы подчеркиваемStreamsПредоставляет простую инфраструктуру для модуляции и повторного использования нашего кода, но упускает важную часть: что, если мы хотим разделить на модули и повторно использовать целые конвейеры? Если мы хотим объединить несколькоStreams, делая их похожими на внешниеStreams, так что мне теперь делать? На изображении ниже показано, что это означает:

На изображении выше мы видим, как объединяются несколько потоков:

  • Когда мы пишем объединенноеStreams, мы на самом деле пишем комбинированныйStreamsпервая единицаStreamA.
  • когда мы объединяемStreamsПри чтении информации из , мы фактически читаем из объединенногоStreamsчитать в последней единице .

комбинированныйStreamsобычно несколькоStreams, соединив конец записи первого блока и конец чтения последнего блока.

Чтобы создать несколько потоков из двух разных потоков (потоки для чтения и потоки для записи), мы можем использовать модуль npm, например.duplexer2.

Но это еще не все. В самом деле, объединенныйStreamsОн также должен быть захвачен на любом участке трубопровода.StreamsОшибка, генерируемая устройством. Мы говорили, что любые ошибки не распространяются автоматически по конвейеру. Итак, у нас должно быть правильное управление ошибками, нам придется явно прикреплять прослушиватель ошибок к каждомуStreams. Тем не менее, комбинированныйStreamsНа самом деле это черный ящик, что означает, что у нас нет доступа ни к одному модулю в середине конвейера, поэтому для перехвата исключений из любого модуля в конвейере комбинированныйStreamsТакже выступает в роли агрегатора.

В общем, комбинированныйStreamsИмеет два основных преимущества:

  • Внутри трубы находится черный ящик, невидимый для пользователя.
  • Упрощает управление ошибками, потому что нам не нужно прикреплять прослушиватель ошибок к каждому блоку в конвейере, а только к комбинированномуStreamsПросто прикрепите его сами.

комбинированныйStreamsявляется очень общей и распространенной практикой, поэтому, если у нас нет особых потребностей, мы можем просто повторно использовать существующее решение, напримерmultipipeилиcombine-stream.

Реализация составных потоков

Чтобы проиллюстрировать простой пример, давайте рассмотрим следующие две комбинацииStreamsСлучай:

  • Сжимайте и шифруйте данные
  • Распаковать и расшифровать данные

используя что-то вродеmultipipeтаких как библиотеки, мы можем объединить некоторые существующие основные библиотеки, комбинируяStreams(документcombinedStreams.js), чтобы легко построить комбинированныйStreams:

const zlib = require('zlib');
const crypto = require('crypto');
const combine = require('multipipe');
module.exports.compressAndEncrypt = password => {
  return combine(
    zlib.createGzip(),
    crypto.createCipher('aes192', password)
  );
};
module.exports.decryptAndDecompress = password => {
  return combine(
    crypto.createDecipher('aes192', password),
    zlib.createGunzip()
  );
};

Например, теперь мы можем использовать эти комбинированные потоки данных, как невидимые для нас черные ящики, для создания небольшого приложения, которое архивирует файлы со сжатием и шифрованием. Начнем сarchive.jsсделать это в новом модуле:

const fs = require('fs');
const compressAndEncryptStream = require('./combinedStreams').compressAndEncrypt;
fs.createReadStream(process.argv[3])
  .pipe(compressAndEncryptStream(process.argv[2]))
  .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"));

Мы можем сделать это, построив комбинированныйStreamДавайте еще улучшим предыдущий код, но на этот раз не просто для получения невидимого для внешнего мира черного ящика, а для отлова исключений. На самом деле, как мы уже упоминали, код, подобный приведенному ниже, поймает только последнийStreamОшибка, выдаваемая юнитом:

fs.createReadStream(process.argv[3])
  .pipe(compressAndEncryptStream(process.argv[2]))
  .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"))
  .on('error', function(err) {
    // 只会捕获最后一个单元的错误
    console.log(err);
  });

Однако, положив всеStreamsВместе мы можем элегантно решить эту проблему. рефакторингarchive.jsследующее:

const combine = require('multipipe');
   const fs = require('fs');
   const compressAndEncryptStream =
     require('./combinedStreams').compressAndEncrypt;
   combine(
     fs.createReadStream(process.argv[3])
     .pipe(compressAndEncryptStream(process.argv[2]))
     .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"))
   ).on('error', err => {
     // 使用组合的Stream可以捕获任意位置的错误
     console.log(err);
   });

Как мы видим, теперь мы можем прикрепить обработчик ошибок непосредственно к составленномуStreams, он получит любойerrorсобытие. Теперь, чтобы запуститьarchiveмодули, достаточно указать в аргументах командной строкиpasswordиfileПараметры, то есть параметры модуля сжатия:

node archive mypassword /path/to/a/file.text

На этом примере мы ясно продемонстрировали, что комбинированноеStreamКак важно, с одной стороны, это позволяет нам создавать многократно используемые композиции потоков, с другой – упрощает управление ошибками пайплайна.

разделенныйStreams

Мы можем сделать это, поместив один читаемыйStreamпередан в несколько записываемыхStreamвыполнитьStreamфилиал. Это вступает в игру, когда мы хотим отправить одни и те же данные в разные места назначения, например, в два разных сокета или два разных файла. Его также можно использовать, когда мы хотим выполнить различные преобразования одних и тех же данных или когда мы хотим разделить данные на основе некоторых критериев. как показано на рисунке:

существуетNode.jsразделены вStreamэто маленькая вещь. Например.

Реализовать генератор нескольких контрольных сумм

Давайте создадим файл, который выводит данный файлsha1иmd5Хеширующий гаджет. Назовем этот новый модульgenerateHashes.js, см. следующий код:

const fs = require('fs');
const crypto = require('crypto');
const sha1Stream = crypto.createHash('sha1');
sha1Stream.setEncoding('base64');
const md5Stream = crypto.createHash('md5');
md5Stream.setEncoding('base64');

Пока ничего особенного В следующей части модуля мы создадим удобочитаемыйStream, и разветвите его на два разных потока, чтобы получить два других файла, один из которых содержитsha1хэш, другой содержитmd5Контрольная сумма:

const inputFile = process.argv[2];
const inputStream = fs.createReadStream(inputFile);
inputStream
  .pipe(sha1Stream)
  .pipe(fs.createWriteStream(inputFile + '.sha1'));
inputStream
  .pipe(md5Stream)
  .pipe(fs.createWriteStream(inputFile + '.md5'));

это очень просто:inputStreamПеременные передаются бок о бок вsha1Stream, другая сторона вводится вmd5Stream. Но будьте осторожны:

  • когдаinputStreamкогда закончите,md5Streamиsha1Streamзавершится автоматически, если не будет вызваноpipe()указано, когдаendВариантыfalse.

  • StreamОбе ветви 's будут принимать один и тот же фрагмент данных, поэтому мы должны быть очень осторожны, когда делаем что-то с побочными эффектами для данных, потому что это повлияет на другую ветвь.

  • Противодавление создается за пределами черного ящика, отinputStreamСкорость потока данных регулируется в соответствии со скоростью потока ветки, которая получает самую медленную.

комбинированныйStreams

Слияние отличается от разделения путем объединения набора читаемыхStreamsобъединены в один записываемыйStream, как показано на рисунке:

поставить несколькоStreamsОбъединение в одно обычно является простой операцией, однако мы должны позаботиться о том, чтобы иметь дело сendсобытий, потому что конвейерная система, использующая параметр автоматического завершения, завершает поток назначения, как только заканчивается один из источников. Обычно это приводит к ошибке, так как другие незавершенные источники будут продолжать писать в завершенныйStream. Обходной путь для этого — использовать параметры при передаче нескольких источников в одно место назначения.{end:false}, и только после прочтения всех исходниковStreamзвонитьend().

Сжать несколько исходных файлов в один архив

В качестве простого примера давайте реализуем небольшую программу, которая создает архив на основе содержимого двух разных каталогов. С этой целью мы введем два новыхnpmМодуль:

  • tarиспользуется для создания архивов
  • fstreamБиблиотека для создания потоков объектов из файлов файловой системы.

Создаем новый модульmergeTar.js, начните инициализацию следующим образом:

var tar = require('tar');
var fstream = require('fstream');
var path = require('path');
var destination = path.resolve(process.argv[2]);
var sourceA = path.resolve(process.argv[3]);
var sourceB = path.resolve(process.argv[4]);

В предыдущем коде мы просто загружаем все зависимости и инициализируем включаемый объектный файл и два исходных каталога (sourceAиsourceB) Переменные.

Далее мы создаемtarизStreamи направить вывод на записываемыйStream:

const pack = tar.Pack();
pack.pipe(fstream.Writer(destination));

Теперь приступаем к инициализации источникаStream

let endCount = 0;

function onEnd() {
  if (++endCount === 2) {
    pack.end();
  }
}

const sourceStreamA = fstream.Reader({
    type: "Directory",
    path: sourceA
  })
  .on('end', onEnd);

const sourceStreamB = fstream.Reader({
    type: "Directory",
    path: sourceB
  })
  .on('end', onEnd);

В предыдущем коде мы создали два исходных каталога (sourceStreamAиsourceStreamB) читать вStreamзатем для каждого источникаStream, мы добавляемendПодписчик событий, который срабатывает только тогда, когда оба каталога полностью прочитаныpackизendсобытие.

Наконец, объедините дваStream:

sourceStreamA.pipe(pack, {end: false});
sourceStreamB.pipe(pack, {end: false});

Мы сжимаем оба исходных файла вpackэтоStream, и установивpipe()изoptionПараметры{end:false}Настроить конечную точкуStreamавтоматический триггерendсобытие.

Итак, мы выполнили нашу простуюTARпрограмма. Мы можем попробовать запустить эту утилиту, указав объектный файл в качестве первого аргумента командной строки, а затем два исходных каталога:

node mergeTar dest.tar /path/to/sourceA /path/to/sourceB

существуетnpmВ мы можем найти что-то, что можно упроститьStreamОбъединенный модуль:

Обратите внимание, что цель притокаStreamДанные перемешиваются случайным образом, что является приемлемым свойством для некоторых типов потоков объектов (как мы видели в предыдущем примере), но при работе с бинарнымиStreamКогда обычно этого не хочется.

Тем не менее, мы можем последовательно объединяться по шаблонуStream; он состоит из слияния источников один за другимStream, когда закончится предыдущий, начните отправлять второй блок данных (типа подключения всех источниковStreamвыход одинаковый). существуетnpmНа , мы можем найти некоторые пакеты, которые также обрабатывают эту ситуацию. Один из них являетсяmultistream.

Мультиплексирование и демультиплексирование

сливатьсяStreamрежим имеет специальный режим, мы не очень хотим объединять несколькоStreamобъединены вместе и вместо этого используют общий канал для передачи набора данныхStream. Не так, как раньше, потому что исходные данныеStreamПоддержание логического разделения внутри общего канала, что позволяет нам снова разделить данные, как только они достигнут другого конца общего канала.Stream. как показано на рисунке:

поставить несколькоStreamобъединены в одинStreamОперация по передаче называется мультиплексированием, а обратная операция (т.е.StreamПолучите данные для реконструкции оригиналаStream) называется демультиплексированием. Устройства, выполняющие эти операции, называются мультиплексорами и демультиплексорами соответственно (. Это широко изучаемая тема в информатике и телекоммуникациях, так как она является предметом практически любого типа средств связи, таких как телефония, радио, телевидение и, конечно же, одна из основ самого Интернета Мы не будем вдаваться в подробности объема этой книги, потому что это очень большая тема.

В этом разделе мы хотим продемонстрировать, как использовать общиеNode.js Streamsдля передачи нескольких логически разделенныхStream, а затем поделитьсяStreamДругой конец снова разделяется, то есть мультиплексирование и демультиплексирование реализуются один раз.

Создайте журнал удаленного ведения журнала

В качестве примера мы хотели бы иметь небольшую программу, которая запускает дочерний процесс и перенаправляет как его стандартный вывод, так и стандартную ошибку на удаленный сервер, который принимает их и сохраняет в виде двух отдельных файлов. Таким образом, в этом случае общая средаTCPсоединение, а два мультиплексируемых канала являются дочерними процессамиstdoutиstderr. Мы будем использовать технологию коммутации пакетов, котораяIP,TCPилиUDPТе же методы используются в таких протоколах, как , включая инкапсуляцию данных в пакеты, что позволяет нам указывать различную исходную информацию, что очень полезно для мультиплексирования, маршрутизации, управления потоком и проверки на наличие поврежденных данных.

Как показано на рисунке, протокол этого примера примерно такой, данные инкапсулируются в пакеты со следующей структурой:

Реализация мультиплексирования на стороне клиента

Давайте сначала поговорим о клиенте, создайте файл с именемclient.jsмодуль, который является частью нашего приложения, отвечает за запуск дочернего процесса и реализациюStreamМультиплексирование.

Чтобы начать определение модуля, сначала загрузите зависимости:

const child_process = require('child_process');
const net = require('net');

Затем начните реализацию мультиплексной функции:

function multiplexChannels(sources, destination) {
  let totalChannels = sources.length;

  for(let i = 0; i < sources.length; i++) {
    sources[i]
      .on('readable', function() { // [1]
        let chunk;
        while ((chunk = this.read()) !== null) {
          const outBuff = new Buffer(1 + 4 + chunk.length); // [2]
          outBuff.writeUInt8(i, 0);
          outBuff.writeUInt32BE(chunk.length, 1);
          chunk.copy(outBuff, 5);
          console.log('Sending packet to channel: ' + i);
          destination.write(outBuff); // [3]
        }
      })
      .on('end', () => { //[4]
        if (--totalChannels === 0) {
          destination.end();
        }
      });
  }
}

multiplexChannels()Функция принимает источник для повторного использованияStreamкак ввод и интерфейс мультиплексирования в качестве параметров, затем выполните следующие действия:

  1. для каждого источникаStream, он регистрируетreadableпрослушиватель событий, мы используемnon-flowingРежим чтения данных из потока.

  2. Каждый раз, когда считывается блок данных, мы инкапсулируем его в заголовок. Порядок заголовков следующий:channel IDсоставляет 1 байт (UInt8), размер пакета 4 байта (UInt32BE), а затем фактические данные.

  3. Когда пакет готов, мы записываем его в цельStream.

  4. мыendсобытие регистрирует прослушиватель, чтобы, когда все источникиStreamкогда закончите,endТриггер события, уведомить цельStreamкурокendсобытие.

Обратите внимание, что наш протокол способен мультиплексировать до 256 различных исходных потоков, поскольку у нас есть только 1 байт для идентификации.channel.

const socket = net.connect(3000, () => { // [1]
  const child = child_process.fork( // [2]
    process.argv[2],
    process.argv.slice(3), {
      silent: true
    }
  );
  multiplexChannels([child.stdout, child.stderr], socket); // [3]
});

В конце делаем следующее:

  1. мы создаем новыйTCPКлиент подключается к адресуlocalhost:3000.
  2. Мы запускаем подпроцесс, используя первый аргумент командной строки в качестве пути, в то время как мы предоставляем остальныеprocess.argvМассивы в качестве аргументов дочернего процесса. мы указываем параметры{silent:true}, чтобы дочерний процесс не наследовал родительскийstdoutиstderr.
  3. Мы используемmutiplexChannels()функция будетstdoutиstderrмультиплексируется вsocketвнутри.
Реализовать демультиплексирование на стороне сервера

Теперь смотрим на сервер, создаемserver.jsмодуль, здесь мы будем подключатьStreamДемультиплексируйте и направьте их в два разных файла.

Сначала создайте файл с именемdemultiplexChannel()Функция:

function demultiplexChannel(source, destinations) {
  let currentChannel = null;
  let currentLength = null;
  source
    .on('readable', () => { //[1]
      let chunk;
      if(currentChannel === null) {          //[2]
        chunk = source.read(1);
        currentChannel = chunk && chunk.readUInt8(0);
      }
    
      if(currentLength === null) {          //[3]
        chunk = source.read(4);
        currentLength = chunk && chunk.readUInt32BE(0);
        if(currentLength === null) {
          return;
        }
      }
    
      chunk = source.read(currentLength);        //[4]
      if(chunk === null) {
        return;
      }
    
      console.log('Received packet from: ' + currentChannel);
    
      destinations[currentChannel].write(chunk);      //[5]
      currentChannel = null;
      currentLength = null;
    })
    .on('end', () => {            //[6]
      destinations.forEach(destination => destination.end());
      console.log('Source channel closed');
    })
  ;
}

Приведенный выше код может показаться сложным, но при внимательном чтении это не так; из-заNode.jsудобочитаемыйStreamС помощью функции pull мы можем легко реализовать демультиплексирование нашего небольшого протокола следующим образом:

  1. мы начинаем использоватьnon-flowingРежим чтения данных из потока.
  2. Во-первых, если мы не читалиchannel ID, пытаемся прочитать из потока 1 байт и преобразовать его в число.
  3. Следующим шагом является чтение длины заголовка. Нам нужно прочитать 4 байта, поэтому возможно, что внутреннеBufferДанных пока недостаточно, поэтомуthis.read()звонок возвращаетсяnull. В этом случае мы просто прерываем синтаксический анализ и повторяем следующий.readableсобытие.
  4. Когда мы, наконец, сможем прочитать размер данных, мы узнаем из внутреннегоBufferсколько данных загружается, поэтому мы пытаемся прочитать все данные.
  5. Когда мы прочитаем все данные, мы можем записать их в правильный канал назначения, обязательно не забудьте сброситьcurrentChannelиcurrentLengthпеременные (эти переменные будут использоваться для разбора следующего пакета).
  6. Наконец, когда источникchannelВ конце обязательно не забудьте вызвать цельStreamизend()метод.

Поскольку мы можем демультиплексировать источникStream, выполните следующий вызов:

net.createServer(socket => {
  const stdoutStream = fs.createWriteStream('stdout.log');
  const stderrStream = fs.createWriteStream('stderr.log');
  demultiplexChannel(socket, [stdoutStream, stderrStream]);
})
  .listen(3000, () => console.log('Server started'))
;

В приведенном выше коде мы сначала3000запустить один в портуTCPserver, то для каждого получаемого соединения мы создадим два доступных для записиStream, указывая на два разных файла, один для stdout и один для stderr; это наши целиchannel. Наконец, мы используемdemultiplexChannel()Демультиплексировать поток сокетов какstdoutStreamиstderrStream.

Запуск приложений мультиплексирования и демультиплексирования

Теперь мы готовы попробовать запустить наше новое приложение мультиплексирования/демультиплексирования, но сначала давайте создадим небольшойNode.jsпрограмма для создания примера вывода; назовем ееgenerateData.js:

console.log("out1");
console.log("out2");
console.error("err1");
console.log("out3");
console.error("err2");

Во-первых, давайте запустим сервер:

node server

Затем запустите клиент, вам нужно указать параметр файла в качестве подпроцесса:

node client generateData.js

Клиент запускается почти сразу, но когда процесс завершается,generateDataСтандартный ввод и стандартный вывод приложения проходят черезTCPСоединение затем на стороне сервера демультиплексируется в два файла.

Обратите внимание, что когда мы используемchild_process.fork(), наш клиент может начать другиеNode.jsмодуль.

Мультиплексирование и демультиплексирование потоков объектов

Пример, который мы только что показали, демонстрирует, как мультиплексировать и демультиплексировать двоичные/текстовые файлы.Stream, но стоит отметить, что те же правила применяются к объектамStream. Большая разница в том, что с объектами у нас уже есть методы для передачи данных с использованием атомарных сообщений (объектов), поэтому мультиплексирование похоже на установку свойства.channel IDтак же просто, как и каждый объект, а демультиплексирование требует только чтения·channel IDсвойства и маршрутизировать каждый объект к правильной целиStream.

Другой шаблон заключается в том, чтобы взять несколько свойств объекта и распределить их по нескольким целям.StreamШаблон С помощью этого шаблона мы можем реализовать сложные процессы, как показано на следующем рисунке:

Как показано выше, возьмите объектStreamвыражатьanimals, а затем в зависимости от вида животного:reptiles,amphibiansиmammals, а затем распространить на правильную цельStreamсередина.

Суммировать

В этой главе у нас естьNode.js Streamsи варианты его использования, но в то же время должны открыть двери парадигмам программирования с почти безграничными возможностями. мы понимаем, почемуStreamодеялоNode.jsСообщество их похвалило, и мы освоили их базовый функционал, что позволило нам делать с ним более интересные вещи. Мы анализируем некоторые сложные шаблоны и начинаем понимать, как различные конфигурацииStreamsСоединив вместе, освойте эти функции, которые делают потоковую передачу такой универсальной и мощной.

Если мы столкнулись с невозможностью использоватьStreamДля достижения функции мы можем передать другиеStreamsсоединены вместе для достижения этогоNode.jsприятная особенность ;StreamsПри работе с двоичными данными строки и объекты очень полезны и имеют разные характеристики.

В следующей главе мы сосредоточимся на традиционных шаблонах объектно-ориентированного проектирования. несмотря на то чтоJavaScriptОбъектно-ориентированный язык в некоторой степени, но вNode.jsобычно предпочтительнее использовать функциональный или гибридный подход. Прочтите следующую главу, чтобы узнать ответ.