Сервис Go-Kit Micro: регистрация и обнаружение сервисов

Микросервисы
Сервис Go-Kit Micro: регистрация и обнаружение сервисов

В архитектуре микросервисов первоначальный единый сервис разбивается на несколько микросервисов и развертывается независимо, и клиент не может знать конкретное местоположение сервиса; количество сервисов слишком велико, поддерживается так много адресов сервисов, и операция и обслуживающий персонал не может работать эффективно.

Поэтому в архитектуру микросервиса вводится реестр служб, чтобы принимать и поддерживать адресную информацию каждой службы. Клиент или шлюз могут запрашивать целевой адрес службы через центр регистрации, динамически реализовывать доступ к службе и реализовывать здесь балансировку нагрузки службы.

Для регистрации и обнаружения сервисов go-kit по умолчанию поддерживает общие реестры consul, zookeeper, etcd и eureka.

Обзор

Эта статья будет основана на консуле и использует «режим обнаружения клиентов» для проведения практических занятий.Основные моменты заключаются в следующем:

  • Консул развернут с использованием образа докера (Progium / Consul).
  • Арифметические услуги регистрируются у консула под названием арифметика;
  • Напишите службу обнаружения, чтобы запрашивать экземпляры службы через консул и выполнять балансировку нагрузки на основе RoundRibbon.

Идея, используемая в примере программы в этой статье, такова: арифметический сервис регистрируется в консуле, а остальные части остаются неизменными; обнаруживается, что сервис выставляет http-интерфейс, и после принятия запроса (содержимое полученного запроса сохраняется в теле и передается в json), согласно go-kit. Механизм динамически запрашивает экземпляр арифметической службы, вызывает интерфейс арифметической службы и возвращает содержимое ответа. Как показано ниже:

начать консул

  1. Исправлятьdocker/docker-compose.yml, как показано ниже (разделы Prometheus и Grafana пока закомментированы).
version: '2'

services:
  consul:
    image: progrium/consul:latest
    ports:
      - 8400:8400
      - 8500:8500
      - 8600:53/udp
    hostname: consulserver
    command: -server -bootstrap -ui-dir /ui
  1. Начните докера. Переключитесь в каталог проекта терминала, выполните следующую команду:
sudo docker-compose -f docker/docker-compose.yml up
  1. Доступ через браузерhttp://localhost:8500, появится следующий интерфейс, что означает, что запуск прошел успешно.

регистрация службы

Шаг 1: Подготовка кода

Этот пример основан наarithmetic_monitor_demoКод переписан. Сначала скопируйте каталог и переименуйте его вarithmetic_consul_demo; Создайте два каталога с именамиregister,discover; оригиналgoфайл кода перемещен вregisterсодержание. Результат показан ниже:

Кроме того, вам необходимо скачать зависимые сторонние библиотекиuuidиhashicorp/consul

go get github.com/pborman/uuid
go get github.com/hashicorp/consul

Шаг 2: Реализуйте метод регистрации

новыйregister/register.go,Добавить кRegisterметод для реализации логики регистрации в консуле. Этот метод получает 5 параметров, а именно ip и порт консула реестра, локальный ip и порт арифметического сервиса и инструмент логирования.

Для создания объекта регистрации необходимо использоватьhashicorp/consul, посмотрите на код, чтобы увидеть, что его метод определен следующим образом:

func NewRegistrar(client Client, r *stdconsul.AgentServiceRegistration, logger log.Logger) *Registrar

такRegisterВ процессе реализации есть три основных шага: создание объекта клиента консула, создание информации о конфигурации параметров для проверки работоспособности службы арифметики с помощью консула, создание информации о конфигурации службы для службы арифметики для регистрации в консуле. код показывает, как показано ниже:

func Register(consulHost, consulPort, svcHost, svcPort string, logger log.Logger) (registar sd.Registrar) {

	// 创建Consul客户端连接
	var client consul.Client
	{
		consulCfg := api.DefaultConfig()
		consulCfg.Address = consulHost + ":" + consulPort
		consulClient, err := api.NewClient(consulCfg)
		if err != nil {
			logger.Log("create consul client error:", err)
			os.Exit(1)
		}

		client = consul.NewClient(consulClient)
	}

	// 设置Consul对服务健康检查的参数
	check := api.AgentServiceCheck{
		HTTP:     "http://" + svcHost + ":" + svcPort + "/health",
		Interval: "10s",
		Timeout:  "1s",
		Notes:    "Consul check service health status.",
	}

	port, _ := strconv.Atoi(svcPort)

	//设置微服务想Consul的注册信息
	reg := api.AgentServiceRegistration{
		ID:      "arithmetic" + uuid.New(),
		Name:    "arithmetic",
		Address: svcHost,
		Port:    port,
		Tags:    []string{"arithmetic", "raysonxin"},
		Check:   &check,
	}

	// 执行注册
	registar = consul.NewRegistrar(client, &reg, logger)
	return
}

Шаг 3. Внедрите интерфейс проверки работоспособности.

Зависит отStep-2Видно, что консул будет периодически запрашивать арифметический сервис/heathИспользуется для проверки работоспособности службы, поэтому начнем сservice,endpoint,transportДобавьте соответствующую реализацию в .

  1. в интерфейсеServiceНовый метод интерфейса вHealthCheck, и, в свою очередьArithmeticService,loggingMiddleware,metricMiddlewareдобавить реализацию.
// service接口
// Service Define a service interface
type Service interface {

	//省略之前的其他方法

	// HealthCheck check service health status
	HealthCheck() bool
}

// ArithmeticService实现HealthCheck
// HealthCheck implement Service method
// 用于检查服务的健康状态,这里仅仅返回true。
func (s ArithmeticService) HealthCheck() bool {
	return true
}

// loggingMiddleware实现HealthCheck
func (mw loggingMiddleware) HealthCheck() (result bool) {
	defer func(begin time.Time) {
		mw.logger.Log(
			"function", "HealthChcek",
			"result", result,
			"took", time.Since(begin),
		)
	}(time.Now())
	result = mw.Service.HealthCheck()
	return
}

// metricMiddleware实现HealthCheck
func (mw metricMiddleware) HealthCheck() (result bool) {

	defer func(begin time.Time) {
		lvs := []string{"method", "HealthCheck"}
		mw.requestCount.With(lvs...).Add(1)
		mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds())
	}(time.Now())

	result = mw.Service.HealthCheck()
	return
}
  1. существуетendpoints.goНовая структура в:ArithmeticEndpoints. В предыдущем примере используется только одна конечная точка, поэтому я использую структуру напрямую.endpoint.Endpoint. Он определяется следующим образом:
// ArithmeticEndpoint define endpoint
type ArithmeticEndpoints struct {
	ArithmeticEndpoint  endpoint.Endpoint
	HealthCheckEndpoint endpoint.Endpoint
}
  1. Запрос на создание объектов ответа проверки работоспособности, а также соответствующихendpoint.Endpointспособ упаковки. код показывает, как показано ниже:
// HealthRequest 健康检查请求结构
type HealthRequest struct{}

// HealthResponse 健康检查响应结构
type HealthResponse struct {
	Status bool `json:"status"`
}

// MakeHealthCheckEndpoint 创建健康检查Endpoint
func MakeHealthCheckEndpoint(svc Service) endpoint.Endpoint {
	return func(ctx context.Context, request interface{}) (response interface{}, err error) {
		status := svc.HealthCheck()
		return HealthResponse{status}, nil
	}
}
  1. существуетtransports.goДобавлен интерфейс проверки работоспособности в/health.
// MakeHttpHandler make http handler use mux
func MakeHttpHandler(ctx context.Context, endpoints ArithmeticEndpoints, logger log.Logger) http.Handler {
	r := mux.NewRouter()

	//省略原有/calculate/{type}/{a}/{b}代码

	// create health check handler
	r.Methods("GET").Path("/health").Handler(kithttp.NewServer(
		endpoints.HealthCheckEndpoint,
		decodeHealthCheckRequest,
		encodeArithmeticResponse,
		options...,
	))

	return r
}

Шаг 4: Изменить main.go

следующий вmain.goДобавьте вызывающий код, связанный с проверкой работоспособности и регистрацией службы, в , чтобы описанная выше логика изменения вступила в силу.

  1. медицинское обследование.
//创建健康检查的Endpoint,未增加限流
healthEndpoint := MakeHealthCheckEndpoint(svc)

//把算术运算Endpoint和健康检查Endpoint封装至ArithmeticEndpoints
endpts := ArithmeticEndpoints{
	ArithmeticEndpoint:  endpoint,
	HealthCheckEndpoint: healthEndpoint,
}

//创建http.Handler
r := MakeHttpHandler(ctx, endpts, logger)
  1. Регистрация службы. Подготовьте переменные среды, необходимые службе, создайте объект регистрации, зарегистрируйтесь в консуле до запуска службы и отмените регистрацию в консуле после выхода из службы. Ниже размещена только часть кода, а полный код можно получить на github.
// 定义环境变量
var (
	consulHost  = flag.String("consul.host", "", "consul ip address")
	consulPort  = flag.String("consul.port", "", "consul port")
	serviceHost = flag.String("service.host", "", "service ip address")
	servicePort = flag.String("service.port", "", "service port")
)
// parse
flag.Parse()

// ...

//创建注册对象
registar := Register(*consulHost, *consulPort, *serviceHost, *servicePort, logger)

go func() {
	fmt.Println("Http Server start at port:" + *servicePort)
    //启动前执行注册
	registar.Register()
	handler := r
	errChan <- http.ListenAndServe(":"+*servicePort, handler)
}()

go func() {
	c := make(chan os.Signal, 1)
	signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
	errChan <- fmt.Errorf("%s", <-c)
}()

error := <-errChan
//服务退出,取消注册
registar.Deregister()
fmt.Println(error)

Шаг 5: Скомпилируйте и запустите

Откройте терминал и перейдите в каталог проекта. воплощать в жизньgo build ./registerПосле успешной компиляции введите следующую команду для запуска службы арифметики (службы регистрации):

./register -consul.host localhost -consul.port 8500 -service.host 192.168.192.145 -service.port 9000

После успешного запуска обновите сноваconsul-uiИнтерфейс, см. Следующий интерфейс, арифметическая служба успешно зарегистрирована в консул.

При этом на терминале, где работает служба регистрации, также можно увидеть обычный вызов консула./healthЖурнал выводит информацию об интерфейсе:

обнаружение службы

discoverСлужба, которую необходимо выполнить: в интерфейсе REST/calculateЧтобы предоставить услуги API внешнему миру, клиент использует метод HTTP POST для отправки данных json для выполнения запроса; запрашивает экземпляр службы, который был зарегистрирован в консуле в конечной точке; затем выбирает соответствующий экземпляр службы для пересылки запроса на это; после завершения запроса запросите ответ от исходного клиента.

Проверьте исходный код go-kit, чтобы увидеть,kit/sd/EndpointerПредоставляется набор механизмов обнаружения служб, а его определение и интерфейс создания следующие:

// Endpointer listens to a service discovery system and yields a set of
// identical endpoints on demand. An error indicates a problem with connectivity
// to the service discovery system, or within the system itself; an Endpointer
// may yield no endpoints without error.
type Endpointer interface {
	Endpoints() ([]endpoint.Endpoint, error)
}

// NewEndpointer creates an Endpointer that subscribes to updates from Instancer src
// and uses factory f to create Endpoints. If src notifies of an error, the Endpointer
// keeps returning previously created Endpoints assuming they are still good, unless
// this behavior is disabled via InvalidateOnError option.
func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer

Из комментариев к коду мы можем узнать: Endpointer обнаруживает информацию о событиях системы, прослушивая службу, и создает конечные точки службы по запросу через фабрику (Endpoint).

Итак, нам нужно пройтиEndpointerреализовать функцию обнаружения сервисов. В режиме микросервиса может быть несколько экземпляров одного и того же сервиса, поэтому выбор экземпляра необходимо выполнять через механизм балансировки нагрузки, здесь используется набор инструментов go-kit.kit/sd/lbКомпонент (этот компонент реализует RoundRibbon и имеет функцию Retry).

Шаг 1: Создайте фабрику

существуетdiscoverСоздайте файл go в каталогеfactory.go,выполнитьsd.FactoryЛогика состоит в том, чтобы преобразовать экземпляр службы в конечную точку и реализовать вызывающий процесс для целевой службы в конечной точке. Здесь непосредственно инкапсулирована служба арифметических операций, код выглядит следующим образом:

func arithmeticFactory(_ context.Context, method, path string) sd.Factory {
	return func(instance string) (endpoint endpoint.Endpoint, closer io.Closer, err error) {
		if !strings.HasPrefix(instance, "http") {
			instance = "http://" + instance
		}

		tgt, err := url.Parse(instance)
		if err != nil {
			return nil, nil, err
		}
		tgt.Path = path

		var (
			enc kithttp.EncodeRequestFunc
			dec kithttp.DecodeResponseFunc
		)
		enc, dec = encodeArithmeticRequest, decodeArithmeticReponse

		return kithttp.NewClient(method, tgt, enc, dec).Endpoint(), nil, nil
	}
}

func encodeArithmeticRequest(_ context.Context, req *http.Request, request interface{}) error {
	arithReq := request.(ArithmeticRequest)
	p := "/" + arithReq.RequestType + "/" + strconv.Itoa(arithReq.A) + "/" + strconv.Itoa(arithReq.B)
	req.URL.Path += p
	return nil
}

func decodeArithmeticReponse(_ context.Context, resp *http.Response) (interface{}, error) {
	var response ArithmeticResponse
	var s map[string]interface{}

	if respCode := resp.StatusCode; respCode >= 400 {
		if err := json.NewDecoder(resp.Body).Decode(&s); err != nil {
			return nil, err
		}
		return nil, errors.New(s["error"].(string) + "\n")
	}

	if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
		return nil, err
	}
	return response, nil
}

Шаг 2. Создайте конечную точку.

создать файл godiscover/enpoints.go. Согласно приведенному выше анализу, конечная точка осуществляет мониторинг системы обнаружения служб, реализует выбор экземпляра и, наконец, возвращает исполняемый файл.endpoint.Endpoint. Процесс реализации описан ниже в соответствии с комментариями кода:

// MakeDiscoverEndpoint 使用consul.Client创建服务发现Endpoint
// 为了方便这里默认了一些参数
func MakeDiscoverEndpoint(ctx context.Context, client consul.Client, logger log.Logger) endpoint.Endpoint {
	serviceName := "arithmetic"
	tags := []string{"arithmetic", "raysonxin"}
	passingOnly := true
	duration := 500 * time.Millisecond

	//基于consul客户端、服务名称、服务标签等信息,
	// 创建consul的连接实例,
	// 可实时查询服务实例的状态信息
	instancer := consul.NewInstancer(client, logger, serviceName, tags, passingOnly)

	//针对calculate接口创建sd.Factory
	factory := arithmeticFactory(ctx, "POST", "calculate")

	//使用consul连接实例(发现服务系统)、factory创建sd.Factory
	endpointer := sd.NewEndpointer(instancer, factory, logger)

	//创建RoundRibbon负载均衡器
	balancer := lb.NewRoundRobin(endpointer)

	//为负载均衡器增加重试功能,同时该对象为endpoint.Endpoint
	retry := lb.Retry(1, duration, balancer)

	return retry
}

Шаг 3. Создайте транспорт

создать файл godiscover/transports.go. пройти черезmux/RouterПредоставление интерфейса REST для службы обнаружения с использованием метода POST/calculate, Как и в случае с арифметическими услугами, где требуетсяendpoint.Endpoint,DecodeRequestFunc,EncodeResponseFunc. Для удобства я напрямую скопировал структуру запроса и ответа и методы кодирования и декодирования в арифметическом сервисе. Код выглядит следующим образом:

func MakeHttpHandler(endpoint endpoint.Endpoint) http.Handler {
	r := mux.NewRouter()

	r.Methods("POST").Path("/calculate").Handler(kithttp.NewServer(
		endpoint,
		decodeDiscoverRequest,
		encodeDiscoverResponse,
	))

	return r
}

// 省略实体结构和编解码方法

Шаг 4: Напишите основной метод

Следующим шагом является объединение вышеуказанной логики в методе main, а затем запуск службы обнаружения, где порт прослушивания — 9001. Относительно просто, вставьте код напрямую:

func main() {

	// 创建环境变量
	var (
		consulHost = flag.String("consul.host", "", "consul server ip address")
		consulPort = flag.String("consul.port", "", "consul server port")
	)
	flag.Parse()

	//创建日志组件
	var logger log.Logger
	{
		logger = log.NewLogfmtLogger(os.Stderr)
		logger = log.With(logger, "ts", log.DefaultTimestampUTC)
		logger = log.With(logger, "caller", log.DefaultCaller)
	}

	//创建consul客户端对象
	var client consul.Client
	{
		consulConfig := api.DefaultConfig()

		consulConfig.Address = "http://" + *consulHost + ":" + *consulPort
		consulClient, err := api.NewClient(consulConfig)

		if err != nil {
			logger.Log("err", err)
			os.Exit(1)
		}
		client = consul.NewClient(consulClient)
	}

	ctx := context.Background()

	//创建Endpoint
	discoverEndpoint := MakeDiscoverEndpoint(ctx, client, logger)

	//创建传输层
	r := MakeHttpHandler(discoverEndpoint)

	errc := make(chan error)
	go func() {
		c := make(chan os.Signal)
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
		errc <- fmt.Errorf("%s", <-c)
	}()

	//开始监听
	go func() {
		logger.Log("transport", "HTTP", "addr", "9001")
		errc <- http.ListenAndServe(":9001", r)
	}()

	// 开始运行,等待结束
	logger.Log("exit", <-errc)
}

Шаг 5: Скомпилируйте и запустите

Переключитесь на терминал, чтобыdiscoverкаталог, выполнитьgo buildЗавершите компиляцию, затем запустите службу обнаружения следующей командой (укажите адрес службы реестра):

./discover -consul.host localhost -consul.port 8500

запросить тест

использовать запрос почтальонаhttp://localhost:9001/calculate,существуетbodyВведите информацию о запросе и завершите тест. Как показано ниже:

Суммировать

В этой статье в качестве реестра используется консул, а функции регистрации и обнаружения службы go-kit демонстрируются на примерах. Поскольку мое понимание этой части недостаточно глубоко, в процессе написания кода и этой статьи я изучал метод проектирования компонентов обнаружения go-kit и стремился ясно объяснить с помощью кода и текста. Мой уровень ограничен, если есть какие-то ошибки или неуместности прошу критиковать и исправлять меня.

См. пример кода этой статьиarithmetic_consul_demo.

Эта статья была впервые опубликована в моей публичной учетной записи WeChat [Xi Yiang Bar], пожалуйста, отсканируйте код, чтобы следовать!