Управление процессами Node.js

Node.js внешний интерфейс
Управление процессами Node.js

Как мы все знаем, Node основан на V8, а в V8 JavaScript работает в одном потоке.Единственный поток здесь не означает, что при запуске Node существует только один поток, но что исполняемый код JavaScript находится в одном потоке. другие потоки, такие как поток ввода-вывода, выполняющий асинхронные операции ввода-вывода. Преимущество этой однопоточной модели заключается в том, что переключение контекста не будет выполняться часто во время процесса планирования системы, что улучшает использование одноядерных процессоров.

Но у этого подхода есть недостаток, то есть мы не можем воспользоваться многоядерной производительностью ЦП сервера, а процесс Node может использовать только один ЦП. А в однопоточном режиме при сбое кода падает вся программа. Обычное решение — использовать кластерный модуль Node черезmaster-workerрежим включает несколько экземпляров процесса. Далее мы подробно опишем, как Node использует многопроцессорную модель для использования многоядерных процессоров, а также конкретный принцип работы встроенного кластерного модуля.

Как создать дочерний процесс

узел обеспечиваетchild_processМодуль используется для создания дочерних процессов.В этом модуле есть четыре метода для создания дочерних процессов.

const { spawn, exec, execFile, fork } = require('child_process')

spawn(command[, args][, options])

exec(command[, options][, callback])

execFile(file[, args][, options][, callback])

fork(modulePath[, args][, options])

spawn

Прежде всего, давайте познакомимся с методом spawn.Ниже приведен официальный пример документации Node.

const { spawn } = require('child_process');
const child = spawn('ls', ['-lh', '/home']);

child.on('close', (code) => {
  console.log(`子进程退出码:${code}`);
});

const { stdin, stdout, stderr } = child

stdout.on('data', (data) => {
  console.log(`stdout: ${data}`);
});

stderr.on('data', (data) => {
  console.log(`stderr: ${data}`);
});

Дочерний процесс, созданный spawn, наследуется от EventEmitter, поэтому на нем могут выполняться события (discount,error,close,message) мониторинг. В то же время подпроцесс имеет три потока ввода и вывода: stdin, stdout и stderr, через эти три потока можно в режиме реального времени получать ввод и вывод, а также информацию об ошибках подпроцесса.

Окончательная реализация этого метода основана на libuv, который здесь обсуждаться не будет, если вам интересно, вы можете его проверить.исходный код.

// 调用libuv的api,初始化一个进程
int err = uv_spawn(env->event_loop(), &wrap->process_, &options);

exec/execFile

Причина, по которой они объединены, заключается в том, что последним вызовом exec является метод execFile, а исходный код находится вздесь. Единственное отличие состоит в том, что вызов в execnormalizeExecArgsпо умолчанию для атрибута оболочки opts будет установлено значение true.

exports.exec = function exec(/* command , options, callback */) {
  const opts = normalizeExecArgs.apply(null, arguments);
  return exports.execFile(opts.file, opts.options, opts.callback);
};

function normalizeExecArgs(command, options, callback) {
  options = { ...options };
  options.shell = typeof options.shell === 'string' ? options.shell : true;
  return { options };
}

В execFile последний вызовspawnметод.

exports.execFile = function execFile(file /* , args, options, callback */) {
  let args = [];
  let callback;
  let options;
  var child = spawn(file, args, {
    // ... some options
  });
  
  return child;
}

exec преобразует входной и выходной потоки spawn в строку, которая по умолчанию использует кодировку UTF-8, а затем передает ее в функцию обратного вызова.Использование метода обратного вызова более знакомо в узле и проще в работе, чем потоки, поэтому мы можем используйте метод exec для выполнения некоторыхshellкоманду, а затем получить возвращаемое значение в обратном вызове. Следует отметить, что буфер здесь имеет максимальную площадь буфера, при превышении которой он будет уничтожен напрямую, это можно настроить через атрибут maxBuffer (по умолчанию: 200*1024).

const { exec } = require('child_process');
exec('ls -lh /home', (error, stdout, stderr) => {
  console.log(`stdout: ${stdout}`);
  console.log(`stderr: ${stderr}`);
});

fork

Наконец, fork вызывает spawn для создания дочернего процесса, но fork — это особый случай spawn, который используется для порождения нового процесса Node.js, который будет генерировать новый экземпляр V8, поэтому вам нужно указать файл js при выполнении метод вилки.

exports.fork = function fork(modulePath /* , args, options */) {
  // ...
  
  options.shell = false;

  return spawn(options.execPath, args, options);
};

После того, как дочерний процесс будет создан с помощью fork, родительский и дочерний процессы будут напрямую создавать канал IPC (межпроцессное взаимодействие), который удобен для прямой связи между родительским и дочерним процессами и используется на уровне js.process.send(message)а такжеprocess.on('message', msg => {})общаться. На нижнем уровне существует множество способов реализации межпроцессного взаимодействия, межпроцессное взаимодействие Node реализовано на основе libuv, а методы реализации в разных операционных системах несовместимы. В системе *unix это реализовано с помощью Unix Domain Socket, а в Windows — с помощью именованного канала.

Общие методы межпроцессного взаимодействия: очереди сообщений, разделяемая память, каналы, семафоры, сокеты.

Ниже приведен пример взаимодействия процессов родитель-потомок.

parent.js

const path = require('path')
const { fork } = require('child_process')

const child = fork(path.join(__dirname, 'child.js'))

child.on('message', msg => {
    console.log('message from child', msg)
});

child.send('hello child, I\'m master')

child.js

process.on('message', msg => {
  console.log('message from master:', msg)
});
let counter = 0
setInterval(() => {
  process.send({
    child: true,
    counter: counter++
  })
}, 1000);

резюме

На самом деле видно, что все эти методы являются повторным использованием метода spawn, а затем метод spawn вызывает libuv внизу для управления процессом, как показано на рисунке ниже.

Используйте вилку для достиженияmaster-workerМодель

Давайте сначала посмотрим, если мыchild.jsЧто происходит, когда вы запускаете службу http в .

// master.js
const { fork } = require('child_process')

for (let i = 0; i < 2; i++) {
  const child = fork('./child.js')
}

// child.js
const http = require('http')
http.createServer((req, res) => {
  res.end('Hello World\n');
}).listen(8000)

              +--------------+
              |              |
              |    master    |
              |              |
     +--------+--------------+- -- -- -
     |                                 |
     |                          Error: listen EADDRINUSE
     |                                 |
     |
+----v----+                      +-----v---+
|         |                      |         |
| worker1 |                      | worker2 |
|         |                      |         |
+---------+                      +---------+
   8000                            8000

Мы разветвляем два дочерних процесса, поскольку два дочерних процесса одновременно прослушивают порт, Node напрямую выдает исключение (Error: listen EADDRINUSE), как показано на фиг. Таким образом, мы можем использовать режим прокси для одновременного прослушивания нескольких портов, позволить главному процессу прослушивать порт 80 и получать запросы, а затем распределять запросы по различным службам, а главный процесс может выполнять правильную балансировку нагрузки.

              +--------------+
              |              |
              |    master    |
              |     80     |
     +--------+--------------+---------+
     |                                 |
     |                                 |
     |                                 |
     |                                 |
+----v----+                      +-----v---+
|         |                      |         |
| worker1 |                      | worker2 |
|         |                      |         |
+---------+                      +---------+
   8000                            8001

Но это вызовет еще одну проблему. Режим прокси потребляет много файловых дескрипторов (максимальный предел файловых дескрипторов по умолчанию в системе Linux составляет 1024). В системе Windows файловый дескриптор называется дескриптором. Мы также можем вызывать файловые дескрипторы. в линуксе как ручками. Когда пользователь обращается, он сначала подключается к главному процессу, который использует дескриптор, а затем прокси-сервер главного процесса для рабочего процесса использует другой дескриптор, поэтому этот метод является пустой тратой системных ресурсов. Чтобы решить эту проблему, межпроцессное взаимодействие Node может отправлять дескрипторы, экономя системные ресурсы.

Дескриптор — это особый вид интеллектуального указателя. Дескрипторы используются, когда приложение хочет обратиться к блокам памяти или объектам, управляемым другими системами (такими как базы данных, операционные системы).

Мы можем запустить службу tcp в главном процессе, а затем отправить дескриптор службы дочернему процессу через IPC, и дочерний процесс будет отслеживать событие подключения службы.Конкретный код выглядит следующим образом:

// master.js
var { fork } = require('child_process')
var server = require('net').createServer()
server.on('connection', function(socket) {
  socket.end('handled by master') // 响应来自master
})
server.listen(3000, function() {
  console.log('master listening on: ', 3000)
})
for (var i = 0; i < 2; i++) {
  var child = fork('./child.js')
  child.send('server', server) // 发送句柄给worker
  console.log('worker create, pid is ', child.pid)
}

// child.js
process.on('message', function (msg, handler) {
  if (msg !== 'server') {
    return
  }
  // 获取到句柄后,进行请求的监听
  handler.on('connection', function(socket) {
    socket.end('handled by worker, pid is ' + process.pid)  
  })
})

启动服务

Ниже мы проходимcurl5 последовательных запросов на обслуживание.

for varible1 in {1..5}
do
  curl "localhost:3000"
done

请求服务

Видно, что ответ на запрос может быть родительским процессом или разными дочерними процессами.Несколько процессов отслеживают события подключения, отвечающие на одну и ту же службу.Тот, кто вытеснит его первым, ответит. Вот очень распространенное событие в сетевом программировании Linux: когда несколько процессов одновременно прослушивают события сетевого подключения, когда поступает новое подключение, эти процессы пробуждаются одновременно, что называется «шоковым роем». Это приводит к тому, что как только событие поступает, все процессы одновременно реагируют на событие, и в конце концов только один процесс может успешно обработать событие, а остальные процессы снова засыпают после неудачной попытки обработать событие, что приводит к пустая трата системных ресурсов.

ps: В системе Windows последний определенный подпроцесс всегда вытесняет дескриптор, что может быть связано с механизмом реализации libuv, и босс всегда может указать конкретные причины.

У всех точно не будет такой проблемы В этот раз мы вспомнилиnginxхорошо, здесьесть статьяВ нем объясняется, как nginx решает "шокированную группу". Использование обратного прокси nginx может эффективно решить эту проблему. В конце концов, nginx очень хорошо справляется с такого рода проблемами.

http { 
  upstream node { 
      server 127.0.0.1:8000; 
      server 127.0.0.1:8001; 
      server 127.0.0.1:8002; 
      server 127.0.0.1:8003;
      keepalive 64;
  } 
  server { 
       listen 80; 
       server_name shenfq.com; 
       location / { 
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header Host $http_host;
            proxy_set_header X-Nginx-Proxy true;
            proxy_set_header Connection "";
            proxy_pass http://node; # 这里要和最上面upstream后的应用名一致,可以自定义
       } 
  }
}

резюме

Если мы используем Node для реализации многопроцессорной модели изначально, возникают проблемы того или иного рода.Хотя мы, наконец, используем nginx для достижения этой цели, если мы используем nginx, нам нужно поддерживать другой набор конфигурации nginx, Служба узла не работает, а nginx не знает об этом и все равно будет перенаправлять запрос на этот порт.

кластерный модуль

Помимо использования nginx в качестве обратного прокси-сервера, сам узел также предоставляетclusterМодуль для балансировки нагрузки нескольких процессов в среде с многоядерным процессором. Модуль кластера создает дочерние процессы, по существу, через child_procee.fork С помощью этого модуля легко создавать серверы дочерних процессов, которые используют один и тот же порт.

Руководство по началу работы

С этим модулем вы почувствуете, как легко реализовать одномашинный кластер Node. Давайте посмотрим на официальный пример: многопроцессорный сервис Node реализован всего в дюжине строк кода и имеет собственную балансировку нагрузки.

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) { // 判断是否为主进程
  console.log(`主进程 ${process.pid} 正在运行`);

  // 衍生工作进程。
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`工作进程 ${worker.process.pid} 已退出`);
  });
} else { // 子进程进行服务器创建
  // 工作进程可以共享任何 TCP 连接。
  // 在本例子中,共享的是一个 HTTP 服务器。
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);

  console.log(`工作进程 ${process.pid} 已启动`);
}

Анализ исходного кода модуля кластера

Сначала посмотрите на код, авторisMasterЧтобы судить, является ли это основным процессом, если основной процесс выполняет операцию ветвления, дочерний процесс создает сервер. Когда кластер выполняет операцию ветвления, выполняется текущий файл.cluster.forkнаконец позвонилchild_process.fork, а первый параметрprocess.argv.slice(2), после дочернего процесса fork будет отслеживаться его событие internalMessage, о котором будет сказано позже, конкретный код выглядит следующим образом:

const { fork } = require('child_process');

cluster.fork = function(env) {
  cluster.setupMaster();
  const id = ++ids;
  const workerProcess = createWorkerProcess(id, env);
  const worker = new Worker({
    id: id,
    process: workerProcess
  });
  
  // 监听子进程的消息
  worker.process.on('internalMessage', internal(worker, onmessage));
  // ...
};
// 配置master进程
cluster.setupMaster = function(options) {
  cluster.settings = {
    args: process.argv.slice(2),
    exec: process.argv[1],
    execArgv: process.execArgv,
    silent: false,
    ...cluster.settings,
    ...options
  };
};

// 创建子进程
function createWorkerProcess(id, env) {
  return fork(cluster.settings.exec, cluster.settings.args, {
    // some options
  });
}

проблема с прослушиванием порта дочернего процесса

这里会有一个问题,子进程全部都在监听同一个端口,我们之前已经试验过,服务监听同一个端口会出现端口占用的问题,那么cluster模块如何保证端口不冲突的呢? чек об оплатеисходный кодУстановлено, что createServer модуля http наследуется от модуля net.

util.inherits(Server, net.Server);

В модуле net метод listen вызывает метод listenInCluster, а listenInCluster определяет, является ли он в данный момент главным процессом.

lib/net.js

Server.prototype.listen = function(...args) {

  // ...
  if (typeof options.port === 'number' || typeof options.port === 'string') {
    // 如果listen方法只传入了端口号,最后会走到这里
    listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive);
    return this;
  }
  // ...
};

function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) {
  if (cluster === undefined) cluster = require('cluster');

  if (cluster.isMaster) {
    // 如果是主进程则启动一个服务
    // 但是主进程没有调用过listen方法,所以没有走这里一步
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }
  
  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
  };
 
  // 子进程获取主进程服务的句柄
  cluster._getServer(server, serverQuery, listenOnMasterHandle);
  
  function listenOnMasterHandle(err, handle) {
    server._handle = handle; // 重写handle,对listen方法进行了hack
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

Глядя на приведенный выше код, вы можете понять, что реальный способ запустить службуserver._listen2. существует_listen2метод, последний вызов_handleметод прослушивания ниже.

function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  // ...
  this._handle.onconnection = onconnection;
  var err = this._handle.listen(backlog || 511);
  // ...
}

Server.prototype._listen2 = setupListenHandle;  // legacy alias

Такcluster._getServerЧто именно делает метод?

Найдите его исходный код, сначала отправьте сообщение главному процессу, тип сообщенияqueryServer.

// child.js
cluster._getServer = function(obj, options, cb) {
  // ...
  
  const message = {
    act: 'queryServer',
    index,
    data: null,
    ...options
  };
  
  // 发送消息到master进程,消息类型为 queryServer
  send(message, (reply, handle) => {
    rr(reply, indexesKey, cb);              // Round-robin.
  });
  // ...
};

Метод rr здесь, для вышеупомянутого_handle.listenПосле взлома не работает прослушивание всех дочерних процессов.

function rr(message, indexesKey, cb) {
  if (message.errno)
    return cb(message.errno, null);

  var key = message.key;

  function listen(backlog) { // listen方法直接返回0,不再进行端口监听
    return 0;
  }

  function close() {
    send({ act: 'close', key });
  }

  function getsockname(out) {
    return 0;
  }
  
  const handle = { close, listen, ref: noop, unref: noop };
  
  handles.set(key, handle); // 根据key将工作进程的 handle 进行缓存
  cb(0, handle);
}

// 这里的cb回调就是前面_getServer方法传入的。 参考之前net模块的listen方法
function listenOnMasterHandle(err, handle) {
  server._handle = handle; // 重写handle,对listen方法进行了hack
  // 该方法调用后,会对handle绑定一个 onconnection 方法,最后会进行调用
  server._listen2(address, port, addressType, backlog, fd, flags);
}

Основной процесс взаимодействует с дочерним процессом

Так где именно контролируется порт?

Как упоминалось ранее, при разветвлении дочернего процесса событие internalMessage отслеживается для дочернего процесса.

worker.process.on('internalMessage', internal(worker, onmessage));

Дочерний процесс отправляет сообщение главному процессу, обычно используяprocess.sendметод, будет контролироватьсяmessageсобытие получено. Это связано с тем, что отправленное сообщение указываетcmd: 'NODE_CLUSTER'до тех пор, пока поле CMD начинается сNODE_в начале, чтобы сообщение считалось внутренней связью и было получено событием internalMessage.

// child.js
function send(message, cb) {
  return sendHelper(process, message, null, cb);
}

// utils.js
function sendHelper(proc, message, handle, cb) {
  if (!proc.connected)
    return false;

  // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js
  message = { cmd: 'NODE_CLUSTER', ...message, seq };

  if (typeof cb === 'function')
    callbacks.set(seq, cb);

  seq += 1;
  return proc.send(message, handle);
}

После того, как главный процесс получает сообщение, он начинает выполнять различные методы в зависимости от типа действия, где действиеqueryServer. Метод queryServer создаст ключ, если ключ (правило в основном адрес + порт + файловый дескриптор) ранее не существовал, тоRoundRobinHandleСоздается экземпляр конструктора, в конструкторе RoundRobinHandle запускается служба TCP, и отслеживается ранее указанный порт.

// master.js
const handles = new Map();

function onmessage(message, handle) {
  const worker = this;
  if (message.act === 'online')
    online(worker);
  else if (message.act === 'queryServer')
    queryServer(worker, message);
  // other act logic
}
function queryServer(worker, message) {
  // ...
  const key = `${message.address}:${message.port}:${message.addressType}:` +
              `${message.fd}:${message.index}`;
  var handle = handles.get(key);
  // 如果之前没有对该key进行实例化,则进行实例化
  if (handle === undefined) {
    let address = message.address;
    // const RoundRobinHandle = require('internal/cluster/round_robin_handle');
    var constructor = RoundRobinHandle;

    handle = new constructor(key,
                             address,
                             message.port,
                             message.addressType,
                             message.fd,
                             message.flags);
    handles.set(key, handle);
  }
  // ...
}

// internal/cluster/round_robin_handle
function RoundRobinHandle(key, address, port, addressType, fd, flags) {
  this.server = net.createServer(assert.fail);
  // 这里启动一个TCP服务器
  this.server.listen({ port, host });
  
  // TCP服务器启动时的事件
  this.server.once('listening', () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
  });
  // ...
}

Вы можете видеть, что после запуска службы TCP сразуconnectionСобытие отслеживается, и вызывается метод распределения RoundRobinHandle.

// RoundRobinHandle
this.handle.onconnection = (err, handle) => this.distribute(err, handle);

// distribute 对工作进程进行分发
RoundRobinHandle.prototype.distribute = function(err, handle) {
  this.handles.push(handle); // 存入TCP服务的句柄
  const worker = this.free.shift(); // 取出第一个工作进程

  if (worker)
    this.handoff(worker); // 切换到工作进程
};

RoundRobinHandle.prototype.handoff = function(worker) {
  const handle = this.handles.shift(); // 获取TCP服务句柄
  
  if (handle === undefined) {
    this.free.push(worker);  // 将该工作进程重新放入队列中
    return;
  }
  
  const message = { act: 'newconn', key: this.key };

  // 向工作进程发送一个类型为 newconn 的消息以及TCP服务的句柄
  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted)
      handle.close();
    else
      this.distribute(0, handle);  // 工作进程不能正常运行,启动下一个

    this.handoff(worker);
  });
};

Внутренние сообщения также отслеживаются в дочернем процессе.cluster/child.jsСреди них естьcluster._setupWorkerметод, этот метод будет отслеживать внутренние сообщения, метод вlib/internal/bootstrap/node.jsЭтот файл вызывается модулем C++ каждый раз, когда запускается команда node.

Связь

function startup() {
  // ...
  startExecution();
}
function startExecution() {
  // ...
  prepareUserCodeExecution();
}
function prepareUserCodeExecution() {
  if (process.argv[1] && process.env.NODE_UNIQUE_ID) {
    const cluster = NativeModule.require('cluster');
    cluster._setupWorker();
    delete process.env.NODE_UNIQUE_ID;
  }
}

startup()

Давайте посмотрим, что делает метод _setupWorker.

cluster._setupWorker = function() {
  // ...
  process.on('internalMessage', internal(worker, onmessage));

  function onmessage(message, handle) {
    // 如果act为 newconn 调用onconnection方法
    if (message.act === 'newconn')
      onconnection(message, handle);
    else if (message.act === 'disconnect')
      _disconnect.call(worker, true);
  }
};

function onconnection(message, handle) {
  const key = message.key;
  const server = handles.get(key);
  const accepted = server !== undefined;

  send({ ack: message.seq, accepted });

  if (accepted)
    server.onconnection(0, handle); // 调用net中的onconnection方法
}

Наконец, после того как подпроцесс получает дескриптор клиента, он вызывает onconnection модуля net для создания экземпляра Socket, что согласуется с логикой других HTTP-запросов и не будет подробно обсуждаться.

На этом этапе реализована логика модуля кластера, и здесь представлены точки знаний, связанные с управлением процессами Node.js.

Ссылка на ссылку