Чистый поток в Node.js: понимание основных концепций Stream

Node.js

Разработчики, знакомые с Node.js, могут знать, что концепцию потока (Stream) сложно понять и с ней не так просто работать.

Эта статья призвана помочь вам понять концепцию потоков и способы их использования. Не волнуйся, ты разберешься.

Что такое поток?

Потоки — одна из фундаментальных концепций, управляющих приложениями Node.js. Это метод обработки данных для последовательного чтения и записи ввода в вывод.

Потоки — это эффективный способ обработки чтения и записи файлов, сетевого взаимодействия или любого сквозного обмена информацией.

Потоки уникальны тем, что вместо того, чтобы считывать файлы в память по одному, как традиционная программа, они считывают данные блок за блоком, обрабатывая их содержимое, а не сохраняя все в памяти.

Это делает потоковую обработкуБольшие объемы данныхЭто очень полезно, когда, например, файл может быть больше, чем ваша свободная память, и невозможно прочитать весь файл в память, чтобы обработать его, и тогда в игру вступают потоки.

Возьмем в качестве примера потоковые сервисы, такие как YouTube или Netflix: вместо того, чтобы загружать сразу полное видео и аудио, браузер обрабатывает видео как непрерывный поток фрагментов, которые пользователь может просмотреть немедленно.

Однако потоковая передача предназначена не только для обработки мультимедиа или больших данных, она также обеспечивает «составимость» кода. Проектирование с учетом возможности компоновки означает, что несколько компонентов могут быть объединены определенным образом для получения одного и того же результата. В Node.js мощные фрагменты можно создавать, используя потоки для импорта или экспорта данных из других небольших фрагментов.

Зачем использовать потоки

Потоки имеют два основных преимущества перед другими методами обработки данных:

  1. Эффективность памяти:Может обрабатываться без загрузки большого количества данных в память
  2. Эффективность времени:Начните обработку, как только у вас появятся данные, вместо того, чтобы ждать, пока все данные будут переданы

4 потока в Node.js

  1. Доступный для записи поток:Поток, в который могут быть записаны данные. Напримерfs.createWriteStream()Данные могут быть записаны в файлы с использованием потоков.
  2. Читаемый поток:Поток, из которого данные могут быть прочитаны. Напримерfs.createReadStream()Его можно прочитать из файла.
  3. Дуплексный поток:Поток, доступный как для чтения, так и для записи. напримерnet.Socket.
  4. Трансформировать поток:Поток данных, который можно изменять или преобразовывать при записи и чтении. Например, в операции сжатия файла сжатые данные могут быть записаны в файл, а распакованные данные считаны из файла.

Если вы использовали Node.js, вы, вероятно, сталкивались с потоками. Например, на HTTP-сервере на основе Node.jsrequestчитаемый поток,responseявляется доступным для записи потоком. а такжеfsМодуль, который обрабатывает файловые потоки как для чтения, так и для записи. Всякий раз, когда вы используете Express, вы взаимодействуете с клиентами, используя потоки, которые также используются в различных драйверах подключения к базе данных, поскольку сокеты TCP, стеки TLS и другие соединения основаны на потоках Node.js.

Как создать читаемый поток

Импортируйте модуль и инициализируйте его:

const Stream = require('stream')
const readableStream = new Stream.Readable()

После инициализации на него можно отправлять данные:

readableStream.push('ping!')
readableStream.push('pong!')

асинхронный итератор

Настоятельно рекомендуется использовать асинхронные итераторы при работе с потоками.. Асинхронная итерация — это протокол для асинхронного извлечения содержимого контейнера данных, что означает, что текущая «задача» может быть приостановлена ​​перед извлечением элемента данных. Кроме того, стоит упомянуть, что внутренняя реализация асинхронного итератора потока используетreadableмероприятие.

При чтении данных из читаемого потока можно использовать асинхронный итератор:

import * as fs from 'fs';

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}

const readable = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);

// Output:
// 'This is a test!\n'

Также можно собрать содержимое читаемого потока в строку:

import { Readable } from 'stream';

async function readableToString2(readable) {
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const readable = Readable.from('Good morning!', { encoding: 'utf8' });
assert.equal(await readableToString2(readable), 'Good morning!');

Обратите внимание, что в этом примере мы должны использовать асинхронную функцию, потому что мы хотим вернуть обещание.

Помните, что нельзя использовать асинхронные функции сEventEmitterЗапутан, потому что отклонения, выдаваемые обработчиками событий, в настоящее время не могут быть перехвачены, что приводит к трудно отслеживаемым ошибкам и утечкам памяти. В настоящее время рекомендуется всегда инкапсулировать содержимое асинхронной функции вtry/catchблокировать и обрабатывать ошибки, но это подвержено ошибкам.этот запрос на вытягиваниеИменно для решения этой проблемы, если его можно добавить в основной код Node.

Readable.from(): создать читаемый поток из итераций.

stream.Readable.from(iterable, [options])это служебный метод для создания читаемого потока из итератора, где итерируемый содержит данные. Iterable может быть итерируемым синхронно или асинхронно.optionsЭто необязательно и может использоваться для указания кодирования текста.

const { Readable } = require('stream');

async function * generate() {
  yield 'hello';
  yield 'streams';
}

const readable = Readable.from(generate());

readable.on('data', (chunk) => {
  console.log(chunk);
});

Два вида режимов чтения

согласно с Streams API, читаемый поток имеет два режима работы:flowing а такжеpaused. Доступные для чтения потоки могут находиться в объектном или необъектном режиме, независимо от того, находится ли поток в потоковом режиме или в режиме паузы.

  • существуетпроточный режимданные автоматически читаются из базовой системы и передаются черезEventEmitterИнтерфейс предоставляется приложению на максимально возможной скорости.

  • существуетрежим паузы, должен явно вызыватьstream.read()метод для чтения фрагментов данных из потока.

в проточном режимеПосередине надо считать данные из потока, можно прослушатьdataсобытия и связывание обратных вызовов. Доступный для чтения поток испускается, когда доступен фрагмент данныхdataсобытие и выполнить обратный вызов. код показывает, как показано ниже:

var fs = require("fs");
var data = '';

var readerStream = fs.createReadStream('file.txt'); //Create a readable stream

readerStream.setEncoding('UTF8'); // Set the encoding to be utf8\. 

// 处理 stream 事件 --> data, end, 和 error
readerStream.on('data', function(chunk) {
   data += chunk;
});

readerStream.on('end',function() {
   console.log(data);
});

readerStream.on('error', function(err) {
   console.log(err.stack);
});

console.log("Program Ended");

вызов функцииfs.createReadStream()Предоставляется читаемый поток. В начале поток находится в состоянии покоя. просто слушайdataсобытие и привязать обратный вызов, и он начинает течь. Затем блок данных считывается и передается обратному вызову. Исполнитель потока может решитьdataЧастота, с которой генерируется событие. Например, HTTP-запрос может выполняться через каждые несколько КБ считанных данных.dataмероприятие. Когда вы читаете данные из файла, вы можете столкнуться с проблемой чтения каждой строки.dataмероприятие.

Поток испускается, когда больше нет данных для чтения (достигает хвоста)endмероприятие. В приведенном выше коде мы прослушиваем это событие, чтобы получить уведомление, когда оно закончится.

Кроме того, если есть ошибка, поток выдаст ошибку и уведомит.

в режиме паузы, вам просто нужно повторно вызывать экземпляр потокаread(), пока каждый фрагмент данных не будет прочитан следующим образом:

var fs = require('fs');
var readableStream = fs.createReadStream('file.txt');
var data = '';
var chunk;

readableStream.on('readable', function() {
    while ((chunk=readableStream.read()) != null) {
        data += chunk;
    }
});

readableStream.on('end', function() {
    console.log(data)
});

read()Функция считывает некоторые данные из внутреннего буфера и возвращает значение. Когда нечего читать, возвращаетсяnull. Таким образом, вwhileВ цикле проверяемnullи завершить цикл. осторожность,readableСобытия генерируются, когда часть данных может быть прочитана из потока.


всеReadableпоток данных срежим паузыдля запуска, но может быть переключен напроточный режим:

  • Добавить кdataобработчик события
  • передача stream.resume()метод
  • передача stream.pipe()способ отправки данных вWritable

ReadableЕсть несколько способов вернуться в режим паузы:

  • Если нет цели канала, вызовитеstream.pause()метод
  • Если есть целевые каналы, удалите все целевые каналы. позвонивstream.unpipe()метод для удаления нескольких целей конвейера.

Важно помнить, что если не предоставлен механизм для потребления или игнорирования данных, в противном случаеReadableДанные не будут генерироваться. Если механизм потребления отключен или отменен,ReadableБудупытатьсяПрекратите генерировать данные. добавить одинreadableОбработчик события автоматически остановит поток и передастreadable.read()данные о потреблении. если удаленоreadableОбработчик событий, то если естьdataОбработчик события, поток запустится снова.

Как создать поток с возможностью записи

Чтобы записать данные в доступный для записи поток, вам нужно вызвать экземпляр потокаwrite(). Следующим образом:

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
    writableStream.write(chunk);
});

Приведенный выше код прост и понятен. Он просто считывает фрагменты данных из входного потока и используетwrite()Пишите в целевое место. Функция возвращает логическое значение, указывающее, была ли операция успешной. Если это правда, запись прошла успешно, и вы можете продолжить запись данных. Если он возвращает false, что-то пошло не так, и в настоящее время ничего нельзя записать. Доступный для записи поток будет испускатьсяdrainСобытия, чтобы уведомить вас, когда вы можете начать писать больше данных.

передачаwritable.end()метод указывает, что больше данные не будут записаныWritable. Если предусмотрена необязательная функция обратного вызова, она будет использоваться какfinishФункция прослушивания событий.

// 写入 'hello, ' 然后以 'world!' 结束
const fs = require('fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// 不允许写更多内容!

С доступными для записи потоками вы можете читать данные из доступных для чтения потоков:

const Stream = require('stream')

const readableStream = new Stream.Readable()
const writableStream = new Stream.Writable()

writableStream._write = (chunk, encoding, next) => {
    console.log(chunk.toString())
    next()
}

readableStream.pipe(writableStream)

readableStream.push('ping!')
readableStream.push('pong!')

writableStream.end()

Вы также можете писать в доступный для записи поток, используя асинхронный итератор, который также рекомендуется:

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';

const finished = util.promisify(stream.finished); // (A)

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if (!writable.write(chunk)) { // (B)
      // 处理反压
      await once(writable, 'drain');
    }
  }
  writable.end(); // (C)
  // 等待完成,如果有错误则抛出
  await finished(writable);
}

await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
  'One line of text.\n');

stream.finished()Версия по умолчанию основана на обратном вызове, но может бытьutil.promisify()Преобразование в версию на основе Promise (строка A).

В этом примере используются следующие два шаблона:

Запись в доступный для записи поток при обработке обратного давления (короткие скачки нагрузки, из-за которых система получает данные намного быстрее, чем может их обработать) (строка B):

if (!writable.write(chunk)) {
  await once(writable, 'drain');
}

Закройте доступный для записи поток и дождитесь завершения записи (строка C):

writable.end();
await finished(writable);

pipeline()

Канал — это механизм, который принимает выходные данные одного потока в качестве входных данных другого потока. Обычно он используется для получения данных из одного потока и передачи вывода этого потока в другой поток. Ограничений на конвейерные операции нет, другими словами, конвейеры используются для поэтапной обработки потоковых данных.

Представлен узел 10.xstream.pipeline(). Это модульный метод для передачи по конвейеру между потоками, пересылки сообщений об ошибках и очистки данных, а также обеспечения обратного вызова после завершения конвейера.

Вот пример использования пайплайна:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// 使用 pipeline API 轻松管理多个管道流,并且在管道全部完成时得到通知
// 一个用来高效压缩超大视频文件的管道

pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

следует использоватьpipelineвместоpipe,потому чтоpipeне безопасно.

Потоковый модуль

Потоковый модуль Node.js— это основа, на которой строятся все потоковые API.

Модуль Stream — это встроенный модуль, который по умолчанию предоставляется в Node.js. Stream — это экземпляр класса EventEmitter, который используется в Node для асинхронной обработки событий. Поэтому потоки по своей сути основаны на событиях.

Чтобы использовать модуль потока, просто:

const stream = require('stream');

streamМодули полезны для создания новых типов экземпляров потока. Обычно нет необходимости использоватьstreamмодули для потребления потоков.

Потоковый API Node.js

Благодаря своим преимуществам многие основные модули Node.js обеспечивают функциональность собственного потока обработки, в частности, их:

  • net.SocketОсновной API-интерфейс узла на основе потока, который лежит в основе большинства следующих API-интерфейсов.
  • process.stdinВозвращает поток, подключенный к стандартному вводу
  • process.stdoutВозвращает поток, подключенный к стандартному выводу
  • process.stderrВернуть поток, подключенный к stderr
  • fs.createReadStream()Создать файлочитаемый поток
  • fs.createWriteStream()Создать поток с возможностью записи в файл
  • net.connect()Инициализировать потоковое соединение
  • http.request() вернутьhttp.ClientRequestЭкземпляр класса, который является доступным для записи потоком
  • zlib.createGzip()Сжать данные в поток, используя gzip (алгоритм сжатия)
  • zlib.createGunzip()Распаковать gzip-поток
  • zlib.createDeflate()Сжатие данных в поток с помощью deflate (алгоритм сжатия)
  • zlib.createInflate()Распаковать дефляционный поток

Шпаргалка по потокам

Типы Функция
Readable поставщик данных
Writable получатель данных
Transform поставщик и получатель
Duplex Поставщик и получатель (независимые)

Для получения дополнительной информации обратитесь к документации:Stream (nodejs.org)

Streams

const Readable = require('stream').Readable
const Writable = require('stream').Writable
const Transform = require('stream').Transform

Трубопровод

clock()              // 可读流
  .pipe(xformer())   // 转换流
  .pipe(renderer())  // 可写流

метод

stream.push(/*...*/)         // Emit a chunk
stream.emit('error', error)  // Raise an error
stream.push(null)            // Close a stream

мероприятие

const st = source() // 假设 source() 是可读流
st.on('data', (data) => { console.log('<-', data) })
st.on('error', (err) => { console.log('!', err.message) })
st.on('close', () => { console.log('** bye') })
st.on('finish', () => { console.log('** bye') })

Текущий режим

// 开启和关闭 flowing 模式
st.resume()
st.pause()
// 自动开启 flowing 模式
st.on('data', /*...*/)

читаемый поток

function clock () {
  const stream = new Readable({
    objectMode: true,
    read() {} // 自己实现 read() 方法,如果要按需读取
  })

  setInterval(() => {
    stream.push({ time: new Date() })
  }, 1000)

  return stream
}

Читаемые потоки — это генераторы данных, которые используютstream.push()ввод данных.

трансформировать поток

function xformer () {
  let count = 0

  return new Transform({
    objectMode: true,
    transform: (data, _, done) => {
      done(null, { ...data, index: count++ })
    }
  })
}

Передайте преобразованный блок данных вdone(null, chunk).

доступный для записи поток

function renderer () {
  return new Writable({
    objectMode: true,
    write: (data, _, done) => {
      console.log('<-', data)
      done()
    }
  })
}

нанизать все вместе

clock()              // 可读流
  .pipe(xformer())   // 转换流
  .pipe(renderer())  // 可写流

Вот некоторые важные события, связанные с доступными для записи потоками:

  • error– Отправляется при возникновении ошибки в операции записи/конвейера.
  • pipeline– Поток с возможностью записи генерирует это событие, когда поток с возможностью чтения передается в поток с возможностью записи.
  • unpipe- когда вы вызываете читаемый потокunpipeи испускается, когда он перестает подавать его в поток назначения.

Суммировать

Это все об основах потоковой передачи. Потоки, конвейеры и цепочки являются основными и наиболее мощными функциями Node.js. Потоки действительно могут помочь вам написать чистый и эффективный код для управления вводом-выводом.

Кроме того, существуетСтратегический план Node.jsназываетсяBOB, цель состоит в том, чтобы улучшить интерфейс потоковой передачи данных Node.js как для использования во внутреннем ядре Node.js, так и для предоставления API в будущем.

Для получения дополнительной технической галантереи, пожалуйста, обратите внимание на публичный аккаунт WeChat: 1024 Translation Station.

微信公众号:1024译站