фреймворк для микросервисов nodejs

Node.js JavaScript RabbitMQ MongoDB PM2 koa Bluebird.js

предисловие

Seneca — это набор инструментов микросервиса nodejs, который позволяет легко создавать и постоянно обновлять системы. Следующее поможет вам один за другим понять введение и практику связанных технологий.

Вставьте сюда твердую широкую. Пацан сделал простую интеграцию, а потом влепилфреймворк для расширения --- облегченный фреймворк для микросервисов nodejs, Заинтересованные студенты взгляните, добро пожаловать, и если у вас есть какие-либо вопросы или проблемы с кодом, оставьте сообщение под сообщением в блоге.

окружающая обстановка

  • Основная среда
"node": "^10.0.0"
"npm": "^6.0.0"
"pm2": "^2.10.3"
"rabbitmq": "^3.7.5"
"consul": "^1.1.0"
"mongodb": "^3.6"
  • Микросервисная инженерия
"bluebird": "^3.5.1"
"koa": "^2.5.1"
"koa-router": "^7.4.0"
"seneca": "^3.4.3"
"seneca-web": "^2.2.0"
"seneca-web-adapter-koa2": "^1.1.0"
"amqplib": "^0.5.2"
"winston": "^2.4.2"
"mongoose": "^5.1.2"

FEATURES

  • Сопоставление шаблонов для межсервисных вызовов: немного отличается от обнаружения сервисов SpringCloud (протокол http, режим IP + PORT), он использует более гибкий принцип сопоставления шаблонов (модуль Patrun) для выполнения вызовов между микросервисами.
  • Доступ к koa2 для предоставления RESTFUL API стороне C
  • Плагины: больше гибкости для написания небольших и микро-модулей многократного использования.
  • встроенный вывод журнала seneca
  • Сравнение сторонних библиотек журналов winston (необязательно), bunyan, log4js
  • Очередь сообщений RabbitMQ
  • PM2: развертывание службы узла (кластер служб), управление и мониторинг
  • PM2: Автоматическое развертывание
  • PM2 интегрирует докер
  • Отслеживание запросов (реконструкция потока пользовательских запросов)
  • Разобраться с базовой логикой регистрации и обнаружения сервиса Консул
  • Узел интеграции с фреймворком-консул
  • постоянное хранилище mongodb
  • Объединение промежуточного программного обеспечения службы маршрутизации seneca и consul (может поддерживать маршрутизацию нескольких кластеров служб с одинаковыми именами, отличающихся по версии?)
  • Поддерживает потоковую обработку (загрузка/выгрузка файлов и т. д.)
  • автоматическое развертывание jenkins
  • балансировка нагрузки nginx
  • Решение для непрерывной интеграции
  • кеш Redis
  • prisma предоставляет интерфейс GraphQL

Сопоставление с образцом (модуль Patrun)

index.js(accout-server/src/index.js)

const seneca = require('seneca')()

seneca.use('cmd:login', (msg, done) => {
	const { username, pass } = msg
	if (username === 'asd' && pass === '123') {
		return done(null, { code: 1000 })
	}
	return done(null, { code: 2100 })
})

const Promise = require('bluebird')

const act = Promise.promisify(seneca.act, { context: 'seneca' })

act({
	cmd: 'login',
	username: 'asd',
	pass: '123'
}).then(res => {
	console.log(res)
}).catch(err => {
	console.log(err)
})

после казни

{ code: 1000 }
{"kind":"notice","notice":"hello seneca k5i8j1cvw96h/1525589223364/10992/3.4.3/-","level":"info","seneca":"k5i8j1cvw96h/1525589223364/10992/3.4.3/-","when":1525589223563}

seneca.addдобавьте шаблон действия в экземпляр Seneca, он имеет три параметра:

  1. pattern: Шаблон сопоставления сообщений, объект или строка формата для JSON в Seneca.
  2. sub_pattern: дополнительный режим, более низкий приоритет, чем основной режим (опционально)
  3. action: Функция действия при успешном совпадении

seneca.actМетод, который выполняет успешно сопоставленное действие в экземпляре Seneca, также имеет два параметра:

  1. msg: JSON-сообщение
  2. sub_pattern: подсообщение с более низким приоритетом, чем основное сообщение (необязательно)
  3. response: используется для получения результатов вызова службы

seneca.useметод, добавить плагин в экземпляр Seneca, у него два параметра: (принцип работы плагина здесь несколько отличается от промежуточного ПО)

  1. func: метод выполнения плагина
  2. options: параметры, требуемые плагином (необязательно)

Ядро заключается в использованииJSONобъект для сопоставления с образцом. Этот объект JSON содержит не только характеристики, необходимые микрослужбе для вызова другой микрослужбы, но и параметры. Подобно обнаружению микросервисов Java, он заменяет ip+port шаблоном.На данный момент шаблон может полностью реализовать функцию обнаружения сервисов, но еще предстоит изучить, является ли он более гибким.

Примечания

  • Шаблоны между микросервисами должны различаться по дизайну

Запустить первый микросервис

index.js(config-server/src/index.js)

const seneca = require('seneca')()
const config = {
SUCCESS_NORMAL_RES: {
    code: 1000,
    desc: '服务端正常响应'
}}

seneca.add('$target$:config-server', (msg, done) => {
  return done(null, config)
}).listen(10011)

После запуска этого скрипта вы можете ввести в браузереhttp://localhost:10011/act?cmd=configИнициировать запрос на получение информации о глобальной конфигурации

OR

const seneca = require('seneca')()
const Promise = require('bluebird')

const act = Promise.promisify(seneca.act, { context: seneca })

seneca.client(10011)
act('?target:config-server, default$:{msg:404}').then(res => {
  console.log(res)
}).catch(err => {
  console.log(err)
})

Внутренний: несколько микросервисов вызывают друг друга (ключ)

noname-server

const seneca = require('seneca')()
seneca.add('?target:account-server', (msg, done) => {
	done(null, { seneca: '666' })
})
seneca.listen(10015)

config-сервер (то же, что и выше)

call

const seneca = require('seneca')()
const Promise = require('blurebird')

const act = Promise.promisify(seneca.act, { context: seneca })

seneca.client({
	port: '10011',
	pin: '?target:account-server'
})
seneca.client({
	port: '10015',
	pin: '?target:noname-server'
})

act('?target:account-server').then(res => {
	console.log(res)
}).catch(err => {
	console.log(err)
})

act('?target:noname-server').then(res => {
	console.log(res)
}).catch(err => {
	console.log(err)
})

Внешний: предоставление услуг REST (ключ)

интегрированный коа

const seneca = require('seneca')()
const Promise = require('bluebird')
const SenecaWeb = require('seneca-web')
const Koa = require('koa')
const Router = require('koa-router')
const app = new Koa()
const userModule = require('./modules/user.js')

// 初始化用户模块
seneca.use(userModule.init)

// 初始化seneca-web插件,并适配koa
seneca.use(SenecaWeb, {
  context: Router(),
  adapter: require('seneca-web-adapter-koa2'),
  routes: [...userModule.routes]
})

// 将routes导出给koa app
seneca.ready(() => {
  app.use(seneca.export('web/context')().routes())
})

app.listen(3333)

пользовательский модуль

const $module = 'module:user'
let userCount = 3

const REST_Routes = [
  {
    prefix: '/user',
    pin: `${$module},if:*`,
    map: {
      list: {
        GET: true,
        name: ''
      },
      load: {
        GET: true,
        name: '',
        suffix: '/:id'
      },
      edit: {
        PUT: true,
        name: '',
        suffix: '/:id'
      },
      create: {
        POST: true,
        name: ''
      },
      delete: {
        DELETE: true,
        name: '',
        suffix: '/:id'
      }
    }
  }
]

const db = {
  users: [{
    id: 1,
    name: '甲'
  }, {
    id: 2,
    name: '乙'
  }, {
    id: 3,
    name: '丙'
  }]
}

function user(options) {
  this.add(`${$module},if:list`, (msg, done) => {
    done(null, db.users)
  })
  this.add(`${$module},if:load`, (msg, done) => {
    const { id } = msg.args.params
    done(null, db.users.find(v => Number(id) === v.id))
  })
  this.add(`${$module},if:edit`, (msg, done) => {
    let { id } = msg.args.params
    id = +id
    const { name } = msg.args.body
    const index = db.users.findIndex(v => v.id === id)
    if (index !== -1) {
      db.users.splice(index, 1, {
        id,
        name
      })
      done(null, db.users)
    } else {
      done(null, { success: false })
    }
  })
  this.add(`${$module},if:create`, (msg, done) => {
    const { name } = msg.args.body
    db.users.push({
      id: ++userCount,
      name
    })
    done(null, db.users)
  })
  this.add(`${$module},if:delete`, (msg, done) => {
    let { id } = msg.args.params
    id = +id
    const index = db.users.findIndex(v => v.id === id)
    if (index !== -1) {
      db.users.splice(index, 1)
      done(null, db.users)
    } else {
      done(null, { success: false })
    }
  })
}

module.exports = {
  init: user,
  routes: REST_Routes
}

vscode-restclient (плагин restclient для vscode, используемый для инициирования запросов RESTFUL)

### 1
POST http://localhost:3333/user HTTP/1.1
Content-Type: application/json

{
  "name": "测试添加用户"
}

### delete
DELETE http://localhost:3333/user/2 HTTP/1.1

### PUT
PUT http://localhost:3333/user/2 HTTP/1.1
Content-Type: application/json

{
  "name": "测试修改用户信息"
}

### GET
GET http://localhost:3333/user HTTP/1.1

### GET
GET http://localhost:3333/user/3 HTTP/1.1

встроенный вывод журнала seneca

Конфигурацию можно передать в конструкторе, а атрибут журнала может управлять уровнем журнала.

Пример 1: передать строку

require('seneca')({
	// quiet silent any all print standard test
	log: 'all'
})

Пример 2: Передача объектов

require('seneca')({
	log: {
		// none debug+ info+ warn+
		level: 'debug+'
	},
	// 设置为true时,seneca日志功能会encapsulate senecaId,senecaTag,actId等字段后输出(一般为两字符)
	short: true
})

Код в Примере 2 рекомендуется, поскольку уровень журнала, выводимый подключаемым модулем seneca-web-adapter-koa2, является отладочным, что хорошо для ведения журнала доступа к веб-интерфейсу.

Модуль журнала Winston

портал

Logger.js

const { createLogger, format, transports } = require('winston')
const { combine, timestamp, label, printf } = format

const logger = createLogger({
  level: 'info',
  format: combine(
    label({label: 'microservices'}),
    timestamp(),
    printf(info => {
      return `${info.timestamp} [${info.label}] ${info.level}: ${info.message}`
    })
  ),
  transports: [ new transports.Console() ]
})

// highest to lowest
const levels = {
  error: 0,
  warn: 1,
  info: 2,
  verbose: 3,
  debug: 4,
  silly: 5
}

module.exports = logger

формат вывода журнала

2018-05-17T14:43:28.330Z [microservices] info: 接收到rpc客户端的调用请求
2018-05-17T14:43:28.331Z [microservices] warn: warn message
2018-05-17T14:43:28.331Z [microservices] error: error message

Служба очереди сообщений RabbitMQ

1. Одна задача, один потребитель, модель производителя-потребителя

producer.js

// 创建一个amqp对等体
const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const q = 'taskQueue1'
    const msg = process.argv.slice(2).join(' ') || 'hello world'

    // 为方式RabbitMQ退出或者崩溃时重启后丢失队列信息,这里配置durable:true(同时在消费者脚本中也要配置durable:true)后,
    ch.assertQueue(q, { durable: true })
    // 这里配置persistent:true,通过阅读官方文档,我理解为当程序重启后,会断点续传之前未send完成的数据消息。(但此功能并不可靠,因为不会为所有消息执行同步IO,会缓存在cache并在某个恰当时机write到disk)
    ch.sendToQueue(q, Buffer.from(msg), { persistent: true })
    setTimeout(() => {
      conn.close(); process.exit(0)
    }, 100)
  })
})
// 创建一个amqp对等体
const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const q = 'taskQueue1'

    // 为方式RabbitMQ退出或者崩溃时重启后丢失队列信息,这里配置durable:true(同时在消费者脚本中也要定义durable:true)后,
    ch.assertQueue(q, { durable: true })
    ch.prefetch(1)
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q)
    ch.consume(q, msg => {
      const secs = msg.content.toString().split('.').length - 1
      console.log(" [x] Received %s", msg.content.toString())
      setTimeout(() => {
        console.log(" [x] Done")
        ch.ack(msg)
      }, secs * 1000)
    })
    // noAck配置(默认为false)表明consumer是否需要在处理完后反馈ack给producer,如果设置为true,则RabbitMQ服务如果将任务send至此consumer后不关心任务实际处理结果,send任务后直接标记已完成;否则,RabbiMQ得到ack反馈后才标记为已完成,如果一直未收到ack默认会一直等待ack然后标记,另外如果接收到nack或者该consumer进程退出则继续dispatcher任务
  })
})

Процесс проверки

  • Выполните rabbitmqctl list_queues, чтобы просмотреть текущую очередь.
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
  • узел производителя.js (процесс выполнения RabbitMQ заключается в том, чтобы сначала создать анонимный обмен, указанную очередь, а затем привязать очередь к анонимному обмену)

  • rabbitmqctl list_bindings

Listing bindings for vhost /...
        exchange        taskQueue1      queue   taskQueue1      []
  • rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
taskQueue1      1
  • node consumer.js
Waiting for messages in taskQueue1. To exit press CTRL+C
[x] Received hello world
[x] Done
  • rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
taskQueue1      0

Точка знаний

  • Режим производитель-потребитель (сообщения производителя обрабатываются только одним потребителем одновременно)
  • Механизм ACK (механизм подтверждения Rabbitmq)
  • Создайте очередь {durable:true} и отправьте сообщение в очередь {persistent:true} (сообщения сохраняются персистентно, но не полностью гарантированно, например, когда сообщение не записывается из кеша на диск и происходит сбой программы, он пропадет)
  • Круговая рассылка (справедливое распределение)
  • Обработка управления окном (предварительная выборка для управления окном отправки)
  • Механизм асинхронной многозадачности (например, декомпозиция больших задач, разделяй и властвуй)
  • Весь процесс потока сообщений (процесс производителя -> анонимный обмен -> через привязку -> указание очереди -> процесс-потребитель)

2. Однозадачная многопользовательская модель публикации/подписки (модель полного сообщения)

publisher.js

const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const ex = 'logs'
    const msg = process.argv.slice(2).join(' ') || 'Hello World!'

    // ex为exchange名称(唯一)
    // 模式为fanout
    // 不对消息持久化存储
    ch.assertExchange(ex, 'fanout', { durable: false })
    // 第二个参数为指定某一个binding,如为空则由RabbitMQ随机指定
    ch.publish(ex, '', Buffer.from(msg))
    console.log(' [x] Send %s', msg)
  })

  setTimeout(() => {
    conn.close()
    process.exit(0)
  }, 100)
})

subscriber.js

const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const ex = 'logs'

    // ex -> exchange是发布/订阅消息的载体,
    // fanout -> 分发消息的模式,fanout,direct,topic,headers
    // durable设置为false降低一些可靠性,提高性能,因为不需要磁盘IO持久化存储消息,另外
    ch.assertExchange(ex, 'fanout', { durable: false })
    // 使用匿名(也就是RabbitMQ自动生成随机名的queue)队列
    // exclusive设置为true,即可以当其寄生的connection被close的时候自动deleted
    ch.assertQueue('', { exclusive: true }, (err, q) => {
      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue)
      // 绑定队列到某个exchange载体(监听某个exchange的消息)
      // 第三个入参为binding key
      ch.bindQueue(q.queue, ex, '')
      // 消费即订阅某个exchange的消息并设置处理句柄
      // 因为发布/订阅消息的模式就是非可靠性,只有当订阅者订阅才能收到相关的消息而且发布者不关心该消息的订阅者是谁以及处理结果如何,所以这里noAck会置为true
      ch.consume(q.queue, (msg) => {
        console.log(' [x] %s', msg.content.toString())
      }, { noAck: true })
    })
  })
})

Процесс проверки

rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(Очистить очереди, обмены, привязки, использованные в предыдущем тесте)

node subscriber.js

[*] Waiting for messages in amq.gen-lgNW51IeEfj9vt1yjMUuaw. To exit press CTRL+C

rabbitmqctl list_exchanges

Listing exchanges for vhost / ...
logs    fanout

rabbitmqctl list_bindings

Listing bindings for vhost /...
        exchange        amq.gen-jDbfwJR8TbSNJT2a2a83Og  queue   amq.gen-jDbfwJR8TbSNJT2a2a83Og  []
logs    exchange        amq.gen-jDbfwJR8TbSNJT2a2a83Og  queue           []

node publisher.js tasks.........

[x] Send tasks......... // publiser.js

[x] tasks......... // subscriber.js

Точка знаний

  • Режим публикации/подписки (издатель отправляет сообщения подписчикам в форме «один ко многим»)
  • noAck (в этом режиме рекомендуется механизм non-Ack, потому что издателям часто не нужно, как подписчики обрабатывают сообщения и их результаты)
  • durable: false (в этом режиме рекомендуется, чтобы постоянное хранение данных не требовалось по вышеуказанным причинам)
  • Рабочий режим обмена (т. е. тип маршрутизации, разветвление, прямой, топик, заголовки и т. д., которые будут объяснены в следующем разделе)
  • Весь процесс потока сообщений (процесс-издатель -> указанный обмен -> через привязку и режим работы -> одна или несколько анонимных очередей, то есть процессы-подписчики)

3. Direct Routing

exchange.js

module.exports = {
  name: 'ex1',
  type: 'direct',
  option: {
    durable: false
  },
  ranks: ['info', 'error', 'warning', 'severity']
}

direct-routing.js

const amqp = require('amqplib/callback_api')
const ex = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {

    ch.assertExchange(ex.name, ex.type, ex.options)
    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})

subscriber.js

const amqp = require('amqplib/callback_api')
const ex = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const ranks = ex.ranks

    ranks.forEach(rank => {
      // 声明一个非匿名queue
      ch.assertQueue(`${rank}-queue`, { exclusive: false }, (err, q) => {
        ch.bindQueue(q.queue, ex.name, rank)
        ch.consume(q.queue, msg => {

          console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
        }, { noAck: true })
      })
    })
  })
})

publisher.js

const amqp = require('amqplib/callback_api')
const ex = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const ranks = ex.ranks

    ranks.forEach(rank => {
      ch.publish(ex.name, rank, Buffer.from(`${rank} logs...`))
    })

    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})

Процесс проверки

rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(Очистить очереди, обмены, привязки, использованные в предыдущем тесте)

node direct-routing.js rabbitmqctl list_exchanges

Listing exchanges for vhost / ...
amq.headers	headers
ex1	direct
amq.fanout	fanout
amq.rabbitmq.trace	topic
amq.topic	topic
	direct
amq.direct	direct
amq.match	headers

node subscriber.js rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
severity-queue	0
error-queue	0
info-queue	0
warning-queue	0

Listing bindings for vhost /...
	exchange	error-queue	queue	error-queue	[]
	exchange	info-queue	queue	info-queue	[]
	exchange	severity-queue	queue	severity-queue	[]
	exchange	warning-queue	queue	warning-queue	[]
ex1	exchange	error-queue	queue	error	[]
ex1	exchange	info-queue	queue	info	[]
ex1	exchange	severity-queue	queue	severity	[]
ex1	exchange	warning-queue	queue	warning	[]

node publisher.js

 [x] info: 'info logs...'
 [x] error: 'error logs...'
 [x] severity: 'severity logs...'
 [x] warning: 'warning logs...'

Точка знаний

  • Ключ маршрутизации, используемый для маршрутизации сообщений в прямом рабочем режиме обмена.
  • Всякий раз, когда assertQueue, очередь будет привязана к анонимному обмену с именем очереди в качестве ключа маршрутизации.
  • Может использоваться для обработки логов на разных уровнях логов

4. Topic Routing

exchange.js

module.exports = {
  name: 'ex2',
  type: 'topic',
  option: {
    durable: false
  },
  ranks: ['info', 'error', 'warning', 'severity']
}

topic-routing.js

const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    ch.assertExchange(exchangeConfig.name, exchangeConfig.type, exchangeConfig.option)

    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})

subscriber.js

const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const args = process.argv.slice(2)
    const keys = (args.length > 0) ? args : ['anonymous.info']

    console.log(' [*] Waiting for logs. To exit press CTRL+C');
    keys.forEach(key => {
      ch.assertQueue('', { exclusive: true }, (err, q) => {
        console.log(` [x] Listen by routingKey ${key}`)
        ch.bindQueue(q.queue, exchangeConfig.name, key)

        ch.consume(q.queue, msg => {
          console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
        }, { noAck: true })
      })
    })
  })
})

publisher.js

const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const args = process.argv.slice(2)
    const key = (args.length > 1) ? args[0] : 'anonymous.info'
    const msg = args.slice(1).join(' ') || 'hello world'

    ch.publish(exchangeConfig.name, key, Buffer.from(msg))

    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})

Процесс проверки

rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(Очистить очереди, обмены, привязки, использованные в предыдущем тесте)

node topic-routing.js

Listing exchanges for vhost / ...
amq.fanout	fanout
amq.rabbitmq.trace	topic
amq.headers	headers
amq.match	headers
ex2	topic
	direct
amq.topic	topic
amq.direct	direct

node subscriber.js "#.info" "*.error"

[*] Waiting for logs. To exit press CTRL+C
[x] Listen by routingKey #.info
[x] Listen by routingKey *.error
  • node publisher.js "account-server.info" "тест пользовательского сервиса"
  • узел publisher.js "config-server.info" "тест службы конфигурации"
  • узел publisher.js «config-server.error» «Ошибка настройки службы»
[x] account-server.info:'用户服务测试'
[x] config-server.info:'配置服务测试'
[x] config-server.error:'配置服务出错'

Точка знаний

  • Максимальная длина ключа 255 байт.
  • #может соответствовать 0 или более словам,*Может соответствовать ровно 1 слову

5. RPC

rpc_server.js

const amqp = require('amqplib/callback_api')
const logger = require('./Logger')

let connection = null

amqp.connect('amqp://localhost', (err, conn) => {
  connection = conn
  conn.createChannel((err, ch) => {
    const q = 'account_rpc_queue'

    ch.assertQueue(q, { durable: true })
    ch.prefetch(2)

    ch.consume(q, msg => {
      let data = {}
      let primitiveContent = msg.content.toString()
      try {
        data = JSON.parse(primitiveContent)
      } catch (e) {
        logger.error(new Error(e))
      }
      logger.info('接收到rpc客户端的调用请求')
      if (msg.properties.correlationId === '10abc') {
        logger.info(primitiveContent)
        const uid = Number(data.uid) || -1
        let r = getUserById(uid)
        ch.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(r)), { persistent: true })
        ch.ack(msg)
      } else {
        logger.info('不匹配的调用请求')
      }
    })
  })
})

function getUserById (uid) {
  let result = ''

  if (uid === +uid && uid > 0) {
    result = {
      state: 1000,
      msg: '成功',
      data: {
        uid: uid,
        name: '小强',
        sex: 1
      }
    }
  } else {
    result = {
      state: 2000,
      msg: '传参格式错误'
    }
  }

  return result
}

process.on('SIGINT', () => {
  logger.warn('SIGINT')
  connection && connection.close()
  process.exit(0)
})

rpc_client.js

const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const q = 'account_rpc_queue'
    const callback = 'callback_queue'

    ch.assertQueue(callback, { durable: true })
    ch.consume(callback, msg => {
      const result = msg.content.toString()
      console.log(`接收到回调的消息啦!`)
      console.log(result)
      ch.ack(msg)
      setTimeout(() => {
        conn.close()
        process.exit(0)
      }, 0)
    })

    ch.assertQueue(q, { durable: true })
    const msg = {
      uid: 2
    }
    ch.sendToQueue(q, Buffer.from(JSON.stringify(msg)), {
      persistent: true,
      correlationId: '10abc',
      replyTo: 'callback_queue'
    })
  })
})

Процесс проверки

node rpc_server.js

rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
account_rpc_queue	0

node rpc_client.js

CLI печать rpc_client

接收到回调的消息啦!
{"state":1000,"msg":"成功","data":{"uid":2,"name":"小强","sex":1}}

CLI печать rpc_server

接收到rpc客户端的调用请求
{ uid: 2 }

PM2: развертывание службы узла (кластер служб), управление и мониторинг

официальный сайт пм2

запускать

pm2 start app.js

  • -w --watch: Прислушивайтесь к изменениям каталога и автоматически перезапускайте приложение, если есть изменения.
  • --ignore-file: файлы, которые следует игнорировать при прослушивании изменений каталога. Такие какpm2 start rpc_server.js --watch --ignore-watch="rpc_client.js"
  • -n --name: Установите имя приложения, которое можно использовать для различения приложений.
  • -i --instances: Установите количество экземпляров приложения, 0 соответствует максимальному
  • -f --force: Принудительный запуск приложения, часто используется, когда одно и то же приложение запущено.
  • -o --output <path>: Путь к стандартному выходному файлу журнала.
  • -e --error <path>: Путь к файлу журнала вывода ошибок
  • --env <path>: настроить переменные среды

Такие какpm2 start rpc_server.js -w -i max -n s1 --ignore-watch="rpc_client.js" -e ./server_error.log -o ./server_info.log

В кластерном режиме, то есть при -i max, файл журнала будет автоматически дополнен -${index}, чтобы гарантировать, что он не повторяется.

Другие простые и распространенные команды

pm2 stop app_name|app_id pm2 restart app_name|app_id pm2 delete app_name|app_id pm2 show app_name|app_id OR pm2 describe app_name|app_id pm2 list pm2 monit pm2 logs app_name|app_id --lines <n> --err

Graceful Stop

pm2 stop app_name|app_id

process.on('SIGINT', () => {
  logger.warn('SIGINT')
  connection && connection.close()
  process.exit(0)
})

Когда процесс завершится, программа перехватитSIGINTТаким образом, сигнал выполняет process.exit() до того, как процесс будет уничтожен, чтобы отключить соединение с базой данных и т. д., а затем выполняет process.exit() для корректного выхода из процесса. (Если процесс не завершился через 1,6 с, продолжайте отправлятьSIGKILLсигнал для принудительного завершения процесса)

Process File

ecosystem.config.js

const appCfg = {
  args: '',
  max_memory_restart: '150M',
  env: {
    NODE_ENV: 'development'
  },
  env_production: {
    NODE_ENV: 'production'
  },
  // source map
  source_map_support: true,
  // 不合并日志输出,用于集群服务
  merge_logs: false,
  // 常用于启动应用时异常,超时时间限制
  listen_timeout: 5000,
  // 进程SIGINT命令时间限制,即进程必须在监听到SIGINT信号后必须在以下设置时间结束进程
  kill_timeout: 2000,
  // 当启动异常后不尝试重启,运维人员尝试找原因后重试
  autorestart: false,
  // 不允许以相同脚本启动进程
  force: false,
  // 在Keymetrics dashboard中执行pull/upgrade操作后执行的命令队列
  post_update: ['npm install'],
  // 监听文件变化
  watch: false,
  // 忽略监听文件变化
  ignore_watch: ['node_modules']
}

function GeneratePM2AppConfig({ name = '', script = '', error_file = '', out_file = '', exec_mode = 'fork', instances = 1, args = "" }) {
  if (name) {
    return Object.assign({
      name,
      script: script || `${name}.js`,
      error_file: error_file || `${name}-err.log`,
      out_file: out_file|| `${name}-out.log`,
      instances,
      exec_mode: instances > 1 ? 'cluster' : 'fork',
      args
    }, appCfg)
  } else {
    return null
  }
}

module.exports = {
  apps: [
    GeneratePM2AppConfig({
      name: 'client',
      script: './rpc_client.js'
    }),

    GeneratePM2AppConfig({
      name: 'server',
      script: './rpc_server.js',
      instances: 1
    })
  ]
}

pm2 start ecosystem.config.js

Руководство по предотвращению ям: Рекомендуется называть файл processFile в формате *.config.js. В противном случае вы будете нести ответственность за последствия.

монитор

пожалуйста, переместитеapp.keymetrics.io

PM2: Автоматическое развертывание

ssh готов

  1. ssh-keygen -t rsa -C 'qingf deployment' -b 4096
  2. При наличии нескольких ключей и нескольких пользователей рекомендуется настроить файл ~/.ssh/config, формат примерно такой
// 用不同用户对不同远程主机发起ssh请求时指定私钥
Host qingf.me
  User deploy
  IdentityFile ~/.ssh/qf_deployment_rsa
  // 设置为no可去掉首次登陆(y/n)的选择
  StrictHostKeyChecking no
// 别名用法
Host deployment
  User deploy
  Hostname qingf.me
  IdentityFile ~/.ssh/qingf_deployment_rsa
  StrictHostKeyChecking no
  1. Скопируйте открытый ключ на удаленный (обычно сервер развертывания) соответствующий пользовательский каталог, например, в файл /home/deploy/.ssh/authorized_keys (права доступа к файлу authorized_keys установлены на 600).

настроить экосистему.config.js

Добавьте атрибут развертывания на одном уровне, что и вышеуказанные приложения, следующим образом

deploy: {
    production: {
        'user': 'deploy',
        'host': 'qingf.me',
        'ref': 'remotes/origin/master',
        'repo': 'https://github.com/Cecil0o0/account-server.git',
        'path': '/home/deploy/apps/account-server',
        // 生命周期钩子,在ssh到远端之后setup操作之前执行
        'pre-setup': '',
        // 生命周期钩子,在初始化设置即git pull之后执行
        'post-setup': 'ls -la',
        // 生命周期钩子,在远端git fetch origin之前执行
        'pre-setup': '',
        // 生命周期钩子,在远端git修改HEAD指针到指定ref之后执行
        'post-deploy': 'npm install && pm2 startOrRestart deploy/ecosystem.config.js --env production',
        // 以下这个环境变量将注入到所有app中
        "env"  : {
          "NODE_ENV": "test"
        }
    }
}

Совет: сначала очистите рабочий каталог Git!

Если вы не понимаете или у вас есть вопросы, пожалуйста, обратитесь к Демо

Затем последовательно выполните следующие две команды** (обратите внимание на путь к файлу конфигурации)**

  1. pm2 deploy deploy/ecosystem.config.js production setup
  2. pm2 deploy deploy/ecosystem.config.js production

другие команды

pm2 deploy <configuration_file>

  Commands:
    setup                run remote setup commands
    update               update deploy to the latest release
    revert [n]           revert to [n]th last deployment or 1
    curr[ent]            output current release commit
    prev[ious]           output previous release commit
    exec|run <cmd>       execute the given <cmd>
    list                 list previous deploy commits
    [ref]                deploy to [ref], the "ref" setting, or latest tag

Рекомендуемый набор инструментов оболочки

oh my zsh

отслеживание запросов

как?

  • Используйте значение seneca.fixedargs['tx$'] в качестве идентификатора трассировки в seneca.add и seneca.act для идентификации процесса запроса. Кроме того, встроенная система журналов Seneca распечатает это значение.

сомневаться?

Как встроенная система журналов seneca выполняет индивидуальную печать журналов?

Напоминание: начните с обычного HTTP-запроса, потому что после тестирования, если микросервис инициирует действие самостоятельно, его значение seneca.fixedargs['tx$'] будет другим.

Регистрация и открытие службы консул

ConsulЭто инструмент регистрации и обнаружения службы распределенного кластера, который имеет расширенные функции, такие как проверка работоспособности, иерархическое хранилище KV и несколько центров обработки данных.

Установить

  • При желании используйте предварительно скомпилированныйИнсталляционный пакет
  • Вы также можете клонировать исходный код, а затем скомпилировать и установить

Основное использование

  • Быстро запустите прокси-сервер сервисного режима в режиме разработки и откройте веб-интерфейс, чтобы посетить http://localhost:8500.

consul agent -dev -ui

  • Написать файл определения службы
{
  "service": {
	// 服务名,稍后用于query服务
    "name": "account-server",
	// 服务标签
    "tags": ["account-server"],
	// 服务元信息
    "meta": {
      "meta": "for my service"
    },
	// 服务端口
    "port": 3333,
	// 不允许标签覆盖
    "enable_tag_override": false,
	// 脚本检测做health checks 与-enable-script-checks=true配合使用,有脚本模式、TCP模式、HTTP模式、TTL模式
    "checks": [
      {
        "http": "http://localhost:3333/user",
        "interval": "10s"
      }
    ]
  }
}
  • Служба сервера учетных записей, определяемая запросом

curl http://localhost:8500/v1/catalog/service/account-server

[
    {
        "ID": "e66eb1ff-460c-e63f-b4ac-0cb42daed19c",
        "Node": "haojiechen.local",
        "Address": "127.0.0.1",
        "Datacenter": "dc1",
        "TaggedAddresses": {
            "lan": "127.0.0.1",
            "wan": "127.0.0.1"
        },
        "NodeMeta": {
            "consul-network-segment": ""
        },
        "ServiceID": "account-server",
        "ServiceName": "account-server",
        "ServiceTags": [
            "account-server"
        ],
        "ServiceAddress": "",
        "ServiceMeta": {
            "meta": "for my service"
        },
        "ServicePort": 3333,
        "ServiceEnableTagOverride": false,
        "CreateIndex": 6,
        "ModifyIndex": 6
    }
]

Использование на уровне производства (распределенный кластер)

Узел запускает прокси-сервер в режиме сервера следующим образом.

consul agent -server -bootstrap-expect=1 \
	-data-dir=/tmp/consul -node=agent-one -bind=valid extranet IP \
	-enable-script-checks=true -config-dir=/usr/local/etc/consul.d

Просмотр членов кластера

consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-one  valid extranet IP:8301  alive   server  1.1.0  2         dc1  <all>

Другой узел запускает прокси-сервер клиентского режима следующим образом.

consul agent \
	-data-dir=/tmp/consul -node=agent-two -bind=139.129.5.228 \
	-enable-script-checks=true -config-dir=/usr/local/etc/consul.d

Просмотр членов кластера

consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-two  139.129.5.228:8301  alive   server  1.1.0  2         dc1  <all>

Присоединяйтесь к кластеру

consul join 139.129.5.228 consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-one  valid extranet IP:8301  alive   server  1.1.0  2         dc1  <all>
agent-two  139.129.5.228:8301  alive   server  1.1.0  2         dc1  <all>

Интеграция узла-консула

config.js

// 服务注册与发现
// https://github.com/silas/node-consul#catalog-node-services
  'serverR&D': {
    consulServer: {
      type: 'consul',
      host: '127.0.0.1',
      port: 8500,
      secure: false,
      ca: [],
      defaults: {
        token: ''
      },
      promisify: true
    },
    bizService: {
      name: 'defaultName',
      id: 'defaultId',
      address: '127.0.0.1',
      port: 1000,
      tags: [],
      meta: {
        version: '',
        description: '注册集群'
      },
      check: {
        http: '',
        // check间隔时间(ex: 15s)
        interval: '10s',
        // check超时时间(ex: 10s)
        timeout: '2s',
        // 处于临界状态后自动注销服务的超时时间
        deregistercriticalserviceafter: '30s',
        // 初始化状态值为成功
        status: 'passing',
        // 备注
        notes: '{"version":"111","microservice-port":1115}'
      }
    }
  }

server-register.js

/*
 * @Author: Cecil
 * @Last Modified by: Cecil
 * @Last Modified time: 2018-06-02 11:26:49
 * @Description 微服务注册方法
 */
const defaultConf = require('../config')['serverR&D']
const { ObjectDeepSet, isString } = require('../helper/utils')
const Consul = require('consul')
const { generateServiceName, generateCheckHttp } = require('../helper/consul')

// 注册服务

function register({ consulServer = {}, bizService = {} } = {}) {
  if (!bizService.name && isString(bizService.name)) throw new Error('name is invalid!')
  if (bizService.port !== +bizService.port) throw new Error('port is invalid!')
  if (!bizService.host && isString(bizService.host)) throw new Error('host is invalid!')
  if (!bizService.meta.?version) throw new Error('meta.?version is invalid!')
  if (!bizService.meta.?microservicePort) throw new Error('meta.?microservicePort is invalid!')
  const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer))
  const service = defaultConf.bizService
  service.name = generateServiceName(bizService.name)
  service.id = service.name
  service.address = bizService.host
  service.port = bizService.port
  service.check.http = generateCheckHttp(bizService.host, bizService.port)
  service.check.notes = JSON.stringify(bizService.meta)

  return new Promise((resolve, reject) => {
    consul.agent.service.list().then(services => {
      // 检查主机+端口是否已被占用
      Object.keys(services).some(key => {
        if (services[key].Address === service.address && services[key].Port === service.port) {
          throw new Error(`该服务集群endpoint[${service.address}, ${service.port}]已被占用!`)
        }
      })
      // 注册集群服务
      consul.agent.service.register(service).then(() => {
        logger.info(`${bizService.name}服务已注册`)
        resolve(services)
      }).catch(err => {
        console.log(err)
      })
    }).catch(err => {
      throw new Error(err)
    })
  })
}

module.exports = class ServerRegister {
  constructor() {
    this.register = register
  }
}

проверять

После обеспечения того, чтобы услуги консула и MongoDB существуют во время выполнения, клонировать складDemo, перейдите в корневой каталог проекта и запустите node src.

Узел интеграции с фреймворком-консул

server-register.js

/*
 * @Author: Cecil
 * @Last Modified by: Cecil
 * @Last Modified time: 2018-06-02 13:58:22
 * @Description 微服务注册方法
 */
const defaultConf = require('../config')['serverR&D']
const { ObjectDeepSet, isString } = require('../helper/utils')
const Consul = require('consul')
const { generateServiceName, generateCheckHttp } = require('../helper/consul')
const logger = new (require('./logger'))().generateLogger()

// 注册服务方法定义

function register({ consulServer = {}, bizService = {} } = {}) {
  if (!bizService.name && isString(bizService.name)) throw new Error('name is invalid!')
  if (bizService.port !== +bizService.port) throw new Error('port is invalid!')
  if (!bizService.host && isString(bizService.host)) throw new Error('host is invalid!')
  if (!bizService.meta.?version) throw new Error('meta.?version is invalid!')
  if (!bizService.meta.?microservicePort) throw new Error('meta.?microservicePort is invalid!')
  const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer))
  const service = defaultConf.bizService
  service.name = generateServiceName(bizService.name)
  service.id = service.name
  service.address = bizService.host
  service.port = bizService.port
  service.check.http = generateCheckHttp(bizService.host, bizService.port)
  service.check.notes = JSON.stringify(bizService.meta)

  return new Promise((resolve, reject) => {
    consul.agent.service.list().then(services => {
      // 检查主机+端口是否已被占用
      Object.keys(services).some(key => {
        if (services[key].Address === service.address && services[key].Port === service.port) {
          throw new Error(`该服务集群endpoint[${service.address}, ${service.port}]已被占用!`)
        }
      })
      // 注册集群服务
      consul.agent.service.register(service).then(() => {
        logger.info(`${bizService.name}服务注册成功`)
        resolve(services)
      }).catch(err => {
        console.log(err)
      })
    }).catch(err => {
      throw new Error(err)
    })
  })
}

module.exports = class ServerRegister {
  constructor() {
    this.register = register
  }
}

account-server/src/index.js

const vastify = require('vastify')
const version = require('../package.json').version
const microservicePort = 10015
const httpPort = 3333

// 注册服务
vastify.ServerRegister.register({
  bizService: {
    name: 'account-server',
    host: '127.0.0.1',
    port: httpPort,
    meta: {
      ?version: version,
      ?microservicePort: microservicePort
    }
  }
})

Постоянное хранилище MongoDB

  • Фреймворк использует mongoose в качестве mongoClient, конечно, вы также можете выбрать собственный mongoClient для nodejs.

Пользовательский модуль перед преобразованием не будет размещать код, если вы ленивы, пожалуйста, проверьте детали.Demo

Промежуточное программное обеспечение службы маршрутизации, объединяющее Seneca и Consul

microRouting.js

/*
 * @Author: Cecil
 * @Last Modified by: Cecil
 * @Last Modified time: 2018-06-02 16:22:02
 * @Description 微服务内部路由中间件,暂不支持自定义路由匹配策略
 */

'use strict'

const Consul = require('consul')
const defaultConf = require('../config')
const { ObjectDeepSet, isNumber } = require('../helper/utils')
const { getServiceNameByServiceKey, getServiceIdByServiceKey } = require('../helper/consul')
const logger = new (require('../tools/logger'))().generateLogger()
const { IPV4_REGEX } = require('../helper/regex')

let services = {}
let consul = null

/**
 * @author Cecil0o0
 * @description 同步consul服务中心的所有可用服务以及对应check并组装成对象以方便取值
 */
function syncCheckList () {
  return new Promise((resolve, reject) => {
    consul.agent.service.list().then(allServices => {
      if (Object.keys(allServices).length > 0) {
        services = allServices
        consul.agent.check.list().then(checks => {
          Object.keys(checks).forEach(key => {
            allServices[getServiceIdByServiceKey(key)]['check'] = checks[key]
          })
          resolve(services)
        }).catch(err => {
          throw new Error(err)
        })
      } else {
        const errmsg = '未发现可用服务'
        logger.warn(errmsg)
        reject(errmsg)
      }
    }).catch(err => {
      throw new Error(err)
    })
  })
}

function syncRoutingRule(senecaInstance = {}, services = {}) {
  Object.keys(services).forEach(key => {
    let service = services[key]
    let name = getServiceNameByServiceKey(key)
    let ?addr = service.Address
    let ?microservicePort = ''
    let ?version = ''
    try {
      let base = JSON.parse(service.check.Notes)
      ?microservicePort = base.?microservicePort
      ?version = base.?version
    } catch (e) {
      logger.warn(`服务名为${serviceName}。该服务check.Notes为非标准JSON格式,程序已忽略。请检查服务注册方式(请确保调用ServerRegister的register来注册服务)`)
    }

    if (IPV4_REGEX.test(?addr) && isNumber(?microservicePort)) {
      if (service.check.Status === 'passing') {
        senecaInstance.client({
          host: ?addr,
          port: ?microservicePort,
          pin: {
            ?version,
            ?target: name
          }
        })
      } else {
        logger.warn(`${?target}@${?version || '无'}服务处于critical,因此无法使用`)
      }
    } else {
      logger.warn(`主机(${?addr})或微服务端口号(${?microservicePort})有误,请检查`)
    }
  })
}


function startTimeInterval() {
  setInterval(syncCheckList, defaultConf.routing.servicesRefresh)
}

function microRouting(consulServer) {
  var self = this
  consul = Consul(ObjectDeepSet(defaultConf['serverR&D'].consulServer, consulServer))
  syncCheckList().then(services => {
    syncRoutingRule(self, services)
  })
}

module.exports = microRouting

После обеспечения времени выполнения consul и mongodb, пожалуйста, объедините эти дваconfig-server,account-serverДемо для тестирования.

[Продолжение следует....]