Краткое изложение использования и реализации Node.js Stream

Node.js Java внешний интерфейс сервер Операционная система JavaScript Android iOS искусственный интеллект дизайн товар
Краткое изложение использования и реализации Node.js Stream

концепция потока

  • Поток — это упорядоченный набор байтовых передач данных с началом и концом.
  • Его не волнует общее содержимое файла, его интересует только то, читаются ли данные из файла, и обработка после чтения данных.
  • Потоки — это абстрактный интерфейс, реализуемый многими объектами в Node. Например, объекты запросов и ответов HTTP-сервера являются потоками.

В Node.js есть четыре основных типа потоков.

  1. Readable — читаемый поток (например, fs.createReadStream()).
  2. Writable — доступный для записи поток (например, fs.createWriteStream()).
  3. Duplex — поток чтения-записи (дуплексный поток) (например, net.Socket).
  4. Transform — поток преобразования Дуплексные потоки, которые могут изменять и преобразовывать данные во время чтения и записи (например, zlib.createDeflate())

Почему поток

Если вы читаете файл и используете fs.readFileSync для его синхронного чтения, программа заблокируется, и все данные будут записаны в память. Используя fs.readFile для чтения, программа не будет блокироваться, но все данные все равно будут записываться в память за один раз, а затем позволять потребителям их читать. Если файл большой, использование памяти становится проблемой. Потоковая передача в этом случае более выгодна. По сравнению с однократной записью в память поток будет сначала записываться в буфер, а затем считываться потребителем, вместо того, чтобы записывать весь файл в память, экономя место в памяти.

1. Когда поток не используется, весь файл будет записан в память, а затем память будет записана в целевой файл.

2. При использовании потока можно управлять скоростью чтения и записи потока.

Использование потоков и реализация

Читаемый поток createReadStream

Использование читаемых потоков

  1. Создайте читаемый поток

    var rs = fs.createReadStream(path,[options]);
    

    1.) путь Путь для чтения файла

    2.)options

    • помечает операцию, которую необходимо выполнить для открытия файла, по умолчанию 'r'
    • кодировка по умолчанию равна нулю
    • начальная индексная позиция, чтобы начать чтение
    • end Позиция индекса конечного показания (включая конечное положение)
    • Размер буфера чтения highWaterMark по умолчанию составляет 64 КБ.

    Если вы укажете кодировку utf8, highWaterMark будет больше 3 байт

  2. Слушайте события данных

    Поток переключается в потоковый режим, данные будут считываться максимально быстро

    rs.on('data', function (data) {
        console.log(data);
    });
    
  3. Слушайте конечное событие

    Это событие будет запущено после чтения данных

    rs.on('end', function () {
        console.log('读取完成');
    });
    
  4. Прослушивание событий ошибок

    rs.on('error', function (err) {
        console.log(err);
    });
    
  5. Слушайте близкое событие

    Тот же эффект, что и при указании {encoding:'utf8'}, установка кодировки

    rs.setEncoding('utf8');
    
  6. Пауза и возобновление данных триггера

    С помощью метода pause() и метода возобновления()

    rs.on('data', function (data) {
        rs.pause();
        console.log(data);
    });
    setTimeout(function () {
        rs.resume();
    },2000);
    

Простая реализация читаемых потоков

  1. Имитирует читаемый поток
    let fs = require('fs');
    let EventEmitter = require('events');
    class ReadStream extends EventEmitter {
      constructor(path, options = {}) {
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.autoClose = options.autoClose || true;
        this.start = options.start || 0; 
        this.pos = this.start; // pos会随着读取的位置改变
        this.end = options.end || null; // null表示没传递
        this.encoding = options.encoding || null;
        this.flags = options.flags || 'r';
    
        // 参数的问题
        this.flowing = null; // 非流动模式
        // 弄一个buffer读出来的数
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.open(); 
        // {newListener:[fn]}
        // 次方法默认同步调用的
        this.on('newListener', (type) => { // 等待着 它监听data事件
          if (type === 'data') {
            this.flowing = true;
            this.read();// 开始读取 客户已经监听了data事件
          }
        })
      }
      pause(){
        this.flowing = false;
      }
      resume(){
        this.flowing =true;
        this.read();
      }
      read(){ // 默认第一次调用read方法时还没有获取fd,所以不能直接读
        if(typeof this.fd !== 'number'){
           return this.once('open',() => this.read()); // 等待着触发open事件后fd肯定拿到了,拿到以后再去执行read方法
        }
        // 当获取到fd时 开始读取文件了
        // 第一次应该读2个 第二次应该读2个
        // 第二次pos的值是4 end是4
        // 一共4个数 123 4
        let howMuchToRead = this.end?Math.min(this.end-this.pos+1,this.highWaterMark): this.highWaterMark;
        fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (error, byteRead) => { // byteRead真实的读到了几个
          // 读取完毕
          this.pos += byteRead; // 都出来两个位置就往后搓两位
          // this.buffer默认就是三个
          let b = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);
          this.emit('data', b);
          if ((byteRead === this.highWaterMark)&&this.flowing){
            return this.read(); // 继续读
          }
          // 这里就是没有更多的逻辑了
          if (byteRead < this.highWaterMark){
            // 没有更多了
            this.emit('end'); // 读取完毕
            this.destroy(); // 销毁即可
          }
        });
      }
      // 打开文件用的
      destroy() {
        if (typeof this.fd != 'number') { return this.emit('close'); }
        fs.close(this.fd, () => {
          // 如果文件打开过了 那就关闭文件并且触发close事件
          this.emit('close');
        });
      }
      open() {
        fs.open(this.path, this.flags, (err, fd) => { //fd标识的就是当前this.path这个文件,从3开始(number类型)
          if (err) {
            if (this.autoClose) { // 如果需要自动关闭我在去销毁fd
              this.destroy(); // 销毁(关闭文件,触发关闭事件)
            }
            this.emit('error', err); // 如果有错误触发error事件
            return;
          }
          this.fd = fd; // 保存文件描述符
          this.emit('open', this.fd); // 触发文件的打开的方法
        });
      }
    }
    module.exports = ReadStream;
    
  2. проверять
    let ReadStream = require('./ReadStream');
    let rs = new ReadStream('./2.txt', {
      highWaterMark: 3, // 字节
      flags:'r',
      autoClose:true, // 默认读取完毕后自动关闭
      start:0,
      //end:3,// 流是闭合区间 包start也包end
      encoding:'utf8'
    });
    // 默认创建一个流 是非流动模式,默认不会读取数据
    // 我们需要接收数据 我们要监听data事件,数据会总动的流出来
    rs.on('error',function (err) {
      console.log(err)
    });
    rs.on('open',function () {
      console.log('文件打开了');
    });
    // 内部会自动的触发这个事件 rs.emit('data');
    rs.on('data',function (data) {
      console.log(data);
      rs.pause(); // 暂停触发on('data')事件,将流动模式又转化成了非流动模式
    });
    setTimeout(()=>{rs.resume()},5000)
    rs.on('end',function () {
      console.log('读取完毕了');
    });
    rs.on('close',function () {
      console.log('关闭')
    });
    

доступный для записи поток createWriteStream

Использование записываемых потоков

  1. Создать доступный для записи поток

    var ws = fs.createWriteStream(path,[options]);
    

    1.) Путь к файлу, записанный путем

    2.)options

    • помечает операцию, которая должна быть выполнена для открытия файла, по умолчанию 'w'
    • кодировка по умолчанию utf8
    • Размер буфера записи highWaterMark по умолчанию составляет 16 КБ.
  2. метод записи

    ws.write(chunk,[encoding],[callback]);
    

    1.) Буфер данных/строка, записанная фрагментом

    2.) Кодирование полезно, когда фрагмент формата кодирования представляет собой строку, необязательную

    3.) callback Обратный вызов после успешной записи

    Возвращаемое значение является логическим значением, ложным, когда системный буфер заполнен, и истинным, когда он не заполнен.

  3. метод конца

    ws.end(chunk,[encoding],[callback]);
    

    Указывает, что нет данных для записи в Writable next. Путем передачи необязательных параметров блока и кодирования можно записать еще один фрагмент данных перед закрытием потока. Если передана необязательная функция обратного вызова, она будет используется в качестве обратного вызова для функции события "finish"

  4. метод слива

    • Когда поток не опустошен, вызов write() буферизует блок данных и возвращает false. Событие 'drain' запускается после того, как все кэшированные в данный момент блоки данных будут опустошены (принято операционной системой для вывода).

    • Рекомендуется, чтобы после того, как write() вернула false, никакие блоки данных не могли быть записаны до тех пор, пока не будет запущено событие 'drain'.

    let fs = require('fs');
    let ws = fs.createWriteStream('./2.txt',{
      flags:'w',
      encoding:'utf8',
      highWaterMark:3
    });
    let i = 10;
    function write(){
     let  flag = true;
     while(i&&flag){
          flag = ws.write("1");
          i--;
         console.log(flag);
     }
    }
    write();
    ws.on('drain',()=>{
      console.log("drain");
      write();
    });
    
  5. метод отделки

    Событие «finish» будет запущено после вызова метода stream.end() и передачи данных буфера в базовую систему.

    var writer = fs.createWriteStream('./2.txt');
    for (let i = 0; i < 100; i++) {
      writer.write(`hello, ${i}!\n`);
    }
    writer.end('结束\n');
    writer.on('finish', () => {
      console.error('所有的写入已经完成!');
    });
    

Простая реализация записываемого потока

  1. Имитация записываемого потока
    let fs = require('fs');
    let EventEmitter = require('events');
    class WriteStream extends EventEmitter {
      constructor(path, options = {}) {
        super();
        this.path = path;
        this.flags = options.flags || 'w';
        this.encoding = options.encoding || 'utf8';
        this.start = options.start || 0;
        this.pos = this.start;
        this.mode = options.mode || 0o666;
        this.autoClose = options.autoClose || true;
        this.highWaterMark = options.highWaterMark || 16 * 1024;
        this.open(); // fd 异步的  触发一个open事件当触发open事件后fd肯定就存在了
    
        // 写文件的时候 需要的参数有哪些
        // 第一次写入是真的往文件里写
        this.writing = false; // 默认第一次就不是正在写入
        // 缓存我用简单的数组来模拟一下
        this.cache = [];
        // 维护一个变量 表示缓存的长度
        this.len = 0;
        // 是否触发drain事件
        this.needDrain = false;
      }
      clearBuffer() {
        let buffer = this.cache.shift();
        if (buffer) { // 缓存里有
          this._write(buffer.chunk, buffer.encoding, () => this.clearBuffer());
        } else {// 缓存里没有了
          if (this.needDrain) { // 需要触发drain事件
            this.writing = false; // 告诉下次直接写就可以了 不需要写到内存中了
            this.needDrain = false;
            this.emit('drain');
          }
        }
      }
      _write(chunk, encoding, clearBuffer) { // 因为write方法是同步调用的此时fd还没有获取到,所以等待获取到再执行write操作
        if (typeof this.fd != 'number') {
          return this.once('open', () => this._write(chunk, encoding, clearBuffer));
        }
        fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, byteWritten) => {
          this.pos += byteWritten;
          this.len -= byteWritten; // 每次写入后就要再内存中减少一下
          clearBuffer(); // 第一次就写完了
        })
      }
      write(chunk, encoding = this.encoding) { // 客户调用的是write方法去写入内容
        // 要判断 chunk必须是buffer或者字符串 为了统一,如果传递的是字符串也要转成buffer
        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
        this.len += chunk.length; // 维护缓存的长度 3
        let ret = this.len < this.highWaterMark;
        if (!ret) {
          this.needDrain = true; // 表示需要触发drain事件
        }
        if (this.writing) { // 正在写入应该放到内存中
          this.cache.push({
            chunk,
            encoding,
          });
        } else { // 第一次
          this.writing = true;
          this._write(chunk, encoding, () => this.clearBuffer()); // 专门实现写的方法
        }
        return ret; // 能不能继续写了,false表示下次的写的时候就要占用更多内存了
      }
      destroy() {
        if (typeof this.fd != 'number') {
          this.emit('close');
        } else {
          fs.close(this.fd, () => {
            this.emit('close');
          });
        }
      }
      open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
          if (err) {
            this.emit('error', err);
            if (this.autoClose) {
              this.destroy(); // 如果自动关闭就销毁文件描述符
            }
            return;
          }
          this.fd = fd;
          this.emit('open', this.fd);
        });
      }
    }
    module.exports = WriteStream;
    
  2. проверять
    let WS = require('./WriteStream')
    let ws = new WS('./2.txt', {
      flags: 'w', // 默认文件不存在会创建
      highWaterMark: 1, // 设置当前缓存区的大小
      encoding: 'utf8', // 文件里存放的都是二进制
      start: 0,
      autoClose: true, // 自动关闭
      mode: 0o666, // 可读可写
    });
    // drain的触发时机,只有当highWaterMark填满时,才可能触发drain
    // 当嘴里的和地下的都吃完了,就会触发drain方法
    let i = 9;
    function write() {
      let flag = true;
      while (flag && i >= 0) {
        i--;
        flag = ws.write('111'); // 987 // 654 // 321 // 0
        console.log(flag)
      }
    }
    write();
    ws.on('drain', function () {
      console.log('干了');
      write();
    });
    

трубный метод

Метод трубы - это смысл трубы, которая может контролировать скорость

  • Будет отслеживать on('data') rs и вызывать метод ws.write для прочитанного содержимого.
  • Вызов написанного метода вернет логический тип
  • Если он возвращает false, вызовите rs.pause(), чтобы приостановить чтение.
  • После ожидания записи доступного для записи потока on('drain') возобновляет чтение Применение трубного метода.
let fs = require('fs');
let rs = fs.createReadStream('./2.txt',{
  highWaterMark:1
});
let ws = fs.createWriteStream('./1.txt',{
  highWaterMark:3
});
rs.pipe(ws); // 会控制速率(防止淹没可用内存)