Справочная статья:
Источник изображенияВизуальный Китай
Понятие «поток».
Stream — абстрактный интерфейс для обработки потоковых данных в Node.js — официальная документация
Поток — это набор данных, вы можете понимать его как связанный список данных или строку, разница в том, что данные в потоке не доступны сразу, и здесь их можно понимать как поток воды. Вам не нужно помещать все данные в память сразу, вместо этого вы можете использовать уникальную природу потоков для выполнения операций над большими объемами данных и обработки их по частям.
На основе асинхронной обработки данных узлом поток обрабатывает данные, подлежащие передаче, в небольшие фрагменты для непрерывной передачи, что приводит к большему повышению производительности за счет меньшего потребления памяти.
тип "потока"
В Node.js существует четыре основных типа потоков:
-
Readable
-- readable stream Абстракция источника, из которого данные могут быть прочитаны. например.fs.createReadStream()
-
Writable
-- записываемый поток Абстракция, поверх которой могут быть записаны объекты данных. например.fs.createWriteStream()
-
Duplex
-- Двунаправленные потоки (дуплексные потоки) доступны как для чтения, так и для записи. например.not.Socket
-
Transform
-- Поток преобразования (переменный поток) Данные могут быть изменены или преобразованы во время чтения и записи.双向流
. например.zlib.createDeflate()
Все потокиEventEmitter
Экземпляры , они выдают события, которые можно читать и записывать.На этой основе мы можем легко использоватьpipe
методы работают с этими потоками
readableSrc.pipe(writableDest)
В приведенном выше простом примере мы используемreadable stream
вывод какwritable stream
ввод.
Затем подумайте еще раз, если наши вход и выходDuplex
тогда ты всегда сможешьpipe
Спуститесь и реализуйте непрерывные операции, такие как команды Linux.
если вы использовалиgulp
Сжатие и интеграция внешних ресурсов определенно будут впечатляющими
Потоки, встроенные в Node.js
Все данные в следующей таблице являются нативными объектами в Node.js. Эти объекты также являются потоками с возможностью чтения и записи, некоторые из которых являются дуплексными потоками и потоками переменных.
Примечание. HTTP-ответ — это поток, доступный для чтения на стороне клиента, и поток, доступный для записи, на стороне сервера.stdio
поток(stdin
, stdout
, stdout
) в дочернем процессе имеет тип, противоположный типу в родительском процессе, и это то, что облегчает общение родитель-потомок.
Readable Stream | Writable Stream |
---|---|
HTTP-ответ (клиент) | HTTP-запрос (клиент) |
HTTP-запрос (сервер) | HTTP-ответ (сервер) |
fs read streams | fs write streams |
zlib streams | zlib streams |
crypto streams | crypto streams |
TCP sockets | TCP sockets |
child process stdout, stderr | child process stdin |
process.stdin | process.stdout, process.stderr |
Простой пример, чтобы подчеркнуть важность потоков на практике
- Создайте большой файл, используя поток с возможностью записи,
big.file
Напишите 1 миллион строк данных, размер файла около 400 МБ.
const fs = require('fs');
const file = fs.createWriteStream('./big.file');
for(let i=0; i<= 1e6; i++) {
file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}
file.end();
- Выполните приведенный выше скрипт, чтобы сгенерировать файл. Выполните скрипт ниже. начать отправлять
big.file
Служба узла. использоватьcurl
Подключиться к запущенному сервису узла
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
fs.readFile('./big.file', (err, data) => {
if (err) throw err;
res.end(data);
});
});
server.listen(8000);
Когда служба узла запущена и не подключена, использование памяти составляет 8,7 МБ, что является нормальным (ниже).
когда используешьcurl localhost:8000
Подключившись к серверу, вы можете четко увидеть, сколько памяти будет потреблять однократное чтение (ниже)
- Модуль fs Node.js предоставляет
createReadStram
метод, мы можем использовать этот метод для чтения потокаpipe
В ответ на снижение нагрузки на сервер. Код и следующие результаты
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);
可以看到,node服务对于内存的压力得到了力度极大的释放,并且在输出速度上依然很快。这里即很直观的体现了此文一开始提到的,node利用流通过极少内存的占用,高效完成了对大文件的操作。最后用一个形象的例子比喻一下上面的操作: A freight worker has a truck of goods that needs to be transported. He can choose to unload all the goods on the truck and move them to the destination together; he can also choose to transport the goods to the destination one by one by using the гусеничный трактор.试想一下,这两种方式操作的效率。
О внутреннем механизме чтения и записи потоков
Вышеприведенный пример — «неуместный», но понятный «краулер» На самом деле реальная работа потока узлов в компьютере не так проста.
проблема производителя-потребителяЭто поможет лучше понять принцип течения
операция чтения
Источник изображения:личный блог усатого братаЧитаемые потоки делятся на два режима
- Текущий режим
- Нетекучий режим Режим паузы (предустановлен, может контролироваться
data
событие или исполнениеresume
изменение метода)
Режим потока можно понимать как три узла на основе метафоры: водяной насос (источник воды, источник данных), ведро (контейнер кэша, память), пункт назначения (куда течет вода).
Поток данных ресурса не будет поступать напрямую к потребителю, а сначала будет оцениваться highWaterMark и помещаться в буферный пул (память). Если highWaterMark превышен, операция push возвращает false. Прошлойresume()
,pause()
клапан к потребителю
операция записи
Источник изображения:личный блог усатого братаПринцип аналогичен потоку чтения.Когда скорость записи достаточно высока, ресурс будет записываться напрямую.Когда скорость записи низкая или запись приостановлена, данные будут кэшироваться в пуле буферов.Когда производитель записывает слишком быстро буферный пул будет освобожден. Когда он будет заполнен, производитель должен быть уведомлен о приостановке производства в это время (например, следующиеwrite
метод возвращаетfalse
), когда пул буферов будет освобожден, Writable Stream отправит сообщение производителюdrain
сообщение, уведомляющее производителя о необходимости начать запись снова. ps: содержимое здесь будет иметь пример кода, когда поток с возможностью записи будет представлен ниже
Использование потоков в операциях FS
Представленный выше общий流的概念
,流的类型
,使用流的优点
, а затем с помощью специального кода разобраться с использованием некоторых потоков в модуле fs.
- Читаемый READABLESTREAM
- Пирентный поток WritableStrestream
Читаемый поток
Создайте читаемый потокfs.createReadStream(path, )
const fs = require('fs);
const rs = fs.createReadStream('text.txt'); // options
/**
fs.createReadStream(path, {
flags: 'r', // 读文件,文件不存在报错,默认'r'
encoding: 'utf-8', // 以什么编码格式读取文件(可以被Buffer接收的任何格式),默认读取buffer
autoClose: true, // 读取后是否自动关闭文件,默认true
highWarterMark: 100, // 每次读取的字节数,默认64k(65536)
start: 0, // 开始读取的位置,默认0
end: 200 // 读取文件的终点索引,默认 Infinity
})
**/
Уведомление:
end
Если установлено значение 100, количество байтов для чтения равно 101, то есть от 0 до 100, включая 100.
потому что по умолчаниюflags
для'r'
,еслиpath
Если указанный файл не существует, будет сообщено об ошибке
прослушать событиеopen
,data
,end
,close
,error
мероприятие
Как упоминалось выше: все потокиEventEmitrer
случай
const fs = require('fs);
const rs = fs.createReadStream('text.txt');
rs.on('open', () => {
console.log('open');
});
rs.on('data', (datas) => {
console.log('file is read', datas);
})
rs.on('close', () => {
console.log('file is closed');
});
rs.on('error', (err) => {
console.log(err);
});
/**
依次输出
open
文件的内容(buffer)
file is closed
**/
Уведомление:
data
Событие может запускаться несколько раз, еслиhighWarterMark
Установите на 3, чтение и запись должны0123456789
изtext.txt
файл, он будет запущен четыре раза, и буферы, соответствующие 012, 345, 678 и 9, будут выведены по очереди.
метод вызоваpause
,resume
, пауза, возобновление
/**
* text.txt文件内容 0123456789
*/
const fs = require('fs');
const rs = fs.createReadStream('text.txt', {
encoding: 'utf-8',
highWaterMark: 3,
});
rs.on('data', (datas) => {
console.log(datas);
rs.pause();
console.log('stream is paused now');
});
rs.on('end', () => {
console.log('stream is end');
clearInterval(interval); // 清除定时器,否则会一直打印stream is resumed now
});
const interval = setInterval(() => {
rs.resume();
console.log('stream is resumed now');
}, 1000);
/**
输出:
012
stream is paused now
stream is resumed now
345
stream is paused now
stream is resumed now
678
stream is paused now
stream is resumed now
9
stream is paused now
stream is end
**/
Уведомление: нечего замечать
Доступный для записи поток
Создать доступный для записи потокfs.createWriteStream(path, )
const fs = require('fs');
fs.createWriteStream(path, options);
const ws = fs.createWriteStream('2.txt', {
flags: 'w', // 默认'w'写入文件,不存在则创建
encoding: 'utf-8'
fd: null, // 文件描述符
mode: 0o666, // 文件操作权限,同438
autoClose: true,
start: 0 // 开始写入位置
highWarterMark: 16384 // !!! 文档没有给出这一设置,默认 16k,文末将验证
});
Уведомление:
параметр опций сcreateReadStream
разные
также можно установитьhighWaterMark
Вариант, официальный документ не дает, размер записи по умолчанию 16k, выполняется на объект потока, доступный для записиwrite
метод, если он превышаетhighWaterMark
, возвращаемое значение станетfalse
метод вызоваwrite
,end
,drain
,finish
- Метод записи имеет возвращаемое значение, return
true
,false
, соответственно, представляя, превышают ли данные, записанные в текущей памяти,highWaterMark
(только что упоминалось выше) - Метод записи является асинхронным и выполняется
write
После этого данные не будут записываться в файл сразу, а будут кэшироваться в памяти, а затем последовательно записываться
/**
* write 方法
* chunk 写入数据的buffer/string
* encoding 编码格式,可选。且chunk为字符串时有用
* callback 写入成功回调函数
**/
ws.write(chunk,[encoding],[callback]);
/**
* end 方法,表明接下来没有数据要被写入
* chunk 写入数据的buffer/string
* encoding 编码格式,可选。且chunk为字符串时有用
* callback 回调函数,如果传入,将作为 finish 事件的回调函数
**/
ws.end(chunk,[encoding],[callback]);
/**
* finish 方法,在调用了 stream.end() 方法,且缓冲区数据都已经传给底层系统之后, 'finish' 事件将被触发。
**/
const writer = fs.createWriteStream('2.txt');
for (let i = 0; i < 100; i++) {
writer.write(`hello, ${i}!\n`);
}
writer.end('结束\n');
writer.on('finish', () => {
console.error('所有的写入已经完成!');
});
drain
метод
const fs = require('fs');
const ws = fs.createWriteStream('2.txt', {
encoding: 'utf-8',
highWaterMark: 3
});
let i = 10;
function write() {
let flag = true;
while(i && flag) {
flag = ws.write('1');
i --;
console.log(flag);
}
}
write();
ws.on('drain', () => {
console.log('drain');
write();
});
Уведомление:
- когда поток
drain
статус, даwrite
вызовы кэшируются (объяснено ниже) и возвращаютсяfalse
. После того, как все кэшированные данные будут удалены (используются операционной системой для вывода),drain
Событие сработает, что означает, что все кэшированные в памяти данные были записаны в файл, после чего выполнение можно продолжитьwrite
записать данные в память- Это рекомендуется, если вы вручную управляете операциями чтения, записи и кэширования.
write
метод возвращает false, вdrain
До того, как событие сработает, лучше не записывать никаких данных, конечно, это требует сотрудничестваcreateWriteStream
изhighWaterMark
параметр, (этот параметр не указан в документации)
pipe
,unpipe
,cork
,uncork
метод
pipe
метод
Среди методов, упомянутых выше,pipe
Несомненно, наиболее используемый в общем сценарии использования потока,pipe
Может решить большинство потребностей, следующий очень простой семантический код:pipe
способ использования,readable
пройти черезpipe
передавать данные вwritable
, Как его имя,管道
readable.pipe(writable)
Основной принцип:
- передача
pipe
метод, уведомить написать - write()
- Потребление медленнее производства, пауза()
- Потребитель завершает потребление, вызывает возобновление() через утечку и продолжает запись.
- вернуть доступный для записи (поскольку канал поддерживает цепочку)
Простой пример:
const from = fs.createReadStream('./1.txt');
const to = fs.createWriteStream('./2.txt');
from.pipe(to);
В приведенных выше примерах все потоки, доступные для чтения, являются входными источниками, а потоки, доступные для записи, — возвращаемыми результатами.duplex
/transform
, то вы можете легко написать цепочку вызовов
// 伪代码
readableSrc
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(finalWrtitableDest)
unpipe
метод
/**
* dest 当前readable pipe 管道的目标可写流
**/
readable.unpipe(dest)
- Потоки, ранее связанные методом stream.pipe(), могут быть отсоединены.
- если
dest
не указано, все потоки, связанные с readable, будут отсоединены
cork
,uncork
метод
- cork
- Вызов метода writable.cork() принудительно поместит все записанные данные в буфер в памяти. Данные в буфере не будут выводиться до тех пор, пока не будут вызваны методы stream.uncork() или stream.end().
- uncork
- writable.uncork() выведет все данные, буферизованные в памяти после вызова метода stream.cork().
stream.cork();
stream.write('1');
stream.write('2');
process.nextTick(() => stream.uncork());
Сводка некоторых важных событий и методов о потоках
Readable Stream События и методы читаемых потоков
Event | Functions |
---|---|
data | труба(), нетруба() |
end | читать(), отменить сдвиг() |
error | пауза(), возобновление() |
close | isPaused() |
readable | setEncoding() |
События Writable Stream и методы Writable Stream
Event | Functions |
---|---|
drain | write() |
finish | end() |
error | cork() |
close | uncork() |
pipe/unpipe | setDefaultEncoding() |
Дополнительно: проверьте размер данных записи по умолчанию для записываемого потока.highWaterMark
- Поскольку я не уверен, при создании потока с возможностью записи размер записи по умолчанию может быть равен 16 КБ следующими двумя способами.
- В документации не сказано
fs.createWriteStream()
вариант вhighWaterMark
роли, я много раз упоминал об этом в этой статье, надеюсь, это может углубить впечатление
метод первый:
const fs = require('fs');
let count = 0;
const ws = fs.createWriteStream('testInput.txt');
for (let i = 0; i < 10000; i ++) {
count ++;
let flag = ws.write(i.toString());
if (!flag) { // 返回false即到达了highWaterMark
console.log('写入' + count + '次');
break;
}
}
ws.end(function() {
console.log('文件写入结束,输出的总字节为', ws.bytesWritten);
});
// 输出:
写入4374次
文件写入结束,输出的总字节为 16386
16386 / 1024
// 结果:
16.001953125
Способ второй:
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 10000;
write();
function write() {
let ok = true;
while (i-- > 0 && ok) {
// 写入结束时回调
= writer.write(data, encoding, i === 0 ? callback : null);
}
if (i > 0) {
// 这里提前停下了,'drain' 事件触发后才可以继续写入
console.log('drain', i);
writer.once('drain', write);
}
}
}
const Writable = require('stream').Writable;
const writer = new Writable({
write(chunk, encoding, callback) {
// 比 process.nextTick() 稍慢
setTimeout(() => {
callback && callback();
});
}
});
writeOneMillionTimes(writer, '123456', 'utf8', () => {
console.log('end');
});
// 输出
drain 7268
drain 4536
drain 1804
end
// 计算:
(10000-7268) * 6 / 1024
// 结果:16.0078125
Заключение и объяснение в конце статьи
Суммировать
В этой статье в основном рассматриваются принципы и использование потоков с точки зрения операций с файлами.В приложениях Node вы можете использовать потоки для выполнения многих задач, таких как сетевые запросы, загрузка файлов, инструменты командной строки и многое другое. В приложениях Node.js потоки можно увидеть повсюду, файловые операции, сетевые запросы, процессы и сокеты повсюду. Именно таким образом характеристики потоков могут сделать ваше приложение узла действительно отражающим характеристики «маленького и красивого».
иллюстрировать
Цель статьи для личных заметок.Я тоже новичок в Node.js.Если в статье есть неуместные описания и пояснения прошу меня поправить. Статья основана на статьях многих больших братьев (первый портал), большое спасибо Я продолжу обновлять, когда будет время в будущем, желаю вам легкого пути к узлу🤡