Внедрение инфраструктуры IDL+RPC с нуля

задняя часть сервер GitHub удаленная работа

что такое РПЦ

Давным-давно в автономную эпоху на одном компьютере выполнялось несколько процессов, и между процессами не было связи. Внезапно однажды возникает новое требование.Процесс А должен реализовать функцию рисования.Бывает, что соседний процесс Б уже имеет эту функцию.Ленивый программист С придумал метод: Процесс А настраивает функцию рисования процесса Б. . Так оно и появилосьIPC(Межпроцессное взаимодействие, межпроцессное взаимодействие). Таким образом, программист C радостно пошел завтракать!

Несколько лет спустя, в эпоху Интернета, каждый компьютер взаимосвязан. В это время у работодателей появляются новые требования, и нужно использовать еще не зависший процесс А.tensorflowСмайлики >_IPCреализация, поставитьIPCРасширенный до Интернета, этоRPC(Удаленный вызов процедуры, удаленный вызов процедуры).RPCПо сути, это инструмент, с помощью которого процесс на одном компьютере вызывает процесс на другом компьютере. зрелыйRPCБольшинство решений будут иметь такие механизмы, как регистрация службы, обнаружение службы, переход на более раннюю версию автоматического выключателя и ограничение тока. В настоящее время на рынке существует множество зрелых RPC, таких какFacebookДомаThrift,GoogleДомаgRPC, АлиDubboи муравейSOFA.

язык определения интерфейса

Язык определения интерфейса, сокращенноIDL,Это набор схем кодирования для обеспечения надежной связи между концами. Здесь есть сериализация и десериализация передаваемых данных.Наши часто используемые http-запросы обычно используют json в качестве инструмента сериализации.rpc协议的时候因为要求响应迅速等特点,所以大多数会定义一套序列化协议。 Например:

Protobuf:

// protobuf 版本
syntax = "proto3";
 
package testPackage;
 
service testService {
  // 定义一个ping方法,请求参数集合pingRequest, 响应参数集合pingReply 
  rpc ping (pingRequest) returns (pingReply) {}
}
 
message pingRequest {
  // string 是类型,param是参数名,1是指参数在方法的第1个位置
  string param = 1;
}
 
message pingReply {
  string message = 1;
  string content = 2;
}

Говоря оProtobufЯ должен рассказать о другой работе автора этой библиотекиCap'n protoТеперь заявлено, что производительность является прямым скачкомGoogle Protobuf, переходим непосредственно к официальному сравнению:

Cap'n proto

Хотя я знаю многоProtobufБолее быстрая схема кодирования, но это также удивительно, чтобы достичь этого момента. Почему это так быстро? Документ Cap'n Proto сразу объясняет это, потому чтоCap'n ProtoНет серийного номера и шага десериализации.Cap'n ProtoФормат закодированных данных согласуется с расположением в памяти, поэтому закодированная структура может быть сохранена непосредственно в байтах на жестком диске. Приклейте каштан:

@0xdbb9ad1f14bf0b36;  # unique file ID, generated by `capnp id`

struct Person {
  name @0 :Text;
  birthdate @3 :Date;

  email @1 :Text;
  phones @2 :List(PhoneNumber);

  struct PhoneNumber {
    number @0 :Text;
    type @1 :Type;

    enum Type {
      mobile @0;
      home @1;
      work @2;
    }
  }
}

struct Date {
  year @0 :Int16;
  month @1 :UInt8;
  day @2 :UInt8;
}

Схема кодирования, которую мы хотим здесь настроить, основана наprotobufиCap'n ProtoВ сочетании с аналогичным синтаксисом. Поскольку я предпочитаю главных героев-мужчин в Sword Art Online, я назвал эту библиотеку——Kiritobuf.

Сначала мы определяемkiritoСинтаксис:

# test

service testService {
  method ping (reqMsg, resMsg)
}

struct reqMsg {
  @0 age = Int16;
  @1 name = Text;
}

struct resMsg {
  @0 age = Int16;
  @1 name = Text;
}
  • #начинается с примечания
  • зарезервированные ключевые слова,service,method,struct,
  • {}представляет собой блочную структуру
  • ()Есть два параметра, первый — это структура параметра запроса, а второй — структура возвращаемого значения.
  • @Параметр определяется дескриптором местоположения,0сказал в первую очередь
  • =Левая часть числа — название параметра, правая — тип параметра.

Тип параметра:

  • Boolean: Bool
  • Integers: Int8, Int16, Int32, Int64
  • Unsigned integers: UInt8, UInt16, UInt32, UInt64
  • Floating-point: Float32, Float64
  • Blobs: Text, Data
  • Lists: List(T)

Определив синтаксис и типы параметров, давайте пройдемся по процессу генерации кода с абстрактными отношениями:

ast

получать.kiritoФайл с суффиксом, прочитать все символы, сгенерированные лексическим анализаторомtokenпринадлежитtokenВходящий парсер генерируетAST (抽象语法树).

Сначала мы создаем новыйkirito.jsдокумент:

'use strict';

const fs = require('fs');
const tokenizer = Symbol.for('kirito#tokenizer');
const parser = Symbol.for('kirito#parser');
const transformer = Symbol.for('kirito#transformer');
// 定义词法分析Token类型 
const TYPE = {
  // 保留字,service、struct、method...
  KEYWORD: 'keyword',
  // 变量
  VARIABLE: 'variable',
  // 符号,{ } ( ) ; # @ ,
  SYMBOL: 'symbol',
  // 参数位置,数值表示0、1、2、3...
  INDEX: 'index'
};
// 定义语法分析字段类型
const EXP = {
  // 变量
  VARIABLE: 'Identifier',
  // 结构申明,service、struct、method
  STRUCT_DECLARATIONL: 'StructDeclaration',
  // 变量申明,@
  VAR_DECLARATION: 'VariableDeclaration',
  // 数据类型, Int16、UInt16、Bool、Text...
  TYPE: 'DataType',
};

После определения некоторых необходимых литералов следующим шагом является фаза лексического анализа.

Лексический анализ

Мы разрабатываем лексический анализ, чтобы получитьTokenЭто выглядит так:

[ { type: 'keyword', value: 'service' },
  { type: 'variable', value: 'testService' },
  { type: 'symbol', value: '{' },
  { type: 'keyword', value: 'method' },
  { type: 'variable', value: 'ping' },
  { type: 'symbol', value: '(' },
  { type: 'variable', value: 'reqMsg' },
  { type: 'variable', value: 'resMsg' },
  { type: 'symbol', value: ')' },
  { type: 'symbol', value: '}' },
  { type: 'keyword', value: 'struct' },
  { type: 'variable', value: 'reqMsg' },
  { type: 'symbol', value: '{' },
  { type: 'symbol', value: '@' },
  { type: 'index', value: '1' },
  { type: 'variable', value: 'age' },
  { type: 'symbol', value: '=' },
  { type: 'variable', value: 'Int16' },
  { type: 'symbol', value: ';' },
  { type: 'symbol', value: '@' },
  { type: 'index', value: '2' },
  { type: 'variable', value: 'name' },
  { type: 'symbol', value: '=' },
  { type: 'variable', value: 'Text' },
  { type: 'symbol', value: ';' },
  { type: 'symbol', value: '}' },
  { type: 'keyword', value: 'struct' },
  { type: 'variable', value: 'resMsg' },
  { type: 'symbol', value: '{' },
  { type: 'symbol', value: '@' },
  { type: 'index', value: '1' },
  { type: 'variable', value: 'age' },
  { type: 'symbol', value: '=' },
  { type: 'variable', value: 'Int16' },
  { type: 'symbol', value: ';' },
  { type: 'symbol', value: '@' },
  { type: 'index', value: '2' },
  { type: 'variable', value: 'name' },
  { type: 'symbol', value: '=' },
  { type: 'variable', value: 'Text' },
  { type: 'symbol', value: ';' },
  { type: 'symbol', value: '}' } ]

Этапы лексического анализа:

  • получитьkiritoкодовая строка в соответствии с\nРазделите и объедините в массив A, каждый элемент массива представляет собой строку кода
  • Пройдите массив A и прочитайте каждую строку кода символ за символом
  • Он определен во время чтения правила сопоставления, такого как комментарии, зарезервированные слова, переменные, символы и тому подобное массив
  • Добавьте каждый сопоставленный символ или строку в массив токенов в соответствии с соответствующим типом

код показывает, как показано ниже:

[tokenizer] (input) {
    // 保留关键字
    const KEYWORD = ['service', 'struct', 'method'];
    // 符号
    const SYMBOL = ['{', '}', '(', ')', '=', '@', ';'];
    // 匹配所有空字符
    const WHITESPACE = /\s/;
    // 匹配所有a-z的字符、不限大小写
    const LETTERS = /^[a-z]$/i;
    // 匹配数值
    const NUMBER = /\d/;
    
    // 以换行符分割成数组
    const source = input.split('\n');
    // 最终生成的token数组
    const tokens = [];
    source.some(line => {
      // 声明一个 `current` 变量作为指针
      let current = 0;
      // 是否继续当前循环、移动到下一行,用于忽略注释
      let isContinue = false;
      while (current < line.length) {
        let char = line[current];

        // 匹配任何空字符
        if (WHITESPACE.test(char)) {
          current++;
          continue;
        }

        // 忽略注释
        if (char === '#') {
          isContinue = true;
          break;
        }

        // 匹配a-z|A-Z的字符
        if (LETTERS.test(char)) {
          // 定义一个字符串变量,用来存储连续匹配成功的字符
          let value = '';
          // 匹配字符(变量/保留字)、字符加数字(参数类型)
          while (LETTERS.test(char) || NUMBER.test(char)) {
            // 追加字符
            value += char;
            // 移动指针
            char = line[++current];
          }
          if (KEYWORD.indexOf(value) !== -1) {
            // 匹配保留关键字
            tokens.push({
              type: TYPE.KEYWORD,
              value: value
            });
          } else {
            // 匹配变量名、类型
            tokens.push({
              type: TYPE.VARIABLE,
              value: value
            });
          }
          continue;
        }

        // 匹配符号 { } ( ) = @
        if (SYMBOL.indexOf(char) !== -1) {
          tokens.push({
            type: TYPE.SYMBOL,
            value: char
          });
          // 匹配@ 参数位置符号
          if (char === '@') {
            char = line[++current];
            // 匹配参数位置0-9
            if (NUMBER.test(char)) {
              // 定义参数位置字符串,用来存储连续匹配成功的参数位置
              let index = '';
              // 匹配参数位置0-9
              while (NUMBER.test(char)) {
                // 追加参数位置 `1`+`2`=`12`
                index += char;
                char = line[++current];
              }
              tokens.push({
                type: TYPE.INDEX,
                value: index
              });
            }
            continue;
          }
          current++;
          continue;
        }
        current++;
      }
        
      // 跳过注释
      if (isContinue) return false;
    });
    return tokens;
  }
Разбор

После получения токена вышеуказанного лексического анализа мы можем разобрать токен Нам нужен формат конечного сгенерированного AST следующим образом:

{
  "type": "Program",
  "body": [
    {
      "type": "StructDeclaration",
      "name": "service",
      "value": "testService",
      "params": [
        {
          "type": "StructDeclaration",
          "name": "method",
          "value": "ping",
          "params": [
            {
              "type": "Identifier",
              "value": "reqMsg"
            },
            {
              "type": "Identifier",
              "value": "resMsg"
            }
          ]
        }
      ]
    },
    {
      "type": "StructDeclaration",
      "name": "struct",
      "value": "reqMsg",
      "params": [
        {
          "type": "VariableDeclaration",
          "name": "@",
          "value": "1",
          "params": [
            {
              "type": "Identifier",
              "value": "age"
            },
            {
              "type": "DataType",
              "value": "Int16"
            }
          ]
        },
        {
          "type": "VariableDeclaration",
          "name": "@",
          "value": "2",
          "params": [
            {
              "type": "Identifier",
              "value": "name"
            },
            {
              "type": "DataType",
              "value": "Text"
            }
          ]
        }
      ]
    },
    {
      "type": "StructDeclaration",
      "name": "struct",
      "value": "resMsg",
      "params": [
        {
          "type": "VariableDeclaration",
          "name": "@",
          "value": "1",
          "params": [
            {
              "type": "Identifier",
              "value": "age"
            },
            {
              "type": "DataType",
              "value": "Int16"
            }
          ]
        },
        {
          "type": "VariableDeclaration",
          "name": "@",
          "value": "2",
          "params": [
            {
              "type": "Identifier",
              "value": "name"
            },
            {
              "type": "DataType",
              "value": "Text"
            }
          ]
        }
      ]
    }
  ]
}

Глядя на рисунок выше, мы можем легко получить зависимости и отношения между структурами, параметрами, типами данных и функциями.

  • Пройдите массив токенов, полученный лексическим анализом, и извлеките узлы зависимости между токенами, вызвав функцию анализа.
  • Правила добычи токена определяются внутри функции анализа, такие как:
    1. сервисное зарезервированное слово сервисное имя { функция зарезервированное слово имя функции (входной параметр, возвращаемый параметр) }
    2. структура параметра зарезервированное слово структура имя { положение параметра имя параметра тип данных параметра }
  • Рекурсивно вызовите функцию анализа, чтобы извлечь соответствующие зависимости узлов и добавить узлы в AST.

код показывает, как показано ниже:

[parser] (tokens) {
    // 声明ast对象,作为分析过程中的节点存储器
    const ast = {
      type: 'Program',
      body: []
    };
    // 定义token数组指针变量
    let current = 0;
    
    // 定义函数、用例递归分析节点之间的依赖和存储
    function walk() {
      // 当前指针位置的token节点
      let token = tokens[current];

      // 检查变量、数据类型
      if (token.type === TYPE.VARIABLE) {
        current++;
        return {
          type: EXP.VARIABLE,
          struct: tokens[current].value === '=' ? false : true,
          value: token.value
        };
      }

      // 检查符号
      if (token.type === TYPE.SYMBOL) {
        // 检查@,添加参数位置绑定
        if (token.value === '@') {
          // 移动到下一个token, 通常是个数值,也就是参数位置
          token = tokens[++current];
          // 定义参数节点,用来存储位置、变量名、数据类型
          let node = {
            type: EXP.VAR_DECLARATION,
            name: '@',
            value: token.value,
            params: []
          };
            
          // 移动到下一个token, 准备开始读取参数变量名和数据类型
          token = tokens[++current];
          // 每个参数节点以;符号结束
          // 这个循环中会匹配参数变量名和参数数据类型并把他们添加到当前的参数节点上
          while (token.value !== ';') {
            // 递归匹配参数变量名、数据类型
            node.params.push(walk());
            // 指定当前指针的token
            token = tokens[current];
          }
          // 移动token数组指针
          current++;
          // 返回参数节点
          return node;
        }

        // 检查=,匹配该符号右边的参数数据类型
        if (token.value === '=') {
          // 移动到下一个token
          token = tokens[++current];
          current++;
          return {
            type: EXP.TYPE,
            value: token.value
          };
        }

        current++;
      }

      // 检查保留字
      if (token.type === TYPE.KEYWORD) {
        // 检查service、struct
        if (['struct', 'service'].indexOf(token.value) !== -1) {
          // 缓存保留字
          let keywordName = token.value;
          // 移动到下一个token,通常是结构名
          token = tokens[++current];
          // 定义结构节点,用来储存结构保留字、结构名、结构参数数组
          let node = {
            type: EXP.STRUCT_DECLARATIONL,
            // 保留字
            name: keywordName,
            // 结构名
            value: token.value,
            // 参数数组
            params: []
          };

          // 移动到下一个token
          token = tokens[++current];
          // 匹配符号且是{,准备解析{里的参数
          if (token.type === TYPE.SYMBOL && token.value === '{') {
            // 移动到下一个token
            token = tokens[++current];
            // 等于}是退出参数匹配,完成参数储存
            while (token.value !== '}') {
              // 递归调用分析函数,获取参数数组
              node.params.push(walk());
              // 移动token到当前指针
              token = tokens[current];
            }
            current++;
          }
          // 返回结构节点
          return node;
        }

        if (token.value === 'method') {
          // 检查method,匹配请求函数名
          token = tokens[++current];
          // 定义请求函数节点,用来储存函数入参和返回参数
          let node = {
            type: EXP.STRUCT_DECLARATIONL,
            name: 'method',
            value: token.value,
            params: []
          };
            
          // 移动到下一个token
          token = tokens[++current];
          // 匹配(符号,准备储存入参和返回参数
          if (token.type === TYPE.SYMBOL && token.value === '(') {
            // 移动到入参token
            token = tokens[++current];
            // 等于)时退出匹配,完成函数匹配
            while (token.value !== ')') {
              // 递归调用分析函数
              node.params.push(walk());
              token = tokens[current];
            }
            current++;
          }
          // 返回函数节点
          return node;

        }
      }
      
      // 抛出未匹配到的错误
      throw new TypeError(token.type);
    }

    // 遍历token数组
    while (current < tokens.length) {
      ast.body.push(walk());
    }
    
    // 返回ast
    return ast;
  }
преобразователь

проанализированоASTТогда нам нужно дальшеASTПреобразование проще в эксплуатацииjs对象. Формат следующий:

{ 
    testService: { 
        ping: {
            [Function]
            param: { 
                reqMsg: { 
                    age: 'Int16', 
                    name: 'Text' 
                },
                resMsg: { 
                    age: 'Int16', 
                    name: 'Text' 
                } 
            }
        } 
    } 
}

Благодаря приведенному выше формату нам легче узнать, сколькоservice,serviceСколько там функций и параметры функций.

код показывает, как показано ниже:

// 转换器
  [transformer] (ast) {
    // 定义汇总的service
    const services = {};
    // 定义汇总的struct,用来储存参数结构,以便最后和service合并
    const structs = {};

    // 转换数组
    function traverseArray(array, parent) {
      // 遍历数组
      array.some((child) => {
        // 分治转换单个节点
        traverseNode(child, parent);
      });
    }

    function traverseNode (node, parent) {

      switch (node.type) {
      case 'Program':
        // 根节点
        traverseArray(node.body, parent);
        break;
      case 'StructDeclaration':
        // 匹配service、struct、method类型节点
        if (node.name === 'service') {
          // 定义service的父节点为对象,为了更好的添加属性
          parent[node.value] = {};
          // 调用数组转换函数解析,并把父节点传入以便添加子节点
          traverseArray(node.params, parent[node.value]);
        } else if (node.name === 'method') {
          // 定义一个空函数给method节点
          parent[node.value] = function () {};
          // 在该函数下挂载一个param属性作为函数的参数列表
          parent[node.value].param = {};
          traverseArray(node.params, parent[node.value].param);
        } else if (node.name === 'struct') {
          // 定义struct的父节点为一个对象
          structs[node.value] = {};
          // 解析struct
          traverseArray(node.params, structs[node.value]);
        }
        break;
      case 'Identifier':
        // 定义参数变量
        parent[node.value] = {};
        break;
      case 'VariableDeclaration':
        // 解析参数数组
        traverseArray(node.params, parent);
        break;
      case 'DataType':
        // 参数数据类型
        parent[Object.keys(parent).pop()] = node.value;
        break;
      default:
        // 抛出未匹配到的错误
        throw new TypeError(node.type);
      }
    }

    traverseNode(ast, services);
      
    // 合并service和struct
    const serviceKeys = Object.getOwnPropertyNames(services);
    serviceKeys.some(service => {
      const methodKeys = Object.getOwnPropertyNames(services[service]);
      methodKeys.some(method => {
        Object.keys(services[service][method].param).some(p => {
          if (structs[p] !== null) {
            services[service][method].param[p] = structs[p];
            delete structs[p];
          }
        });
      });
    });

    return services;
  }

Протокол передачи

RPCСуществуют различные протоколы, которые могут бытьjson、xml、http2Что касается текстового протокола такого http1.x, http2.0, таких как двоичные протоколы, более подходящиеRPCпротокол связи прикладного уровня. много зрелыхRPCФреймворки обычно настраивают свои собственные протоколы для удовлетворения различных непредсказуемых потребностей.

НапримерThriftизTBinaryProtocol,TCompactProtocolи так далее, пользователь может самостоятельно выбрать подходящий ему протокол передачи.

Большинство компьютеров имеют байтовую адресацию (кроме байтовой адресации по словам, а также адресации и битовой адресации), здесь мы обсуждаем только байтовую адресацию. Каждая машина из-за разных систем или разных правил не является одним и тем же адресом памяти, закодированным процессором, как правило, двумя типами порядка байтов: прямым порядком байтов и прямым порядком байтов.

Большой кусок: старший байт данных сохраняется по младшему адресу.

Little endian: младший байт данных хранится в старшем адресе

Возьмите каштан:

Например целое число:258, выраженный в шестнадцатеричном виде как0x0102, мы разделяем его на два байта0x01иox02, соответствующий двоичный файл0000 0001и0000 0010. Формат хранения на компьютере с обратным порядком байтов следующий:

big

Little endian наоборот. Чтобы гарантировать, что данные, передаваемые между разными машинами, одинаковы, при разработке протокола связи сначала согласовывается использование одного из них в качестве схемы связи.java虚拟机Используется обратный порядок байтов. На машине мы звоним主机字节序, мы называем его网络字节序.网络字节序是TCP/IPФормат представления данных, указанный вCPUНезависимо от типа, операционной системы и т. д., что обеспечивает правильную интерпретацию данных при передаче между разными хостами. В сетевом порядке байтов используется обратный порядок байтов.

Мы не будем создавать колесо нового протокола прикладного уровня здесь, мы будем использовать его напрямуюMQTTпротокол в качестве нашего протокола прикладного уровня по умолчанию.MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议), который основан на发布/订阅(publish/subscribe) Режим «Легкий» протокол связи, использующий передачу Big-Endian Network Byte заказа, протокол построен наTCP/IPна договоре.

осуществлять общение

Сначала вставьте завершенный процесс вызова кода, в первую очередь на стороне сервера:

'use strict';

const pRPC = require('..');
const path = require('path');
const kiritoProto = './protocol/test.kirito';
const server = new pRPC.Server();
// 解析kirito文件生成js对象
const proto = pRPC.load(path.join(__dirname, kiritoProto));

// 定义client端可以调用的函数
function test(call, cb) {
  cb(null, {age: call.age, name: call.name});
}

// 加载kirito解析出来的对象和函数绑定,这里声明了ping的执行函数test
server.addKiritoService(proto.testService, {ping: test});

server.listen(10003);

сторона клиента:

'use strict';

const pRPC = require('..');
const path = require('path');
const kiritoProto = './protocol/test.kirito';
// 解析kirito文件生成js对象
const proto = pRPC.load(path.join(__dirname, kiritoProto));
// 分配一个client实例绑定kirito解析的对象并连接server
const client =  new pRPC.Client({host: 'localhost', port: 10003}, proto.testService);

// 调用server端的函数
client.ping({age: 23, name: 'ricky 泽阳'}, function (err, result) {
  if (err) {
    throw new Error(err.message);
  }
  console.log(result);
});

Будь то определение функции на стороне сервера или вызов функции на стороне клиента, это относительно простой шаг. Далее мы будем медленно анализировать реализацию конкретной логики.

Вставьте конкретную диаграмму архитектуры потока вызовов:

rpc

Сводка потока вызовов:

  • Клиентская сторона анализирует файл кирито и привязывает службу кирито к объекту клиента.
  • Сторона сервера анализирует файл кирито и привязывает службу и вызывающую функцию кирито к серверному объекту.
  • Клиентская сторона вызывает функцию, определенную в сервисе kirito, регистрирует событие обратного вызова и инициирует запрос MQTT.
  • Сторона сервера получает запрос MQTT, анализирует тело запроса и вызывает соответствующую функцию, чтобы инициировать запрос MQTT на сторону клиента после выполнения.
  • После того, как клиентская сторона получает MQTT-запрос, она анализирует тело и ошибку, вынимает соответствующую функцию обратного вызова из очереди событий обратного вызова и назначает ее для выполнения.

Поговорив о вызывающем процессе, давайте начнем объяснять конкретную реализацию.

server:

// protocol/mqtt.js

'use strict';

const net = require('net');
const debug = require('debug')('polix-rpc:mqtt');
const EventEmitter = require('events').EventEmitter;
const mqttCon = require('mqtt-connection');

// 定义server类,继承EventEmitter是为了更好的将模块解耦
class MQTT extends EventEmitter {

  constructor () {
    super();
    // 是否已经开启服务
    this.inited = false;
    // 函数集合
    this.events = {};
  }

  // 监听端口并开启服务
  listen (port, cb) {
    // 已经初始化了就不用再次init
    if (this.inited) {
      cb && cb(new Error('already inited.', null));
      return;
    }
    // 赋值当前作用域上下文的指针给self对象,用来在非当前作用的函数执行当前作用域的代码
    const self = this;
    // 设置初始化
    this.inited = true;
    // 实例化一个net服务
    this.server = new net.Server();
    this.port = port || 10003;
    // 监听端口
    this.server.listen(this.port);
    debug('MQTT Server is started for port: %d', this.port);
      
    // 监听error事件
    this.server.on('error', (err) => {
      debug('rpc server is error: %j', err.stack);
      self.emit('error', err);
    });
      
    // 监听连接事件
    this.server.on('connection', (stream) => {
      // 实例化mqtt对象
      const socket = mqttCon(stream);
      debug('=========== new connection ===========');
      
      // 监听mqtt服务connect事件
      socket.on('connect', () => {
        debug('connected');
        socket.connack({ returnCode: 0 });
      });

      socket.on('error', (err) => {
        debug('error : %j', err);
        socket.destroy();
      });

      socket.on('close', () => {
        debug('===========     close     ============');
        socket.destroy();
      });


      socket.on('disconnect', () => {
        debug('===========   disconnect   ============');
        socket.destroy();
      });
        
      // 监听mqtt服务publish事件,接收client端请求
      socket.on('publish', (pkg) => {
        // 消费client端的请求
        self.consumers(pkg, socket);
      });
    });
  }
    
  // 消费client端的请求
  consumers (pkg, socket) {
    // 赋值当前作用的指针给self对象
    const self = this;
    // 将client的数据包转成json字符,字节序不同的处理已经在mqtt的底层转换好了
    let content = pkg.payload.toString();
    debug(content);
    content = JSON.parse(content);
    // 定义响应数据包
    const respMsg = {
      msgId: content.msgId
    };
    // 如果请求调用的函数不存在则加上错误消息响应回去client端
    if (this.events[content.method] === null) {
      // 定义调用错误消息
      respMsg.error = {
        message: `not found ${content.method} method`
      };
      // 推送到client端
      self.response(socket, {messageId: pkg.messageId, body: respMsg});
    } else {
      // 如果存在有效的函数则准备调用
      const fn = this.events[content.method].method;
      // 设置调用函数的回调事件,用来处理调用函数完成后的参数返回
      const callback = function (err, result) {
        // 获取调用完后的参数结果
        respMsg.body = result;
        // 推送到client端
        self.response(socket, {messageId: pkg.messageId, body: respMsg});
      };
      // 执行调用参数
      fn.call(fn, content.body, callback);
    }
  }
    
  // 推送调用结果数据包给client端
  response (socket, result) {
    socket.publish({
      topic: 'rpc',
      qos: 1,
      messageId: result.messageId,
      payload: JSON.stringify(result.body)
    });
  }


  // 绑定kirito定义的函数集合
  addEvent (events) {
    const eventKeys = Object.getOwnPropertyNames(events);
    eventKeys.some(event => {
      this.events[event] = {
        method: events[event].method,
        param: events[event].param
      };
    });
  }

}

module.exports.create = function () {
  return new MQTT();
};

Интерфейс протокола определен, и этот уровень добавлен для будущих мультипротоколов. mqtt — это только протокол, используемый по умолчанию:

// protocol.js

'use strict';

const mqtt = require('./protocol/mqtt');

module.exports.create = function (opts = {}) {
  return mqtt.create(opts);
};

Далее идет открытый интерфейс на стороне сервера:

// index.js

'use strict';

const protocol = require('./protocol.js');

class Server {

  constructor () {
    // 实例化协议对象
    this.server = protocol.create();
  }
    
  // 将kirito定义的接口和函数集合绑定
  addKiritoService (service, methods) {
    const serviceKeys = Object.getOwnPropertyNames(service);
    const methodKeys = Object.getOwnPropertyNames(methods);
    const events = {};
    serviceKeys.some(method => {
      let idx = -1;
      if ((idx = methodKeys.indexOf(method)) !== -1) {
        events[method] = {
          method: methods[method],
          param: service[method].param
        };
        methodKeys.splice(idx, 1);
      }
    });
    if (Object.keys(events).length > 0) {
      this.server.addEvent(events);
    }
  }

  listen (port) {
    this.server.listen(port);
  }

}

module.exports = Server;

client:

// protocol/mqtt.js

'use strict';

const net = require('net');
const debug = require('debug')('polix-rpc:mqtt');
const EventEmitter = require('events').EventEmitter;
const mqttCon = require('mqtt-connection');

class MQTT extends EventEmitter {

  constructor (server) {
    super();
    // 获取server端连接信息
    this.host = server.host || 'localhost';
    this.port = server.port || 10003;
    // 是否服务已连接
    this.connected = false;
    // 是否服务已关闭
    this.closed = false;
  }
    
  // 连接server服务
  connect (cb) {
    // 连接了就不用再次执行连接
    if (this.connected) {
      cb && cb (new Error('mqtt rpc has already connected'), null);
      return;
    }

    // 复制当前作用域上下文的指针给self变量
    const self = this;
    // 获取net服务连接流
    const stream = net.createConnection(this.port, this.host);
    // 初始化mqtt服务
    this.socket = mqttCon(stream);
    // 监听conack事件
    this.socket.on('connack', (pkg) => {
      debug('conack: %j', pkg);
    });

    // 监听error事件
    this.socket.on('error', function (err) {
      debug('error: %j', err);
    });


    // 监听publish事件,接收server端调用函数结果的返回数据
    this.socket.on('publish', (pkg) => {
      // 将数据包转成json字符
      const content = pkg.payload.toString();
      debug(content);
      // 将数据转发到MQTT的对象事件上
      this.emit('data', JSON.parse(content));
    });

    // 监听puback事件
    this.socket.on('puback', (pkg) => {
      debug('puback: %j', pkg);
    });

    // 发起连接
    this.socket.connect({
      clientId: 'MQTT_RPC_' + Math.round(new Date().getTime() / 1000)
    }, () => {
      if (self.connected) {
        return;
      }
        
      // 设置已连接
      self.connected = true;

      cb && cb(null, {connected: self.connected});
    });
  }
    
  // 发起调用函数请求
  send (param) {
    this.socket.publish({
      topic: 'rpc',
      qos: 1,
      messageId: 1,
      payload: JSON.stringify(param || {})
    });
  }

  // 关闭连接
  close () {
    if (this.closed) {
      return;
    }
    this.closed = true;
    this.connected = false;
    this.socket.destroy();
  }

}

module.exports.create = function (server) {
  return new MQTT(server || {});
};

Определите интерфейс протокола:

// protocol.js

'use strict';

const mqtt = require('./protocol/mqtt');

module.exports.create = function (opts = {}) {
  return mqtt.create(opts);
};

Наконец, интерфейс, предоставляемый клиентской стороной:

'use strict';

const protocol = require('./protocol.js');
const connect = Symbol.for('connect');
const uuid = require('uuid/v1');

class Client {

  constructor(opts, service) {
    // 声明client实例
    this.client = void(0);
    // 调用协议连接接口
    this[connect](opts, service);
    // 定义回调参数集合
    this.callQueues = {};
  }

  // 连接server
  [connect] (opts, service) {
    // 初始化协议服务
    this.client = protocol.create(opts);
    // 发起连接
    this.client.connect((err) => {
      if (err) {
        throw new Error(err);
      }
    });
      
    // 复制当前作用域的上下文指针给self对象
    const self = this;

    // 监听协议data时间,接收协议转发server端响应的数据
    this.client.on('data', function (result) {
      // 听过msgId取出回调函数
      const fn = self.callQueues[result.msgId];
      // 如果有调用错误信息,则直接回调错误
      if (result.error) {
        return fn.call(fn, result.error, null);
      }
      // 执行回调
      fn.call(fn, null, result.body);
    });
    // 绑定kirito定义的接口参数到协议对象中
    const serviceKeys = Object.getOwnPropertyNames(service);
    serviceKeys.some(method => {
      // 增加client端的函数,对应server端的调用函数
      self[method] = function () {
        // 取出发送的数据
        const reqMsg = arguments[0];
        // 取出回调函数
        const fn = arguments[1];
        const paramKey = Object.getOwnPropertyNames(service[method].param);
        paramKey.some((param) => {
          if (reqMsg[param] === null) {
            throw new Error(`Parameters '${param}' are missing`);
          }
          // todo 类型判断及转换
        });
        // 为每个请求标记
        const msgId = uuid();
        // 注册该请求的回调函数到回调队列中
        self.callQueues[msgId] = fn;
        // 发起调用函数请求
        self.client.send({method, msgId, body: reqMsg});
      };
    });
  }

}

module.exports = Client;

Таким образом создается простая структура IDL+RPC. Это просто для описания принципа RPC и общих методов вызова.Если вы хотите использовать его в разработке на уровне предприятия, вам нужно добавить обнаружение службы, регистрацию, предохранитель службы, переход на более раннюю версию службы и т. д. Если вы заинтересованы, вы можете разветвите его на Github или упомяните об этом PR для улучшения этого фреймворка, если у вас есть какие-либо вопросы, вы также можете поднять Issue, конечно, PR лучше всего :) .

Адрес склада:

RPC: GitHub.com/exception уездный город/рваный…

IDL: GitHub.com/Рик тоже/K IR…


Любые вопросы можно задать на CNode:Супер нет JS.org/topic/5 нет 63 нет…