Я никогда не понимал концепцию Stream в Node.js. Недавно я подробно изучил ее и попытался разъяснить в статье. Адрес кода в статье:GitHub.com/Мари Скрабс/нет…
Что такое поток?
Поток — это абстрактная структура данных. Так же, как массивы или строки, потоки представляют собой наборы данных.
Отличие в том, что поток может выводить за раз небольшой объем данных, и их не нужно хранить в памяти.
Например, сделать http-запрос на серверrequest/response
Объект — поток.
Графический поток
поток похож на поток воды, но по умолчанию без воды. stream.write может делать воду в потоке воды, то есть записывать данные.
Верхний левый угол — это сегмент, производящий данные, называемый источником. Внизу находится сток (пул), который получает данные. Маленькие точки, идущие сверху вниз, представляют собой небольшие данные, записываемые каждый раз, называемые фрагментами.
Зачем нужен Стрим
У студентов могут возникнуть такие вопросы.Разве роль потоков не заключается только в передаче данных?Вы также можете выполнять операции чтения и записи без потоковых узлов.
Да, но способ чтения и записи состоит в том, чтобы прочитать все содержимое файла в память, а затем записать в файл, с небольшими файлами не большая проблема.
Но когда дело доходит до больших файлов, это действительно невыносимо.
Потоки могут разбивать файловые ресурсы на мелкие части и передавать их по частям, ресурсы передаются подобно водному потоку, снижая нагрузку на сервер.
Экземпляр потока
Это описание может не убедить всех. Давайте проведем эксперимент, чтобы увидеть, нужно ли использовать потоки при чтении и записи больших файлов.
Сначала создайте большой файл:
Создавайте большие файлы с помощью Stream
Сначала мы создаем доступный для записи поток и записываем в файл несколько раз. Не забудьте закрыть поток в конце, вы получите большой файл.
// 引入文件模块
const fs = require('fs');
const stream = fs.createWriteStream('./../big_file.txt');
for (let i = 0; i < 1000000; i++) {
stream.write(`这是第${i}行内容\n`);
}
stream.end()
console.log('done')
использовать файл чтения
Давайте сначала воспользуемся fs.readFile, чтобы прочитать содержимое файла и посмотреть, что произойдет.
const fs = require('fs')
const http = require('http')
const server = http.createServer()
server.on('request', (request, response) => {
fs.readFile('./../big_file.txt', (error, data) => {
if (error) throw error
response.end(data)
console.log('done')
})
})
server.listen(8889)
console.log(8889)
когда мы посещаемhttp://localhost:8889
, сервер асинхронно прочитает этот большой файл.
Все выглядит нормально, ничего страшного.
Однако, когда мы посмотрели на память Node.js с помощью диспетчера задач, она составила около 130 МБ.
Сервер принимает 1 запрос и занимает 130 Мб, если принимает 10 запросов, то занимает 1G. Потребление памяти сервером очень велико.
Как решить эту проблему? Используйте поток.
Использовать поток
Давайте попробуем переписать приведенный выше пример с помощью Stream.
Создайте читаемый потокcreateReadStream
,
файл сноваstream
иresponse stream
через трубыpipe
связанный.
const fs = require('fs')
const http = require('http')
const server = http.createServer()
server.on('request', (request, response) => {
const stream = fs.createReadStream('./big_file.txt')
stream.pipe(response)
})
server.listen(8888)
Давайте еще раз посмотрим на объем памяти node.js, который в основном не превышает 30 МБ.
Поскольку каждый раз передается только небольшой фрагмент данных, он не займет много памяти.
трубка
Два потока могут быть соединены конвейером, при этом конец потока 1 соединяется с началом потока 2.Пока в потоке 1 есть данные, они будут передаваться в поток 2.
Например, приведенный выше код:
const stream = fs.createReadStream('./big_file.txt')
stream.pipe(response)
stream
Это файловый поток, а следующий поток — это наш ответ http-потока.
Изначально эти два потока не связаны между собой, и теперь мы хотим передать данные файлового потока в http-поток.
Это очень просто, просто используйте трубу для соединения.
Общий код
stream1.pipe(stream2)
- stream1 — это поток, излучающий данные, читаемый поток.
- stream2 — это поток, в который записываются данные, доступный для записи поток.
цепная операция
Он может проходить через трубопровод без ограничений, одинакового потока данных.
Есть два способа написать:
a.pipe(b).pipe(c)
// 等价于
a.pipe(b)
b.pipe(c)
Принцип конвейера
Конвейер также можно рассматривать как инкапсуляцию двух событий.
- Слушайте событие данных, и когда в потоке 1 есть данные, они будут помещены в поток 2.
- Прослушайте конечное событие, когда поток1 остановится, остановите поток2
stream1.on('data', (chunk) => {
stream2.write(chunk)
})
stream1.on('end', () => {
stream2.end()
})
Прототип цепочки объектов Stream
Знание цепочки прототипов Stream облегчает запоминание API Stream.
fs.createReadStream(path)
еслиs = fs.createReadStream(path)
,Такs
Иерархия объектов:
- собственное имущество, по
fs.ReadStream
построенный конструктором - прототип:
stream.Readable.prototype
- Вторичный прототип:
stream.Stream.prototype
- Прототип уровня 3:
events.EventEmitter.prototype
Это прототип, от которого наследуются все потоки. - Уровень 4 Прототип:
Object.prototype
То есть прототип, от которого наследуются все объекты.
События и методы, поддерживаемые Stream
Зная цепочку прототипов, давайте взглянем на события и методы, поддерживаемые Stream.
Наверное, хорошо иметь впечатление, и я проверю его, когда буду использовать.
Readable Stream | Writable Stream | |
---|---|---|
событие | data, end, error,close,readable | слить (завершено на этот раз), закончить (завершить всю запись), ошибка, закрыть, перенаправить, разобрать |
метод | pipe() unpipe() read()... | write() destroy() ... |
Категория потока
Есть четыре категории
название | Функции |
---|---|
Readable | удобочитаемый |
Writable | доступный для записи |
Duplex | Чтение и запись (двунаправленный) |
Transform | Читать и писать (изменить) |
И Readable, и Writable являются односторонними, а два других — двусторонними.
Читаемый, записываемый и понятный, в чем разница между двумя другими?
Дуплекс может читать и писать, но содержание чтения и записи не зависит друг от друга, и пересечения нет.А Transform должен писать и читать сам по себе.
Например, Babel преобразует es6 в, мы пишем es6 слева и читаем es5 справа. Как автомойка, черные машины заезжают, а белые уезжают.
Читаемый поток Читаемый поток
Статическая пауза и течение
Доступные для чтения потоки имеют два состояния: приостановлено и течет.
Читаемый поток можно рассматривать как производителя контента.Когда контент не отправляется, это статическое состояние, а когда контент возобновляет отправку, это динамическое состояние.
- Доступные для чтения потоки по умолчанию приостановлены.
- После добавления прослушивателя событий данных он становится текущим состоянием.
- Удалить прослушиватель событий данных, состояние паузы.
- pause() может поставить его на паузу.
- возобновление () может превратить его в поток.
const http = require('http')
const fs = require('fs')
const server = http.createServer()
server,on('request', (request, response) => {
// 默认处于 paused 态
const stream = fs.createReadStream('./big_file.txt')
stream.pipe(response)
stream.pause(); // 暂停
setTimeout(() => {
// 恢复
stream.resume()
}, 3000)
})
server.listen(8888);
Writable Stream
слив истощает событие
Это значит, что можно добавить немного воды, то есть можно продолжить запись данных.
мы называемstream.write(chunk)
может получиться ложным.
false означает, что вы пишете слишком быстро, накапливая данные.
В это время мы уже не можем писать, надо следить за сливом.
Мы можем продолжать писать, пока не сработает событие слива.
Это немного сложно понять, просто глядя на них, вы можете видетьПримеры на официальном сайте:
const fs = require('fs');
// 将 data 写入文件 1000000 次
function writeOneMillionTimes(writer, data) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// 最后一次写入
writer.write(data);
} else {
// 在这里判断是不是可以继续写
// ok 为 false 的意思是你写太快了,数据积压
ok = writer.write(data);
if (ok === false) {
console.log('不能再写了')
}
}
} while (i > 0 && ok);
if (i > 0) {
// 干涸了,可以继续写入
writer.once('drain', () => {
console.log('干涸了')
write()
});
}
}
}
const write = fs.createWriteStream('./../big_file.txt')
writeOneMillionTimes(write, 'hello world')
закончить мероприятие
вызовstream.end()
После этого и после того, как данные буфера были переданы базовой системе, запускается событие завершения.
Когда мы записываем данные в файл, они не сохраняются непосредственно на жестком диске, а сначала помещаются в буфер. Когда данные достигают определенного размера, они будут записаны на жесткий диск.
Создайте свой собственный поток
Давайте посмотрим, как создать свой собственный поток.
По очереди объясняются четыре типа потоков.
Writable
const {Writable} = require('stream')
const outStream = new Writable({
// 如果别人调用,我们做什么
write(chunk, encoding, callback) {
console.log(chunk.toString())
// 进入下一个流程
callback()
}
})
process.stdin.pipe(outStream);
Сохраните файл как writable.js и запустите его с помощью node. Вы получите тот же результат независимо от того, что вы вводите.
Readable
Читать все данные сразу
const {Readable} = require('stream')
const inStream = new Readable()
inStream.push('hello world') // 写入数据
inStream.push('hello node')
inStream.push(null) // 没有数据了
// 将这个可读流,导入到可写流 process.stdout。
inStream.pipe(process.stdout)
Сначала поместите все данные в inStream, а затем используйте канал для импорта в доступный для записи поток.process.stdout
середина.
Таким образом, когда мы запускаем файл с узлом, мы можем прочитать все данные из inStream и распечатать их.
Этот подход очень прост, но не эффективен.
Лучший способ — отправлять по запросу, и мы читаем данные только тогда, когда они нужны пользователю.
Вызов чтения даст данные только один раз
Этот вид записи данных предоставляется по запросу, и другая сторона звонитread
, мы дадим данные только один раз.
Например, в следующем примере мы однаждыpush
Символ, начинающийся с кода символа 65 (для A).
Когда пользователь читает, он будет продолжать стрелятьread
,мы будемpush
больше персонажей.
когда все персонажиpush
сделано, мыpush null
, остановите поток.
const {Readable} = require('stream')
const inStream = new Readable({
read(size) {
const char = String.fromCharCode(this.currentCharCode++)
this.push(char);
console.log(`推了${char}`)
// 这个时候停止
if (this.currentCharCode > 90) { // Z
this.push(null)
}
}
})
inStream.currentCharCode = 65 // A
inStream.pipe(process.stdout)
Duplex Stream
После чтения потока для чтения и потока для записиDuplex Stream
Гораздо проще.
Одновременноwrite
иread
Метод в порядке.
const {Duplex} = require('stream')
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString())
callback()
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++))
if (this.currentCharCode > 90) {
this.pull(null)
}
}
})
inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);
Transform Stream
Для Transform Stream мы просто реализуем метод преобразования, который сочетает в себе методы чтения и записи.
Вот простой пример преобразования, который напечатает любой введенный вами символ в верхнем регистре:
const {Transform} = require('stream')
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
// 1. 读数据 chunk.toString()
// 2. 写数据 this.push(xxx)
this.push(chunk.toString().toUpperCase())
callback();
}
})
// 监听用户输入,调用 upperCaseTr
// 转化完成后,输出
process.stdin.pipe(upperCaseTr)
.pipe(process.stdout)
Transform Stream, встроенный в Node.js
Например, часто упоминаемая в интервью схема оптимизации: сжатие gzip.
Это можно сделать в 4 строчках кода в Node.js.
const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]
fs.createReadStream(file)
.pipe(zlib.createGzip())
.on('data', () => process.stdout.write(".")) // 打出进度条
.pipe(fs.createWriteStream(file + ".gz"))
Потоки повсюду в Node.js
Readable Stream | Writeable Stream |
---|---|
Клиент ответа HTTP | Клиент HTTP-запроса |
Сервер HTTP-запросов | Сервер ответов HTTP |
fs read stream | fs write stream |
zlib stream | zlib stream |
TCP sockets | TCP sockets |
child process stdout & stderr | child process stdin |
process.stdin | process.stdout,process.stderr |
... | ... |
проблема с отставанием данных
В Stream есть еще одна очень важная проблема: отставание данных.
Если слишком много данных заблокировано, как это решить.
На официальном сайте Node.js есть специальная статья, объясняющая, как ее решить: когда вы сталкиваетесь с проблемой, вы можете перевернуть ее и посмотреть.
Я не буду вдаваться в подробности здесь,Адрес здесь
Суммировать
Подведем итог тому, что мы уже узнали об объекте Stream.
-
Зачем использовать поток?
- Потому что при чтении и записи больших файлов это может эффективно снизить нагрузку на память.
-
Трубы Труба — важная концепция в Stream, которая может соединять потоки.
-
Все потоковые объекты наследуют EventEmitter.
-
Поток разделен на четыре категории
- Читаемый Читаемый, имеет два состояния: приостановлено, течет.
- Writable Writable, два важных события: слив, финиш.
- Чтение и запись (двунаправленный) Дуплекс
- Чтение и запись (изменение) Преобразование
-
Наконец-то узнайте, как создавать четыре типа потоков и потоков в Node.js.