Незаменимый Stream в Node.js

Node.js задняя часть

1. Что такое поток

  • Поток в Node.js — это абстрактный интерфейс для обработки потоковых данных. Модуль потока предоставляет базовый API. Используя эти API, легко создавать объекты, реализующие потоковый интерфейс. Например, HTTP-запросы и process.stdout являются экземплярами потоков.
  • Потоки могут быть доступны для чтения, записи или и того, и другого.Обратите внимание, что все потоки являются экземплярами EventEmitter..

Тип потока

В Node.js существует четыре основных типа потоков:

  1. Readable — читаемый поток (например, fs.createReadStream()).
  2. Writable — доступный для записи поток (например, fs.createWriteStream()).
  3. Duplex — поток чтения-записи (дуплексный поток) (например, net.Socket).
  4. Transform — дуплексный поток, который может изменять и преобразовывать данные во время чтения и записи (например, zlib.createDeflate()).
var Stream = require('stream') //stream 模块引入方式

var Readable = Stream.Readable //可读的流
var Writable = Stream.Writable //可写的流
var Duplex = Stream.Duplex //可读写的流
var Transform = Stream.Transform //在读写过程中可以修改和变换数据的 Duplex 流

Потоковые операции в Node.js инкапсулированы в модуль Stream, на который также ссылаются несколько основных модулей. Например, в реализации исходного кода fs.createReadStream() и fs.createWriteStream() абстрактный интерфейс, предоставляемый модулем Stream, вызывается для реализации операции с потоковыми данными.

В-третьих, зачем использовать поток?

Давайте посмотрим, почему мы должны использовать Stream, на двух примерах.

Опыт1:

Вот пример чтения содержимого файла:

const fs = require('fs')

fs.readFile(file, function (err, content) { //读出来的content是Buffer
  console.log(content)
  console.log(content.toString())
})

Но если содержимое файла большое, например 500 МБ, результат выполнения приведенного выше кода будет следующим:

<Buffer 64 74 09 75 61 09 63 6f 75 6e 74 0a 0a 64 74 09 75 61 09 63 6f 75 6e 74 0a 32 30 31 35 31 32 30 38 09 4d 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 63 6f 6d ... >
buffer.js:382
    throw new Error('toString failed');
    ^

Error: toString failed
    at Buffer.toString (buffer.js:382:11)

Причина ошибки в том, что длина объекта буфера содержимого слишком велика, что приводит к сбою метода toString. Видно, что такой способ получения всего содержимого за один раз не подходит для работы с большими файлами.

Рассмотрите возможность использования потоков для чтения содержимого файла.

var fs = require('fs')

fs.createReadStream(bigFile).pipe(process.stdout) 

Fs.createReadStream создает удобочитаемый поток, соединяющий источник (вверх по течению, файл) и потребитель (вниз по течению, стандартный вывод).

Когда приведенный выше код выполняется, поток последовательно вызывает fs.read (в исходном коде класса ReadStream есть метод _read, этот метод _read внутренне вызывает fs.read для чтения файла) и преобразует содержимое файла файл Вынимать партиями и передавать его вниз по течению.

С точки зрения файла его содержимое берется последовательно порциями.

С точки зрения нисходящего потока он получает последовательность данных, поступающих один за другим.

Он может обрабатывать часть данных и отбрасывать их, если ему не нужно работать со всем сразу.

С точки зрения потока, он в любой момент времени хранит только часть данных в файле, но содержимое меняется.

Эта ситуация похожа на забор воды из бассейна шлангом.

Всякий раз, когда тратится немного воды, немного воды уходит из пруда.

Независимо от того, насколько большой бассейн, он хранит столько же воды, сколько и объем водопроводной трубы.

Опыт2:

Ниже приведен пример просмотра видео онлайн. Предположим, мы возвращаем видеоконтент пользователю через HTTP-запрос.

const http = require('http');
const fs = require('fs');
 
http.createServer((req, res) => {
    fs.readFile(videoPath, (err, data) => {
    res.end(data);
});
}).listen(8080);

Но есть две очевидные проблемы с этим

  1. Видеофайл должен быть полностью прочитан, прежде чем его можно будет вернуть пользователю, поэтому время ожидания будет очень долгим.
  2. Видеофайлы все помещаются в память одновременно, и память этого не выдерживает.

При потоковой передаче вы можете считывать видеофайл в память по крупицам, а затем по крупицам возвращать его пользователю, читать часть и записывать часть. (с использованием функции Transfer-Encoding: сегментированной передачи протокола HTTP) взаимодействие с пользователем оптимизировано, а накладные расходы памяти значительно снижены.

const http = require('http');
const fs = require('fs');
 
http.createServer((req, res) => {
    fs.createReadStream(videoPath).pipe(res);
}).listen(8080);

Из приведенных выше двух примеров мы знаем, что потоковая обработка должна использоваться в случае больших данных.

4. Читаемый поток

Читаемые потоки — это абстракции от источников, предоставляющих данные.

Общие читаемые потоки:

  • HTTP responses, on the client
  • HTTP requests, on the server
  • fs read streams
  • Сокеты TCP //сокеты — это дуплексный поток, который можно читать и писать
  • process.stdin //Стандартный ввод

Все доступные для чтения потоки реализуют интерфейс, определенный классом stream.Readable.

Два режима для читаемых потоков (текущий и приостановленный)

  1. В потоковом режиме читаемый поток автоматически считывает данные с нижнего уровня системы и предоставляет данные приложению как можно быстрее через события интерфейса EventEmitter (Все потоки являются экземплярами EventEmitter).

  2. В режиме паузы метод stream.read() должен вызываться явно, чтобы считать фрагменты данных из потока.

Создайте Readable поток потока.По умолчанию установлен режим non-flow (режим паузы), и по умолчанию данные не будут читаться. Все доступные для чтения потоки, чей исходный рабочий режим приостановлен, можно переключить в непрерывный режим следующими тремя способами:

  • Слушайте событие «данные»
  • Вызов метода stream.resume()
  • Вызов метода stream.pipe() для отправки данных в Writable

fs.createReadStream(path[ options]) реализация исходного кода

//文件名 ReadStream.js
let fs = require('fs');//读取文件
let EventEmitter = require('events');
class ReadStream extends EventEmitter {//流操作都是基于事件的
  constructor(path, options = {}) {
    super();
    //需要的参数
    this.path = path;//读取文件的路径
    this.highWaterMark = options.highWaterMark || 64 * 1024;//缓冲区大小,默认64KB
    this.autoClose = options.autoClose || true;//是否需要自动关闭文件描述符,默认为true
    this.start = options.start || 0; //options 可以包括 start 和 end 值,使其可以从文件读取一定范围的字节而不是整个文件
    this.pos = this.start; // 从文件的那个位置开始读取内容,pos会随着读取的位置而改变
    this.end = options.end || null; // null表示没传递
    this.encoding = options.encoding || null;
    this.flags = options.flags || 'r';//以何种方式操作文件

    // 参数的问题
    this.flowing = null; // 默认为非流动模式
    // 建一个buffer存放读出来的数据
    this.buffer = Buffer.alloc(this.highWaterMark);
    this.open(); 
    // {newListener:[fn]}
    // 次方法默认同步调用的
    this.on('newListener', (type) => { // 等待着 它监听data事件
      if (type === 'data') {//当监听到data事件时,把流设置为流动模式
        this.flowing = true;
        this.read();// 开始读取 客户已经监听了data事件
      }
    })
  }
  pause(){//将流从flowing模式切换为paused模式
    this.flowing = false;
  }
  resume(){//将流从paused模式切换为flowing模式
    this.flowing =true;
    this.read();//将流从paused模式切换为flowing模式后,继续读取文件内容
  }
  read(){ // 默认第一次调用read方法时还没有获取fd,文件的打开是异步的,所以不能直接读
    if(typeof this.fd !== 'number'){ //如果fd不是number类型,证明文件还没有打开,此时需要监听一次open事件,因为文件一打开,就会触发open事件,这个在this.open()里写了
       return this.once('open',() => this.read()); // 等待着触发open事件后fd肯定拿到了,拿到以后再去执行read方法
    }
    // 当获取到fd时 开始读取文件了
    // 第一次应该读2个 第二次应该读2个
    // 第二次pos的值是4 end是4
    // 读取文件里一共4有个数为123 4,我们读取里面的123 4
    let howMuchToRead = this.end?Math.min(this.end-this.pos+1,this.highWaterMark): this.highWaterMark;//规定每次读取多少个字节
    fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (error, byteRead) => { // byteRead为真实的读到了几个字节的内容
      // 读取完毕
      this.pos += byteRead; // 读出来两个,pos位置就往后移两位
      // this.buffer默认就是三个
      let b = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);//对读出来的内容进行编码
      this.emit('data', b);//触发data事件,将读到的内容输出给用户
      if ((byteRead === this.highWaterMark)&&this.flowing){
        return this.read(); // 继续读
      }
      // 这里就是没有更多的逻辑了
      if (byteRead < this.highWaterMark){
        // 没有更多了
        this.emit('end'); // 读取完毕
        this.destroy(); // 销毁即可
      }
    });
  }
  // 打开文件用的
  destroy() {
    if (typeof this.fd != 'number') { return this.emit('close'); } //如果文件还没打开,直接触发close事件
    fs.close(this.fd, () => {
      // 如果文件打开过了 那就关闭文件并且触发close事件
      this.emit('close');
    });
  }
  open() {
    fs.open(this.path, this.flags, (err, fd) => { //fd是文件描述符,它标识的就是当前this.path这个文件,从3开始(number类型)
      if (err) {
        if (this.autoClose) { // 如果需要自动关闭我再去销毁fd
          this.destroy(); // 销毁(关闭文件,触发关闭事件)
        }
        this.emit('error', err); // 如果有错误触发error事件
        return;
      }
      this.fd = fd; // 保存文件描述符
      this.emit('open', this.fd); // 文件被打开了,触发文件被打开的方法
    });
  }
  pipe(dest){//管道流的实现 pipe()方法是ReadStream下的方法,它里面的参数是WritableStream
    this.on('data',(data)=>{
      let flag = dest.write(data);
      if(!flag){//这个flag就是每次调用ws.write()后返回的读状态值
        this.pause();// 已经不能继续写了,等他写完了再恢复
      }
    });
    dest.on('drain',()=>{//当读取缓存区清空后
      console.log('写一下停一下')
      this.resume();//继续往dest写入数据
    });
  }
}
module.exports = ReadStream;//导出可读流

Используйте fs.createReadStream()

// 流:有序的有方向的,可以自己控制速率
// 读:读是将内容读取到内存中 
// 写:写是将内存或者文件的内容写入到文件内
// 读取的时候默认读 默认一次读取64k,encoding 读取出来的内容默认都是buffer
//let fs = require('fs');
//let rs = fs.createReadStream({...});//原生实现可读流
let ReadStream = require('./ReadStream');
let rs = new ReadStream('./2.txt', {
  highWaterMark: 3, // 字节
  flags:'r',//读文件
  autoClose:true, // 默认读取完毕后自动关闭文件描述符
  start:0,
  //end:3,// 流是闭合区间 包start也包end
  encoding:'utf8'
});
// 默认创建一个流 是非流动模式(上述源码中有写的),默认不会读取数据
// 如果我们需要接收数据,那我们要监听data事件,这样数据会自动的流出来
rs.on('error',function (err) {// 通常,这会在底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。
  console.log(err)
});
rs.on('open',function () {//文件被打开了,获取到了fd。内部会自动的触发这个事件 rs.emit('data'); 
  console.log('文件打开了');
});
rs.on('data',function (data) {//有数据流出来了
  console.log(data);
  rs.pause(); // 暂停触发on('data')事件,将流动模式又转化成了非流动模式
});
setTimeout(()=>{rs.resume()},3000);//三秒钟之后再将非流动模式转化为流动模式
rs.on('end',function () {// 读取完毕
  console.log('读取完毕了');
});
rs.on('close',function () {//close 事件将在流或其底层资源(比如一个文件)关闭后触发。close 事件触发后,该流将不会再触发任何事件。
  //console.log('关闭')
});

4. Доступный для записи поток

Поток с возможностью записи — это абстракция потока данных на устройство, которое используется для потребления данных, поступающих от восходящего потока.Данные могут быть записаны на устройство с помощью программы потока с возможностью записи.Обычными являются файлы на локальном диске или сетевые ответы. такие как TCP и HTTP.

Общие доступные для записи потоки:

  • HTTP requests, on the client
  • HTTP responses, on the server
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdin
  • process.stdout, process.stderr

Все доступные для записи потоки реализуют интерфейс, определенный классом stream.Writable.

Использование записываемых потоков

Запишите данные в доступный для записи поток, вызвав метод write() экземпляра доступного для записи потока.

const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);
 
rs.setEncoding('utf-8'); // 设置编码格式
rs.on('data', chunk => {
ws.write(chunk); // 写入数据
});

Прослушивание события данных потока для чтения приведет к тому, что поток для чтения перейдет в режим потоковой передачи.Мы вызываем метод write() потока для записи в событии обратного вызова, чтобы данные записывались на устройство destPath абстракции потока для записи. .

Метод write() имеет три параметра.

  • chunk {String| Buffer}, представляющий данные для записи
  • кодировка Когда записанные данные представляют собой строку, кодировку можно установить
  • callback Функция обратного вызова после записи данных

событие слива

Если вызов метода stream.write(chunk) возвращает false, это означает, что текущий буфер заполнен, и поток вызовет событие слива в соответствующее время (после того, как буфер опустеет).

const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);
 
rs.setEncoding('utf-8'); // 设置编码格式
rs.on('data', chunk => {
let flag = ws.write(chunk); // 写入数据
if (!flag) { // 如果缓存区已满暂停读取
rs.pause();
}
});
 
ws.on('drain', () => {
rs.resume(); // 缓存区已清空 继续读取写入
});

fs.createWriteStream(path[ options]) реализация исходного кода

// 文件 WriteStream.js
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter {
  constructor(path, options = {}) {
    super();
    this.path = path;
    this.flags = options.flags || 'w';
    this.encoding = options.encoding || 'utf8';
    this.start = options.start || 0;
    this.pos = this.start;
    this.mode = options.mode || 0o666;
    this.autoClose = options.autoClose || true;
    this.highWaterMark = options.highWaterMark || 16 * 1024;
    this.open(); // fd 异步的  //触发一个open事件,当触发open事件后fd肯定就存在了

    // 写文件的时候 需要的参数有哪些
    // 第一次写入是真的往文件里写
    this.writing = false; // 默认第一次就不是正在写入
    // 用简单的数组来模拟一下缓存
    this.cache = [];
    // 维护一个变量,表示缓存的长度
    this.len = 0;
    // 是否触发drain事件
    this.needDrain = false;
  }
  clearBuffer() {
    let buffer = this.cache.shift();
    if (buffer) { // 如果缓存里有
      this._write(buffer.chunk, buffer.encoding, () => this.clearBuffer());
    } else {// 如果缓存里没有了
      if (this.needDrain) { // 需要触发drain事件
        this.writing = false; // 告诉下次直接写就可以了 不需要写到内存中了
        this.needDrain = false;
        this.emit('drain');
      }
    }
  }
  _write(chunk, encoding, clearBuffer) { // 因为write方法是同步调用的此时fd还没有获取到,所以等待获取到再执行write操作
    if (typeof this.fd != 'number') {
      return this.once('open', () => this._write(chunk, encoding, clearBuffer));
    }
    fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, byteWritten) => {
      this.pos += byteWritten;
      this.len -= byteWritten; // 每次写入后就要在内存中减少一下
      clearBuffer(); // 第一次就写完了
    })
  }
  write(chunk, encoding = this.encoding) { // 客户调用的是write方法去写入内容
    // 要判断 chunk必须是buffer或者字符串 为了统一,如果传递的是字符串也要转成buffer
    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
    this.len += chunk.length; // 维护缓存的长度 3
    let ret = this.len < this.highWaterMark;
    if (!ret) {
      this.needDrain = true; // 表示需要触发drain事件
    }
    if (this.writing) { // 表示正在写入,应该放到内存中
      this.cache.push({
        chunk,
        encoding,
      });
    } else { // 第一次
      this.writing = true;
      this._write(chunk, encoding, () => this.clearBuffer()); // 专门实现写的方法
    }
    return ret; // 能不能继续写了,false表示下次的写的时候就要占用更多内存了
  }
  destroy() {
    if (typeof this.fd != 'number') {
      this.emit('close');
    } else {
      fs.close(this.fd, () => {
        this.emit('close');
      });
    }
  }
  open() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
      if (err) {
        this.emit('error', err);
        if (this.autoClose) {
          this.destroy(); // 如果自动关闭就销毁文件描述符
        }
        return;
      }
      this.fd = fd;
      this.emit('open', this.fd);
    });
  }
}
module.exports = WriteStream;

Используйте fs.createWriteStream()

// 可写流有缓存区的概念
// 1.第一次写入是真的向文件里写,第二次在写入的时候是放到了缓存区里
// 2.写入时会返回一个boolean类型,返回为false时表示缓存区满了,不要再写入了
// 3.当内存和正在写入的内容消耗完后,会触发一个drain事件
//let fs = require('fs');
//let rs = fs.createWriteStream({...});//原生实现可写流
let WS = require('./WriteStream')
let ws = new WS('./2.txt', {
  flags: 'w', // 写入文件,默认文件不存在会创建
  highWaterMark: 1, // 设置当前缓存区的大小
  encoding: 'utf8', // 文件里存放的都是二进制
  start: 0,
  autoClose: true, // 自动关闭文件描述符
  mode: 0o666, // 可读可写
});
// drain的触发时机,只有当highWaterMark填满时,才可能触发drain
// 当嘴里的和地下的都吃完了,就会触发drain方法
let i = 9;
function write() {
  let flag = true;
  while (flag && i >= 0) {
    i--;
    flag = ws.write('111'); // 987 // 654 // 321 // 0
    console.log(flag)
  }
}
write();
ws.on('drain', function () {
  console.log('dry');
  write();
});

Суммировать

stream(流)分为可读流(flowing mode和paused mode)、可写流、可读写流,Node.js 提供了多种流对象。 Например, HTTP-запросы и process.stdout являются экземплярами потоков. Модуль потока предоставляет базовый API. Используя эти API, легко создавать объекты, реализующие потоковый интерфейс.它们底层都调用了stream模块并进行封装。