предисловие
RPC (удаленный вызов процедур), переводится как «удаленный вызов процедур», представляет собой эффективный механизм связи между службами или узлами в распределенной системе. С помощью RPC узел (или клиент) может легко вызывать удаленные (или серверные) методы или службы точно так же, как и локальные вызовы. Многие существующие платформы RPC требуют раскрытия адреса сервера, то есть должны быть известны IP-адрес и порт RPC сервера. В этой статье будет представлен метод связи RPC, который не требует раскрытия IP-адресов и портов. Этот метод основан на отложенной очереди, реализованной операциями Redis BRPOP/BLPOP, и асинхронном механизме goroutine coroutine в Golang.Вся структура очень проста и понятна, а также эффективна, стабильна и безопасна. Этот метод был применен кCrawlabСреди узлов связи он стал для каждого узла основным способом передачи информации в режиме реального времени. Ниже мы начнем с PubSub, раннего решения Crawlab для связи узлов, представим проблемы и решения, с которыми столкнулись в то время, а затем о том, как перейти на текущее решение RPC и как оно работает в Crawlab.
Решение на основе PubSub
PubSub
Ранний Crawlab был основан на PubSub Redis, модели публикации-подписки. Эта схема в основном используется для односторонней связи «один ко многим» в Redis. Его использование очень простое:
- Подписчики пользуются
SUBSCRIBE channel1 channel2 ...
подписаться на один или несколько каналов; - Издатель использует
PUBLISH channelx message
опубликовать сообщение для подписчиков канала.
РедисPubSub
Может использоваться как широковещательный режим, то есть одному издателю соответствует несколько подписчиков. В Crawlab у нас есть только один подписчик, соответствующий одному издателю (главный узел -> рабочий узел:nodes:<node_id>
) или один подписчик соответствует нескольким издателям (рабочий узел -> главный узел:nodes:master>
).这是为了方便双向通信。
Ниже приведена схематическая диаграмма принципа связи узла.
Каждый узел будет передавать RedisPubSub
функцию общения друг с другом.
так называемыйPubSub
, которая представляет собой просто модель публикации-подписки. Подписчик (Subscriber) подписывается на канал в Redis, и любой другой узел может выступать в роли издателя (Publisher) для публикации (Publish) сообщений на канале.
Коммуникационная архитектура
В Crawlab главный узел подписываетсяnodes:master
канал, если другим узлам необходимо отправлять сообщения главному узлу, им нужно только отправитьnodes:master
Просто опубликуйте сообщение. Точно так же каждый рабочий узел подпишется на собственный канал.nodes:<node_id>
(node_id
— это идентификатор узла в MongoDB, который является идентификатором объекта MongoDB. Если вам нужно отправить сообщение на рабочий узел, вам нужно только опубликовать сообщение на этом канале.
Простой процесс сетевого запроса выглядит следующим образом:
- Клиент (интерфейсное приложение) отправляет запрос на главный узел (API);
- Мастер-узел через Redis
PubSub
из<nodes:<node_id>
Канал публикует сообщения на соответствующие рабочие узлы; - После того, как рабочий узел получил сообщение, он выполняет некоторые операции и передает соответствующее сообщение через
<nodes:master>
Канал публикуется на главном узле; - После того, как главный узел получает сообщение, он возвращает сообщение клиенту.
Не вся связь узла является двунаправленной, то есть главный узел будет связываться с рабочим узлом только в одностороннем порядке, и рабочий узел не будет возвращать ответ главному узлу, так называемая односторонняя связь. Ниже приведены типы связи Crawlab.
chan
иgoroutine
Если вы читаете исходный код Crawlab, вы обнаружите, что там многоchan
синтаксис, который является функцией параллелизма Golang.
chan
Представленный как канал, он делится на небуферизованные и буферизованные каналы в Golang, Мы используем небуферизованные каналы для блокировки сопрограмм, только когдаchan
полученный сигнал (chan <- "some signal"
), блокировка будет снята, и сопрограмма перейдет к следующему шагу). В режиме запрос-ответ, если это двусторонняя связь, мастер-узел сгенерирует небуферизованный канал, чтобы заблокировать запрос после получения запроса.После получения сообщения от рабочего узла, он присвоит значение небуферизованному каналу. , блокировать и освобождать, а также возвращать ответ клиенту.
go
Команда получитgoroutine
(сопрограмма) для завершения параллелизма, сотрудничестваchan
, сопрограмма может быть приостановлена с использованием небуферизованного канала в ожидании сигнала для выполнения следующей операции.
Схема на основе очереди с задержкой
Проблемы со схемой PubSub
Шаблон проектирования подписки-публикации сообщений в PubSub является эффективным способом реализации связи между узлами, но у него есть две проблемы:
- Данные PubSub мгновенны и будут потеряны, когда Redis выйдет из строя;
- Написание службы связи на основе PubSub потребует использования
goroutine
иchannel
, что увеличивает сложность разработки и снижает ремонтопригодность.
Среди них вторая проблема является более сложной. Если мы хотим добавить больше функций, нам нужно написать много асинхронного кода, что увеличит связь между системными модулями, что приведет к плохой масштабируемости и болезненному чтению кода.
Поэтому для решения этой проблемы мы используем службу RPC, основанную на очереди отложенных сообщений Redis.
Архитектура очереди с задержкой
На следующем рисунке представлена схема реализации RPC на основе архитектуры отложенной очереди.
У каждого узла есть клиент (Client) и сервер (Server). Клиент используется для отправки сообщения целевому узлу (Target Node) и получения возвращенного сообщения, а сервер используется для приема и обработки сообщения исходного узла (Source Node) и возврата сообщения клиенту узла. исходный узел.
Весь процесс связи RPC выглядит следующим образом:
- Клиент исходного узла проходит
LPUSH
Push-сообщения в Redisnodes:<node_id>
и выполнитьBRPOP nodes:<node_id>:<msg_id>
Заблокируйте и прослушайте эту очередь сообщений; - Сервер целевого узла проходит
BRPOP
Слушалnodes:<node_id>
, после получения сообщения передатьMethod
Поле выполняет соответствующую программу; - После выполнения целевого узла сервер проходит
LPUSH
Push-сообщения в Redisnodes:<node_id>:<msg_id>
середина; - Поскольку клиент исходного узла прослушивал
nodes:<node_id>:<msg_id>
Очередь сообщений, узел назначения, когда сервер нажимает сообщения в очередь, узел-источник немедленно получает ответное сообщение, чтобы выполнить последующую обработку.
Таким образом, процесс связи всего узла завершается через Redis. Преимущество этого заключается в том, что вам не нужно раскрывать IP-адрес и порт HTTP, и вам нужно знать только идентификатор узла для завершения связи RPC.
Код RPC, разработанный таким образом, легче понять и поддерживать. Каждый раз, когда вам нужно расширить новый класс связи, просто наследуйтеrpc.Service
класс, реализацияClientHandle
(метод обработки на стороне клиента) иServerHandle
(Метод обработки на стороне сервера) подходит.
Скажи больше здесьBRPOP
.它将移出并获取消息队列的最后一个元素, 如果消息队列没有元素会阻塞队列直到等待超时或发现可弹出元素为止。 Следовательно,BRPOP
По сравнению с опросом или другими методами, команды позволяют избежать непрерывных запросов к Redis и не тратить впустую сетевые и вычислительные ресурсы.
Если вы не знакомы с рабочими командами Redis, вы можете обратиться к буклету Nuggets.«Redis Deep Adventures: основные принципы и практика применения», эта брошюра подробно знакомит с принципами и инженерными методами Redis и очень удобна для применения Redis в реальной разработке.
Кодекс Практика
После разговора о таком большом количестве теоретических знаний нам все же нужно взглянуть на код. Учителя часто учат нас: «Говорить дешево. Покажите мне код».
Поскольку бэкэнд Crawlab разработан в Голан, понимание следующего кода требует некоторых базовых знаний о Голанге.
структура данных сообщения
Сначала нам нужно определить структуру данных для передачи сообщений. код показывает, как показано ниже.
package entity
type RpcMessage struct {
Id string `json:"id"` // 消息ID
Method string `json:"method"` // 消息方法
NodeId string `json:"node_id"` // 节点ID
Params map[string]string `json:"params"` // 参数
Timeout int `json:"timeout"` // 超时
Result string `json:"result"` // 结果
Error string `json:"error"` // 错误
}
Здесь мы определяем такие поля, как идентификатор сообщения, метод, идентификатор узла, параметры и т. д. Идентификатор сообщения представляет собой UUID, который обеспечивает уникальность идентификатора сообщения.
базовый интерфейс
Во-первых, мы определяем абстрактный базовый интерфейс, который удобно наследовать реальным модулям бизнес-логики. Логика обработки сервера находится вServerHandle
в, вернутьсяentity
внутреннийRpcMessage
И логика клиентаClientHandle
середина.
// RPC服务基础类
type Service interface {
ServerHandle() (entity.RpcMessage, error)
ClientHandle() (interface{}, error)
}
Общий метод клиента
Когда мы вызываем общий метод клиента, нам нужно реализовать две логики:
- Отправьте сообщение: сгенерируйте идентификатор сообщения, сериализуйте сообщение в JSON и поместите LPUSH в очередь сообщений Redis;
- Задержите получение возвращенного сообщения через BRPOP и верните его вызывающему абоненту.
Ниже приведен реализованный код.
// 客户端处理消息函数
func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) {
return func() (replyMsg entity.RpcMessage, err error) {
// 请求ID
msg.Id = uuid.NewV4().String()
// 发送RPC消息
msgStr := utils.ObjectToString(msg)
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 获取RPC回复消息
dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout)
if err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 反序列化消息
if err := json.Unmarshal([]byte(dataStr), &replyMsg); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 如果返回消息有错误,返回错误
if replyMsg.Error != "" {
return replyMsg, errors.New(replyMsg.Error)
}
return
}
}
обработка на стороне сервера
Логика серверной обработки следующая, общая логика такая:
- В цикле получить сообщение, соответствующее узлу, через BRPOP;
- Когда сообщение получено, генерируется горутина для асинхронной обработки сообщения;
- продолжайте ждать.
ты сможешьInitRpcService
Вышеуказанная логика видно в этом методе. Частный методhandleMsg
Реализована логика сериализации, вызова сервисных методов RPC на стороне сервера и отправки возвращаемых сообщений. Если вам нужно расширить тип метода RPC, в методе фабричного классаGetService
Просто добавьте это.
// 获取RPC服务
func GetService(msg entity.RpcMessage) Service {
switch msg.Method {
case constants.RpcInstallLang:
return &InstallLangService{msg: msg}
case constants.RpcInstallDep:
return &InstallDepService{msg: msg}
case constants.RpcUninstallDep:
return &UninstallDepService{msg: msg}
case constants.RpcGetLang:
return &GetLangService{msg: msg}
case constants.RpcGetInstalledDepList:
return &GetInstalledDepsService{msg: msg}
}
return nil
}
// 处理RPC消息
func handleMsg(msgStr string, node model.Node) {
// 反序列化消息
var msg entity.RpcMessage
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 获取service
service := GetService(msg)
// 根据Method调用本地方法
replyMsg, err := service.ServerHandle()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 发送返回消息
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
}
// 初始化服务端RPC服务
func InitRpcService() error {
go func() {
for {
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 获取获取消息队列信息
msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
if err != nil {
if err != redis.ErrNil {
log.Errorf(err.Error())
debug.PrintStack()
}
continue
}
// 处理消息
go handleMsg(msgStr, node)
}
}()
return nil
}
пример метода вызова
Узлы Crawlab часто нуждаются в установке некоторых сторонних зависимостей для сканера, таких как pymongo, запросы и т. д. Среди них нам также необходимо знать, установлена ли определенная зависимость на определенном узле, что требует межсерверной связи, то есть двусторонней связи в распределенной сети. И реализуется эта логика через RPC. Главный узел инициирует вызов RPC к целевому узлу, целевой узел запускает вызываемый метод, возвращает текущий результат, то есть установленный список зависимостей, клиенту, а клиент возвращает его вызывающей стороне.
Следующий код реализует метод RPC для получения установленных зависимостей на целевом узле.
// 获取已安装依赖服务
// 继承Service基础类
type GetInstalledDepsService struct {
msg entity.RpcMessage
}
// 服务端处理方法
// 重载ServerHandle
func (s *GetInstalledDepsService) ServerHandle() (entity.RpcMessage, error) {
lang := utils.GetRpcParam("lang", s.msg.Params)
deps, err := GetInstalledDepsLocal(lang)
if err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
resultStr, _ := json.Marshal(deps)
s.msg.Result = string(resultStr)
return s.msg, nil
}
// 客户端处理方法
// 重载ClientHandle
func (s *GetInstalledDepsService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
s.msg, err = ClientFunc(s.msg)()
if err != nil {
return o, err
}
// 反序列化
var output []entity.Dependency
if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
return o, err
}
o = output
return
}
позвонить
Напишите RPC Server и метод обработки клиента, вы можете легко написать логику вызовов. Ниже приведен способ призыва к получению различного представления. СначалаGetService
Перед получением заводского классаGetInstalledDepsService
, а затем вызовите его метод обработки на стороне клиентаClientHandle
, и вернуть результат. Это похоже на локальный вызов метода. Разве это не просто?
// 获取远端已安装依赖
func GetInstalledDepsRemote(nodeId string, lang string) (deps []entity.Dependency, err error) {
params := make(map[string]string)
params["lang"] = lang
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcGetInstalledDepList,
Params: params,
Timeout: 60,
})
o, err := s.ClientHandle()
if err != nil {
return
}
deps = o.([]entity.Dependency)
return
}
Эпилог
В этой статье в основном представлен метод связи RPC, основанный на очереди задержки Redis, который является очень безопасным методом без раскрытия IP-адресов или портов каждого узла или службы. Более того, этот способ использовался в Голанге вCrawlabДвусторонняя связь реализована в Golang, особенно в горутине, которая естественным образом поддерживает асинхронность в Golang, что упрощает реализацию этого метода. На самом деле, этот метод теоретически очень эффективен и может поддерживать высокую скорость одновременной передачи данных.
Но когдаCrawlabС реализацией все еще есть некоторые проблемы, то есть она не ограничивает количество одновременных серверных обработок. Поэтому, если будет передано слишком много сообщений, ресурсы сервера будут заполнены, что приведет к снижению скорости обработки и даже к простою. Метод восстановления заключается в ограничении количества одновременно работающих серверов. Кроме того, по причинам ограниченного времени у автора не было времени проверить фактическую эффективность передачи этого RPC-сообщения, отказоустойчивость также не является членом. Так что в целом есть много возможностей для улучшения и оптимизации.
Тем не менее такой подход являетсяCrawlabЭтого достаточно для удаленной связи с низким уровнем параллелизма, при реальном использовании проблем не возникает, и он очень стабилен. Разработчикам, которым требуется конфиденциальность и которые не хотят раскрывать адресную информацию, мы также рекомендуем попробовать этот метод в практических приложениях.