В Интернете много информации о внедрении и использовании rabbitmq, поэтому я не буду здесь вдаваться в подробности. См. ссылки в конце статьи.
Вторая инкапсуляция клиента golang для rabbitmq черезподписаться/опубликоватьРежим делает сообщениеполучить и получитьРазделение выполняется для поддержки нескольких типов обработки сообщений, например, для одного и того же сообщения хранение и пересылка выполняются отдельно.
(1) Rabbitmq вызывает конечную инкапсуляцию
1. Определите вызывающий конечный интерфейс
// Defines our interface for connecting and consuming messages.
type RabbitmqClient interface {
ConnectToBroker(connectionString string)
Publish(msg []byte, exchangeName , exchangeType ,bindingKey string) error
PublishOnQueue(msg []byte, queueName string) error
Subscribe(exchangeName , exchangeType , consumerName ,
bindingKey string, handlerFunc func(amqp.Delivery)) error
SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error
Close()
}
2. Структура диаграммы классов, реализующая интерфейс
Пожалуйста, смотрите конкретную реализациюgithub(автор:eriklupander )
(2) Используйте развязку приемника
Определите получателя, включая функцию обработки сообщения, объявите такие параметры, как обмен привязками и очередь сообщений, цель состоит в том, чтобы определить несколько получателей для асинхронного выполнения различных операций над сообщением.
//定义接受者,使其与客户端解耦
type Receiver struct {
//接收者的各项信息
ExchangeName string
ExchangeType string
QueueName string
BindingKey string
ConsumerName string
Deliveries chan amqp.Delivery
handlerFunc func(msg amqp.Delivery) //定义一个处理方法
}
(3) Инкапсуляция потребителя, объединяющая вызывающего и получателя.
Определите потребителя для реализации режима подписки и публикации, держите клиента rabbitmq и несколько получателей, чтобы несколько получателей могли получать сообщения.
1. Структура диаграммы классов
2. Реализация кода
//定义消费端,消费端持有调用端和接收者
type Consumer struct {
Client RabbitmqClient //一个客户端
Receivers []*Receiver
}
func (c *Consumer)Add(rec ...*Receiver){
//添加接收器
c.Receivers=append(c.Receivers,rec...)
}
//订阅接收器到交换器
func (c *Consumer)Subscribe(){
for _,receiver:=range c.Receivers{
err:=c.Client.Subscribe(receiver.ExchangeName,
receiver.ExchangeType,
receiver.ConsumerName,
receiver.BindingKey,
receiver.handlerFunc)
if err != nil {
log.Printf("Subscribe error: %s %s ",receiver,err)
}
}
}
//订阅接收器到特定队列
func (c *Consumer)SubscribeToQueue(){
for _,receiver:=range c.Receivers{
err:=c.Client.SubscribeToQueue(receiver.QueueName,
receiver.ConsumerName,
receiver.handlerFunc)
if err != nil {
log.Printf("SubscribeToQueue error: %s %s ",receiver,err)
}
}
}
(4) Тест
1. Определите обмен типа темы и используйте ключ маршрутизации для отправки сообщений определенного типа.
func TestConsumer(t *testing.T) {
var receiver *Receiver
var client *MessagingClient
var consumer Consumer
var exname="chat" //交换器命名
var extype=amqp.ExchangeTopic //使用tpoic交换器类型
var routingkey="*.waiwen" //消息路由键,代表发送给waiwen的信息
var queueName="waiwen" //命名一个队列名称
client=&MessagingClient{}
//defer client.Close()
var connectionStr="amqp://admin:admin@192.168.172.2:5672/" //链接
client.ConnectToBroker(connectionStr)
go func() {
var body=[]byte("hello waiwen")
err:=client.Publish(body,exname,extype,"xiaoming.waiwen",queueName)
if err!=nil{
log.Printf("publish msg error : %s \n",err)
}else{
log.Printf("A message was sent: %s \n", body)
}
}()
receiver=&Receiver{
ExchangeType:extype,
ExchangeName:exname,
ConsumerName:"",
QueueName:queueName,
BindingKey:routingkey,
Deliveries:make(chan amqp.Delivery),
handlerFunc: func(msg amqp.Delivery) {
log.Printf("get message from queue: %v \n",string(msg.Body))
},
}
consumer=Consumer{
Client:client,
Receivers:[]*Receiver{},
}
consumer.Add(receiver)
consumer.Subscribe()
select {}
}