Практика Nodejs - Потоковая передача

Node.js задняя часть сервер

Когда использовать потоки

При работе с большими файлами, сжатием, архивированием, мультимедийными файлами и огромными файлами журналов все данные считываются в память, и память быстро расходуется, что может вызвать большие проблемы для программы.

Если эти операции выполняются с подходящим буфером и за раз считывается фиксированная длина, будет использоваться меньше памяти, что является потоковым API.

Классы API, доступные для Stream


1. Используйте встроенную потоковую передачу для реализации статического веб-сервера.

Основные модули Node для файловой системы и сетевых операций, fs и net, оба предоставляют потоковые интерфейсы. Использование потоков для решения проблем с вводом-выводом довольно просто.

Используя основной модуль Node, реализуйте простой статический сервер:

const http = require('http');
const fs = require('fs');

const server = http.createServer(function(req,res){
	fs.readFile(__dirname + '/index.html', function(err,data){
		if(err){
			res.statusCode = 500;
			res.end(String(err))
			return;
		}
		res.end(data)
	})
})

server.listen(3000)

Хотя приведенный выше код предназначен для неблокирующего readFile, как только прочитанный файл станет очень большим или будет слишком много обращений к файлу, он быстро исчерпает память, поэтому необходимоУлучшено с помощью метода fs.createReadStream.:

const http = require('http');
const fs = require('fs');

const server = http.createServer(function(req,res){
        // 数据通过流的方式,从html 文件输出到 http 的请求响应
	fs.createReadStream(__dirname + '/index.html').pipe(res);
})

server.listen(3000)

Приведенный выше код предоставляет буфер для отправки клиенту. Если клиентское соединение медленное, сетевой поток отправит сигнал для приостановки ресурса ввода-вывода до тех пор, пока клиент не будет готов принять больше данных.

Реализуйте простой статический файловый сервер, используя потоки:

const http = require('http');
const fs = require('fs');

const server = http.createServer(function(req,res){
	let filename = req.url
	if(filename === '/'){
	   filename = '/index.html'	
	}
	fs.createReadStream(__dirname + filename ).pipe(res);	
})

server.listen(3000)


Статический сервер, сжатый с помощью gzip

const http = require('http');
const fs = require('fs');
const zlib = require('zlib')

const server = http.createServer(function(req,res){
	res.writeHead(200, { 'content-encoding': 'gzip' })
	fs.createReadStream(__dirname + '/index.html' )
            .pipe(zlib.createGzip())
            .pipe(res);	
})

server.listen(3000)



2. Читаемый читаемый поток

Stream наследуется от событий, поэтому в событиях есть методы on и emit.

1. События

  • readable--- Генерируется, когда блок данных может быть прочитан из потока.
  • data--- Инициировать это событие при доставке данных (с блоком данных фрагмента в качестве объекта)
  • end--- Запускается, когда заканчивается чтение данных
  • close--- Запускается, когда базовый ресурс (например, файл) закрывается.
  • error--- Срабатывает при ошибке в полученных данных.

2. Метод

  • read([size]) --- Читать данные из потока.Данные могут быть String, Buffer, null (будут в коде ниже), при указании размера read-only ограничивается этим количеством байт
  • setEncoding(encoding)--- Установите кодировку, используемую, когда запрос read() считывает возвращаемую строку
  • pause()--- Приостановить события данных, испускаемые этим объектом
  • resume()--- Возобновить события данных, испускаемые этим объектом
  • pipe(destination,[options]) --- Передайте прочитанный блок данных в место назначения с возможностью записи. Когда передача данных завершена и срабатывает событие «конец», одновременно срабатывает событие «конец» цели (поток с возможностью записи), в результате чего цель становится недоступной для записи.
  • unpipe([destination])---- Отключите этот объект от места назначения Writale.

Примечания по наследованию читаемых потоков:

  • Все блоки данных, возвращаемые методом readable.read, добавляются во внутреннюю очередь чтения методом readable.push.
  • Все подклассы, наследующие читаемые потоки, должны реализовыватьreadable._read()метод для получения базовых ресурсов данных и может вызываться только внутренними методами объекта Readable и не должен вызываться непосредственно пользовательской программой. существует readable._read()В реализации, только если есть еще данные для чтения, он должен вызыватьсяreadable.push(chunk) метод добавляет данные во внутреннюю читаемую очередь путемreadable.readМетод чтения для использования приложения.
  • Как только экземпляр прослушивает событие данных, возвращаемое значение readable._read() будет потеряно.

Пример: реализация читаемого потока

const { Readable } = require('stream');
const util = require('util');

util.inherits(MyReadStream, Readable)

function MyReadStream(arr){
	this.source = arr;
	Readable.call(this);
}

MyReadStream.prototype._read = function(){
	if(this.source.length){
		this.push(this.source[0])
		this.source.splice(0,1)
	}else{
		this.push(null)
	}
}

let myStream = new MyReadStream(['php','js','java'])

myStream.on('readable',function(){
	let output_buf  = myStream.read();
	console.log(output_buf,'output')  // null
})

myStream.on('data',function(res){
	console.log(res.toString(),'data')
})
myStream.on('end',function(){
	console.log('end')
})

В приведенном выше коде вreadableпозвонить в случаеreadметод чтения строки и прослушиванияdataсобытие для вывода прочитанных данных.


3. Доступный для записи поток

Интерфейс записываемого потока — это абстракция над местом назначения, в которое записываются данные.

1. Метод

write(chunk,[encoding],[callback])--- Запись данных в поток. Фрагмент (блок данных) содержит данные для записи, кодирование указывает кодировку строки, а обратный вызов указывает функцию обратного вызова, которая выполняется после полного обновления данных. write() возвращает true, если запись прошла успешно.

end([chunk],[encoding],[callback]) --- Так же, как и write(), он устанавливает объект Writable в состояние, при котором данные больше не принимаются, и отправляет событие завершения.

2. События

drain -- После того, как вызов write() вернет false, создайте это событие, чтобы уведомить монитор, когда он будет готов начать запись дополнительных данных.

finish -- Запускается, когда end() вызывается для записываемого объекта, поэтому данные сбрасываются и больше данные не принимаются

pipe-- Генерируется, когда метод pipe() вызывается в доступном для чтения потоке, который добавил этот доступный для записи в качестве пункта назначения

unpipe-- Генерируется при вызове метода unpipe() с объектом удаления Writable в качестве места назначения.


Примечания по наследованию доступных для записи потоков:

  • writable.write()метод записывает данные в поток и вызывается после завершения обработки данныхcallback. Если возникает ошибка,callbackНе обязательно с этой ошибкой в ​​качестве первого аргумента и вызывается. Чтобы гарантировать надежное обнаружение ошибок записи, вы должны прослушивать'error'событие.
  • Все реализации записываемого потока должны предоставлятьwritable._write()метод для отправки данных в базовый ресурс.


Пример. Реализуйте доступный для записи поток от стандартного ввода к стандартному выводу и оцените, содержит ли входной символ a, сообщите об ошибке и завершите работу.

const { Writable } = require('stream');
const util = require('util');

util.inherits(MyWriteStream, Writable)


function MyWriteStream(options){
	Writable.call(this, options);
}

MyWriteStream.prototype._write = function(chunk, encoding, callback){
	if(chunk.toString().indexOf('a') > -1){
		process.stdout.write("新写入的:"+ chunk)
		callback(null)
	}else{
		callback(new Error('no a'))
	}
}

let myStream = new MyWriteStream();
myStream.write('abc\n')
process.stdin.pipe(myStream)

Примечание: необходимо вызватьcallbackметод, указывающий на успех или неудачу записи. Если возникает ошибка,callbackПервый параметр должен бытьErrorобъект, параметры успехаnull.


4. Дуплексный поток -- поток с возможностью чтения и записи.

наследованиеstream.Duplex дуплексный поток

Пример. Реализуйте программу, которая изменяет цвет содержимого стандартного ввода, а затем печатает его из стандартного вывода.

const { Duplex } = require('stream');
const util = require('util');

util.inherits(MyDuplexStream, Duplex)


function MyDuplexStream(options){
	Duplex.call(this, options);
	this.wating = false;
}

MyDuplexStream.prototype._write = function(chunk, encoding, callback){
	this.wating = false;
        // 把数据推动到内部队列
	this.push('\u001b[32m' +  chunk + '\u001b[39m');
	callback()
}

MyDuplexStream.prototype._read = function(chunk, encoding, callback){
	if(!this.wating){
                // 在等待数据时展示一个提示
		this.push('等待输入> ')
		this.wating = true;
	}
}

let myStream = new MyDuplexStream();

// 获取标准输入,用管道传给双工流,单后返回给标准输出
process.stdin.pipe(myStream).pipe(process.stdout)



Пять, конверсионный поток

Потоки преобразования очень похожи на дуплексные потоки и также реализуют интерфейсы Readable и Writable. Разница в том, что поток преобразования преобразует данные или реализуется с помощью _transform. Этот метод имеет три параметра: блок данных преобразователя, код кодирования, обратный вызов (во многом похожий на _write), который выполняет обратный вызов после завершения преобразования данных, позволяя потоку преобразования анализировать данные асинхронно.

Примеры ожидаются.