Эта серия статей являетсяШаблоны проектирования Node.js, второе изданиеОригинальный перевод и примечания к чтению, сериализованные и обновленные на GitHub,Синхронизировать ссылку на перевод.
Добро пожаловать, обратите внимание на мою колонку, в колонке будут синхронизированы следующие записи блога:
- Колонка самородков Encounter
- Зная о программировании мышления Encounter
- интерфейсный сайт столбца segmentfault
Coding with Streams
Streams
даNode.js
Один из важнейших компонентов и узоров. В сообществе есть поговорка «Стримить все (Steam — это все)», и одного этого достаточно, чтобы описатьNode.js
положение в.Dominic Tarr
в видеNode.js
Крупнейший участник сообщества, который определяет поток какNode.js
Самая лучшая и самая сложная концепция для понимания.
сделатьNode.js
изStreams
Есть и другие причины такой привлекательности; кроме того,Streams
Не только о технических характеристиках, таких как производительность или эффективность, но, что более важно, об их элегантности и о том, как они связаны сNode.js
То, как концепция дизайна подходит идеально.
В этой главе вы узнаете следующее:
-
Streams
заNode.js
важность. - Как создать и использовать
Streams
. -
Streams
Как парадигма программирования, а не только дляI/O
С точки зрения его применения и мощных функций в различных сценариях применения. - Конвейерный режим и подключение в разных конфигурациях
Streams
.
Откройте для себя важность потоков
на событийных платформах, таких какNode.js
), обработкаI / O
Самый эффективный метод — обработка в реальном времени: как только есть входная информация, она будет обработана немедленно, а как только появится результат, который нужно вывести, сразу же будет выведена обратная связь.
В этом разделе мы сначала представимNode.js
изStreams
и его преимущества. Помните, что это всего лишь обзор, так как использование и комбинация будут подробно описаны позже в этой главе.Streams
.
Сравнение потоков и буферов
Почти каждый асинхронный API, который мы видели в этой книге, используетBuffer
модель. Для операций ввода,Buffer
Схема соберет все данные из ресурса вBuffer
области; как только весь ресурс будет прочитан, результат передается в функцию обратного вызова. На следующем рисунке показан реальный пример этой парадигмы:
Из приведенного выше рисунка мы видим, чтоt1Иногда некоторые данные принимаются от ресурса и сохраняются в буфере. существуетt2В данный момент последний фрагмент данных принят в другой блок данных, и операция чтения завершена, в это время потребителю отправляется содержимое всего буфера.
с другой стороны,Streams
Позволяет обрабатывать данные по мере их поступления. Как показано ниже:
Это изображение показываетStreams
Как получать каждый новый блок данных от ресурса и сразу предоставлять его потребителю, теперь потребителю не нужно ждать, пока все данные соберутся в буфер, перед обработкой каждого блока данных.
Но в чем разница между этими двумя методами? Мы можем обобщить их на два пункта:
- космическая эффективность
- эффективность времени
также,Node.js
изStreams
Имеет еще одно важное преимущество:компонуемость. Теперь давайте посмотрим, как эти свойства влияют на то, как мы разрабатываем и пишем приложения.
космическая эффективность
Во-первых,Streams
Позволяет нам делать вещи, которые кажутся невозможными, путем буферизации данных и их одновременной обработки. Например, предположим, что нам нужно прочитать очень большой файл, скажем, сотниMB
даже тысячаMB
. По-видимому, ожидание полного чтения файла возвращает большиеBuffer
изAPI
Не хорошая идея. Представьте, что если несколько больших файлов будут читаться одновременно, нашему приложению может легко не хватить памяти. Кроме,V8
серединаBuffer
не может быть больше, чем0x3FFFFFFF
байт (менее1GB
). Таким образом, мы можем столкнуться со стеной до того, как закончится физическая память.
Сжимайте файлы с помощью Buffered API
В качестве конкретного примера рассмотрим простой интерфейс командной строки (CLI
) приложение, которое используетGzip
Отформатируйте сжатый файл. использоватьBuffered
изAPI
, такое приложение вNode.js
Примерно так (обработка исключений для краткости опущена):
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.readFile(file, (err, buffer) => {
zlib.gzip(buffer, (err, buffer) => {
fs.writeFile(file + '.gz', buffer, err => {
console.log('File successfully compressed');
});
});
});
Теперь мы можем попробовать поместить предыдущий код в файл с именемgzip.js
файл, а затем выполните следующую команду:
node gzip <path to file>
Если мы выберем достаточно большой файл, скажем, больше, чем1GB
файл, мы получаем сообщение об ошибке, говорящее о том, что файл, который мы хотим прочитать, больше, чем максимально допустимый размер буфера, например:
RangeError: File size is greater than possible Buffer:0x3FFFFFFF
В приведенном выше примере большой файл не найден, но скорость чтения больших файлов действительно намного ниже.
Как мы и предполагали, используяBuffer
Очевидно, что читать большие файлы неправильно.
Сжимайте файлы с помощью потоков
мы должны исправить нашиGzip
приложение, и самый простой способ заставить его работать с большими файлами — использоватьStreams
изAPI
. Давайте посмотрим, как этого добиться. Давайте заменим содержимое модуля, который мы только что создали, следующим кодом:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'))
.on('finish', () => console.log('File successfully compressed'));
«Правда?» — спросите вы. Да, как мы сказали, из-заStreams
интерфейс и компонуемость, поэтому мы также можем писать такой более краткий, элегантный и изысканный код. Мы подробно рассмотрим это позже, но сейчас важно понять, что программы могут без проблем работать с файлами любого размера, в идеале с постоянным использованием памяти. Попробуйте (но учтите, что сжатие большого файла может занять некоторое время).
эффективность времени
Теперь рассмотрим сжатый файл и загрузим его на удаленныйHTTP
Примеры серверных приложений, которые удаленноHTTP
Сервер, в свою очередь, распаковывает и сохраняет его в файловой системе. Если наш клиент используетBuffered
изAPI
реализовано, загрузка начнется только после того, как весь файл будет прочитан и сжат. С другой стороны, распаковка начнется на сервере только после получения всех данных. Лучшее решение для достижения того же результата включает использованиеStreams
. На клиентской машинеStreams
Блоки данных могут быть сжаты и отправлены до тех пор, пока они считываются из файловой системы, в то время как на сервере каждый блок данных может быть распакован, пока он получен от удаленного узла. Мы демонстрируем это, создавая вышеупомянутое приложение, начиная со стороны сервера.
Мы создаемgzipReceive.js
модуль, код выглядит следующим образом:
const http = require('http');
const fs = require('fs');
const zlib = require('zlib');
const server = http.createServer((req, res) => {
const filename = req.headers.filename;
console.log('File request received: ' + filename);
req
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream(filename))
.on('finish', () => {
res.writeHead(201, {
'Content-Type': 'text/plain'
});
res.end('That\'s it\n');
console.log(`File saved: ${filename}`);
});
});
server.listen(3000, () => console.log('Listening'));
Сервер получает чанки из сети, распаковывает их и сохраняет чанки, как только они получены, благодаряNode.js
изStreams
.
Клиенты нашего приложения будут вводить файл с именемgzipSend.js
модуль следующим образом:
В предыдущем коде мы снова используемStreams
Чтение данных из файла, затем сжатие и отправка каждого блока данных при чтении из файловой системы.
Теперь, чтобы запустить приложение, мы сначала запускаем сервер с помощью следующей команды:
node gzipReceive
Затем мы можем указать файл для отправки и адрес сервера, указав (например,localhost
) для запуска клиента:
node gzipSend <path to file> localhost
Если мы выберем достаточно большой файл, нам будет легче увидеть, как данные передаются от клиента к серверу, но почему в этом режиме мы используемStreams
, чем при использованииBuffered
изAPI
более эффективным? Изображение ниже должно дать нам подсказку:
Процесс обработки файла проходит следующие этапы:
- Клиент читает из файловой системы
- Сжатые данные клиента
- Клиент отправляет данные на сервер
- Сервер получает данные
- Сервер распаковывает данные
- Сервер записывает данные на диск
Чтобы завершить обработку, мы должны пройти каждый этап в конвейерном порядке до конца. На изображении выше мы видим, что с помощьюBuffered
изAPI
, процесс полностью последователен. Чтобы сжать данные, мы сначала должны дождаться, пока будет прочитан весь файл, затем, чтобы отправить данные, мы должны дождаться, пока весь файл будет прочитан и сжат, и так далее. когда мы используемStreams
, конвейер запустится, как только мы получим первый блок данных, не дожидаясь чтения всего файла. Но еще более удивительно то, что когда становится доступным очередной фрагмент данных, нет необходимости ждать завершения предыдущего набора задач, вместо этого параллельно запускается еще одна сборочная линия. Поскольку каждая задача, которую мы выполняем, является асинхронной, что идеально, ее можно выполнить с помощьюNode.js
выполнять параллельноStreams
связанные операции; единственным ограничением является то, что каждый этап должен гарантировать порядок поступления блоков данных.
Как видно из предыдущего рисунка, при использованииStreams
В результате весь процесс занимает меньше времени, поскольку нам не нужно ждать, пока все данные будут прочитаны и обработаны.
композиционность
Код, который мы видели до сих пор, говорил нам, как использоватьpipe()
способ сборкиStreams
блок данных,Streams
Позволяет нам подключать различные процессорные блоки, каждый из которых несет единую ответственность (это согласуется сNode.js
Стиль). Это возможно, потому чтоStreams
имеет единый интерфейс иAPI
с точки зрения различныхStreams
Он также может хорошо взаимодействовать. Единственным условием является следующий из конвейераStreams
должны поддерживать предыдущиеStreams
Результирующий тип данных может быть двоичным, текстовым или даже объектным, как мы увидим в следующих главах.
чтобы доказатьStreams
Преимущество композиционности, мы можем попытаться построитьgzipReceive / gzipSend
Добавьте функцию шифрования в приложение.
Для этого нам просто нужно добавить еще одинStreams
для обновления клиента. В частности, поcrypto.createChipher()
Возвращенный поток. Результирующий код должен выглядеть так:
const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');
const http = require('http');
const path = require('path');
const file = process.argv[2];
const server = process.argv[3];
const options = {
hostname: server,
port: 3000,
path: '/',
method: 'PUT',
headers: {
filename: path.basename(file),
'Content-Type': 'application/octet-stream',
'Content-Encoding': 'gzip'
}
};
const req = http.request(options, res => {
console.log('Server response: ' + res.statusCode);
});
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(crypto.createCipher('aes192', 'a_shared_secret'))
.pipe(req)
.on('finish', () => {
console.log('File successfully sent');
});
Точно так же мы обновляем код на стороне сервера, чтобы он мог расшифровать блок данных перед его распаковкой:
const http = require('http');
const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');
const server = http.createServer((req, res) => {
const filename = req.headers.filename;
console.log('File request received: ' + filename);
req
.pipe(crypto.createDecipher('aes192', 'a_shared_secret'))
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream(filename))
.on('finish', () => {
res.writeHead(201, {
'Content-Type': 'text/plain'
});
res.end('That\'s it\n');
console.log(`File saved: ${filename}`);
});
});
server.listen(3000, () => console.log('Listening'));
cryptoЭто один из основных модулей Node.js, предоставляющий ряд алгоритмов шифрования.
Всего несколькими строками кода мы добавили в наше приложение уровень шифрования. Нам просто нужно просто передать существующийStreams
Модуль и уровень шифрования объединены вместе, и все. Точно так же мы можем добавлять и объединять другиеStreams
, как будто играешь с кубиками Лего.
Очевидно, что основным преимуществом этого подхода является возможность повторного использования, но, как видно из кода, представленного до сих пор,Streams
Также можно получить более четкий, модульный и лаконичный код. По этим причинам потоки часто используются не только для обработки чистых данных.I / O
, а также средство упрощения и модульности кода.
Начало работы с потоками
В предыдущих главах мы видели, почемуStreams
такой мощный, и этоNode.js
везде, даже вNode.js
Он также присутствует в основном модуле . Например, мы видели, что,fs
модуль имеет для чтения из файлаcreateReadStream()
и для записи в файлcreateWriteStream()
,HTTP
Объекты запроса и ответа по существуStreams
,иzlib
модули позволяют использоватьStreams
РежимAPI
Сжимайте и распаковывайте блоки данных.
Теперь мы знаем, почемуStreams
Это так важно, давайте сделаем шаг назад и начнем изучать его более подробно.
Структура потоков
Node.js
каждый изStreams
обаStreams
Реализация одного из четырех основных абстрактных классов, доступных в основном модуле:
stream.Readable
stream.Writable
stream.Duplex
stream.Transform
каждыйstream
класс такжеEventEmitter
экземпляр . По факту,Streams
Можно генерировать несколько типов событий, напримерend
События будут в читаемомStreams
Запускается, когда чтение завершено, или считывается ошибка, или во время процесса возникает исключение.
Обратите внимание, что для краткости мы часто опускаем правильную обработку ошибок в примерах, представленных в этой главе. Однако в производственной среде всегда рекомендуется регистрировать прослушиватели событий ошибок для всех потоков.
Streams
Одна из причин его гибкости заключается в том, что он может обрабатывать не только двоичные данные, но и почти любыеJavaScript
ценность. По факту,Streams
Могут поддерживаться два режима работы:
- Двоичный режим: в блоках данных (например,
buffers
илиstrings
) потоковые данные - Шаблон объекта: обрабатывать потоковые данные как ряд дискретных объектов (это позволяет нам использовать практически любой
JavaScript
ценность)
Эти два режима работы позволяют не только использоватьI / O
потоки, но и как инструмент элегантного комбинирования блоков обработки в функциональном стиле, как мы увидим позже в этой главе.
В этой главе мы будем в основном использовать потоковый интерфейс Node.js, представленный в Node.js 0.11, также известный как версия 3. См. отличный пост в блоге StrongLoop по адресу https://strongloop.com/strongblog/whats-new-io-js-beta-streams3/ для получения более подробной информации об отличиях от старого интерфейса.
Читаемые потоки
читаемыйStreams
представляет собой источник данных, вNode.js
, оно используетstream
в модулеReadableabstract
реализация класса.
Чтение информации из потоков
читаемый изStreams
Существует два способа получения данных:non-flowing
узор иflowing
модель. Разберем эти закономерности более подробно.
нетекучий режим (non-flowing mode)
из читаемогоStreams
Режим чтения данных по умолчанию заключается в присоединении к нему прослушивателя событий readable, который указывает на доступность новых данных для чтения. Затем в цикле мы считываем все данные до тех пор, пока внутреннийbuffer
опустеет. Это можно использоватьread()
завершается метод, который синхронно считывает данные из внутреннего буфера и возвращаетBuffer
илиString
объект.read()
Методы используются по следующей схеме:
readable.read([size]);
Используя этот подход, данные легко доступны непосредственно изStreams
Выписка по требованию.
Чтобы проиллюстрировать, как это работает, создадим файл с именемreadStdin.js
, который реализует простую программу, которая считывает данные со стандартного ввода (читаемый поток) и выводит все данные на стандартный вывод:
process.stdin
.on('readable', () => {
let chunk;
console.log('New data available');
while ((chunk = process.stdin.read()) !== null) {
console.log(
`Chunk read: (${chunk.length}) "${chunk.toString()}"`
);
}
})
.on('end', () => process.stdout.write('End of stream'));
read()
метод является синхронной операцией, он начинается с читаемогоStreams
внутреннийBuffers
Извлеките блоки данных из области. еслиStreams
Работает в двоичном режиме, возвращаемый блок данных по умолчаниюBuffer
объект.
В читаемом потоке, работающем в двоичном режиме, мы можем прочитать строку вместо объекта Buffer, вызвав setEncoding(encoding) в потоке и предоставив допустимый формат кодирования (например, utf8).
Данные считываются из читаемого слушателя, который вызывается всякий раз, когда появляются новые данные. Когда во внутреннем буфере больше нет данных,read()
метод возвращаетnull
; в этом случае мы должны дождаться другого читаемого события, сообщающего нам, что мы можем прочитать снова, или дождаться представленияStreams
процесс чтения завершенend
Событие запущено. Когда поток работает в бинарном режиме, мы также можем передатьread()
метод проходитsize
параметр, чтобы указать размер данных, которые мы хотим прочитать. Это особенно полезно при реализации сетевых протоколов или разборе определенных форматов данных.
Теперь мы готовы бежатьreadStdin
модуль и эксперимент. Давайте наберем в консоли несколько символов и нажмемEnter
чтобы просмотреть данные, выводимые на стандартный вывод. Чтобы завершить поток и, таким образом, сгенерировать обычное конечное событие, нам нужно вставитьEOF
(конец файла) символ (вWindows
использовать наCtrl + Z
или вLinux
использовать наCtrl + D
).
Мы также можем попробовать связать нашу программу с другими программами, это можно сделать с помощью оператора канала (|
), который перенаправляет стандартный вывод программы на стандартный ввод другой программы. Например, мы можем запустить следующую команду:
cat <path to a file> | node readStdin
Это отличный пример того, как потоковая парадигма является универсальным интерфейсом, который позволяет нашим программам взаимодействовать независимо от того, на каком языке они написаны.
проточный режим
отStreams
Еще один способ считывания — присоединить прослушиватель кdata
событие; это будетStreams
переключить наflowing
режим, в котором данные не используютсяread()
функцию для извлечения, но как только данные поступятdata
Слушатель вталкивается в слушателя. Например, мы создали ранееreadStdin
Приложение будет использовать потоковый режим:
process.stdin
.on('data', chunk => {
console.log('New data available');
console.log(
`Chunk read: (${chunk.length}) "${chunk.toString()}"`
);
})
.on('end', () => process.stdout.write('End of stream'));
flowing
Устаревший режимStreams
интерфейс (также известный какStreams1
), который менее гибкий,API
меньше. вместе сStreams2
введение интерфейса,flowing
режим не является рабочим режимом по умолчанию, чтобы включить его, слушатель должен быть подключен кdata
событие или явный вызовresume()
метод. временно прерватьStreams
курокdata
событие, мы можем назватьpause()
метод, заставляющий любые входящие данные кэшироваться внутриbuffer
середина.
Вызов pause() не приведет к переключению Streams обратно в нетекущий режим.
Реализовать читаемые потоки
Теперь мы знаем, какStreams
читать данные, следующим шагом будет узнать, как реализовать новыйReadable
поток данных. Для этого необходимо передать наследованиеstream.Readable
прототип для создания нового класса. Бетонный поток должен обеспечивать_read()
Реализация метода:
readable._read(size)
Readable
Внутри класса вызовет_read()
метод, который, в свою очередь, запускает
использоватьpush()
Заполнить внутренний буфер:
Обратите внимание, что read() — это метод, вызываемый потребителями Stream, а _read() — это метод, реализованный подклассами Stream, и его нельзя вызывать напрямую. Символы подчеркивания обычно указывают на то, что метод является закрытым и не должен вызываться напрямую.
Чтобы продемонстрировать, как реализовать новый читаемыйStreams
, мы можем попробовать реализовать случайную строкуStreams
. Давайте создадимrandomStream.js
новый модуль, который будет содержать нашу строкуgenerator
код:
const stream = require('stream');
const Chance = require('chance');
const chance = new Chance();
class RandomStream extends stream.Readable {
constructor(options) {
super(options);
}
_read(size) {
const chunk = chance.string(); //[1]
console.log(`Pushing chunk of size: ${chunk.length}`);
this.push(chunk, 'utf8'); //[2]
if (chance.bool({
likelihood: 5
})) { //[3]
this.push(null);
}
}
}
module.exports = RandomStream;
В верхней части файла мы загрузим наши зависимости. за исключением того, что мы загружаеммодуль шанса npmКроме этого, ничего особенного, это библиотека для генерации всех видов случайных значений, от чисел и строк до целых предложений.
Следующим шагом является создание файла с именемRandomStream
новый класс и указатьstream.Readable
как его родительский класс. В предыдущем коде мы вызываем конструктор родительского класса для инициализации его внутреннего состояния и передачи полученногоoptions
параметры в качестве входных данных. пройти черезoptions
Возможные параметры, передаваемые объектом, включают следующее:
- для
Buffers
преобразовать вStrings
изencoding
параметр (по умолчаниюnull
) - Включить ли объектный режим (
objectMode
По умолчаниюfalse
) - хранится внутри
buffer
Верхний предел данных в зоне, как только этот верхний предел превышен, данные будут приостановленыdata source
читать(highWaterMark
По умолчанию16KB
)
хорошо, теперь давайте объясним, что мы переписалиstream.Readable
Категория_read()
метод:
- Этот метод использует
chance
Генерировать случайные строки. - это будет струна
push
внутреннийbuffer
. Обратите внимание, что поскольку мыpush
даString
, и мы также указываем кодировку какutf8
(если блок данных просто бинарныйBuffer
, не требуется). - от
5%
вероятность случайного прерыванияstream
случайных строк, сгенерированныхpush
null
в интерьерBuffer
ПредставлятьEOF
,Сейчасstream
конец.
Мы также можем видеть в_read()
дано на входе функцииsize
параметр игнорируется, поскольку это рекомендуемый параметр. Мы можем просто поместить все доступные данные вpush
к внутреннемуbuffer
, но если в одном и том же вызове есть несколько push-уведомлений, то мы должны проверитьpush()
возвращаться лиfalse
, так как это означает, что внутреннийbuffer
достигhighWaterMark
предел, мы должны прекратить добавлять больше данных.
ВышеупомянутоеRandomStream
модуль, теперь мы готовы его использовать. Давайте создадимgenerateRandom.js
новый модуль, в этом модуле мы создаем экземпляр новогоRandomStream
объект и извлечь из него некоторые данные:
const RandomStream = require('./randomStream');
const randomStream = new RandomStream();
randomStream.on('readable', () => {
let chunk;
while ((chunk = randomStream.read()) !== null) {
console.log(`Chunk received: ${chunk.toString()}`);
}
});
Теперь, когда все готово, давайте попробуем новый кастомstream
. просто выполнить как обычноgenerateRandom
Модуль для просмотра случайных строк, протекающих по экрану.
Доступные для записи потоки
записываемыйstream
Представляет конечную точку данных вNode.js
, оно используетstream
в модулеWritable
абстрактный класс для реализации.
писать в поток
поместить некоторые данные в записываемыеstream
это простая вещь, все, что нам нужно сделать, это использоватьwrite()
метод, который имеет следующий формат:
writable.write(chunk, [encoding], [callback])
encoding
Параметр является необязательным и находится вchunk
даString
тип (по умолчаниюutf8
,еслиchunk
даBuffer
, затем игнорируется); когда блок данных сбрасывается в базовый ресурс,callback
будет называться,callback
Параметры также являются необязательными.
Чтобы указать, что больше данные не будут записаныstream
, мы должны использоватьend()
метод:
writable.end([chunk], [encoding], [callback])
мы можем пройтиend()
метод предоставляет последний фрагмент данных. при этих обстоятельствах,callbak
функция эквивалентнаfinish
Событие регистрирует слушателя, когда весь блок данных записанstream
in, событие будет запущено.
Теперь давайте создадим небольшую выходную последовательность случайных строк, создавHTTP
сервер, чтобы продемонстрировать, как это работает:
const Chance = require('chance');
const chance = new Chance();
require('http').createServer((req, res) => {
res.writeHead(200, {
'Content-Type': 'text/plain'
}); //[1]
while (chance.bool({
likelihood: 95
})) { //[2]
res.write(chance.string() + '\n'); //[3]
}
res.end('\nThe end...\n'); //[4]
res.on('finish', () => console.log('All data was sent')); //[5]
}).listen(8080, () => console.log('Listening on http://localhost:8080'));
мы создалиHTTP服务器
, и записать данные вres
объект,res
объектhttp.ServerResponse
Экземпляр , также доступный для записиstream
. Вот объяснение того, что происходит с приведенным выше кодом:
- Сначала мы пишем
HTTP response
голова. Пожалуйста, обрати внимание,writeHead()
нетWritable
часть интерфейса, на самом деле этот методhttp.ServerResponse
Вспомогательные методы, предоставляемые классом. - мы начинаем
5%
Цикл завершается с вероятностью (вероятность входа в тело цикла равнаchance.bool()
продукция, которая95%
). - Внутри цикла мы записываем случайную строку в
stream
. - Как только мы вышли из цикла, мы вызываем
stream
изend()
, указывая не более Блок данных будет записан. Кроме того, мы предоставляем последнюю строку для записи в поток перед завершением. - Наконец, мы регистрируем
finish
прослушиватель событий, когда все блоки данных сбрасываются на нижний слойsocket
, это событие будет запущено.
Мы можем назвать этот маленький модуль именемentropyServer.js
, а затем выполнить его. Чтобы протестировать этот сервер, мы можем обратитьсяhttp:// localhost:8080
Откройте браузер или из терминала используйтеcurl
команда следующим образом:
curl localhost:8080
В этот момент сервер должен начать спрашивать выбранный вамиHTTP客户端
Отправьте случайную строку (обратите внимание, что некоторые браузеры могут буферизовать данные, и поведение потоковой передачи может быть незаметным).
Обратное давление
Подобно жидкостям, текущим в реальных трубопроводных системах,Node.js
изstream
также могут страдать от узких мест, данные могут записываться быстрее, чемstream
потребление. Механизмы для решения этой проблемы включают буферизацию входных данных; однако, если данныеstream
Без какой-либо обратной связи с производителем у нас может возникнуть ситуация, когда все больше и больше данных накапливается во внутреннем буфере, вызывая утечку памяти.
Чтобы этого не произошло, при внутреннемbuffer
ПревосходитьhighWaterMark
При ограничении,writable.write()
вернусьfalse
. доступный для записиstream
имеютhighWaterMark
собственность, котораяwrite()
метод начинает возвращатьсяfalse
внутреннийBuffer
ограничение размера области, один разBuffer
Размер экстента превышает этот предел, что указывает на то, что приложение должно прекратить запись. Когда буфер опустошается, вызывается триггерdrain
событие, можно безопасно уведомить, что можно безопасно начать запись снова. Этот механизм называетсяback-pressure
.
Механизмы, описанные в этом разделе, также применимы к читаемым потокам. На самом деле противодавление также существует в читаемых потоках и срабатывает, когда метод push(), вызываемый внутри _read(), возвращает false. Однако это специфическая проблема для разработчиков потоков, поэтому мы не будем часто ею заниматься.
Мы можем изменить ранее созданныйentropyServer
модуль для демонстрации возможности записиstream
изback-pressure
:
const Chance = require('chance');
const chance = new Chance();
require('http').createServer((req, res) => {
res.writeHead(200, {
'Content-Type': 'text/plain'
});
function generateMore() { //[1]
while (chance.bool({
likelihood: 95
})) {
const shouldContinue = res.write(
chance.string({
length: (16 * 1024) - 1
}) //[2]
);
if (!shouldContinue) { //[3]
console.log('Backpressure');
return res.once('drain', generateMore);
}
}
res.end('\nThe end...\n', () => console.log('All data was sent'));
}
generateMore();
}).listen(8080, () => console.log('Listening on http://localhost:8080'));
Наиболее важные шаги в предыдущем коде можно резюмировать следующим образом:
- Мы инкапсулируем основную логику в
generateMore()
в функции. - Чтобы получить немного
back-pressure
случайно, мы увеличиваем размер блока данных до16KB-1Byte
, что очень близко к значению по умолчаниюhighWaterMark
предел. - После записи блока данных мы проверяем
res.write()
Возвращаемое значение. если он вернетсяfalse
, а это означает, что внутреннийbuffer
заполнен, мы должны прекратить отправлять больше данных. В этом случае мы выходим из функции, а затем регистрируем нового издателя для записи событий, когдаdrain
Вызывается при возникновении событияgenerateMore
.
Если мы сейчас попытаемся снова запустить сервер, то используемcurl
генерировать клиентский запрос, вполне вероятно, что будет какой-тоback-pressure
, потому что сервер генерирует данные с очень высокой скоростью, даже быстрее, чем базовыйsocket
Быстрее.
Реализовать доступные для записи потоки
Мы можем унаследоватьstream.Writable
класс для реализации нового записываемого потока, а для_write()
метод обеспечивает реализацию. Реализовать наш собственный перезаписываемыйStreams
своего рода.
Давайте создадим записываемыйstream
, который получает объекты в следующем формате:
{
path: <path to a file>
content: <string or buffer>
}
Роль этого класса такова: для каждого объекта нашstream
должно бытьcontent
раздел сохраняется в файл, созданный по заданному пути. мы можем сразу увидеть, что мыstream
Вход - это объект, а неStrings
илиBuffers
, а это значит, что нашstream
Должен работать в объектном режиме.
вызывающий модульtoFileStream.js
:
const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirp = require('mkdirp');
class ToFileStream extends stream.Writable {
constructor() {
super({
objectMode: true
});
}
_write(chunk, encoding, callback) {
mkdirp(path.dirname(chunk.path), err => {
if (err) {
return callback(err);
}
fs.writeFile(chunk.path, chunk.content, callback);
});
}
}
module.exports = ToFileStream;
В качестве первого шага мы загружаем все необходимые нам зависимости. Обратите внимание, что нам нужны модулиmkdirp
, как вы должны знать из предыдущих глав, он должен использоватьnpm
Установить.
Мы создаем новый класс, который начинается сstream.Writable
расширен.
Нам пришлось вызвать родительский конструктор, чтобы инициализировать его внутреннее состояние; мы также предоставилиoption
object в качестве параметра, чтобы указать, что поток работает в объектном режиме (objectMode:true
).stream.Writable
Другие принятые варианты следующие:
-
highWaterMark
(Значение по умолчанию16KB
):контрольback-pressure
верхний предел . -
decodeStrings
(по умолчаниюtrue
): в строке, переданной_write()
метод, автоматически декодирует строку в двоичныйbuffer
Площадь. Этот параметр игнорируется в объектном режиме.
Наконец, мы_write()
метод обеспечивает реализацию. Как видите, этот метод принимает блок данных, кодировку (только в двоичном режиме,stream
опцииdecodeStrings
Установить какfalse
со смыслом).
Кроме того, метод принимает callback-функцию, которую нужно вызвать по завершении операции, необязательно передавать результат операции, но мы все же можем передатьerror
объект, в результате чегоstream
курокerror
событие.
Теперь, чтобы попробовать то, что мы только что построилиstream
, мы можем создатьwriteToFile.js
, и выполните некоторые операции записи в потоке:
const ToFileStream = require('./toFileStream.js');
const tfs = new ToFileStream();
tfs.write({path: "file1.txt", content: "Hello"});
tfs.write({path: "file2.txt", content: "Node.js"});
tfs.write({path: "file3.txt", content: "Streams"});
tfs.end(() => console.log("All files created"));
Таким образом, мы создали и использовали наш первый настраиваемый поток с возможностью записи. Запустите новый модуль, как обычно, чтобы проверить его вывод; вы увидите три новых файла, созданных после выполнения.
Двойной поток
Двойнойstream
Как для чтения, так и для записи. Когда мы хотим описать объект, который является одновременно источником данных и получателем данных (например,socket
), что оказалось очень полезным. Наследование дуплексного потокаstream.Readable
иstream.Writable
метод, поэтому он не нов для нас. Это означает, что мы можемread()
илиwrite()
данные, или может контролироватьreadable
иdrain
событие.
Создание пользовательского двойникаstream
, мы должны_read()
и_write()
Предоставьте реализацию. Перейти кDuplex()
конструкторoptions
Объект внутренне перенаправляется наReadable
иWritable
конструктор.options
Содержание параметров такое же, как обсуждалось ранее,options
добавилallowHalfOpen
значение (по умолчаниюtrue
), если установленоfalse
, будет иметь место до тех пор, покаstream
вечеринка (Readable
иWritable
)Заканчивать,stream
Это конец.
Чтобы двойной поток работал в объектном режиме с одной стороны и в бинарном режиме с другой, нам нужно вручную установить следующие свойства в конструкторе потока:
this._writableState.objectMode
this._readableState.objectMode
Преобразованные потоки
преобразованныйStreams
это особый тип двойногоStreams
.
в простом двойникеStreams
в отstream
Нет прямой связи между считываемыми данными и записываемыми в них данными (по крайней мере,stream
является непознаваемым). подумай об одномTCP socket
, он просто отправляет данные и получает данные от удаленных узлов.TCP socket
Он не осознает, что существует какая-либо связь между входом и выходом.
На приведенной ниже диаграмме показана двойнаяStreams
Поток данных в:
С другой стороны, преобразованныйStreams
Примените некоторое преобразование к каждому блоку данных, полученному со стороны, доступной для записи, затем сделайте преобразованные данные доступными со стороны, доступной для чтения.
На изображении ниже показано, как данные преобразуютсяStreams
Средний поток:
Снаружи преобразованныйStreams
интерфейс с двойнымStreams
интерфейс точно такой же. Но когда мы хотим построить новый двойникStreams
, мы должны предоставить_read()
и_write()
метод, и чтобы реализовать новый поток преобразования, мы должны заполнить еще пару методов:_transform()
и_flush()
).
Давайте продемонстрируем, как использовать пример для создания нового преобразованногоStreams
.
Потоки, реализующие преобразование
Реализуем преобразованиеStreams
, который заменит все вхождения данной строки. Для этого мы должны создатьreplaceStream.js
новый модуль. Перейдем непосредственно к тому, как это реализовать:
const stream = require('stream');
const util = require('util');
class ReplaceStream extends stream.Transform {
constructor(searchString, replaceString) {
super();
this.searchString = searchString;
this.replaceString = replaceString;
this.tailPiece = '';
}
_transform(chunk, encoding, callback) {
const pieces = (this.tailPiece + chunk) //[1]
.split(this.searchString);
const lastPiece = pieces[pieces.length - 1];
const tailPieceLen = this.searchString.length - 1;
this.tailPiece = lastPiece.slice(-tailPieceLen); //[2]
pieces[pieces.length - 1] = lastPiece.slice(0,-tailPieceLen);
this.push(pieces.join(this.replaceString)); //[3]
callback();
}
_flush(callback) {
this.push(this.tailPiece);
callback();
}
}
module.exports = ReplaceStream;
Как всегда, мы будем собирать модуль, начиная с его зависимостей. На этот раз мы не используем сторонние модули.
Затем мы создалиstream.Transform
Новый класс, наследуемый от базового класса. Конструктор этого класса принимает два параметра:searchString
иreplaceString
. Как вы можете себе представить, они позволяют нам определить текст для сопоставления и строку для использования в качестве замены. Мы также инициализируем_transform()
используемый методtailPiece
внутренняя переменная.
Теперь давайте проанализируем_transform()
метод, который является сердцем нашего нового класса._transform()
методы и записываемыеstream
из_write()
методы имеют почти такой же формат, но вместо записи данных в базовый ресурс используютthis.push()
втолкнуть его внутрьbuffer
, то же самое, что мы имели бы в читаемом потоке_read()
метод выполняется. Это показывает преобразованныйStreams
как на самом деле соединяются две стороны.
ReplaceStream
из_transform()
Методы реализуют ядро нашего нового класса. Обычно поиск и заменаbuffer
Строки в зоне — простая задача, однако при потоковой передаче данных ситуация совершенно иная, и возможные совпадения могут быть распределены по нескольким фрагментам данных. Программу, стоящую за кодом, можно объяснить следующим образом:
- Наш алгоритм использует
searchString
Функция действует как разделитель для разделения блоков. - Затем он берет последний элемент результирующего массива после разделения
lastPiece
и извлеките его последний символsearchString.length - 1
. Результаты сохраняются вtailPiece
переменная, она будет использоваться как префикс для следующего блока данных. - Наконец, все из
split()
Полученный фрагментreplaceString
объединены вместе как разделители и помещены внутрьbuffer
Площадь.
когдаstream
В конце концов, у нас может быть еще один последнийtailPiece
Переменная не была помещена во внутренний буфер. это точно_flush()
цель метода; он заключается вstream
вызывается перед завершением, и именно здесь у нас, наконец, есть шанс завершить поток или передать любые оставшиеся данные, прежде чем полностью завершить поток.
_flush()
Методу нужна только функция обратного вызова в качестве параметра, мы должны обязательно вызвать эту функцию обратного вызова, когда все операции будут выполнены. Готово с этим, мы закончили нашReplaceStream
своего рода.
Теперь пришло время попробовать что-то новоеstream
. Мы можем создать еще один с именемreplaceStreamTest.js
модуль для записи некоторых данных, а затем чтения результата преобразования:
const ReplaceStream = require('./replaceStream');
const rs = new ReplaceStream('World', 'Node.js');
rs.on('data', chunk => console.log(chunk.toString()));
rs.write('Hello W');
rs.write('orld!');
rs.end();
Чтобы немного усложнить этот пример, мы распределяем условия поиска по двум разным фрагментам, а затем используемflowing
режим, мы начинаем с того жеstream
Считайте данные в , записывая каждый преобразованный блок. Выполнение предыдущей программы должно привести к следующему результату:
Hel
lo Node.js
!
Стоит упомянуть пятый тип потока: stream.PassThrough. В отличие от других классов потоков, которые мы рассмотрели, PassThrough не является абстрактным и может быть создан непосредственно без реализации каких-либо методов. По сути, это трансформируемый поток, который выводит каждый фрагмент данных без какого-либо преобразования.
Соединение потоков с помощью каналов
Unix
Понятие трубы такоеDouglas Mcllroy
Изобретен; это позволяет выходу программы быть соединенным со входом следующей. Взгляните на следующую команду:
echo Hello World! | sed s/World/Node.js/g
В предыдущей командеecho
будетHello World!
записывается в стандартный вывод, который затем перенаправляется наsed
стандартный ввод команды (из-за оператора канала|
). потомsed
использоватьNode.js
заменить любойWorld
, и выводит результат на стандартный вывод (в этот раз на консоль).
Аналогичным образом можно использовать читаемыйStreams
изpipe()
метод будетNode.js
изStreams
Вместе они имеют следующие интерфейсы:
readable.pipe(writable, [options])
очень интуитивно,pipe()
метод будет начинаться с читаемогоStreams
Извлеките данные, испускаемые в предоставленной записиStreams
середина. Кроме того, когда читаемыйStreams
проблемаend
событие (если мы не укажем{end:false}
в видеoptions
), записываемыйStreams
закончится автоматически.pipe()
Метод возвращает записываемый объект, переданный в качестве параметраStreams
, если такойstream
также читаем (например, двойной или конвертируемыйStreams
), позволяет нам создавать связанные вызовы.
поставить дваStreams
При соединении вместе данные могут автоматически передаваться на записываемыйStreams
, так что не надо звонитьread()
илиwrite()
метод; но самое главное не требует контроляback-pressure
, так как он обрабатывается автоматически.
В качестве простого примера (примеров будет множество) мы можем создать файл с именемreplace.js
Новый модуль, который принимает поток текста со стандартного ввода, применяет преобразование замены и возвращает данные на стандартный вывод:
const ReplaceStream = require('./replaceStream');
process.stdin
.pipe(new ReplaceStream(process.argv[2], process.argv[3]))
.pipe(process.stdout);
Вышеприведенная программа передает данные из стандартного ввода вReplaceStream
, затем возвращается к стандартному выводу. Теперь, чтобы попрактиковаться в этом маленьком приложении, мы можем использоватьUnix
Канал перенаправляет некоторые данные на свой стандартный ввод следующим образом:
echo Hello World! | node replace World Node.js
Запуск вышеуказанной программы выведет следующие результаты:
Hello Node.js
Этот простой пример демонстрируетStreams
(особенно текстStreams
) — это общий интерфейс, а каналы — это в значительной степени общий способ их составления и соединения.
error
События не распространяются автоматически по конвейеру. Например, посмотрите на следующий фрагмент кода:
stream1
.pipe(stream2)
.on('error', function() {});
В предыдущих цепочках вызовов мы будем собирать данные только из
stream2
ошибка, это связано с тем, что мы добавилиerorr
прослушиватель событий. Это означает, что если мы хотим захватить изstream1
Любые сгенерированные ошибки, мы должны напрямую подключить другой прослушиватель ошибок. Позже мы увидим еще один паттерн, с помощью которого можно достичь общего отлова ошибок (слияниеStreams
). Кроме того, следует отметить, что если цельStreams
(читатьStreams
) выдает ошибку, будетStreams
уведомить одинerror
, что затем приводит к прерыванию конвейера.
Как потоки проходят через трубы
Пока мы создаем на заказStreams
путь не следует точноNode
заданной схеме; фактически, изstream
наследование базового класса является нарушениемsmall surface area
, и нужен пример кода. это не значитStreams
Плохо спроектированный, на самом деле, мы не должны забывать, потому чтоStreams
даNode.js
часть ядра, поэтому они должны быть максимально гибкими и широко расширяемымиStreams
Так что модули пользовательского уровня могут в полной мере использовать их.
Однако в большинстве случаев нам не нужны все возможности и расширяемость, которые может дать наследование прототипов, но зачастую все, что нам нужно, — это определить новыеStreams
Режим быстрого развития.Node.js
Сообщество, конечно же, создало решение и для этого. Прекрасным примером являетсяthrough2, который позволяет нам просто создавать преобразованияStreams
небольшая библиотека. пройти черезthrough2
, мы можем создать новый кабриолет, вызвав простую функциюStreams
:
const transform = through2([options], [_transform], [_flush]);
сродни,from2также позволяет нам создать удобочитаемый видStreams
:
const readable = from2([options], _read);
Затем мы покажем их использование в оставшейся части этой главы, когда увидим преимущества использования этих небольших библиотек.
throughиfromоснован на
Stream1
Каноническая библиотека верхнего уровня.
Асинхронный поток управления на основе потоков
Из приведенных нами примеров должно быть ясно, чтоStreams
Его можно использовать не только для обработкиI / O
, и может использоваться как элегантный шаблон программирования для работы с данными любого типа. Но на этом преимущества не заканчиваются, вы также можете воспользоватьсяStreams
для реализации асинхронного потока управления, как мы увидим в этом разделе.
Последовательное исполнение
по умолчанию,Streams
Данные будут обрабатываться последовательно, например, преобразовыватьсяStreams
из_transform()
Функция выполняется в предыдущем блоке данныхcallback()
Только после этого будет вызван следующий блок данных. ЭтоStreams
Важным свойствомStreams
Реализуйте элегантные традиционные шаблоны потока управления.
Код всегда лучше, чем слишком много объяснений, поэтому давайте рассмотрим пример использования потоков для последовательного выполнения асинхронных задач. Давайте создадим функцию для объединения набора полученных файлов в качестве входных данных, соблюдая указанный порядок. Мы создаемconcatFiles.js
, и начнем с его зависимостей:
const fromArray = require('from2-array');
const through = require('through2');
const fs = require('fs');
мы будем использоватьthrough2
чтобы упростить преобразованиеStreams
создается и используетfrom2-array
Создать читаемый из массива объектовStreams
.
Далее мы можем определитьconcatFiles()
функция:
function concatFiles(destination, files, callback) {
const destStream = fs.createWriteStream(destination);
fromArray.obj(files) //[1]
.pipe(through.obj((file, enc, done) => { //[2]
const src = fs.createReadStream(file);
src.pipe(destStream, {end: false});
src.on('end', done); //[3]
}))
.on('finish', () => { //[4]
destStream.end();
callback();
});
}
module.exports = concatFiles;
Предыдущая функция передаетсяfiles
массив преобразуется вStreams
для достиженияfiles
Последовательная итерация массива. Процедура, за которой следует функция, объясняется следующим образом:
- Во-первых, мы используем
from2-array
отfiles
массив создает читаемыйStreams
. - Далее мы используем
through
для создания трансформированногоStreams
для обработки каждого файла в последовательности. Для каждого файла мы создаем читаемыйStreams
, и передать его в представление выходного файлаdestStream
середина. После того, как исходный файл закончил чтение, передайтеpipe()
указывается во втором параметре метода{end:false}
, мы следим за тем, чтобы не закрытьdestStream
. - Когда все содержимое исходного файла перенесено в
destStream
когда мы звонимthrough.obj
публичныйdone
функция передачи текущей обработки завершена, в нашем случае это необходимо для запуска обработки следующего файла. - После обработки всех файлов
finish
событие запущено. мы можем, наконец, закончитьdestStream
и позвониconcatFiles()
изcallback()
функция, эта функция представляет собой завершение всей операции.
Теперь мы можем опробовать только что созданный нами небольшой модуль. Давайте создадимconcat.js
новый файл для завершения примера:
const concatFiles = require('./concatFiles');
concatFiles(process.argv[2], process.argv.slice(3), () => {
console.log('Files concatenated successfully');
});
Теперь мы можем запустить приведенную выше программу с целевым файлом в качестве первого аргумента командной строки, за которым следует список файлов для объединения, например:
node concat allTogether.txt file1.txt file2.txt
Выполнение этой команды создастallTogether.txt
новый файл для последовательного сохраненияfile1.txt
иfile2.txt
Содержание.
использоватьconcatFiles()
функцию, мы можем просто использоватьStreams
Реализует последовательное выполнение асинхронных операций. как и мыChapter3 Asynchronous Control Flow Patters with Callbacks
Как видно из, при использовании чистогоJavaScript
внедрить или использоватьasync
и другие внешние библиотеки, вам необходимо использовать или реализовать итераторы. Теперь мы предоставляем другой метод, который достигает того же эффекта, и, как мы видим, он реализован очень элегантно и читабельно.
Шаблоны. Используя потоки или комбинацию потоков, можно легко последовательно выполнять набор асинхронных задач.
Внеочередное параллельное выполнение
мы только что виделиStreams
Обрабатывайте каждый блок данных последовательно, но иногда это может быть невозможно, потому что он не используется полностью.Node.js
параллелизма. Если нам нужно выполнять медленную асинхронную операцию над каждым фрагментом данных, то совершенно необходимо распараллелить выполнение этого набора асинхронных задач. Конечно, этот шаблон можно применять только в том случае, если нет связи между каждым блоком данных, что часто может происходить в шаблоне объекта.Streams
, но для бинарного режимаStreams
Параллельное выполнение вне очереди используется редко.
Примечание. Потоки с неупорядоченным параллельным выполнением нельзя использовать, когда важен порядок обработки данных.
Чтобы распараллелить конвертируемыйStreams
реализации, мы можем использоватьChapter3 Asynchronous Control Flow Patters with Callbacks
Те же шаблоны, упомянутые для параллельного выполнения вне порядка, а затем внести некоторые изменения, чтобы заставить их работать дляStreams
. Посмотрим, как это изменится.
Реализация неупорядоченных параллельных потоков
Перейдем сразу к делу на примере: мы создаемparallelStream.js
модуль, затем настроить нормальный конвертируемыйStreams
, а затем привести ряд методов, преобразующих потоки:
const stream = require('stream');
class ParallelStream extends stream.Transform {
constructor(userTransform) {
super({objectMode: true});
this.userTransform = userTransform;
this.running = 0;
this.terminateCallback = null;
}
_transform(chunk, enc, done) {
this.running++;
this.userTransform(chunk, enc, this._onComplete.bind(this), this.push.bind(this));
done();
}
_flush(done) {
if(this.running > 0) {
this.terminateCallback = done;
} else {
done();
}
}
_onComplete(err) {
this.running--;
if(err) {
return this.emit('error', err);
}
if(this.running === 0) {
this.terminateCallback && this.terminateCallback();
}
}
}
module.exports = ParallelStream;
Давайте проанализируем этот новый пользовательский класс. Как видите, конструктор принимаетuserTransform()
в качестве аргумента, который затем сохраняется как переменная экземпляра; мы также вызываем родительский конструктор и по умолчанию включаем объектный режим.
Далее см._transform()
метод, в этом методе мы выполняемuserTransform()
функцию, а затем увеличиваем количество запущенных в данный момент задач; наконец, мы вызываемdone()
для уведомления о том, что текущий шаг преобразования завершен._transform()
Метод показывает, как параллельно обрабатывать другую задачу. нам не нужно ждатьuserTransform()
После выполнения метода вызовите его сноваdone()
. Вместо этого мы немедленно выполняемdone()
метод. С другой стороны, мы предоставляем специальную функцию обратного вызова дляuserTransform()
метод, этоthis._onComplete()
метод, чтобы мы моглиuserTransform()
Получите уведомление, когда это будет сделано.
существуетStreams
Перед завершением он будет называться_flush()
метод, поэтому, если еще есть запущенные задачи, мы можем сделать это, не вызываяdone()
функция обратного вызова для задержкиfinish
триггер события. Вместо этого мы назначаем егоthis.terminateCallback
Переменная. чтобы понятьStreams
Как правильно завершать см._onComplete()
метод.
Когда каждый набор асинхронных задач наконец завершается,_onComplete()
метод будет вызван. Во-первых, он проверяет, запущена ли задача, и, если нет, вызываетthis.terminateCallback()
функция, которая приведет кStreams
конец, триггер_flush()
методfinish
событие.
Используйте только что построенныйParallelStream
класс для простого создания трансформируемого параллельного выполнения вне очередиStreams
instance, но с оговоркой: он не сохраняет порядок получения элементов. На самом деле асинхронные операции потенциально могут выполняться и отправлять данные в любое время, независимо от того, когда они начинаются. Итак, мы знаем, что для двоичного режимаStreams
не применяется, потому что двоичныйStreams
Требования более высокого порядка.
Реализовать приложение для мониторинга URL
Теперь воспользуемсяParallelStream
Модуль реализует конкретный пример. Давайте представим следующее: мы хотим создать простой сервис для мониторинга большогоURL
Состояние списка, представим себе следующее, все этиURL
содержится в отдельном файле, и каждыйURL
занимает пустую строку.
Streams
может обеспечить эффективное и элегантное решение для этого сценария. Особенно, когда мы используем то, что только что написалиParallelStream
класс, чтобы проверить их не по порядкуURL
.
Далее создадим простойcheckUrls.js
модульное приложение.
const fs = require('fs');
const split = require('split');
const request = require('request');
const ParallelStream = require('./parallelStream');
fs.createReadStream(process.argv[2]) //[1]
.pipe(split()) //[2]
.pipe(new ParallelStream((url, enc, done, push) => { //[3]
if(!url) return done();
request.head(url, (err, response) => {
push(url + ' is ' + (err ? 'down' : 'up') + '\n');
done();
});
}))
.pipe(fs.createWriteStream('results.txt')) //[4]
.on('finish', () => console.log('All urls were checked'))
;
Как мы видим, с потоками наш код выглядит очень элегантно и интуитивно понятно. Давайте посмотрим, как это работает:
- Во-первых, мы создаем читаемый файл с заданным файловым аргументом
Streams
, чтобы файл можно было прочитать позже. - мы проходимsplitвходной файл
Streams
Контент выводит конвертируемыйStreams
в конвейер, и каждая строка блока данных отделена. - Тогда пришло время использовать наш
ParallelStream
ПроверятьURL
, мы отправляемHEAD
запрос, а затем дождаться запрошенногоresponse
. Когда запрос возвращается, мы помещаем результат запросаpush
прибытьstream
середина. - Наконец, передайте результат в
results.txt
в файле.
node checkUrls urlList.txt
файл здесьurlList.txt
содержит наборURL
,Например:
http://www.mariocasciaro.me/
http://loige.co/
http://thiswillbedownforsure.com/
Когда приложение завершит выполнение, мы увидим файлresults.txt
создается с результатом операции, например:
http://thiswillbedownforsure.com is down
http://loige.co is up
http://www.mariocasciaro.me is up
Порядок выходных результатов, вероятно, будет таким же, как указан во входном файле.URL
порядок разный. ЭтоStreams
Отличительные особенности внеочередного параллельного выполнения задач.
Из любопытства мы могли бы попробовать заменить ParallelStream на обычный поток through2 и сравнить их поведение и производительность (упражнение, которое вы, возможно, захотите сделать). Мы увидим, что использование through2 медленнее, потому что каждый URL будет проверяться по порядку, а порядок результатов в файле results.txt сохраняется.
Нестандартное предельное параллельное выполнение
При запуске файла, содержащего тысячи или миллионы URL-адресовcheckUrls
приложение, у нас определенно будут проблемы. Наше приложение будет создавать неконтролируемое количество одновременных подключений, параллельно отправлять большие объемы данных и потенциально нарушать стабильность приложения и доступность всей системы. Мы уже знаем, что неупорядоченное ограниченное параллельное выполнение контролируемых нагрузок является отличным решением.
Давайте создадимlimitedParallelStream.js
модуль, чтобы увидеть, как он работает, этот модуль адаптирован из созданного в предыдущем разделеparallelStream.js
модуль.
Посмотрим на его конструктор:
class LimitedParallelStream extends stream.Transform {
constructor(concurrency, userTransform) {
super({objectMode: true});
this.concurrency = concurrency;
this.userTransform = userTransform;
this.running = 0;
this.terminateCallback = null;
this.continueCallback = null;
}
// ...
}
нам нужноconcurrency
Переменные используются в качестве входных данных для ограничения количества параллелизма, на этот раз мы хотим сохранить две функции обратного вызова,continueCallback
для любых ожидающих_transform
метод,terminateCallback
Обратный вызов для метода _flush.
см. далее_transform()
метод:
_transform(chunk, enc, done) {
this.running++;
this.userTransform(chunk, enc, this.push.bind(this), this._onComplete.bind(this));
if(this.running < this.concurrency) {
done();
} else {
this.continueCallback = done;
}
}
на этот раз в_transform()
метод, мы должны вызватьdone()
Перед проверкой, достигнут ли предел максимального количества параллелей, если предел не достигнут, может быть запущена обработка следующего элемента. Если мы достигли предела максимального количества параллелей, мы можем просто положитьdone()
обратный вызов сохраняется вcontinueCallback
переменная, чтобы она вызывалась сразу после завершения задачи.
_flush()
метод сParallelStream
Класс остался точно таким же, поэтому переходим сразу к реализации_onComplete()
метод:
_onComplete(err) {
this.running--;
if(err) {
return this.emit('error', err);
}
const tmpCallback = this.continueCallback;
this.continueCallback = null;
tmpCallback && tmpCallback();
if(this.running === 0) {
this.terminateCallback && this.terminateCallback();
}
}
Всякий раз, когда задача завершается, мы вызываем любой сохраненныйcontinueCallback()
приведет кstream
Разблокировать, запускает обработку следующего элемента.
ЭтоlimitedParallelStream
модуль. мы можем сейчасcheckUrls
модуль использовать его вместоparallelStream
, и ограничьте параллелизм наших задач установленным нами значением.
Последовательное параллельное выполнение
параллель, которую мы ранее создалиStreams
Можно испортить порядок данных, но в некоторых случаях это недопустимо. Иногда на самом деле существуют бизнес-сценарии, требующие, чтобы каждый блок данных отправлялся в том же порядке, в котором он был получен. Мы все еще можем работать параллельноtransform
функция. Все, что нам нужно сделать, это отсортировать данные, выдаваемые каждой задачей, так, чтобы они следовали тому же порядку, что и полученные данные.
Эта техника предполагает использованиеbuffer
, который переупорядочивает фрагменты по мере того, как каждая запущенная задача испускает. Для краткости мы не собираемся предоставлять такуюstream
реализации, так как объем этой книги довольно многословен; все, что нам нужно сделать, это повторно использоватьnpm
один из доступных пакетов на , например.through2-parallel.
Мы можем модифицировать существующийcheckUrls
модуль для быстрой проверки поведения упорядоченного параллельного выполнения. Предположим, мы хотим, чтобы наши результаты были такими, как во входном файлеURL
написано в том же порядке. Мы можем использоватьthrough2-parallel
реализовать:
const fs = require('fs');
const split = require('split');
const request = require('request');
const throughParallel = require('through2-parallel');
fs.createReadStream(process.argv[2])
.pipe(split())
.pipe(throughParallel.obj({concurrency: 2}, function (url, enc, done) {
if(!url) return done();
request.head(url, (err, response) => {
this.push(url + ' is ' + (err ? 'down' : 'up') + '\n');
done();
});
}))
.pipe(fs.createWriteStream('results.txt'))
.on('finish', () => console.log('All urls were checked'))
;
Как мы можем видеть,through2-parallel
интерфейс сthrough2
Интерфейсы очень похожи, разница только вthrough2-parallel
также может предоставить намtransform
Функция указывает предел параллелизма. Если мы попытаемся запустить эту новую версиюcheckUrls
,посмотримresults.txt
файл перечисляет результаты в том же порядке, что и во входном файле
Порядок, в котором появляются URL-адреса, тот же.
На этом мы резюмируем использованиеStreams
Реализовать анализ асинхронного потока управления, затем мы исследуем шаблон конвейера.
конвейерный режим
Как и в реальной жизни,Node.js
изStreams
Соединения труб также могут быть выполнены в соответствии с различными режимами. На самом деле, мы можем объединить два разныхStreams
сливаться в одинStreams
, положитьStreams
Разделите на два или более конвейера или перенаправьте потоки в зависимости от условий. В этом разделе мы рассмотримNode.js
изStreams
Самая важная сантехническая технология.
Комбинированные потоки
В этой главе мы подчеркиваемStreams
Предоставляет простую инфраструктуру для модуляции и повторного использования нашего кода, но упускает важную часть: что, если мы хотим разделить на модули и повторно использовать целые конвейеры? Если мы хотим объединить несколькоStreams
, делая их похожими на внешниеStreams
, так что мне теперь делать? На изображении ниже показано, что это означает:
На изображении выше мы видим, как объединяются несколько потоков:
- Когда мы пишем объединенное
Streams
, мы на самом деле пишем комбинированныйStreams
первая единицаStreamA
. - когда мы объединяем
Streams
При чтении информации из , мы фактически читаем из объединенногоStreams
читать в последней единице .
комбинированныйStreams
обычно несколькоStreams
, соединив конец записи первого блока и конец чтения последнего блока.
Чтобы создать несколько потоков из двух разных потоков (потоки для чтения и потоки для записи), мы можем использовать модуль npm, например.duplexer2.
Но это еще не все. В самом деле, объединенныйStreams
Он также должен быть захвачен на любом участке трубопровода.Streams
Ошибка, генерируемая устройством. Мы говорили, что любые ошибки не распространяются автоматически по конвейеру. Итак, у нас должно быть правильное управление ошибками, нам придется явно прикреплять прослушиватель ошибок к каждомуStreams
. Тем не менее, комбинированныйStreams
На самом деле это черный ящик, что означает, что у нас нет доступа ни к одному модулю в середине конвейера, поэтому для перехвата исключений из любого модуля в конвейере комбинированныйStreams
Также выступает в роли агрегатора.
В общем, комбинированныйStreams
Имеет два основных преимущества:
- Внутри трубы находится черный ящик, невидимый для пользователя.
- Упрощает управление ошибками, потому что нам не нужно прикреплять прослушиватель ошибок к каждому блоку в конвейере, а только к комбинированному
Streams
Просто прикрепите его сами.
комбинированныйStreams
является очень общей и распространенной практикой, поэтому, если у нас нет особых потребностей, мы можем просто повторно использовать существующее решение, напримерmultipipeилиcombine-stream.
Реализация составных потоков
Чтобы проиллюстрировать простой пример, давайте рассмотрим следующие две комбинацииStreams
Случай:
- Сжимайте и шифруйте данные
- Распаковать и расшифровать данные
используя что-то вродеmultipipe
таких как библиотеки, мы можем объединить некоторые существующие основные библиотеки, комбинируяStreams
(документcombinedStreams.js
), чтобы легко построить комбинированныйStreams
:
const zlib = require('zlib');
const crypto = require('crypto');
const combine = require('multipipe');
module.exports.compressAndEncrypt = password => {
return combine(
zlib.createGzip(),
crypto.createCipher('aes192', password)
);
};
module.exports.decryptAndDecompress = password => {
return combine(
crypto.createDecipher('aes192', password),
zlib.createGunzip()
);
};
Например, теперь мы можем использовать эти комбинированные потоки данных, как невидимые для нас черные ящики, для создания небольшого приложения, которое архивирует файлы со сжатием и шифрованием. Начнем сarchive.js
сделать это в новом модуле:
const fs = require('fs');
const compressAndEncryptStream = require('./combinedStreams').compressAndEncrypt;
fs.createReadStream(process.argv[3])
.pipe(compressAndEncryptStream(process.argv[2]))
.pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"));
Мы можем сделать это, построив комбинированныйStream
Давайте еще улучшим предыдущий код, но на этот раз не просто для получения невидимого для внешнего мира черного ящика, а для отлова исключений. На самом деле, как мы уже упоминали, код, подобный приведенному ниже, поймает только последнийStream
Ошибка, выдаваемая юнитом:
fs.createReadStream(process.argv[3])
.pipe(compressAndEncryptStream(process.argv[2]))
.pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"))
.on('error', function(err) {
// 只会捕获最后一个单元的错误
console.log(err);
});
Однако, положив всеStreams
Вместе мы можем элегантно решить эту проблему. рефакторингarchive.js
следующее:
const combine = require('multipipe');
const fs = require('fs');
const compressAndEncryptStream =
require('./combinedStreams').compressAndEncrypt;
combine(
fs.createReadStream(process.argv[3])
.pipe(compressAndEncryptStream(process.argv[2]))
.pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"))
).on('error', err => {
// 使用组合的Stream可以捕获任意位置的错误
console.log(err);
});
Как мы видим, теперь мы можем прикрепить обработчик ошибок непосредственно к составленномуStreams
, он получит любойerror
событие.
Теперь, чтобы запуститьarchive
модули, достаточно указать в аргументах командной строкиpassword
иfile
Параметры, то есть параметры модуля сжатия:
node archive mypassword /path/to/a/file.text
На этом примере мы ясно продемонстрировали, что комбинированноеStream
Как важно, с одной стороны, это позволяет нам создавать многократно используемые композиции потоков, с другой – упрощает управление ошибками пайплайна.
разделенныйStreams
Мы можем сделать это, поместив один читаемыйStream
передан в несколько записываемыхStream
выполнитьStream
филиал. Это вступает в игру, когда мы хотим отправить одни и те же данные в разные места назначения, например, в два разных сокета или два разных файла. Его также можно использовать, когда мы хотим выполнить различные преобразования одних и тех же данных или когда мы хотим разделить данные на основе некоторых критериев. как показано на рисунке:
существуетNode.js
разделены вStream
это маленькая вещь. Например.
Реализовать генератор нескольких контрольных сумм
Давайте создадим файл, который выводит данный файлsha1
иmd5
Хеширующий гаджет. Назовем этот новый модульgenerateHashes.js
, см. следующий код:
const fs = require('fs');
const crypto = require('crypto');
const sha1Stream = crypto.createHash('sha1');
sha1Stream.setEncoding('base64');
const md5Stream = crypto.createHash('md5');
md5Stream.setEncoding('base64');
Пока ничего особенного В следующей части модуля мы создадим удобочитаемыйStream
, и разветвите его на два разных потока, чтобы получить два других файла, один из которых содержитsha1
хэш, другой содержитmd5
Контрольная сумма:
const inputFile = process.argv[2];
const inputStream = fs.createReadStream(inputFile);
inputStream
.pipe(sha1Stream)
.pipe(fs.createWriteStream(inputFile + '.sha1'));
inputStream
.pipe(md5Stream)
.pipe(fs.createWriteStream(inputFile + '.md5'));
это очень просто:inputStream
Переменные передаются бок о бок вsha1Stream
, другая сторона вводится вmd5Stream
. Но будьте осторожны:
-
когда
inputStream
когда закончите,md5Stream
иsha1Stream
завершится автоматически, если не будет вызваноpipe()
указано, когдаend
Вариантыfalse
. -
Stream
Обе ветви 's будут принимать один и тот же фрагмент данных, поэтому мы должны быть очень осторожны, когда делаем что-то с побочными эффектами для данных, потому что это повлияет на другую ветвь. -
Противодавление создается за пределами черного ящика, от
inputStream
Скорость потока данных регулируется в соответствии со скоростью потока ветки, которая получает самую медленную.
комбинированныйStreams
Слияние отличается от разделения путем объединения набора читаемыхStreams
объединены в один записываемыйStream
, как показано на рисунке:
поставить несколькоStreams
Объединение в одно обычно является простой операцией, однако мы должны позаботиться о том, чтобы иметь дело сend
событий, потому что конвейерная система, использующая параметр автоматического завершения, завершает поток назначения, как только заканчивается один из источников. Обычно это приводит к ошибке, так как другие незавершенные источники будут продолжать писать в завершенныйStream
. Обходной путь для этого — использовать параметры при передаче нескольких источников в одно место назначения.{end:false}
, и только после прочтения всех исходниковStream
звонитьend()
.
Сжать несколько исходных файлов в один архив
В качестве простого примера давайте реализуем небольшую программу, которая создает архив на основе содержимого двух разных каталогов. С этой целью мы введем два новыхnpm
Модуль:
- tarиспользуется для создания архивов
- fstreamБиблиотека для создания потоков объектов из файлов файловой системы.
Создаем новый модульmergeTar.js
, начните инициализацию следующим образом:
var tar = require('tar');
var fstream = require('fstream');
var path = require('path');
var destination = path.resolve(process.argv[2]);
var sourceA = path.resolve(process.argv[3]);
var sourceB = path.resolve(process.argv[4]);
В предыдущем коде мы просто загружаем все зависимости и инициализируем включаемый объектный файл и два исходных каталога (sourceA
иsourceB
) Переменные.
Далее мы создаемtar
изStream
и направить вывод на записываемыйStream
:
const pack = tar.Pack();
pack.pipe(fstream.Writer(destination));
Теперь приступаем к инициализации источникаStream
let endCount = 0;
function onEnd() {
if (++endCount === 2) {
pack.end();
}
}
const sourceStreamA = fstream.Reader({
type: "Directory",
path: sourceA
})
.on('end', onEnd);
const sourceStreamB = fstream.Reader({
type: "Directory",
path: sourceB
})
.on('end', onEnd);
В предыдущем коде мы создали два исходных каталога (sourceStreamA
иsourceStreamB
) читать вStream
затем для каждого источникаStream
, мы добавляемend
Подписчик событий, который срабатывает только тогда, когда оба каталога полностью прочитаныpack
изend
событие.
Наконец, объедините дваStream
:
sourceStreamA.pipe(pack, {end: false});
sourceStreamB.pipe(pack, {end: false});
Мы сжимаем оба исходных файла вpack
этоStream
, и установивpipe()
изoption
Параметры{end:false}
Настроить конечную точкуStream
автоматический триггерend
событие.
Итак, мы выполнили нашу простуюTAR
программа. Мы можем попробовать запустить эту утилиту, указав объектный файл в качестве первого аргумента командной строки, а затем два исходных каталога:
node mergeTar dest.tar /path/to/sourceA /path/to/sourceB
существуетnpm
В мы можем найти что-то, что можно упроститьStream
Объединенный модуль:
Обратите внимание, что цель притокаStream
Данные перемешиваются случайным образом, что является приемлемым свойством для некоторых типов потоков объектов (как мы видели в предыдущем примере), но при работе с бинарнымиStream
Когда обычно этого не хочется.
Тем не менее, мы можем последовательно объединяться по шаблонуStream
; он состоит из слияния источников один за другимStream
, когда закончится предыдущий, начните отправлять второй блок данных (типа подключения всех источниковStream
выход одинаковый). существуетnpm
На , мы можем найти некоторые пакеты, которые также обрабатывают эту ситуацию. Один из них являетсяmultistream.
Мультиплексирование и демультиплексирование
сливатьсяStream
режим имеет специальный режим, мы не очень хотим объединять несколькоStream
объединены вместе и вместо этого используют общий канал для передачи набора данныхStream
. Не так, как раньше, потому что исходные данныеStream
Поддержание логического разделения внутри общего канала, что позволяет нам снова разделить данные, как только они достигнут другого конца общего канала.Stream
. как показано на рисунке:
поставить несколькоStream
объединены в одинStream
Операция по передаче называется мультиплексированием, а обратная операция (т.е.Stream
Получите данные для реконструкции оригиналаStream
) называется демультиплексированием. Устройства, выполняющие эти операции, называются мультиплексорами и демультиплексорами соответственно (. Это широко изучаемая тема в информатике и телекоммуникациях, так как она является предметом практически любого типа средств связи, таких как телефония, радио, телевидение и, конечно же, одна из основ самого Интернета Мы не будем вдаваться в подробности объема этой книги, потому что это очень большая тема.
В этом разделе мы хотим продемонстрировать, как использовать общиеNode.js Streams
для передачи нескольких логически разделенныхStream
, а затем поделитьсяStream
Другой конец снова разделяется, то есть мультиплексирование и демультиплексирование реализуются один раз.
Создайте журнал удаленного ведения журнала
В качестве примера мы хотели бы иметь небольшую программу, которая запускает дочерний процесс и перенаправляет как его стандартный вывод, так и стандартную ошибку на удаленный сервер, который принимает их и сохраняет в виде двух отдельных файлов. Таким образом, в этом случае общая средаTCP
соединение, а два мультиплексируемых канала являются дочерними процессамиstdout
иstderr
. Мы будем использовать технологию коммутации пакетов, котораяIP
,TCP
илиUDP
Те же методы используются в таких протоколах, как , включая инкапсуляцию данных в пакеты, что позволяет нам указывать различную исходную информацию, что очень полезно для мультиплексирования, маршрутизации, управления потоком и проверки на наличие поврежденных данных.
Как показано на рисунке, протокол этого примера примерно такой, данные инкапсулируются в пакеты со следующей структурой:
Реализация мультиплексирования на стороне клиента
Давайте сначала поговорим о клиенте, создайте файл с именемclient.js
модуль, который является частью нашего приложения, отвечает за запуск дочернего процесса и реализациюStream
Мультиплексирование.
Чтобы начать определение модуля, сначала загрузите зависимости:
const child_process = require('child_process');
const net = require('net');
Затем начните реализацию мультиплексной функции:
function multiplexChannels(sources, destination) {
let totalChannels = sources.length;
for(let i = 0; i < sources.length; i++) {
sources[i]
.on('readable', function() { // [1]
let chunk;
while ((chunk = this.read()) !== null) {
const outBuff = new Buffer(1 + 4 + chunk.length); // [2]
outBuff.writeUInt8(i, 0);
outBuff.writeUInt32BE(chunk.length, 1);
chunk.copy(outBuff, 5);
console.log('Sending packet to channel: ' + i);
destination.write(outBuff); // [3]
}
})
.on('end', () => { //[4]
if (--totalChannels === 0) {
destination.end();
}
});
}
}
multiplexChannels()
Функция принимает источник для повторного использованияStream
как ввод
и интерфейс мультиплексирования в качестве параметров, затем выполните следующие действия:
-
для каждого источника
Stream
, он регистрируетreadable
прослушиватель событий, мы используемnon-flowing
Режим чтения данных из потока. -
Каждый раз, когда считывается блок данных, мы инкапсулируем его в заголовок. Порядок заголовков следующий:
channel ID
составляет 1 байт (UInt8
), размер пакета 4 байта (UInt32BE
), а затем фактические данные. -
Когда пакет готов, мы записываем его в цель
Stream
. -
мы
end
событие регистрирует прослушиватель, чтобы, когда все источникиStream
когда закончите,end
Триггер события, уведомить цельStream
курокend
событие.
Обратите внимание, что наш протокол способен мультиплексировать до 256 различных исходных потоков, поскольку у нас есть только 1 байт для идентификации.
channel
.
const socket = net.connect(3000, () => { // [1]
const child = child_process.fork( // [2]
process.argv[2],
process.argv.slice(3), {
silent: true
}
);
multiplexChannels([child.stdout, child.stderr], socket); // [3]
});
В конце делаем следующее:
- мы создаем новый
TCP
Клиент подключается к адресуlocalhost:3000
. - Мы запускаем подпроцесс, используя первый аргумент командной строки в качестве пути, в то время как мы предоставляем остальные
process.argv
Массивы в качестве аргументов дочернего процесса. мы указываем параметры{silent:true}
, чтобы дочерний процесс не наследовал родительскийstdout
иstderr
. - Мы используем
mutiplexChannels()
функция будетstdout
иstderr
мультиплексируется вsocket
внутри.
Реализовать демультиплексирование на стороне сервера
Теперь смотрим на сервер, создаемserver.js
модуль, здесь мы будем подключатьStream
Демультиплексируйте и направьте их в два разных файла.
Сначала создайте файл с именемdemultiplexChannel()
Функция:
function demultiplexChannel(source, destinations) {
let currentChannel = null;
let currentLength = null;
source
.on('readable', () => { //[1]
let chunk;
if(currentChannel === null) { //[2]
chunk = source.read(1);
currentChannel = chunk && chunk.readUInt8(0);
}
if(currentLength === null) { //[3]
chunk = source.read(4);
currentLength = chunk && chunk.readUInt32BE(0);
if(currentLength === null) {
return;
}
}
chunk = source.read(currentLength); //[4]
if(chunk === null) {
return;
}
console.log('Received packet from: ' + currentChannel);
destinations[currentChannel].write(chunk); //[5]
currentChannel = null;
currentLength = null;
})
.on('end', () => { //[6]
destinations.forEach(destination => destination.end());
console.log('Source channel closed');
})
;
}
Приведенный выше код может показаться сложным, но при внимательном чтении это не так; из-заNode.js
удобочитаемыйStream
С помощью функции pull мы можем легко реализовать демультиплексирование нашего небольшого протокола следующим образом:
- мы начинаем использовать
non-flowing
Режим чтения данных из потока. - Во-первых, если мы не читали
channel ID
, пытаемся прочитать из потока 1 байт и преобразовать его в число. - Следующим шагом является чтение длины заголовка. Нам нужно прочитать 4 байта, поэтому возможно, что внутренне
Buffer
Данных пока недостаточно, поэтомуthis.read()
звонок возвращаетсяnull
. В этом случае мы просто прерываем синтаксический анализ и повторяем следующий.readable
событие. - Когда мы, наконец, сможем прочитать размер данных, мы узнаем из внутреннего
Buffer
сколько данных загружается, поэтому мы пытаемся прочитать все данные. - Когда мы прочитаем все данные, мы можем записать их в правильный канал назначения, обязательно не забудьте сбросить
currentChannel
иcurrentLength
переменные (эти переменные будут использоваться для разбора следующего пакета). - Наконец, когда источник
channel
В конце обязательно не забудьте вызвать цельStream
изend()
метод.
Поскольку мы можем демультиплексировать источникStream
, выполните следующий вызов:
net.createServer(socket => {
const stdoutStream = fs.createWriteStream('stdout.log');
const stderrStream = fs.createWriteStream('stderr.log');
demultiplexChannel(socket, [stdoutStream, stderrStream]);
})
.listen(3000, () => console.log('Server started'))
;
В приведенном выше коде мы сначала3000
запустить один в портуTCP
server, то для каждого получаемого соединения мы создадим два доступных для записиStream
, указывая на два разных файла, один для stdout и один для stderr; это наши целиchannel
. Наконец, мы используемdemultiplexChannel()
Демультиплексировать поток сокетов какstdoutStream
иstderrStream
.
Запуск приложений мультиплексирования и демультиплексирования
Теперь мы готовы попробовать запустить наше новое приложение мультиплексирования/демультиплексирования, но сначала давайте создадим небольшойNode.js
программа для создания примера вывода; назовем ееgenerateData.js
:
console.log("out1");
console.log("out2");
console.error("err1");
console.log("out3");
console.error("err2");
Во-первых, давайте запустим сервер:
node server
Затем запустите клиент, вам нужно указать параметр файла в качестве подпроцесса:
node client generateData.js
Клиент запускается почти сразу, но когда процесс завершается,generateData
Стандартный ввод и стандартный вывод приложения проходят черезTCP
Соединение затем на стороне сервера демультиплексируется в два файла.
Обратите внимание, что когда мы используем
child_process.fork()
, наш клиент может начать другиеNode.js
модуль.
Мультиплексирование и демультиплексирование потоков объектов
Пример, который мы только что показали, демонстрирует, как мультиплексировать и демультиплексировать двоичные/текстовые файлы.Stream
, но стоит отметить, что те же правила применяются к объектамStream
. Большая разница в том, что с объектами у нас уже есть методы для передачи данных с использованием атомарных сообщений (объектов), поэтому мультиплексирование похоже на установку свойства.channel ID
так же просто, как и каждый объект, а демультиплексирование требует только чтения·channel ID
свойства и маршрутизировать каждый объект к правильной целиStream
.
Другой шаблон заключается в том, чтобы взять несколько свойств объекта и распределить их по нескольким целям.Stream
Шаблон С помощью этого шаблона мы можем реализовать сложные процессы, как показано на следующем рисунке:
Как показано выше, возьмите объектStream
выражатьanimals
, а затем в зависимости от вида животного:reptiles
,amphibians
иmammals
, а затем распространить на правильную цельStream
середина.
Суммировать
В этой главе у нас естьNode.js Streams
и варианты его использования, но в то же время должны открыть двери парадигмам программирования с почти безграничными возможностями. мы понимаем, почемуStream
одеялоNode.js
Сообщество их похвалило, и мы освоили их базовый функционал, что позволило нам делать с ним более интересные вещи. Мы анализируем некоторые сложные шаблоны и начинаем понимать, как различные конфигурацииStreams
Соединив вместе, освойте эти функции, которые делают потоковую передачу такой универсальной и мощной.
Если мы столкнулись с невозможностью использоватьStream
Для достижения функции мы можем передать другиеStreams
соединены вместе для достижения этогоNode.js
приятная особенность ;Streams
При работе с двоичными данными строки и объекты очень полезны и имеют разные характеристики.
В следующей главе мы сосредоточимся на традиционных шаблонах объектно-ориентированного проектирования. несмотря на то чтоJavaScript
Объектно-ориентированный язык в некоторой степени, но вNode.js
обычно предпочтительнее использовать функциональный или гибридный подход. Прочтите следующую главу, чтобы узнать ответ.