Go Microservices: обмен сообщениями на основе RabbitMQ и AMQP

Микросервисы Go Docker RabbitMQ

Приветствую всех вОблако Tencent + сообщество, получить больше крупной технической практики Tencent по галантерее ~

Эта статья взята изОблако + Сообщество Бюро переводов,Зависит отTnecesocкомпилировать.

вводить

Микросервисы — это разделение бизнес-области приложения на отдельные сценарии с четко определенными областями и запуск этих сценариев в отдельных процессах, так что любые постоянные отношения между границами должны основываться на возможной согласованности, а не на ACID-подобных транзакциях или ограничениях внешнего ключа. Многие из этих концепций исходят изДизайн, управляемый доменом(DDD) или вдохновлено им. Но DDD — это тема, для освещения которой потребовалась бы целая серия блогов, поэтому я не буду упоминать ее здесь.

В контексте нашей серии блогов Go Microservices и архитектуры микрослужб одним из способов достижения слабой связи между службами является введение механизма передачи сообщений для служб, которым не нужно следовать строгим обменам сообщениями типа запрос/ответ или аналогичным механизмам связи. Здесь следует подчеркнуть, что внедрение механизма передачи сообщений — это лишь одна из многих стратегий, которые можно использовать для достижения слабой связи между службами.

Как мы делаем в нашей серии блоговГлава 8Как видите, в Spring Cloud сервер конфигурации Spring Cloud использует RabbitMQ в качестве зависимости времени выполнения, поэтому RabbitMQ должен быть хорошим брокером сообщений.

Что касается этой главы этой серии блогов, наша «служба учетных записей» помещает сообщение в обмен RabbitMQ при чтении определенного объекта учетной записи. Эти новости будут обрабатываться совершенно новым микросервисом, о котором мы напишем в этом блоге. Мы также поместим некоторый код языка Go в «общую» библиотеку, чтобы обеспечить повторное использование кода в сценариях с несколькими микросервисами.

Помните картинку системы из первой части? Вот как это выглядит после завершения частей этой главы:

img

Нам предстоит проделать большую работу, прежде чем эта часть будет завершена, но мы справимся.

исходный код

В этой части будет много нового кода, и мы вряд ли сможем уместить его весь в пост в блоге. Чтобы получить полный исходный код, загрузите его с помощью команды git clone и переключитесь на ветку главы 9:

git checkout P9

Отправить сообщение

Мы реализуем простой вариант использования моделирования: когда некоторые учетные записи «VIP» считываются в «службе учетных записей», мы хотим запустить службу «VIP-предложение». Эта служба генерирует «предложение» для владельца счета при определенных обстоятельствах. В правильно спроектированной доменной модели объект учетных записей и объект VIP-предложения представляют собой два отдельных домена, которые должны знать друг о друге как можно меньше.

img

Эта учетная запись должнаникогдаПрямой доступ к хранилищу (т.е. предложениям) VIP-сервиса. В этом случае мы должны передать сообщение «vip-сервису» на RabbitMQ и полностью делегировать бизнес-логику и постоянное хранилище «vip-сервису».

мы будем использоватьAMQP протокол для всех коммуникаций,AMQP Этот протокол представляет собой протокол прикладного уровня в качестве стандарта ISO, и передача сообщений, которую он реализует, может обеспечить функциональную совместимость системы. Здесь мы продолжаем настройки, которые мы использовали в главе 8, когда мы имели дело с обновлениями конфигурации, выбравstreadway / amqp Это библиотека AMQP.

Давайте вспомним взаимосвязь между биржами и издателями, потребителями и очередями в AMQP:

img

Издатель опубликует сообщение в точке обмена, которая отправит копию сообщения в очередь (или распределит по нескольким очередям) в соответствии с определенными правилами маршрутизации или информацией о привязке, зарегистрированной у соответствующего потребителя. Для этого Кораэтот ответЕсть хорошее объяснение.

код, связанный с обменом сообщениями

Поскольку мы хотим использовать новый и существующий код для загрузки необходимой нам конфигурации из файлов конфигурации Spring Cloud внутри нашей существующей службы учетных записей и новой службы vip, мы создадим здесь нашу первую общую библиотеку.

Сначала создайте новую общую папку в /goblog для хранения повторно используемого контента:

mkdir -p common/messaging
mkdir -p common/config

Мы помещаем весь код, связанный с AMQP, в папку обмена сообщениями, а файлы конфигурации — в папку config. Вы также можете скопировать содержимое /goblog/accountservice/config в /goblog/common/config — имейте в виду, что для этого нам потребуется обновить предыдущий оператор импорта, который импортировал код конфигурации из службы учетных записей. Давайте взглянемполный исходный кодПосмотрите, что написано в этом разделе.

Код, связанный с обменом сообщениями, инкапсулирован в файл, определяющий интерфейсы и фактические реализации, которые приложения используют для подключения, публикации и подписки на сообщения. На самом деле используемый нами streadway/amqp уже предоставляет много шаблонного кода, необходимого для реализации обмена сообщениями AMQP, поэтому не будем вдаваться в подробности этой части.

Создайте новый файл *.*go в /goblog/common/messaging:messagingclient.go.

Посмотрим, что должно быть в основном:

// 定义用来连接、发布消息、消费消息的接口
type IMessagingClient interface {
        ConnectToBroker(connectionString string)
        Publish(msg []byte, exchangeName string, exchangeType string) error
        PublishOnQueue(msg []byte, queueName string) error
        Subscribe(exchangeName string, exchangeType string, consumerName string, handlerFunc func(amqp.Delivery)) error
        SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error
        Close()
}

Приведенный выше код определяет интерфейс сообщений, который мы используем. Это то, что будет обрабатывать наша «служба учетных записей» и «служба VIP» при обмене сообщениями, абстрагируя большую часть сложности системы. Обратите внимание, что я выбрал два варианта «Производить» и «Потреблять» для использования с темами подписки/публикации и шаблонами прямого/очередного обмена сообщениями.

Далее мы определим структуру, которая будет содержать указатели наamqp.Connectionуказатель, и мы добавим необходимые методы, чтобы он (неявно, Go всегда им был) реализовывал интерфейс, который мы только что объявили.

// 接口实现,封装了一个指向 amqp.Connection 的指针
type MessagingClient struct {
        conn *amqp.Connection
}

Реализация интерфейса довольно подробная, здесь приведены только две из них —ConnectToBroker()иPublishToQueue():

func (m *MessagingClient) ConnectToBroker(connectionString string) {
        if connectionString == "" {
                panic("Cannot initialize connection to broker, connectionString not set. Have you initialized?")
        }
        var err error
        m.conn, err = amqp.Dial(fmt.Sprintf("%s/", connectionString))
        if err != nil {
                panic("Failed to connect to AMQP compatible broker at: " + connectionString)
        }
}

Вот как мы получаем указатель соединения (как вamqp.Dial) Методы. Если мы потеряем файл конфигурации или не сможем подключиться к ретранслятору, микросервис выдаст исключение паники и заставит координатора контейнера воссоздать новый экземпляр. Переданный здесь параметр connectionString показан в следующем примере:

amqp://guest:guest@rabbitmq:5672/

Обратите внимание, что брокер rabbitmq здесь работает в сервисном режиме Docker Swarm.

PublishOnQueue()Функция очень длинная - она ​​такая же, как и официальнаяпример бродягиОн более-менее другой, ведь здесь некоторые его параметры упрощены. Чтобы опубликовать сообщение в именованной очереди, нам нужно передать следующие параметры:

  • body - существует в виде массива байтов. Может быть JSON, XML или какой-нибудь бинарный файл.
  • queueName — имя очереди, в которую нужно отправить сообщение.

Дополнительные сведения о переключателях см.Документация по RabbitMQ.

func (m *MessagingClient) PublishOnQueue(body []byte, queueName string) error {
        if m.conn == nil {
                panic("Tried to send message before connection was initialized. Don't do that.")
        }
        ch, err := m.conn.Channel()      // 从 connection 里获得一个 channel 对象
        defer ch.Close()
        // 提供一些参数声明一个队列,若相应的队列不存在,那就创建一个
        queue, err := ch.QueueDeclare(
                queueName, // 队列名
                false, // 是否持久存在
                false, // 是否在不用时就删掉
                false, // 是否排外
                false, // 是否无等待
                nil, // 其他参数
        )
        // 往队列发布消息
        err = ch.Publish(
                "", // 目标为默认的交换器
                queue.Name, // 路由关键字,例如队列名
                false, // 必须发布
                false, // 立即发布
                amqp.Publishing{
                        ContentType: "application/json",
                        Body:        body, // JSON 正文, 以 byte[] 形式给出
                })
        fmt.Printf("A message was sent to queue %v: %v", queueName, body)
        return err
}

Здесь немного больше кода шаблона, но его не должно быть сложно понять. Этот код объявит очередь (создаст ее, если она не существует), а затем опубликует в ней наше сообщение в виде массива байтов.

Код для публикации сообщения в именованном обмене более сложен, так как для него требуется фрагмент кода шаблона, чтобы объявить обмен и очередь и связать их вместе. вот копияполный исходный кодПример.

Далее, поскольку фактическим пользователем нашего «MessageClient» будет /goblog/accountservice/service/handlers.go , мы добавим к нему еще одно поле и жестко закодируем его, когда запрашиваемая учетная запись имеет идентификатор «10000». Проверка «является VIP» Метод в программе отправляет сообщение:

var DBClient dbclient.IBoltClient
var MessagingClient messaging.IMessagingClient     // NEW
func GetAccount(w http.ResponseWriter, r *http.Request) {
     ...

Потом:

    ...
    notifyVIP(account)   // 并行地发送 VIP 消息
    // 若有这样的 account, 那就把它弄成一个 JSON, 然后附上首部和其他内容来打包
    data, _ := json.Marshal(account)
    writeJsonResponse(w, http.StatusOK, data)
}
// 如果这个 account 是我们硬编码进来的 account, 那就开个协程来发送消息
func notifyVIP(account model.Account) {
        if account.Id == "10000" {
                go func(account model.Account) {
                        vipNotification := model.VipNotification{AccountId: account.Id, ReadAt: time.Now().UTC().String()}
                        data, _ := json.Marshal(vipNotification)
                        err := MessagingClient.PublishOnQueue(data, "vipQueue")
                        if err != nil {
                                fmt.Println(err.Error())
                        }
                }(account)
        }
}

Просто воспользуйтесь этой возможностью, чтобы показать встроенную анонимную функцию, используемую при вызове новой сопрограммы (горутины), т.е.goключевые слова. Мы не можем заблокировать «основную» сопрограмму только для того, чтобы выполнить обработчик HTTP для отправки сообщения, поэтому сейчас самое время добавить немного параллелизма.

main.go также необходимо обновить, чтобы соединение AMQ можно было инициализировать при запуске с загрузкой информации о конфигурации и вводом ее в Viper.

// 在 main 方法里面调用这个函数
func initializeMessaging() {
if !viper.IsSet("amqp_server_url") {
panic("No 'amqp_server_url' set in configuration, cannot start")
}
service.MessagingClient = &messaging.MessagingClient{}
service.MessagingClient.ConnectToBroker(viper.GetString("amqp_server_url"))
service.MessagingClient.Subscribe(viper.GetString("config_event_bus"), "topic", appName, config.HandleRefreshEvent)
}

Этот абзац не имеет особого смысла — мы вызываем его, создавая пустую структуру обмена сообщениями и используя значения свойств, полученные от Viper.ConnectToBrokerполучитьservice.MessagingClientпример. Если наша конфигурация неbroker_url, то кидать паническое исключение, ведь программа не может запуститься при невозможности подключения к ретранслятору.

Обновить конфигурацию

В части 8 мы имеемamqp_broker_urlСвойство добавляется в наш файл конфигурации .yml, так что этот шаг фактически выполнен.

broker_url: amqp://guest:guest@192.168.99.100:5672 _(dev)_   
broker_url: amqp://guest:guest@rabbitmq:5672 _(test)_

Обратите внимание, что мы заполнили «тестовый» файл конфигурации именем службы Swarm «rabbitmq», а не IP-адресом локальной сети Swarm, как видно с моего компьютера. (Ваш фактический IP-адрес должен отличаться, но 192.168.99.100 кажется стандартным при запуске Docker Toolbox).

Мы не рекомендуем заполнять открытый текст имени пользователя и пароля в файле конфигурации. В реальной среде использования мы обычно можем использовать встроенные возможности шифрования на сервере Spring Cloud Config, которые мы видели в части 8.

модульный тест

Конечно, мы должны хотя бы написать модульный тест, чтобы гарантировать, что handlers.goGetAccountФункция пытается отправить сообщение, когда кто-то запрашивает очень конкретную учетную запись с номером «10000».

Для этого нам нужно реализовать mock в handlers_test.go.IMessagingClientТакже есть новый тест-кейс. Начнем с моделирования. Здесь мы будем использовать сторонние инструментыmockery генерироватьIMessagingClientИмитационная реализация интерфейса (не забудьте сначала установить GOPATH, когда оболочка запускает следующую команду):

> go get github.com/vektra/mockery/.../
> cd $GOPATH/src/github.com/callistaenterprise/goblog/common/messaging 
> ./$GOPATH/bin/mockery -all -output .
  Generating mock for: IMessagingClient

Теперь у вас есть фиктивный файл реализации IMessagingClient.go в текущей папке. Мне не понравилось ни название этого файла, ни верблюжий регистр, поэтому давайте переименуем его, чтобы было более очевидно, что это фиктивная реализация, и будем следовать имени файла из этой серии блогов. Обычный стиль:

 mv IMessagingClient.go mockmessagingclient.go

Возможно, нам потребуется немного изменить оператор импорта в сгенерированном файле и удалить некоторые псевдонимы. Кроме этого, мы воспользуемся подходом «черного ящика» к этой фиктивной реализации — просто предполагая, что это сработает, когда мы начнем тестирование.

Также можно взглянуть на реализацию сгенерированной здесь симуляции.исходный код, что очень похоже на то, что мы писали вручную в главе 4.

Добавьте новый тестовый пример в handlers_test.go:

// 声明一个模仿类来让测试更有可读性
var anyString = mock.AnythingOfType("string")
var anyByteArray = mock.AnythingOfType("[]uint8")  // == []byte
func TestNotificationIsSentForVIPAccount(t *testing.T) {
        // 配置 DBClient 的模拟实现
        mockRepo.On("QueryAccount", "10000").Return(model.Account{Id:"10000", Name:"Person_10000"}, nil)
        DBClient = mockRepo
        mockMessagingClient.On("PublishOnQueue", anyByteArray, anyString).Return(nil)
        MessagingClient = mockMessagingClient
        Convey("Given a HTTP req for a VIP account", t, func() {
                req := httptest.NewRequest("GET", "/accounts/10000", nil)
                resp := httptest.NewRecorder()
                Convey("When the request is handled by the Router", func() {
                        NewRouter().ServeHTTP(resp, req)
                        Convey("Then the response should be a 200 and the MessageClient should have been invoked", func() {
                                So(resp.Code, ShouldEqual, 200)
                                time.Sleep(time.Millisecond * 10)    // Sleep since the Assert below occurs in goroutine
                                So(mockMessagingClient.AssertNumberOfCalls(t, "PublishOnQueue", 1), ShouldBeTrue)
                        })
        })})
}

Подробности написаны в примечаниях. Здесь тоже мне неудобно искусственно ставить 10 мс сна перед утверждением постсостояния numberOfCalls, но так как макет вызывается в сопрограмме, отдельной от «основного потока», нам нужно дать ему немного зависнуть. Некоторое время ожидания основной поток, чтобы сделать некоторую работу. Мы надеемся на лучший идиоматический способ модульного тестирования сопрограмм и каналов.

Я признаю, что процесс использования этого стиля тестирования более многословен, чем использование Mockito при написании модульных тестов для приложения Java. Тем не менее, я думаю, что это довольно читабельно и просто написать.

Затем запустите тест и убедитесь, что он проходит:

go test ./...

бегать

Первое, что нужно сделать, это запустить скрипт springcloud.sh для обновления сервера конфигурации. Затем запустите copyall.sh и подождите несколько секунд, пока он завершит обновление нашей «службы учетной записи». Затем мы используем curl, чтобы получить нашу «специальную» учетную запись.

> curl http://$ManagerIP:6767/accounts/10000
{"id":"10000","name":"Person_0","servedBy":"10.255.0.11"}

Если все пойдет хорошо, мы сможем открыть консоль управления RabbitMQ. Затем посмотрим, получим ли мы сообщение в очереди с именем vipQueue:

open http://192.168.99.100:15672/#/queues

img

В самом низу изображения выше мы видим, что «vipQueue» имеет 1 сообщение. Давайте снова вызовем функцию «Получить сообщение» в консоли управления RabbitMQ, и мы должны увидеть это сообщение:

img

Написание потребителей на Go — «вип-сервис»

Наконец пришло время написать новый микросервис с нуля, который мы будем использовать, чтобы показать, как использовать обмен сообщениями RabbitMQ. Мы будем использовать то, что узнали в этой серии, в том числе:

  • HTTP-сервер
  • мониторинг производительности
  • Централизованная конфигурация
  • Повторное использование кода механизма обмена сообщениями

если вы выполнитеgit checkout P9, тогда вы должны увидеть «vipservice» в папке root/goblog.

Я не буду рассматривать здесь каждую строку кода, в конце концов, в нем есть некоторые части, которые дублируют «accountservice». Мы собираемся сосредоточиться на том, «как использовать» сообщение, которое мы только что отправили. Несколько замечаний:

  • На данный момент в config-repo добавлены два новых файла .yml. Это vipservice-dev.yml и vipservice-test.yml*. *
  • copyall.sh также был обновлен для создания и развертывания «accountservice» и нашего недавно написанного «vipservice».

использовать сообщение

Мы будем использовать /goblog/common/messaging иSubscribeToQueueкод в функции, например:

SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error

Для этого предоставляем такие параметры:

  • Имя очереди (например, "vip_queue")
  • имя потребителя
  • Функция обратного вызова, которая вызывается при получении сообщения из очереди ответов — как мы это делали в главе 8 при использовании обновлений конфигурации.

Привяжите нашу функцию обратного вызова к очередиSubscribeToQueueО реализации функции говорить особо нечего. Это егоисходный код, и посмотрите, если вам нужно.

Далее давайте быстро взглянем на main.go VIP-сервиса, чтобы увидеть, как мы настроили эти вещи:

var messagingClient messaging.IMessagingConsumer
func main() {
fmt.Println("Starting " + appName + "...")
config.LoadConfigurationFromBranch(viper.GetString("configServerUrl"), appName, viper.GetString("profile"), viper.GetString("configBranch"))
initializeMessaging()
// 确保在服务存在的时候关掉连接
handleSigterm(func() {
if messagingClient != nil {
messagingClient.Close()
}
})
service.StartWebServer(viper.GetString("server_port"))
}
// 在收到 "vipQueue" 发来的消息时会调用的回调函数
func onMessage(delivery amqp.Delivery) {
fmt.Printf("Got a message: %v\n", string(delivery.Body))
}
func initializeMessaging() {
        if !viper.IsSet("amqp_server_url") {
            panic("No 'broker_url' set in configuration, cannot start")
        }
        messagingClient = &messaging.MessagingClient{}
        messagingClient.ConnectToBroker(viper.GetString("amqp_server_url"))
        // Call the subscribe method with queue name and callback function
        err := messagingClient.SubscribeToQueue("vip_queue", appName, onMessage)
        failOnError(err, "Could not start subscribe to vip_queue")
        err = messagingClient.Subscribe(viper.GetString("config_event_bus"), "topic", appName, config.HandleRefreshEvent)
        failOnError(err, "Could not start subscribe to " + viper.GetString("config_event_bus") + " topic")
}

Знакомо, да? Мы также, вероятно, повторим метод настройки и запуска микросервисов, который мы добавим в последующих главах. Это тоже часть основ.

этоonMessageФункция просто регистрирует тело любого полученного нами «VIP» сообщения. Если бы мы реализовали больше смоделированных вариантов использования, нам пришлось бы ввести некоторую причудливую логику, чтобы определить, имеет ли владелец учетной записи право на лечение «супер-крутая покупка всех наших вещей (tm)» и, возможно, также «VIP-предложение». Запись записей в базу данных. Если вам интересно, вы можете реализовать эту логику и отправить запрос на извлечение.

И последнее, но не менее важное: этот код. С помощью этого кода мы можем убить экземпляр службы, нажав Ctrl + C, или мы также можем дождаться, пока Docker Swarm уничтожит экземпляр службы.

   func handleSigterm(handleExit func()) {
           c := make(chan os.Signal, 1)
           signal.Notify(c, os.Interrupt)
           signal.Notify(c, syscall.SIGTERM)
           go func() {
                   <-c
                   handleExit()
                   os.Exit(1)
           }()
   }

Этот код не более читабелен, чем другие, все, что он делает, это регистрирует канал "c" какos.Interruptиsyscall.SIGTERM, и будет блокировать сообщения на "c", пока не будет получен какой-либо сигнал. Это позволяет нам быть уверенными, что как только экземпляр микросервиса будет уничтожен,handleExit()будет вызвана функция. Если вы все еще не уверены, вы можете протестировать его с помощью Ctrl + C или масштабирования Docker Swarm.killКоманды хороши, ноkill -9Просто не. Поэтому без необходимости лучше не использоватьkill -9прекратить действие чего-либо.

handleExit()функция вызовет нас вIMessageConsumerобъявлено на интерфейсеClose()функция, обеспечивающая корректное закрытие соединения AMQP.

Развернуть и запустить

здесьcopyall.sh Скрипт обновлен. Если вы выполнили описанные выше шаги и обеспечили согласованность с веткой P9 на Github, все должно работать. После развертывания выполнение docker service ls должно вывести что-то вроде этого:

> docker service ls
ID            NAME            REPLICAS  IMAGE                        
kpb1j3mus3tn  accountservice  1/1       someprefix/accountservice                                                                            
n9xr7wm86do1  configserver    1/1       someprefix/configserver                                                                              
r6bhneq2u89c  rabbitmq        1/1       someprefix/rabbitmq                                                                                  
sy4t9cbf4upl  vipservice      1/1       someprefix/vipservice                                                                                
u1qcvxm2iqlr  viz             1/1       manomarks/visualizer:latest

или вы также можете использоватьdvizzСервисный рендерер Docker Swarm для просмотра статуса:

img

Проверить журналы

Поскольку функция журналов службы докеров была отмечена как экспериментальная функция в версии 1.13.0, нам приходится просматривать журналы «vipservice» по старинке.

Сначала выполните docker ps, чтобы узнать ИДЕНТИФИКАТОР КОНТЕЙНЕРА:

> docker ps
CONTAINER ID        IMAGE                                                                                       
a39e6eca83b3        someprefix/vipservice:latest           
b66584ae73ba        someprefix/accountservice:latest        
d0074e1553c7        someprefix/configserver:latest

Запишите ИДЕНТИФИКАТОР КОНТЕЙНЕРА vipservice и выполните команду docker logs -f, чтобы проверить его журналы:

> docker logs -f a39e6eca83b3
Starting vipservice...
2017/06/06 19:27:22 Declaring Queue ()
2017/06/06 19:27:22 declared Exchange, declaring Queue ()
2017/06/06 19:27:22 declared Queue (0 messages, 0 consumers), binding to Exchange (key 'springCloudBus')
Starting HTTP service at 6868

Откройте другое окно командной строки и закрутите наш специальный объект учетной записи.

> curl http://$ManagerIP:6767/accounts/10000

Если все прошло хорошо, мы должны увидеть сообщения из очереди ответов в журнале исходного окна.

Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:27.033757223 +0000 UTC"}

рабочая очередь

Шаблон распределения работы между экземплярами службы используетрабочая очередьКонцепция чего-либо. Каждое «vip-сообщение» должно обрабатываться экземпляром «vipservice».

img

Итак, давайте посмотрим, что произойдет, когда мы масштабируем «vipservice» на два экземпляра с помощью команды масштабирования службы docker:

> docker service scale vipservice=2

Новый экземпляр «vipservice» должен быть развернут в течение нескольких секунд.

Поскольку в AMQP мы используем метод прямой доставки/очереди, мы должны увидеть циклический сценарий распределения. Затем используйте curl, чтобы вызвать поиск четырех учетных записей VIP:

> curl http://$ManagerIP:6767/accounts/10000
> curl http://$ManagerIP:6767/accounts/10000
> curl http://$ManagerIP:6767/accounts/10000
> curl http://$ManagerIP:6767/accounts/10000

Снова проверьте наши исходные журналы «vipservice»:

 > docker logs -f a39e6eca83b3
Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:27.033757223 +0000 UTC"}
Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:29.073682324 +0000 UTC"}

Как мы и ожидали, мы видим, что первый экземпляр обрабатывает два из четырех сообщений. Если мы также делаем журналы докеров для другого экземпляра «vipservice», мы также должны увидеть там два сообщения.

тестовый потребитель

На самом деле, я так и не нашел хорошего способа модульного тестирования потребителей AMQP, не тратя много времени на издевательство над библиотекой AMQP. существуетmessagingclient_test.goВ нем подготовлен тест для проверки ожидания абонентом входящих сообщений и цикла обработки. Но ничего стоящего упоминания.

Чтобы более полно протестировать механизм обмена сообщениями, я могу рассмотреть тему интеграционного тестирования в следующем посте в блоге. Используйте Docker Remote API или Docker Compose для тестов на ходу. Тест запустит некоторые сервисы, такие как RabbitMQ, который позволяет нам отправлять и получать сообщения в тестовом коде.

След и производительность

На этот раз мы не будем проводить тестирование производительности. Достаточно беглого взгляда на использование памяти после отправки и получения некоторых сообщений:

CONTAINER                                    CPU %               MEM USAGE / LIMIT
   vipservice.1.tt47bgnmhef82ajyd9s5hvzs1       0.00%               1.859MiB / 1.955GiB
   accountservice.1.w3l6okdqbqnqz62tg618szsoj   0.00%               3.434MiB / 1.955GiB
   rabbitmq.1.i2ixydimyleow0yivaw39xbom         0.51%               129.9MiB / 1.955GiB

Выше показано использование памяти после обработки нескольких запросов. Новый «vipservice» не такой сложный, как «accountservice», поэтому после запуска он должен использовать меньше памяти.

резюме

это, наверное, эторядСамая длинная часть! Мы выполняем это в этой главе:

  • Более глубокий взгляд на протоколы RabbitMQ и AMQP.
  • Добавлен новый "випсервис".
  • Извлеченный код, связанный с обменом сообщениями (и конфигурацией), в повторно используемые подпроекты.
  • Публикация/подписка на сообщения на основе протокола AMQP.
  • Генерируйте фиктивный код с насмешкой.

вопросы и ответы

Микросервисная архитектура: как достигается обмен данными между сервисами?

Связанное Чтение

Связь между микросервисами

Протокол RabbitMQ и AMQP

Создание микросервисов с помощью Akka HTTP: подход CDC


Эта статья была разрешена автором для публикации сообщества Tencent Cloud +, исходная ссылка: https://cloud.tencent.com/developer/article/1149121?fromSource=waitui.

Приветствую всех вОблако Tencent + сообществоИли обратите внимание на общедоступную учетную запись WeChat сообщества Yunjia (QcloudCommunity) и как можно скорее получите больше массовой технической практики галантерейных товаров ~

Категории