Когда использовать потоки
При работе с большими файлами, сжатием, архивированием, мультимедийными файлами и огромными файлами журналов все данные считываются в память, и память быстро расходуется, что может вызвать большие проблемы для программы.
Если эти операции выполняются с подходящим буфером и за раз считывается фиксированная длина, будет использоваться меньше памяти, что является потоковым API.
Классы API, доступные для Stream
- Readable- читаемый поток (например,
fs.createReadStream()
). -
Writable- доступные для записи потоки (например,
fs.createWriteStream()
). -
Duplex- потоки чтения-записи (например,
net.Socket
). -
Transform- Дуплексные потоки, которые могут изменять и преобразовывать данные во время чтения и записи (например,
zlib.createDeflate()
).
1. Используйте встроенную потоковую передачу для реализации статического веб-сервера.
Основные модули Node для файловой системы и сетевых операций, fs и net, оба предоставляют потоковые интерфейсы. Использование потоков для решения проблем с вводом-выводом довольно просто.
Используя основной модуль Node, реализуйте простой статический сервер:
const http = require('http');
const fs = require('fs');
const server = http.createServer(function(req,res){
fs.readFile(__dirname + '/index.html', function(err,data){
if(err){
res.statusCode = 500;
res.end(String(err))
return;
}
res.end(data)
})
})
server.listen(3000)
Хотя приведенный выше код предназначен для неблокирующего readFile, как только прочитанный файл станет очень большим или будет слишком много обращений к файлу, он быстро исчерпает память, поэтому необходимоУлучшено с помощью метода fs.createReadStream.:
const http = require('http');
const fs = require('fs');
const server = http.createServer(function(req,res){
// 数据通过流的方式,从html 文件输出到 http 的请求响应
fs.createReadStream(__dirname + '/index.html').pipe(res);
})
server.listen(3000)
Приведенный выше код предоставляет буфер для отправки клиенту. Если клиентское соединение медленное, сетевой поток отправит сигнал для приостановки ресурса ввода-вывода до тех пор, пока клиент не будет готов принять больше данных.
Реализуйте простой статический файловый сервер, используя потоки:
const http = require('http');
const fs = require('fs');
const server = http.createServer(function(req,res){
let filename = req.url
if(filename === '/'){
filename = '/index.html'
}
fs.createReadStream(__dirname + filename ).pipe(res);
})
server.listen(3000)
Статический сервер, сжатый с помощью gzip
const http = require('http');
const fs = require('fs');
const zlib = require('zlib')
const server = http.createServer(function(req,res){
res.writeHead(200, { 'content-encoding': 'gzip' })
fs.createReadStream(__dirname + '/index.html' )
.pipe(zlib.createGzip())
.pipe(res);
})
server.listen(3000)
2. Читаемый читаемый поток
Stream наследуется от событий, поэтому в событиях есть методы on и emit.
1. События
-
readable--- Генерируется, когда блок данных может быть прочитан из потока.
-
data--- Инициировать это событие при доставке данных (с блоком данных фрагмента в качестве объекта)
-
end--- Запускается, когда заканчивается чтение данных
-
close--- Запускается, когда базовый ресурс (например, файл) закрывается.
-
error--- Срабатывает при ошибке в полученных данных.
2. Метод
-
read([size]) --- Читать данные из потока.Данные могут быть String, Buffer, null (будут в коде ниже), при указании размера read-only ограничивается этим количеством байт
-
setEncoding(encoding)--- Установите кодировку, используемую, когда запрос read() считывает возвращаемую строку
-
pause()--- Приостановить события данных, испускаемые этим объектом
-
resume()--- Возобновить события данных, испускаемые этим объектом
-
pipe(destination,[options]) --- Передайте прочитанный блок данных в место назначения с возможностью записи. Когда передача данных завершена и срабатывает событие «конец», одновременно срабатывает событие «конец» цели (поток с возможностью записи), в результате чего цель становится недоступной для записи.
-
unpipe([destination])---- Отключите этот объект от места назначения Writale.
Примечания по наследованию читаемых потоков:
- Все блоки данных, возвращаемые методом readable.read, добавляются во внутреннюю очередь чтения методом readable.push.
- Все подклассы, наследующие читаемые потоки, должны реализовывать
readable._read()
метод для получения базовых ресурсов данных и может вызываться только внутренними методами объекта Readable и не должен вызываться непосредственно пользовательской программой. существуетreadable._read()
В реализации, только если есть еще данные для чтения, он должен вызыватьсяreadable.push(chunk)
метод добавляет данные во внутреннюю читаемую очередь путемreadable.read
Метод чтения для использования приложения. - Как только экземпляр прослушивает событие данных, возвращаемое значение readable._read() будет потеряно.
Пример: реализация читаемого потока
const { Readable } = require('stream');
const util = require('util');
util.inherits(MyReadStream, Readable)
function MyReadStream(arr){
this.source = arr;
Readable.call(this);
}
MyReadStream.prototype._read = function(){
if(this.source.length){
this.push(this.source[0])
this.source.splice(0,1)
}else{
this.push(null)
}
}
let myStream = new MyReadStream(['php','js','java'])
myStream.on('readable',function(){
let output_buf = myStream.read();
console.log(output_buf,'output') // null
})
myStream.on('data',function(res){
console.log(res.toString(),'data')
})
myStream.on('end',function(){
console.log('end')
})
В приведенном выше коде вreadable
позвонить в случаеread
метод чтения строки и прослушиванияdata
событие для вывода прочитанных данных.
3. Доступный для записи поток
Интерфейс записываемого потока — это абстракция над местом назначения, в которое записываются данные.
1. Метод
write(chunk,[encoding],[callback])--- Запись данных в поток. Фрагмент (блок данных) содержит данные для записи, кодирование указывает кодировку строки, а обратный вызов указывает функцию обратного вызова, которая выполняется после полного обновления данных. write() возвращает true, если запись прошла успешно.
end([chunk],[encoding],[callback]) --- Так же, как и write(), он устанавливает объект Writable в состояние, при котором данные больше не принимаются, и отправляет событие завершения.
2. События
drain -- После того, как вызов write() вернет false, создайте это событие, чтобы уведомить монитор, когда он будет готов начать запись дополнительных данных.
finish -- Запускается, когда end() вызывается для записываемого объекта, поэтому данные сбрасываются и больше данные не принимаются
pipe-- Генерируется, когда метод pipe() вызывается в доступном для чтения потоке, который добавил этот доступный для записи в качестве пункта назначения
unpipe-- Генерируется при вызове метода unpipe() с объектом удаления Writable в качестве места назначения.
Примечания по наследованию доступных для записи потоков:
-
writable.write()
метод записывает данные в поток и вызывается после завершения обработки данныхcallback
. Если возникает ошибка,callback
Не обязательно с этой ошибкой в качестве первого аргумента и вызывается. Чтобы гарантировать надежное обнаружение ошибок записи, вы должны прослушивать'error'
событие.
- Все реализации записываемого потока должны предоставлять
writable._write()
метод для отправки данных в базовый ресурс.
Пример. Реализуйте доступный для записи поток от стандартного ввода к стандартному выводу и оцените, содержит ли входной символ a, сообщите об ошибке и завершите работу.
const { Writable } = require('stream');
const util = require('util');
util.inherits(MyWriteStream, Writable)
function MyWriteStream(options){
Writable.call(this, options);
}
MyWriteStream.prototype._write = function(chunk, encoding, callback){
if(chunk.toString().indexOf('a') > -1){
process.stdout.write("新写入的:"+ chunk)
callback(null)
}else{
callback(new Error('no a'))
}
}
let myStream = new MyWriteStream();
myStream.write('abc\n')
process.stdin.pipe(myStream)
Примечание: необходимо вызватьcallback
метод, указывающий на успех или неудачу записи. Если возникает ошибка,callback
Первый параметр должен бытьError
объект, параметры успехаnull
.
4. Дуплексный поток -- поток с возможностью чтения и записи.
наследованиеstream.Duplex
дуплексный поток
Пример. Реализуйте программу, которая изменяет цвет содержимого стандартного ввода, а затем печатает его из стандартного вывода.
const { Duplex } = require('stream');
const util = require('util');
util.inherits(MyDuplexStream, Duplex)
function MyDuplexStream(options){
Duplex.call(this, options);
this.wating = false;
}
MyDuplexStream.prototype._write = function(chunk, encoding, callback){
this.wating = false;
// 把数据推动到内部队列
this.push('\u001b[32m' + chunk + '\u001b[39m');
callback()
}
MyDuplexStream.prototype._read = function(chunk, encoding, callback){
if(!this.wating){
// 在等待数据时展示一个提示
this.push('等待输入> ')
this.wating = true;
}
}
let myStream = new MyDuplexStream();
// 获取标准输入,用管道传给双工流,单后返回给标准输出
process.stdin.pipe(myStream).pipe(process.stdout)
Пять, конверсионный поток
Потоки преобразования очень похожи на дуплексные потоки и также реализуют интерфейсы Readable и Writable. Разница в том, что поток преобразования преобразует данные или реализуется с помощью _transform. Этот метод имеет три параметра: блок данных преобразователя, код кодирования, обратный вызов (во многом похожий на _write), который выполняет обратный вызов после завершения преобразования данных, позволяя потоку преобразования анализировать данные асинхронно.
Примеры ожидаются.