чистый поток
Во время фестиваля Цинмин идет много дождей, и, разумеется, каждый Цинмин идет дождь. В этот мокрый и снежный день не удобно выходить на улицу, давайте останемся дома и будем учиться и делиться друг с другом! Как еще может быть! Ха-ха
дружеское напоминание: в этой статье может быть некоторый контент API, который будет скучным, скучным и скучным.
Но волнение позади нас будет за гранью вашего воображения, потому что мы должны делать это вручную, откуда вы знаете, насколько медленная лошадь, если вы не садитесь на нее лично, Спешите, вперед, вперед, вперед!
Друзья, которые использовали node, знают, что роль потоков очень мощная, доступная для чтения и записи и всемогущая.
По сравнению с модулем fs, поток больше подходит для чтения большого файла.Одноразовое чтение будет занимать много памяти и эффективность очень низкая.Однако поток разбивает данные на сегменты, которые будут читаться сегментом. по сегментам, и эффективность будет очень высокой. После долгих разговоров давайте сначала перейдем к концепции, давайте посмотрим, кто это
концепция
- Поток — это упорядоченный набор байтовых передач данных с началом и концом.
- Его не волнует общее содержимое файла, его интересует только то, читаются ли данные из файла, и обработка после чтения данных.
- Потоки — это абстрактный интерфейс, реализуемый многими объектами в Node. Например, объекты запросов и ответов HTTP-сервера являются потоками.
Многое из содержимого узла применяется к потокам. Например, req модуля http — это поток для чтения, res — поток для записи, а socket — поток для чтения и записи. Давайте поговорим о братьях потока для чтения и потока для записи.
Доступные для чтения и записи потоки также используют модуль fs для операций с файлами.
Итак, давайте начнем считаемый потокДля начала давайте посмотрим, какие методы (Api)
читаемый поток
Прежде всего, вы должны уметь им пользоваться, и ключ в том, чтобы им пользоваться.
Создайте читаемый поток
const fs = require('fs'); // 引入fs核心模块
// fs.createReadStream(path, options)
// 返回的是一个可读流对象
let rs = fs.createReadStream('1.txt', {
flags: 'r', // 文件的读取操作,默认是'r':读取
encoding: 'utf8', // 设置编码格式,默认是null, null代表的是buffer
autoClose: true, // 读取完毕后自动关闭
highWaterMark: 3, // 默认是读取64k 64 * 1024字节
start: 0,
end: 3 // 文件结束位置索引,和正常的截取slice有所不同,包前又包后(包括自己结束的位置)
});
// 默认情况下,不会将文件中的内容输出
// 内部会先创建一个buffer先读取3字节
// 1.txt文件内容为 123456789
Вышеприведенный код описывает как создать читаемый поток.Запоминать столько элементов опций кажется головной болью.На самом деле в общем-то элементы конфигурации не нужно нам писать.Теперь все довольны.
Зная, как его создать, давайте взглянем на события прослушивания в читаемом объекте потока rs.
Слушайте события данных
Поток для чтения Этот режим по умолчанию не является потоковым режимом (режим паузы), он ничего не делает и просто ждет здесь.
Всем известно, что потоки основаны на событиях, поэтому мы можем прослушивать события, а если мы прослушиваем события данных, мы можем преобразовать непотоковой режим в потоковый режим.
// 流动模式会疯狂的触发data事件,直到读取完毕
// 根据上面设置的highWaterMark一次读3个字节
rs.on('data', data => { // 非流动模式 -> 流动模式
console.log(data); // 触发2次data事件, 分别打出123和4 从0到3共4个(包括末尾)
});
// 题外话:
// 监听data事件的时候,如果没有设置编码格式,data返回的是buffer类型
// so我们可以为data设置encoding为utf8
rs.setEncoding('utf8'); // 等同于options里的encoding: 'utf8'
Когда мы прочитаем весь контент, который хотим прочитать, мы также можем прослушать конечное событие, чтобы определить, когда мы закончили чтение.
Слушайте конечное событие
rs.on('end', () => {
console.log('完毕了');
});
// 此时除了会打印data事件里的123, 4之外还会打印 完毕了
// 如下表示:
// 123
// 4
// 完毕了
Помимо событий data и end, в читаемом потоке также можно отслеживать события error, open и close, так как они не так полезны, как первые два, давайте запишем их вместе.
Слушайте события ошибки/открытия/закрытия
// error
rs.on('error', err => {
console.log(err);
});
// open
rs.on('open', () => {
console.log('文件打开了');
});
// close
rs.on('close', () => {
console.log('文件关闭了');
});
// 根据上面监听data、end事件,下面打印的内容是
/*
* 文件打开了
123
4
end
文件关闭了
* */
Все виды событий мониторинга умеют писать, и, наконец, посмотрите на два метода, это пауза и возобновление, пауза и возобновление данных триггера.
Пауза и возобновление
// pause
rs.on('data', data => {
console.log(data); // 只会读取一次就暂停了,此时只读到了123
rs.pause(); // 暂停读取,会暂停data事件触发
});
// resume
setInterval(() => {
rs.resume(); // 恢复data事件, 继续读取,变为流动模式
// 恢复data事件后,还会调用rs.pause,要想再继续触发,把setTimeout换成setInterval持续触发
}, 3000);
// 打印如下:
/*
* 文件打开了
123
4 // 隔了3秒后打印
end
文件关闭了
* */
После разговора об использовании читаемых потоков, давайте приложим настойчивые усилия (должны) посмотреть на его собратьевдоступный для записи потокНу, в конце концов, для самой большой в мире группы программистов должен быть глубокий уровень улучшения от начального до профессионального (сдаться)! Добавьте немного масла, поехали.
доступный для записи поток
Не говори глупостей, просто делай
Создать доступный для записи поток
const fs = require('fs');
// fs.createWriteStream(path, options);
const ws = fs.createWriteStream('2.txt', {
flags: 'w', // 文件的操作, 'w'写入文件,不存在则创建
mode: 0o666,
autoClose: true,
highWaterMark: 3, // 默认写是16k
encoding: 'utf8'
});
Существует два метода для записываемых потоков, а именно методы записи и завершения. Давайте посмотрим, как их использовать напрямую.
метод записи
// ws.write(chunk, encoding(可选), callback);
// 写入的chunk数据必须是字符串或者buffer
let flag = ws.write('1', 'utf8', () => {}); // 异步的方法 有返回值
console.log(flag); // true
flag = ws.write('22', 'utf8', () => {});
console.log(flag); // false 超过了highWaterMark的3个字节,不能再写了
flag = ws.write('3', 'utf8', () => {});
console.log(flag); // false
// 2.txt -> 写入了 1223
Идентификатор флага указывает не на то, следует ли писать, а на то, следует ли продолжать запись.Истина означает, что запись может продолжаться. Но если он вернет false, он не будет потерян, а также будет записан в файл.
Далее мы представим метод конца.
метод конца
// 可以传入chunk值
ws.end('完毕'); // 当写完后 就不能再写了
// 此时2.txt -> 写入了 1223完毕
После разговора о методах записи и завершения, доступный для записи поток также имеет событие слушателя on, которое может прослушивать событие слива.
Прослушивание событий слива
// drain方法
// 抽干方法 当都写入后触发drain事件
ws.on('drain', () => {
console.log('已经抽干了');
});
Грядет главное событие
- Впереди Рори писал о том, как им пользоваться, и от Апи действительно всех клонило в сон.
- Но все, сейчас самый счастливый момент.Для работы потоковой передачи мы должны не только знать, как ее использовать, но и просто реализовать.
- Только так мы можем удовлетворить нашу огромную жажду знаний и получить новые навыки.Как обычно, мы можем напрямую перейти к коду и глубоко проанализировать его из кода.
- Если вы устали от чтения, сделайте перерыв.Будучи буддийским юношей, это болезненное осознание того, что вам не хватает всего.
Реализовать читаемый поток
сначала каштан
// demo.js
const ReadStream = require('./ReadStream'); // 引入实现的可读流
const rs = new ReadStream('1.txt', {
flags: 'r',
// encoding: 'utf8',
autoClose: true,
highWaterMark: 3,
start: 0,
end: 4
});
rs.on('data', data => {
console.log(data);
rs.pause();
});
rs.on('end', () => {
console.log('end');
});
setTimeout(() => {
rs.resume();
}, 2000);
Впереди высокая энергия, включайте таппинг, если не знаете буферов и событий в узле, не торопитесь. Все в одной лодке, я поделюсь этим с вами в следующей статье, а пока давайте продолжим читать! Держитесь, братья и сестры!
Создайте класс ReadStream
// ReadStream.js
const fs = require('fs');
const EventEmitter = require('events'); // 需要依赖事件发射
// 这里用ES6提供的class写法,大家也一起来看看是怎么写的吧
class ReadStream extends EventEmitter {
constructor(path, options) { // 需要传入path和options配置项
super(); // 继承
this.path = path;
// 参照上面new出的实例,我们开始写
this.flags = options.flags || 'r'; // 文件打开的操作,默认是'r'读取
this.encoding = options.encoding || null; // 读取文件编码格式,null为buffer类型
this.autoClose = options.autoClose || true;
this.highWaterMark = options.highWaterMark || 64 * 1024; // 默认是读取64k
this.start = options.start || 0;
this.end = options.end;
this.flowing = null; // null表示非流动模式
// 要建立一个buffer,这个buffer就是一次要读多少内容
// Buffer.alloc(length) 是通过长度来创建buffer,这里每次读取创建highWaterMark个
this.buffer = Buffer.alloc(this.highWaterMark);
this.pos = this.start; // 记录读取的位置
this.open(); // 打开文件,获取fd文件描述符
// 看是否监听了data事件,如果监听了,就变成流动模式
this.on('newListener', (eventName, callback) => {
if (eventName === 'data') { // 相当于用户监听了data事件
this.flowing = true; // 此时监听了data会疯狂的触发
this.read(); // 监听了,就去读,要干脆,别犹豫
}
});
}
}
module.exports = ReadStream; // 导出
Написав здесь, мы создали класс ReadStream, в котором наследуем метод генерации события EventEmitter
Среди них мы написали два метода open и read, что можно понять из буквального смысла.Если наш читаемый поток хочет прочитать файл, то сначала нужно открыть (открыть), а затем мы будем читать содержимое (читать)
Это основной метод отработки читаемых потоков, начнем с открытого метода.
открытый метод
class ReadStream extends EventEmitter {
constructor(path, options) {
// 省略...
}
open() {
// 用法: fs.open(filename,flags,[mode],callback)
fs.open(this.path, this.flags, (err, fd) => { // fd为文件描述符
// 说实在的我们打开文件,主要就是为了获取fd
// fd是个从3开始的数字,每打开一次都会累加,4->5->6...
if (err) {
if (this.autoClose) { // 文件打开报错了,是否自动关闭掉
this.destory(); // 销毁
}
this.emit('error', err); // 发射error事件
return;
}
this.fd = fd; // 如果没有错,保存文件描述符
this.emit('open'); // 发射open事件
});
}
// 这里用到了一个destory销毁方法,我们也直接实现了吧
destory() {
// 先判断有没有fd 有就关闭文件 触发close事件
if (typeof this.fd === 'number') {
// 用法: fs.close(fd,[callback])
fs.close(this.fd, () => {
this.emit('close');
});
return;
}
this.emit('close');
}
}
Начало всего сложное, мы сделали первый шаг, чтобы открыть файл, затем осталось прочитать и приложить настойчивые усилия, чтобы стать королем.
метод чтения
class ReadStream extends EventEmitter {
constructor(path, options) {
// 省略...
}
// 监听data事件的时候,去读取
read() {
console.log(this.fd); // 直接读fd为undefined,因为open事件是异步的,此时还拿不到fd
// 此时文件还没打开
if (typeof this.fd !== 'number') { // 前面说过fd是个数字
// 当文件真正打开的时候,会触发open事件
// 触发事件后再执行read方法,此时fd肯定有了
return this.once('open', () => this.read()); // once方法只会执行一次
}
// 现在有fd了,大声的读出来,不要害羞
// 用法: fs.read(fd, buffer, offset, length, pos, callback((err, bytesRead)))
// length就是一次想读几个, 不能大于buffer长度
// 这里length不能等于highWaterMark,举个🌰
// 文件内容是12345如果按照highWaterMark:3来读,总共读end:4个,每次读3个字节
// 分别是123 45空,我们应该知道一共要读几个,总数-读取位置+1得到下一次要读多少个
// 这里有点绕,大家可以多去试试体会一下
// 我们根据源码起一个同样的名字
let howMuchToRead = this.end ? Math.min((this.end-this.pos+1), this.highWaterMark) : this.highWaterMark;
fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, bytesRead) => {
// bytesRead为读取到的个数,每次读3个,bytesRead就是3
if (bytesRead > 0) {
this.pos += bytesRead; // 读到了多少个,累加,下次从该位置继续读
let buf = this.buffer.slice(0, bytesRead); // 截取buffer对应的值
// 其实正常情况下,我们只要把buf当成data传过去即可了
// 但是考虑到还有编码的问题,所以有可能不是buffer类型的编码
// 这里需要判断一下是否有encoding
let data = this.encoding ? buf.toString(this.encoding) : buf.toString();
this.emit('data', data); // 发射data事件,并把data传过去
// 如果读取的位置 大于 结束位置 就代表读完了,触发一个end事件
if (this.pos > this.end) {
this.emit('end');
this.destory();
}
// 流动模式继续触发
if (this.flowing) {
this.read();
}
} else { // 如果bytesRead没有值了就说明读完了
this.emit('end'); // 发射end事件,表示文件读完
this.destory(); // 没有价值了,kill
}
});
}
}
Выше приведена основная реализация метода чтения, на самом деле идея несложная, и она будет более упорядоченной, если убрать комментарии. Вы также узнали об использовании читаемых потоков выше, и вы знаете, что для этого есть два метода, а именно pause (пауза) и возобновление (возобновление), тогда мы могли бы также выбрать день и написать его напрямую, это как простая, как отвратительная реализация. , чтобы увидеть, что это не пострадает, ха-ха
методы паузы и возобновления
class ReadStream extends EventEmitter {
constructor(path, options) {
// 省略...
}
pause() {
this.flowing = false;
}
resume() {
this.flowing = true;
this.read();
}
}
Когда все закончилось, это так просто, мы реализовали свой собственный читаемый поток, поздравления, поздравления.
реализовать доступный для записи поток
Взгляните на тестовые данные
let WriteStream = require('./WriteStream'); // 引入我们实现的可写流
let ws = new WriteStream('3.txt', {
flags: 'w',
highWaterMark: 3,
autoClose: true,
encoding: 'utf8',
mode: 0o666,
start: 0
});
// ws.write('你d好', 'utf8', () => {});
let i = 9;
function write() {
let flag = true;
while (i >= 0 && flag) {
flag = ws.write(--i + '', 'utf8', () => {});
console.log(flag);
}
}
write();
// drain只有当缓存区充满后 并且被消费后出发
ws.on('drain', () => {
console.log('抽干');
write();
});
Предыдущая часть реализации записываемого потока в основном аналогична читаемому потоку, но записываемый поток имеет событие слива, поэтому он также будет обрабатываться при записи.
Создать доступный для записи поток
let fs = require('fs');
let EventEmitter = require('events'); // 需要事件发射
// 继承事件发射EventEmitter
class WriteStream extends EventEmitter {
constructor(path, options) {
super(); // 继承
this.path = path;
this.highWaterMark = options.highWaterMark || 16 * 1024; // 默认一次写入16k
this.autoClose = options.autoClose || true;
this.encoding = options.encoding || null;
this.mode = options.mode;
this.start = options.start || 0;
this.flags = options.flags || 'w'; // 默认'w'为写入操作
this.buffers = [];
this.writing = false; // 标识 是否正在写入
this.needDrain = false; // 是否满足触发drain事件
this.pos = 0; // 记录写入的位置
this.length = 0;
this.open(); // 首先还是打开文件获取到fd文件描述符
}
}
module.exports = WriteStream;
-
Доступный для записи поток должен иметь буферную область. Когда файл записывается, содержимое должно быть записано в буферную область. В исходном коде это связанный список => Мы можем напрямую использовать [] для достижения этого. это роль this.buffers
-
Кроме того, если вы используете буферы для вычислений, каждый дополнительный элемент необходимо обходить, а производительность слишком высока для поддержания, поэтому используйте this.length для записи размера области буфера.
Далее мы напрямую пишем метод open, чтобы открыть файл и получить файловый дескриптор fd.
открытый метод
class WriteStream extends EventEmitter {
constructor(path, options) {
// 省略...
this.open();
}
open() {
// 用法: fs.open(filename,flags,[mode],callback)
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
this.emit('error', err);
// 看一下是否会自动关闭
if (this.autoClose) {
this.destory(); // 销毁
}
return;
}
this.fd = fd;
this.emit('open'); // 触发open事件,表示当前文件打开了
});
}
destory() {
if (typeof this.fd !== 'number') { // 如果不是fd的话直接返回一个close事件
return this.emit('close');
}
fs.close(this.fd, () => {
this.emit('close');
});
}
}
После того как файловый дескриптор fd получен методом open, для потока достигается половина успеха. Давайте воспользуемся победой и используем два метода Хуанлуна, чтобы завершить доступный для записи поток! ! !
методы записи и завершения
class WriteStream extends EventEmitter {
constructor(path, options) {
// 省略...
}
// 用法:ws.write(chunk,[encoding],[callback])
write(chunk, encoding = this.encoding, callback) {
// 通过fs.write()写入时,chunk需要改成buffer类型
// 并且要用我们指定的编码格式去转换
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
// chunk.length就是要写入的长度
this.length += chunk.length;
// 比较是否达到了缓存区的大小
let result = this.length < this.highWaterMark;
this.needDrain = !result; // 是否需要触发drain事件
if (this.writing) {
this.buffers.push({
chunk,
encoding,
callback
});
} else {
this.writing = true;
this._write(chunk, encoding, () => {
callback();
this.clearBuffer();
});
}
return result; // write方法 返回一个布尔值
}
_write(chunk, encoding, callback) {
if (typeof this.fd !== 'number') {
return this.once('open', () => this._write(chunk, encoding, callback));
}
// fs.write写入文件
// 用法: fs.write(fd, buffer[, offset[, length[, position]]], callback)
fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => {
// this.length记录缓存区大小,写入后length需要再减掉写入的个数
this.length -= bytesWritten;
// this.pos下次写入的位置
this.pos += bytesWritten;
this.writing = false;
callback(); // 清空缓存区内容
});
}
clearBuffer() {
let buf = this.buffers.shift(); // 每次把最先放入缓存区的取出
if (buf) { // 如果有值,接着写
this._write(buf.chunk, buf.encoding, () => {
buf.callback();
this.clearBuffer(); // 每次写完都清空一次缓存区
});
} else { // 缓存区已经空了
if (this.needDrain) { // 是否需要触发drain 需要就发射drain事件
this.needDrain = false;
this.emit('drain');
}
}
}
end() {
if (this.autoClose) {
this.emit('end');
this.destory();
}
}
}
- Вышеизложенное завершает реализацию записываемого потока, у вас могут быть некоторые сомнения, здесь я расскажу об общих сомнениях.
- Сначала объясняется условное суждение в методе записи.
- если условие
- Если он пишет, сначала поместите содержимое в буфер, то есть this.buffers.
- Сохраните объект в массиве, соответствующем чанк, кодировке, обратному вызову
- Это упрощает получение соответствующего содержимого в кеше при очистке кеша.
- еще условие
- Специально используется для записи содержимого в файл
- После каждой записи содержимое буферов (буферной области) необходимо очищать
- Событие слива запускается, когда массив буферов пуст.
- если условие
- Решение typeof в методе _write должно объяснять
- Определить, есть ли файловый дескриптор fd, только при успешном открытии файла будет fd
- Поэтому, если нет, вам нужно вызвать событие открытия, получить fd и затем вызвать метод _write.
- Конечный метод проще
- Чтобы определить, будет ли он закрыт автоматически, запустите конечное событие и уничтожьте его.
напиши в конце
Наконец-то все сделал.На самом деле, если честно, эти штуки на основе API все еще очень скучны, и все отвергают. Но я все же настоял на том, чтобы это записать, а также хотел, чтобы все прочувствовали процесс того, как мастера осознавали и думали вместе со мной.
Спасибо за просмотр, это не ангел со сломанными крыльями, который может упорствовать!