предисловие
Недавно я использовал Egg в качестве базовой среды для разработки проектов, и мне было любопытно, как осуществляется управление его моделью с несколькими процессами, поэтому я кое-что узнал и, кстати, записал. Если в статье есть какие-либо ошибки, пожалуйста, слегка распылите
Зачем нужна многопроцессорность
С развитием технологий сегодняшние серверы в основном多核cpu
. Тем не менее, узел является单进程单线程
Язык (однопоточный для разработчиков, не очень). мы все знаем,cpu的调度单位是线程
, и исходя из характеристик Node, мы можем использовать только один процессор одновременно. Это не только крайне малоиспользуемо, но и неприемлемо для отказоустойчивости (крах всей программы, когда что-то пойдет не так). Итак, узел имеетclusterчтобы помочь нам максимально эффективно использовать ресурсы сервера.
cluster工作原理
Всем рекомендую посмотреть принцип работы кластераэта статья, вот краткое содержание:
- Порт прослушивания дочернего процесса будет
hack
вне, но объединены мастером内部TCP监听
, поэтому несколько дочерних процессов, прослушивающих один и тот же порт, не сообщат об ошибках. -
请求统一经过master的内部TCP
, в логике обработки запросов TCP будет выбран рабочий процесс для发送一个newconn内部消息,随消息发送客户端句柄
. (Здесь можно выбрать два способа. Первый — метод циклического перебора по умолчанию на всех платформах, кроме Windows, то есть основной процесс отвечает за прослушивание порта, получение новых соединений и последующее распределение соединения на рабочий процесс по кругу.В дистрибутиве некоторые встроенные приемы предотвращают перегрузку задачи рабочего процесса.Второй заключается в том, что основной процесс создает прослушивающий сокет и отправляет его заинтересованному рабочему процессу, а рабочий процесс отвечает за прямой прием подключений.) - После того, как рабочий процесс получит дескриптор,
创建客户端实例(net.socket)执行具体的业务逻辑
, затем вернуться.
Как показано на рисунке:
многопроцессная модель
Первый взглядОфициальная документация яйцамодель процесса
+--------+ +-------+
| Master |<-------->| Agent |
+--------+ +-------+
^ ^ ^
/ | \
/ | \
/ | \
v v v
+----------+ +----------+ +----------+
| Worker 1 | | Worker 2 | | Worker 3 |
+----------+ +----------+ +----------+
Типы | количество процессов | эффект | стабильность | Следует ли запускать бизнес-код |
---|---|---|---|---|
Master | 1 | Управление процессами, пересылка сообщений между процессами | очень высоко | нет |
Agent | 1 | Работа в фоновом режиме (клиент с длительным подключением) | высоко | небольшое количество |
Worker | Обычно количество ядер процессора | Выполнить бизнес-код | в общем | да |
грубо использоватьMaster
В качестве основного потока начнитеAgent
Помощь в секретарском процессеWorker
Займитесь некоторыми общественными делами (журналы и т. д.), начнитеWorker
Процессы выполняют реальный бизнес-код.
Реализация нескольких процессов
Код, связанный с процессом
Первый изMaster
Начнем, здесь мы временно думаем, что Мастер — это топовый процесс (на самом деле там другойparent
процесс, который будет рассмотрен позже).
/**
* start egg app
* @method Egg#startCluster
* @param {Object} options {@link Master}
* @param {Function} callback start success callback
*/
exports.startCluster = function(options, callback) {
new Master(options).ready(callback);
};
начать сMaster的构造函数
Смотреть
constructor(options) {
super();
// 初始化参数
this.options = parseOptions(options);
// worker进程的管理类 详情见 Manager及Messenger篇
this.workerManager = new Manager();
// messenger类, 详情见 Manager及Messenger篇
this.messenger = new Messenger(this);
// 设置一个ready事件 详情见get-ready npm包
ready.mixin(this);
// 是否为生产环境
this.isProduction = isProduction();
this.agentWorkerIndex = 0;
// 是否关闭
this.closed = false;
...
接下来看的是ready的回调函数及注册的各类事件:
this.ready(() => {
// 将开始状态设置为true
this.isStarted = true;
const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
this.logger.info('[master] %s started on %s (%sms)%s',
frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg);
// 发送egg-ready至各个进程并触发相关事件
const action = 'egg-ready';
this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({ action, to: 'app', data: this.options });
this.messenger.send({ action, to: 'agent', data: this.options });
// start check agent and worker status
this.workerManager.startCheck();
});
// 注册各类事件
this.on('agent-exit', this.onAgentExit.bind(this));
this.on('agent-start', this.onAgentStart.bind(this));
...
// 检查端口并 Fork一个Agent
detectPort((err, port) => {
...
this.forkAgentWorker();
}
});
}
Подводя итог, можно увидеть, что конструктор Мастера в основном初始化和注册各类相应的事件
, последний запускforkAgentWorker
можно увидеть код клавиши этой функции:
const agentWorkerFile = path.join(__dirname, 'agent_worker.js');
// 通过child_process执行一个Agent
const agentWorker = childprocess.fork(agentWorkerFile, args, opt);
продолжатьagent_worker.js
Глядя выше,agent_worker
создать экземплярagent
объект,agent_worker.js
Есть ключевой код:
agent.ready(() => {
agent.removeListener('error', startErrorHandler); // 清除错误监听的事件
process.send({ action: 'agent-start', to: 'master' }); // 向master发送一个agent-start的动作
});
можно увидеть,agent_worker.js
код вmaster
Сообщение отправлено, действиеagent-start
, вернуться кMaster
, вы можете видеть, что он зарегистрировал два события, а именноonce的forkAppWorkers和 on的onAgentStart
this.on('agent-start', this.onAgentStart.bind(this));
this.once('agent-start', this.forkAppWorkers.bind(this));
Первый взглядonAgentStart
Функция, эта функция относительно проста, это передача некоторой информации:
onAgentStart() {
this.agentWorker.status = 'started';
// Send egg-ready when agent is started after launched
if (this.isAllAppWorkerStarted) {
this.messenger.send({ action: 'egg-ready', to: 'agent', data: this.options });
}
this.messenger.send({ action: 'egg-pids', to: 'app', data: [ this.agentWorker.pid ] });
// should send current worker pids when agent restart
if (this.isStarted) {
this.messenger.send({ action: 'egg-pids', to: 'agent', data: this.workerManager.getListeningWorkerIds() });
}
this.messenger.send({ action: 'agent-start', to: 'app' });
this.logger.info('[master] agent_worker#%s:%s started (%sms)',
this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime);
}
затем выполнитforkAppWorkers
функции, которая в основном основана наcforkМешокfork
Соответствующий рабочий процесс и зарегистрируйте серию связанных событий прослушивания,
...
cfork({
exec: this.getAppWorkerFile(),
args,
silent: false,
count: this.options.workers,
// don't refork in local env
refork: this.isProduction,
});
...
// 触发app-start事件
cluster.on('listening', (worker, address) => {
this.messenger.send({
action: 'app-start',
data: { workerPid: worker.process.pid, address },
to: 'master',
from: 'app',
});
});
можно увидетьforkAppWorkers
функция слушаетListening
событие, вызоветmaster
Вверхapp-start
мероприятие.
this.on('app-start', this.onAppStart.bind(this));
...
// master ready回调触发
if (this.options.sticky) {
this.startMasterSocketServer(err => {
if (err) return this.ready(err);
this.ready(true);
});
} else {
this.ready(true);
}
// ready回调 发送egg-ready状态到各个进程
const action = 'egg-ready';
this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({ action, to: 'app', data: this.options });
this.messenger.send({ action, to: 'agent', data: this.options });
// start check agent and worker status
if (this.isProduction) {
this.workerManager.startCheck();
}
总结下:
- Master.constructor: сначала выполните конструктор Мастера, есть функция обнаружения, которую нужно выполнить.
- Detect: Detect => forkAgentWorker()
- forkAgentWorker: получить процесс агента и инициировать событие запуска агента для мастера.
- Выполнить функцию onAgentStart, выполнить функцию forkAppWorker (один раз)
- onAgentStart => отправлять все виды информации, forkAppWorker => запускать событие запуска приложения мастеру
- Событие запуска приложения запускает метод onAppStart()
- onAppStart => set ready(true) => выполнить готовую функцию обратного вызова
- Ready() = > Отправить яйцо готово каждому процессу и вызвать соответствующие события, выполнить функцию startCheck()
+---------+ +---------+ +---------+
| Master | | Agent | | Worker |
+---------+ +----+----+ +----+----+
| fork agent | |
+-------------------->| |
| agent ready | |
|<--------------------+ |
| | fork worker |
+----------------------------------------->|
| worker ready | |
|<-----------------------------------------+
| Egg ready | |
+-------------------->| |
| Egg ready | |
+----------------------------------------->|
демон процесса
Согласно официальной документации, демон процесса в основном зависит отgracefulа такжеegg-clusterэти две библиотеки.
未捕获异常
- Закройте все TCP-серверы аномального рабочего процесса (быстро отключите существующие соединения и больше не принимайте новые соединения), отключите канал IPC с Мастером и больше не принимайте новые запросы пользователей.
- Мастер немедленно разветвляет новый рабочий процесс, гарантируя, что общее количество «воркеров» в сети останется неизменным.
- Обработчик исключений ждет некоторое время и завершает работу после обработки принятых запросов.
+---------+ +---------+
| Worker | | Master |
+---------+ +----+----+
| uncaughtException |
+------------+ |
| | | +---------+
| <----------+ | | Worker |
| | +----+----+
| disconnect | fork a new worker |
+-------------------------> + ---------------------> |
| wait... | |
| exit | |
+-------------------------> | |
| | |
die | |
| |
| |
Как видно из исполняемого файла приложения,app
фактически унаследовано отApplicationКласс, этот класс называетсяgraceful()
.
onServer(server) {
......
graceful({
server: [ server ],
error: (err, throwErrorCount) => {
......
},
});
......
}
Продолжай читатьgraceful
, вы можете видеть, что он захватываетprocess.on('uncaughtException')
событие и закрыть его в функции обратного вызоваTCP
подключиться, закрыть свой процесс, отключиться отmaster
изIPC
ряд.
process.on('uncaughtException', function (err) {
......
// 对http连接设置 Connection: close响应头
servers.forEach(function (server) {
if (server instanceof http.Server) {
server.on('request', function (req, res) {
// Let http server set `Connection: close` header, and close the current request socket.
req.shouldKeepAlive = false;
res.shouldKeepAlive = false;
if (!res._header) {
res.setHeader('Connection', 'close');
}
});
}
});
// 设置一个定时函数关闭子进程, 并退出本身进程
// make sure we close down within `killTimeout` seconds
var killtimer = setTimeout(function () {
console.error('[%s] [graceful:worker:%s] kill timeout, exit now.', Date(), process.pid);
if (process.env.NODE_ENV !== 'test') {
// kill children by SIGKILL before exit
killChildren(function() {
// 退出本身进程
process.exit(1);
});
}
}, killTimeout);
// But don't keep the process open just for that!
// If there is no more io waitting, just let process exit normally.
if (typeof killtimer.unref === 'function') {
// only worked on node 0.10+
killtimer.unref();
}
var worker = options.worker || cluster.worker;
// cluster mode
if (worker) {
try {
// 关闭TCP连接
for (var i = 0; i < servers.length; i++) {
var server = servers[i];
server.close();
}
} catch (er1) {
......
}
try {
// 关闭ICP通道
worker.disconnect();
} catch (er2) {
......
}
}
});
хорошо, закрытоIPC
После канала продолжаем смотретьcfork
файл, т.е. упомянутый вышеfork worker
Пакет, который следит за дочерним процессомdisconnect
событие, он будет судить, следует ли перезапустить в соответствии с условиямиfork
новый дочерний процесс
cluster.on('disconnect', function (worker) {
......
// 存起该pid
disconnects[worker.process.pid] = utility.logDate();
if (allow()) {
// fork一个新的子进程
newWorker = forkWorker(worker._clusterSettings);
newWorker._clusterSettings = worker._clusterSettings;
} else {
......
}
});
Вообще говоря, в это время он будет продолжать ждать некоторое время, а затем выполнять функцию синхронизации, упомянутую выше, то есть退出进程
.
OOM、系统异常
об этом系统异常
, а иногда и в дочернем процессе不能捕获到
, мы можем обработать его только в мастере, т.е.cfork
Мешок.
cluster.on('exit', function (worker, code, signal) {
// 是程序异常的话, 会通过上面提到的uncatughException重新fork一个子进程, 所以这里就不需要了
var isExpected = !!disconnects[worker.process.pid];
if (isExpected) {
delete disconnects[worker.process.pid];
// worker disconnect first, exit expected
return;
}
// 是master杀死的子进程, 无需fork
if (worker.disableRefork) {
// worker is killed by master
return;
}
if (allow()) {
newWorker = forkWorker(worker._clusterSettings);
newWorker._clusterSettings = worker._clusterSettings;
} else {
......
}
cluster.emit('unexpectedExit', worker, code, signal);
});
Межпроцессное взаимодействие (IPC)
Различное межпроцессное взаимодействие было упомянуто выше.Осторожно, вы могли обнаружить, что канал IPC кластера существует только между мастером и рабочим/агентом, а процессы рабочего и агента не имеют друг друга. Так что же делать, если вы хотите общаться между работниками? Да, вперед через Мастера.
广播消息: agent => all workers
+--------+ +-------+
| Master |<---------| Agent |
+--------+ +-------+
/ | \
/ | \
/ | \
/ | \
v v v
+----------+ +----------+ +----------+
| Worker 1 | | Worker 2 | | Worker 3 |
+----------+ +----------+ +----------+
指定接收方: one worker => another worker
+--------+ +-------+
| Master |----------| Agent |
+--------+ +-------+
^ |
send to / |
worker 2 / |
/ |
/ v
+----------+ +----------+ +----------+
| Worker 1 | | Worker 2 | | Worker 3 |
+----------+ +----------+ +----------+
существуетmaster
, видно, что когдаagent和app被fork时
, будет прослушивать их информацию и преобразовывать информацию в объект:
agentWorker.on('message', msg => {
if (typeof msg === 'string') msg = { action: msg, data: msg };
msg.from = 'agent';
this.messenger.send(msg);
});
worker.on('message', msg => {
if (typeof msg === 'string') msg = { action: msg, data: msg };
msg.from = 'app';
this.messenger.send(msg);
});
Вы можете видеть, что последний звонокmessenger.send
, а messenger.send — это根据from和to来决定将信息发送到哪里
send(data) {
if (!data.from) {
data.from = 'master';
}
......
// app -> master
// agent -> master
if (data.to === 'master') {
debug('%s -> master, data: %j', data.from, data);
// app/agent to master
this.sendToMaster(data);
return;
}
// master -> parent
// app -> parent
// agent -> parent
if (data.to === 'parent') {
debug('%s -> parent, data: %j', data.from, data);
this.sendToParent(data);
return;
}
// parent -> master -> app
// agent -> master -> app
if (data.to === 'app') {
debug('%s -> %s, data: %j', data.from, data.to, data);
this.sendToAppWorker(data);
return;
}
// parent -> master -> agent
// app -> master -> agent,可能不指定 to
if (data.to === 'agent') {
debug('%s -> %s, data: %j', data.from, data.to, data);
this.sendToAgentWorker(data);
return;
}
}
master
основывается непосредственно наaction
Информацияemit
Соответствующее зарегистрированное событие
sendToMaster(data) {
this.master.emit(data.action, data.data);
}
Агент и рабочий проходят черезsendmessage
package, который на самом деле вызывает метод, подобный следующему
// 将信息传给子进程
agent.send(data)
worker.send(data)
Наконец, базовый класс, который наследуют и агент, и приложение.EggApplication
выше, называетсяMessenger
class, конструктор внутри класса выглядит следующим образом:
constructor() {
super();
......
this._onMessage = this._onMessage.bind(this);
process.on('message', this._onMessage);
}
_onMessage(message) {
if (message && is.string(message.action)) {
// 和master一样根据action信息emit对应的注册事件
this.emit(message.action, message.data);
}
}
总结一下:
Идея состоит в том, чтобы использовать механизм событий и канал IPC для обеспечения связи между различными процессами.
разное
В процессе обучения я столкнулся с функцией timeout.unref(), рекомендую вам обратиться к этой функции.6 этаж ответ на этот вопрос
Суммировать
На самом деле очень сложно переключиться с фронтенд-мышления на бэкэнд-мышление, кроме того, реализация управления процессами в Egg действительно мощная, поэтому я потратил много времени на размышления о различных API и идеях.
Ссылки и цитаты
Многопроцессная модель и межпроцессное взаимодействие
Анализ исходного кода яйца