Связь между микросервисами через RabbitMQ
Микросервисы не зависят друг от друга. В отличие от одного проекта каждый модуль может взаимодействовать напрямую через вызовы методов. Общий метод связи независимых сервисов заключается в использованииHTTP协议
,rpc协议
Или используйте промежуточное программное обеспечение сообщений, напримерRabbitMQ``Kafka
Ждать
в этой статьеСоздание микросервисов с Golang и MongoDBРеализовано приложение микросервиса, а в статье реализована прямая связь каждого сервиса, который используетсяHTTP
форме, как проходит услугаRabbitMQ
Для обмена сообщениями теперь нам нужно реализовать функцию, которая представляет собой интерфейс для пользователей, чтобы бронировать билеты в кино, для чего требуется сервис.User Service(port 8000)Он взаимодействует со службой **Служба бронирования (порт 8003)**. После того, как пользователь сделает бронирование, информация о бронировании записывается вbookingв базе данных
УстановитьRabbitMQ
УстановитьRabbitMQ
необходимо установить передErlangсреды, затем загрузите и установитеRabbitMQ, Пожалуйста, выберите соответствующую версию. После завершения установки RabbitMQ запускается в фоновом режиме как служба в Windows.RabbitMQ
Как использовать интерфейс, пожалуйста, обратитесь к официальному сайтуруководство, существуют реализации на различных основных языках, которые мы используем,Go
версия, пожалуйста, загрузите соответствующий интерфейс реализацииgo get github.com/streadway/amqp
правильноRabbitMQ
Сделать простую инкапсуляцию интерфейса
- определить интерфейс
messaging/message.go
type IMessageClient interface {
ConnectToBroker(connectionStr string) error
PublishToQueue(data []byte, queueName string) error
SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error
Close()
}
type MessageClient struct {
conn *amqp.Connection
}
- интерфейс подключения
func (m *MessageClient) ConnectToBroker(connectionStr string) error {
if connectionStr == "" {
panic("the connection str mustnt be null")
}
var err error
m.conn, err = amqp.Dial(connectionStr)
return err
}
- опубликовать интерфейс сообщения
func (m *MessageClient) PublishToQueue(body []byte, queueName string) error {
if m.conn == nil {
panic("before publish you must connect the RabbitMQ first")
}
ch, err := m.conn.Channel()
defer ch.Close()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
queueName,
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
},
)
failOnError(err, "Failed to publish a message")
return nil
}
- Интерфейс сообщения о подписке
func (m *MessageClient) SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error {
ch, err := m.conn.Channel()
//defer ch.Close()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
queueName,
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
go consumeLoop(msgs, handlerFunc)
return nil
}
включить связь
существуетUser Serviceопределить новыйPOST
интерфейс/user/{name}/booking
, чтобы реализовать функцию бронирования пользователя, после бронирования, черезRabbitMQ
опубликовать сообщениеBooking Service,Booking ServiceПосле получения сообщения делаем соответствующую обработку (записываем в базу)
User Service
- инициализация
MessageClient
users/controllers/user.go
var client messaging.IMessageClient
func init() {
client = &messaging.MessageClient{}
err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println("connect to rabbitmq error", err)
}
}
- Добавляйте новые маршруты и реализации
routes.go
register("POST", "/user/{name}/booking", controllers.NewBooking, nil)
users/controllers/user.go
func NewBooking(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
user_name := params["name"]
defer r.Body.Close()
var bookings models.Booking
body, _ := ioutil.ReadAll(r.Body)
err := json.Unmarshal(body, &bookings)
if err != nil {
fmt.Println("the format body error ", err)
}
fmt.Println("user name:", user_name, bookings)
go notifyMsg(body)
}
- Публикация сообщений с помощью сопрограммы
func notifyMsg(body []byte) {
err := client.PublishToQueue(body, "new_booking")
if err != nil {
fmt.Println("Failed to publis message", err)
}
}
Booking Service
- Инициализировать MessageClient
var client messaging.IMessageClient
func initMessage() {
client = &messaging.MessageClient{}
err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println("Failed to connect to RabbitMQ", err)
}
err = client.SubscribeToQueue("new_booking", getBooking)
if err != nil {
fmt.Println("Failed to comsuer the msg", err)
}
}
начать до веб-службы
func main() {
initMessage()
r := routes.NewRouter()
http.ListenAndServe(":8003", r)
}
- Обработка сообщений после получения
func getBooking(delivery amqp.Delivery) {
var booking models.Booking
json.Unmarshal(delivery.Body, &booking)
booking.Id = bson.NewObjectId().Hex()
dao.Insert("Booking", "BookModel", booking)
fmt.Println("the booking msg", booking)
}
Аутентификация, нужно начатьUser ServiceиBooking Service
использоватьPostman
отправить соответствующие данные
post 127.0.0.1:8000/user/kevin_woo/booking
{
"name":"kevin_woo",
"books":[
{
"date":"20180727",
"movies":["5b4c45d49d5e3e33c4a5b97a"]
},
{
"date":"20180810",
"movies":["5b4c45ea9d5e3e33c4a5b97b"]
}
]
}
Вы можете видеть, что в базе данных уже есть новая информация о бронировании.
Объяснение, данные, которые я отправляю здесь, являются структурой в базе данных бронирования. Фактическая ситуация требует инкапсуляции данных. При отправке данных данные не проверяются. В реальном процессе разработки все данные должны быть соответствующим образом проверены.Здесь мы в основном рассматриваем процесс обработки доставки сообщений RabbitMQ.