Node.js — многопроцессорная модель Alibaba Egg и межпроцессное взаимодействие

Node.js Egg.js

предисловие

Недавно я использовал Egg в качестве базовой среды для разработки проектов, и мне было любопытно, как осуществляется управление его моделью с несколькими процессами, поэтому я кое-что узнал и, кстати, записал. Если в статье есть какие-либо ошибки, пожалуйста, слегка распылите

Зачем нужна многопроцессорность

С развитием технологий сегодняшние серверы в основном多核cpu. Тем не менее, узел является单进程单线程Язык (однопоточный для разработчиков, не очень). мы все знаем,cpu的调度单位是线程, и исходя из характеристик Node, мы можем использовать только один процессор одновременно. Это не только крайне малоиспользуемо, но и неприемлемо для отказоустойчивости (крах всей программы, когда что-то пойдет не так). Итак, узел имеетclusterчтобы помочь нам максимально эффективно использовать ресурсы сервера.

cluster工作原理
Всем рекомендую посмотреть принцип работы кластераэта статья, вот краткое содержание:

  1. Порт прослушивания дочернего процесса будетhackвне, но объединены мастером内部TCP监听, поэтому несколько дочерних процессов, прослушивающих один и тот же порт, не сообщат об ошибках.
  2. 请求统一经过master的内部TCP, в логике обработки запросов TCP будет выбран рабочий процесс для发送一个newconn内部消息,随消息发送客户端句柄. (Здесь можно выбрать два способа. Первый — метод циклического перебора по умолчанию на всех платформах, кроме Windows, то есть основной процесс отвечает за прослушивание порта, получение новых соединений и последующее распределение соединения на рабочий процесс по кругу.В дистрибутиве некоторые встроенные приемы предотвращают перегрузку задачи рабочего процесса.Второй заключается в том, что основной процесс создает прослушивающий сокет и отправляет его заинтересованному рабочему процессу, а рабочий процесс отвечает за прямой прием подключений.)
  3. После того, как рабочий процесс получит дескриптор,创建客户端实例(net.socket)执行具体的业务逻辑, затем вернуться.

Как показано на рисунке:

Ссылка на рисунокпроисхождение

многопроцессная модель

Первый взглядОфициальная документация яйцамодель процесса

                +--------+          +-------+
                | Master |<-------->| Agent |
                +--------+          +-------+
                ^   ^    ^
               /    |     \
             /      |       \
           /        |         \
         v          v          v
+----------+   +----------+   +----------+
| Worker 1 |   | Worker 2 |   | Worker 3 |
+----------+   +----------+   +----------+
Типы количество процессов эффект стабильность Следует ли запускать бизнес-код
Master 1 Управление процессами, пересылка сообщений между процессами очень высоко нет
Agent 1 Работа в фоновом режиме (клиент с длительным подключением) высоко небольшое количество
Worker Обычно количество ядер процессора Выполнить бизнес-код в общем да

грубо использоватьMasterВ качестве основного потока начнитеAgentПомощь в секретарском процессеWorkerЗаймитесь некоторыми общественными делами (журналы и т. д.), начнитеWorkerПроцессы выполняют реальный бизнес-код.

Реализация нескольких процессов

Код, связанный с процессом

Первый изMasterНачнем, здесь мы временно думаем, что Мастер — это топовый процесс (на самом деле там другойparentпроцесс, который будет рассмотрен позже).

/**
 * start egg app
 * @method Egg#startCluster
 * @param {Object} options {@link Master}
 * @param {Function} callback start success callback
 */
exports.startCluster = function(options, callback) {
  new Master(options).ready(callback);
};

начать сMaster的构造函数Смотреть

constructor(options) {
  super();
  // 初始化参数
  this.options = parseOptions(options);
  // worker进程的管理类 详情见 Manager及Messenger篇
  this.workerManager = new Manager();
  // messenger类, 详情见 Manager及Messenger篇
  this.messenger = new Messenger(this);
  // 设置一个ready事件 详情见get-ready npm包
  ready.mixin(this);
  // 是否为生产环境
  this.isProduction = isProduction();
  this.agentWorkerIndex = 0;
  // 是否关闭
  this.closed = false;
  ...

  接下来看的是ready的回调函数及注册的各类事件:
  this.ready(() => {
    // 将开始状态设置为true
    this.isStarted = true;
    const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
    this.logger.info('[master] %s started on %s (%sms)%s',
    frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg);

    // 发送egg-ready至各个进程并触发相关事件
    const action = 'egg-ready';
    this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
    this.messenger.send({ action, to: 'app', data: this.options });
    this.messenger.send({ action, to: 'agent', data: this.options });
    // start check agent and worker status
    this.workerManager.startCheck();
    });
    // 注册各类事件
    this.on('agent-exit', this.onAgentExit.bind(this));
    this.on('agent-start', this.onAgentStart.bind(this));
    ...
    // 检查端口并 Fork一个Agent
    detectPort((err, port) => {
      ... 
      this.forkAgentWorker();
    }
  });
}

Подводя итог, можно увидеть, что конструктор Мастера в основном初始化和注册各类相应的事件, последний запускforkAgentWorkerможно увидеть код клавиши этой функции:

const agentWorkerFile = path.join(__dirname, 'agent_worker.js');
// 通过child_process执行一个Agent
const agentWorker = childprocess.fork(agentWorkerFile, args, opt);

продолжатьagent_worker.jsГлядя выше,agent_workerсоздать экземплярagentобъект,agent_worker.jsЕсть ключевой код:

agent.ready(() => {
  agent.removeListener('error', startErrorHandler); // 清除错误监听的事件
  process.send({ action: 'agent-start', to: 'master' }); // 向master发送一个agent-start的动作
});

можно увидеть,agent_worker.jsкод вmasterСообщение отправлено, действиеagent-start, вернуться кMaster, вы можете видеть, что он зарегистрировал два события, а именноonce的forkAppWorkers和 on的onAgentStart

this.on('agent-start', this.onAgentStart.bind(this));
this.once('agent-start', this.forkAppWorkers.bind(this));

Первый взглядonAgentStartФункция, эта функция относительно проста, это передача некоторой информации:

onAgentStart() {
    this.agentWorker.status = 'started';

    // Send egg-ready when agent is started after launched
    if (this.isAllAppWorkerStarted) {
      this.messenger.send({ action: 'egg-ready', to: 'agent', data: this.options });
    }

    this.messenger.send({ action: 'egg-pids', to: 'app', data: [ this.agentWorker.pid ] });
    // should send current worker pids when agent restart
    if (this.isStarted) {
      this.messenger.send({ action: 'egg-pids', to: 'agent', data: this.workerManager.getListeningWorkerIds() });
    }

    this.messenger.send({ action: 'agent-start', to: 'app' });
    this.logger.info('[master] agent_worker#%s:%s started (%sms)',
      this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime);
  }

затем выполнитforkAppWorkersфункции, которая в основном основана наcforkМешокforkСоответствующий рабочий процесс и зарегистрируйте серию связанных событий прослушивания,

...
cfork({
  exec: this.getAppWorkerFile(),
  args,
  silent: false,
  count: this.options.workers,
  // don't refork in local env
  refork: this.isProduction,
});
...
// 触发app-start事件
cluster.on('listening', (worker, address) => {
  this.messenger.send({
    action: 'app-start',
    data: { workerPid: worker.process.pid, address },
    to: 'master',
    from: 'app',
  });
});

можно увидетьforkAppWorkersфункция слушаетListeningсобытие, вызоветmasterВверхapp-startмероприятие.

this.on('app-start', this.onAppStart.bind(this));

...
// master ready回调触发
if (this.options.sticky) {
  this.startMasterSocketServer(err => {
    if (err) return this.ready(err);
      this.ready(true);
  });
} else {
  this.ready(true);
}

// ready回调 发送egg-ready状态到各个进程
const action = 'egg-ready';
this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({ action, to: 'app', data: this.options });
this.messenger.send({ action, to: 'agent', data: this.options });

// start check agent and worker status
if (this.isProduction) {
  this.workerManager.startCheck();
}

总结下:

  1. Master.constructor: сначала выполните конструктор Мастера, есть функция обнаружения, которую нужно выполнить.
  2. Detect: Detect => forkAgentWorker()
  3. forkAgentWorker: получить процесс агента и инициировать событие запуска агента для мастера.
  4. Выполнить функцию onAgentStart, выполнить функцию forkAppWorker (один раз)
  5. onAgentStart => отправлять все виды информации, forkAppWorker => запускать событие запуска приложения мастеру
  6. Событие запуска приложения запускает метод onAppStart()
  7. onAppStart => set ready(true) => выполнить готовую функцию обратного вызова
  8. Ready() = > Отправить яйцо готово каждому процессу и вызвать соответствующие события, выполнить функцию startCheck()
+---------+           +---------+          +---------+
|  Master |           |  Agent  |          |  Worker |
+---------+           +----+----+          +----+----+
     |      fork agent     |                    |
     +-------------------->|                    |
     |      agent ready    |                    |
     |<--------------------+                    |
     |                     |     fork worker    |
     +----------------------------------------->|
     |     worker ready    |                    |
     |<-----------------------------------------+
     |      Egg ready      |                    |
     +-------------------->|                    |
     |      Egg ready      |                    |
     +----------------------------------------->|

демон процесса

Согласно официальной документации, демон процесса в основном зависит отgracefulа такжеegg-clusterэти две библиотеки.

未捕获异常

  1. Закройте все TCP-серверы аномального рабочего процесса (быстро отключите существующие соединения и больше не принимайте новые соединения), отключите канал IPC с Мастером и больше не принимайте новые запросы пользователей.
  2. Мастер немедленно разветвляет новый рабочий процесс, гарантируя, что общее количество «воркеров» в сети останется неизменным.
  3. Обработчик исключений ждет некоторое время и завершает работу после обработки принятых запросов.
+---------+                 +---------+
|  Worker |                 |  Master |
+---------+                 +----+----+
     | uncaughtException         |
     +------------+              |
     |            |              |                   +---------+
     | <----------+              |                   |  Worker |
     |                           |                   +----+----+
     |        disconnect         |   fork a new worker    |
     +-------------------------> + ---------------------> |
     |         wait...           |                        |
     |          exit             |                        |
     +-------------------------> |                        |
     |                           |                        |
    die                          |                        |
                                 |                        |
                                 |                        |

Как видно из исполняемого файла приложения,appфактически унаследовано отApplicationКласс, этот класс называетсяgraceful().

onServer(server) {
    ......
    graceful({
      server: [ server ],
      error: (err, throwErrorCount) => {
        ......
      },
    });
    ......
  }

Продолжай читатьgraceful, вы можете видеть, что он захватываетprocess.on('uncaughtException')событие и закрыть его в функции обратного вызоваTCPподключиться, закрыть свой процесс, отключиться отmasterизIPCряд.

process.on('uncaughtException', function (err) {
    ......
    // 对http连接设置 Connection: close响应头
    servers.forEach(function (server) {
      if (server instanceof http.Server) {
        server.on('request', function (req, res) {
          // Let http server set `Connection: close` header, and close the current request socket.
          req.shouldKeepAlive = false;
          res.shouldKeepAlive = false;
          if (!res._header) {
            res.setHeader('Connection', 'close');
          }
        });
      }
    });

    // 设置一个定时函数关闭子进程, 并退出本身进程
    // make sure we close down within `killTimeout` seconds
    var killtimer = setTimeout(function () {
      console.error('[%s] [graceful:worker:%s] kill timeout, exit now.', Date(), process.pid);
      if (process.env.NODE_ENV !== 'test') {
        // kill children by SIGKILL before exit
        killChildren(function() {
          // 退出本身进程
          process.exit(1);
        });
      }
    }, killTimeout);

    // But don't keep the process open just for that!
    // If there is no more io waitting, just let process exit normally.
    if (typeof killtimer.unref === 'function') {
      // only worked on node 0.10+
      killtimer.unref();
    }

    var worker = options.worker || cluster.worker;

    // cluster mode
    if (worker) {
      try {
        // 关闭TCP连接
        for (var i = 0; i < servers.length; i++) {
          var server = servers[i];
          server.close();
        }
      } catch (er1) {
        ......
      }

      try {
        // 关闭ICP通道
        worker.disconnect();
      } catch (er2) {
        ......
      }
    }
  });

хорошо, закрытоIPCПосле канала продолжаем смотретьcforkфайл, т.е. упомянутый вышеfork workerПакет, который следит за дочерним процессомdisconnectсобытие, он будет судить, следует ли перезапустить в соответствии с условиямиforkновый дочерний процесс

cluster.on('disconnect', function (worker) {
    ......
    // 存起该pid
    disconnects[worker.process.pid] = utility.logDate();
    if (allow()) {
      // fork一个新的子进程
      newWorker = forkWorker(worker._clusterSettings);
      newWorker._clusterSettings = worker._clusterSettings;
    } else {
      ......
    }
  });

Вообще говоря, в это время он будет продолжать ждать некоторое время, а затем выполнять функцию синхронизации, упомянутую выше, то есть退出进程.

OOM、系统异常об этом系统异常, а иногда и в дочернем процессе不能捕获到, мы можем обработать его только в мастере, т.е.cforkМешок.

cluster.on('exit', function (worker, code, signal) {
    // 是程序异常的话, 会通过上面提到的uncatughException重新fork一个子进程, 所以这里就不需要了
    var isExpected = !!disconnects[worker.process.pid];
    if (isExpected) {
      delete disconnects[worker.process.pid];
      // worker disconnect first, exit expected
      return;
    }
    // 是master杀死的子进程, 无需fork
    if (worker.disableRefork) {
      // worker is killed by master
      return;
    }

    if (allow()) {
      newWorker = forkWorker(worker._clusterSettings);
      newWorker._clusterSettings = worker._clusterSettings;
    } else {
      ......
    }
    cluster.emit('unexpectedExit', worker, code, signal);
  });

Межпроцессное взаимодействие (IPC)

Различное межпроцессное взаимодействие было упомянуто выше.Осторожно, вы могли обнаружить, что канал IPC кластера существует только между мастером и рабочим/агентом, а процессы рабочего и агента не имеют друг друга. Так что же делать, если вы хотите общаться между работниками? Да, вперед через Мастера.

广播消息: agent => all workers
                  +--------+          +-------+
                  | Master |<---------| Agent |
                  +--------+          +-------+
                 /    |     \
                /     |      \
               /      |       \
              /       |        \
             v        v         v
  +----------+   +----------+   +----------+
  | Worker 1 |   | Worker 2 |   | Worker 3 |
  +----------+   +----------+   +----------+

指定接收方: one worker => another worker
                  +--------+          +-------+
                  | Master |----------| Agent |
                  +--------+          +-------+
                 ^    |
     send to    /     |
    worker 2   /      |
              /       |
             /        v
  +----------+   +----------+   +----------+
  | Worker 1 |   | Worker 2 |   | Worker 3 |
  +----------+   +----------+   +----------+

существуетmaster, видно, что когдаagent和app被fork时, будет прослушивать их информацию и преобразовывать информацию в объект:

agentWorker.on('message', msg => {
  if (typeof msg === 'string') msg = { action: msg, data: msg };
  msg.from = 'agent';
  this.messenger.send(msg);
});

worker.on('message', msg => {
  if (typeof msg === 'string') msg = { action: msg, data: msg };
  msg.from = 'app';
  this.messenger.send(msg);
});

Вы можете видеть, что последний звонокmessenger.send, а messenger.send — это根据from和to来决定将信息发送到哪里

send(data) {
    if (!data.from) {
      data.from = 'master';
    }
    ......

    // app -> master
    // agent -> master
    if (data.to === 'master') {
      debug('%s -> master, data: %j', data.from, data);
      // app/agent to master
      this.sendToMaster(data);
      return;
    }

    // master -> parent
    // app -> parent
    // agent -> parent
    if (data.to === 'parent') {
      debug('%s -> parent, data: %j', data.from, data);
      this.sendToParent(data);
      return;
    }

    // parent -> master -> app
    // agent -> master -> app
    if (data.to === 'app') {
      debug('%s -> %s, data: %j', data.from, data.to, data);
      this.sendToAppWorker(data);
      return;
    }

    // parent -> master -> agent
    // app -> master -> agent,可能不指定 to
    if (data.to === 'agent') {
      debug('%s -> %s, data: %j', data.from, data.to, data);
      this.sendToAgentWorker(data);
      return;
    }
  }

masterосновывается непосредственно наactionИнформацияemitСоответствующее зарегистрированное событие

sendToMaster(data) {
  this.master.emit(data.action, data.data);
}

Агент и рабочий проходят черезsendmessagepackage, который на самом деле вызывает метод, подобный следующему

 // 将信息传给子进程
 agent.send(data)
 worker.send(data)

Наконец, базовый класс, который наследуют и агент, и приложение.EggApplicationвыше, называетсяMessengerclass, конструктор внутри класса выглядит следующим образом:

constructor() {
    super();
    ......
    this._onMessage = this._onMessage.bind(this);
    process.on('message', this._onMessage);
  }

_onMessage(message) {
    if (message && is.string(message.action)) {
      // 和master一样根据action信息emit对应的注册事件  
      this.emit(message.action, message.data);
    }
  }

总结一下:
Идея состоит в том, чтобы использовать механизм событий и канал IPC для обеспечения связи между различными процессами.

разное

В процессе обучения я столкнулся с функцией timeout.unref(), рекомендую вам обратиться к этой функции.6 этаж ответ на этот вопрос

Суммировать

На самом деле очень сложно переключиться с фронтенд-мышления на бэкэнд-мышление, кроме того, реализация управления процессами в Egg действительно мощная, поэтому я потратил много времени на размышления о различных API и идеях.

Ссылки и цитаты

Многопроцессная модель и межпроцессное взаимодействие
Анализ исходного кода яйца