Node.js Операция Данные по запросу Используя интерфейс Sream API,streamЭто набор данных, и данные могут быть доступны не все сразу, они находятся в буфере и не должны быть в памяти. Подходит для обработки больших наборов данных или блоков данных из внешних источников данных Многие встроенные модули в Node реализуют потоковый интерфейс:
Собственные объекты Node.js в приведенном выше списке являются потоковыми объектами с возможностью чтения и записи. Некоторые объекты являются как доступными для чтения, так и записываемыми потоками, такими как сокеты TCP, zlib и криптопотоки.
Эти объекты тесно связаны. Когда ответ HTTP представляет собой поток, доступный для чтения на стороне клиента, соответствующий поток является потоком, доступным для записи, на стороне сервера. Это связано с тем, что в случае HTTP мы пишем из другого объекта (http.ServerResponse) на основе чтения из одного объекта (http.IncomingMessage).
потоки stdio (stdin, stdout, stderr) будут иметь обратные типы потоков в дочернем процессе. Таким образом, вы можете очень просто подключиться к другим потокам или потоку stdio основного процесса.
В Node.js есть 4 основных типа потоков:
- Читаемый поток (читаемый)
- Доступный для записи поток (доступный для записи)
- Дуплекс
- Преобразование потоков
- Доступный для чтения поток — это абстракция источника данных, который можно использовать, типичным примером является метод fs.createReadStream.
- Доступный для записи поток — это абстракция назначения, в которое могут быть записаны данные, типичным примером является метод fs.createWriteStream.
- Дуплексный поток доступен как для чтения, так и для записи, типичным примером является сокет TCP.
- Потоки преобразования основаны на дуплексных потоках, которые можно использовать для изменения или преобразования данных по мере их записи и чтения. zlib.createGzip — это пример потока преобразования, использующего gzip для сжатия данных. Вы можете думать о потоке преобразования как о функции, вход которой является потоком для записи, а выход — потоком для чтения.Возможно, вы также слышали, что потоки преобразования называются «сквозными потоками».
Все потоки являются экземплярами EventEmitter. Они генерируют события, когда данные доступны для чтения или записи. Однако мы также можем использовать потоковые данные просто с помощью метода канала.
трубный метод:
**readable**.pipe(**writableDest**)
Эта простая строка соединяет выход читаемого потока — исходные данные с входом записываемого потока — назначения. Источник должен быть потоком, доступным для чтения, а пункт назначения должен быть потоком, доступным для записи. Конечно, это также может быть дуплексный поток или поток преобразования.На самом деле, если соединение является дуплексным потоком, вы можете вызвать pipe в цепочке:
readable
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(finalWrtitableDest)
Метод pipe возвращает поток назначения, что позволяет нам выполнять описанные выше связанные вызовы. Для потоков a (доступный для чтения), b и c (дуплексный) и d (доступный для записи)
a.pipe(b).pipe(c).pipe(d)
上面等价于
a.pipe(b)
b.pipe(c)
c.pipe(d)
трубка Метод является самым простым способом использования потоков. Как правило, рекомендуется использовать метод трубы или использовать события для обработки потоков, но избегайте смешивания двух. Обычно, когда вы используете метод трубы, вам не нужно использовать события, но если вам нужно справиться с потоками более индивидуально, вы можете просто использовать события.
Потоковые события
В дополнение к чтению данных из потоков, доступных для чтения, и записи данных в целевые объекты потока, доступные для записи, метод канала будет автоматически управлять несколькими вещами по пути. Например, он обрабатывает ошибки, конец файла и ситуации, когда один поток медленнее или быстрее другого.
Однако мы также можем напрямую использовать события для управления потоками. Ниже приведен упрощенный эквивалентный код для событий, в которых метод канала в основном используется для чтения и записи данных:
# readable.pipe(writable) 等于下面
readable.on('data', (chunk) => {
writable.write(chunk);
});
readable.on('end', () => {
writable.end();
});
Ниже приведены важные события и доступные методы для потоков чтения и записи:
Эти события и функции каким-то образом связаны, потому что они часто используются вместе.
Наиболее важными событиями в читаемом потоке являются:
-
data
Событие, когда поток передает блок данных потребителю, оно сработает. -
end
Событие, срабатывающее, когда данные из потока больше не потребляются.
Наиболее важными событиями в доступном для записи потоке являются:
-
drain
событие, которое является сигналом того, что доступный для записи поток может получить больше данных. -
finish
Событие, которое запускается, когда все данные передаются базовой системе.
События и функции можно комбинировать для настройки и оптимизации использования потоков. Используя читаемый поток, мы можем использоватьpipe
/ unpipe
Метод илиread
/ Unshift
/возобновить метод. Используя записываемый поток, мы можем назвать егоpipe
/ unpipe
пункт назначения или напишите егоwrite
вызов методаend
метод, когда мы закончим.
Режимы паузы и потока для читаемых потоков
Есть два основных режима читаемых потоков, которые влияют на то, как мы можем их использовать:
- Они могут быть
暂停(paused)
модель - или
流(flowing)
модель
Эти режимы иногда называют режимами вытягивания и выталкивания.
Все доступные для чтения потоки по умолчанию запускаются в режиме паузы, но при необходимости можно легко переключиться в режим потоковой передачи или вернуться в состояние паузы. Иногда преобразование происходит автоматически.
Когда доступный для чтения поток находится в режиме паузы, мы можем использовать метод read() для чтения данных из потока по требованию, однако в доступном для чтения потоке в потоковом режиме данные передаются постоянно, и нам нужно прослушивать события, чтобы использовать эти данные.
Режим притока, если пользователь обрабатывается, то фактические данные будут потеряны. Вот почему нам нужно событие DATA, когда у нас есть читаемый поток в потоковом режиме. Фактически, пока вы добавляете событие DATA, вы можете преобразовать режим паузы в режим потока, удалить событие DATA, и поток снова переключится в режим паузы. Некоторые из этих действий совместимы со старым интерфейсом потока узлов.
Ручное переключение между двумя схемами потока может быть использовано методами возобновления () и паузы ().
При использовании метода канала для чтения читаемого потока нам не нужно беспокоиться об этих режимах, поскольку канал управляет ими автоматически.
поток реализации
Когда мы говорим о потоках в Node.js, в основном есть две разные задачи:
- выполнитьпоток.
- использоватьпоток.
Потоковые реализации обычно требуютstream
модуль.
реализовать доступный для записи поток
Чтобы реализовать поток с возможностью записи, нам нужно использовать конструктор Writable из модуля потоков.
const { Writable } = require('stream');
У нас есть много способов реализовать записываемый поток. Например, мы можем наследовать конструктор Writable, если захотим.
class myWritableStream extends Writable {
}
Вот простой подход конструктора. Мы просто передаем некоторые параметры конструктору Writable и создаем объект. Единственная необходимая опция — это функция Writable, которая показывает, куда должен быть записан фрагмент данных.
const { Writable } = require('stream');
const outStream = new Writable({
**write**(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
process.stdin.pipe(outStream);
Функция записи имеет 3 параметра:
- chunkОбычно это буфер, если только мы не настроим другой поток.
- encodingпараметр, требуемый в конкретном случае, обычно его можно игнорировать.
- callbackэто функция, которую необходимо вызвать после завершения обработки блока данных. Это признак успеха или неудачи записи данных. Чтобы сигнализировать об ошибке, вызовите функцию обратного вызова с объектом ошибки.
В outstream мы просто используем console.log для вывода фрагмента данных на консоль в виде строки, а затем вызываем обратный вызов без объекта ошибки, чтобы указать на успех. Это очень простой и, вероятно, не очень полезный эхо-поток, который выводит все полученные данные на консоль.
Чтобы использовать этот поток, мы можем напрямую использовать читаемый поток process.stdin, и мы можем передать process.stdin в outStream.
Выполняя приведенный выше код, все, что мы вводим в process.stdin, будет выведено на консоль с помощью console.log outStream.
Для достижения этого потока не очень полезно, поскольку он фактически достигается и встроенным узлом, который эквивалентен process.stdout. Следующая строка кода предназначена для stdin pipe to stdout, эффект может быть достигнут раньше:
process.stdin.pipe(process.stdout);
Реализовать читаемый поток
Чтобы реализовать читаемый поток, обратитесь к интерфейсу Readable и используйте его для создания нового объекта:
const { Readable } = require('stream');
const inStream = new Readable({});
Существует простой способ реализации читаемых потоков. Мы можем напрямую вытолкнуть данные для использования.
const { Readable } = require('stream');
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);
Когда мы отправляем нулевой объект, это означает, что мы хотим сигнализировать о том, что для потока больше нет данных.
Используя этот доступный для записи поток, вы можете направить его непосредственно в доступный для записи поток process.stdout.
Выполнение приведенного выше кода будет считывать все данные из inStream и выводить их в стандартный поток вывода. Просто и не очень полезно.
По сути, мы отправляем все данные в поток перед передачей в process.stdout . Лучший подход — нажимать по требованию. Мы можем сделать это, реализуя метод read() в конфигурации читаемого потока:
const inStream = new Readable({
**read**(size) {
// there is a demand on the data... Someone wants to read it.
}
});
При вызове метода чтения в доступном для чтения потоке реализация может поместить часть данных в очередь. Например, мы можем вводить по одной букве за раз, начиная с кода символа 65 (что означает A), и увеличивать его на 1 при каждом нажатии:
const inStream = new Readable({
read(size) {
**this.push**(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
**this.push**(null);
}
}
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);
При чтении данных из читаемого потока метод чтения будет вызываться постоянно, и мы будем проталкивать больше букв. Нам нужно условие, чтобы остановить этот цикл, поэтому оператор if выталкивает ноль, когда currentcharcode больше 90 (представляет Z).
Этот код эквивалентен более простому коду, с которого мы начали, но мы отправляем данные по запросу, когда пользователь запрашивает их. Вы должны делать это часто.