Связь между микросервисами через RabbitMQ

задняя часть база данных Микросервисы RabbitMQ

Связь между микросервисами через RabbitMQ

Микросервисы не зависят друг от друга. В отличие от одного проекта каждый модуль может взаимодействовать напрямую через вызовы методов. Общий метод связи независимых сервисов заключается в использованииHTTP协议,rpc协议Или используйте промежуточное программное обеспечение сообщений, напримерRabbitMQ``KafkaЖдать

в этой статьеСоздание микросервисов с Golang и MongoDBРеализовано приложение микросервиса, а в статье реализована прямая связь каждого сервиса, который используетсяHTTPформе, как проходит услугаRabbitMQДля обмена сообщениями теперь нам нужно реализовать функцию, которая представляет собой интерфейс для пользователей, чтобы бронировать билеты в кино, для чего требуется сервис.User Service(port 8000)Он взаимодействует со службой **Служба бронирования (порт 8003)**. После того, как пользователь сделает бронирование, информация о бронировании записывается вbookingв базе данных

УстановитьRabbitMQ

УстановитьRabbitMQнеобходимо установить передErlangсреды, затем загрузите и установитеRabbitMQ, Пожалуйста, выберите соответствующую версию. После завершения установки RabbitMQ запускается в фоновом режиме как служба в Windows.RabbitMQКак использовать интерфейс, пожалуйста, обратитесь к официальному сайтуруководство, существуют реализации на различных основных языках, которые мы используем,Goверсия, пожалуйста, загрузите соответствующий интерфейс реализацииgo get github.com/streadway/amqp

правильноRabbitMQСделать простую инкапсуляцию интерфейса

  • определить интерфейс

messaging/message.go

type IMessageClient interface {
	ConnectToBroker(connectionStr string) error
	PublishToQueue(data []byte, queueName string) error
	SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error
	Close()
}

type MessageClient struct {
	conn *amqp.Connection
}
  • интерфейс подключения
func (m *MessageClient) ConnectToBroker(connectionStr string) error {
	if connectionStr == "" {
		panic("the connection str mustnt be null")
	}
	var err error
	m.conn, err = amqp.Dial(connectionStr)
	return err
}
  • опубликовать интерфейс сообщения
func (m *MessageClient) PublishToQueue(body []byte, queueName string) error {
	if m.conn == nil {
		panic("before publish you must connect the RabbitMQ first")
	}

	ch, err := m.conn.Channel()
	defer ch.Close()
	failOnError(err, "Failed to open a channel")

	q, err := ch.QueueDeclare(
		queueName,
		false,
		false,
		false,
		false,
		nil,
	)
	failOnError(err, "Failed to declare a queue")

	err = ch.Publish(
		"",
		q.Name,
		false,
		false,
		amqp.Publishing{
			ContentType: "application/json",
			Body:        body,
		},
	)
	failOnError(err, "Failed to publish a message")

	return nil
}
  • Интерфейс сообщения о подписке
func (m *MessageClient) SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error {
	ch, err := m.conn.Channel()
	//defer ch.Close()
	failOnError(err, "Failed to open a channel")

	q, err := ch.QueueDeclare(
		queueName,
		false,
		false,
		false,
		false,
		nil,
	)
	failOnError(err, "Failed to declare a queue")

	msgs, err := ch.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)

	failOnError(err, "Failed to register a consumer")
	go consumeLoop(msgs, handlerFunc)
	return nil
}

включить связь

существуетUser Serviceопределить новыйPOSTинтерфейс/user/{name}/booking, чтобы реализовать функцию бронирования пользователя, после бронирования, черезRabbitMQопубликовать сообщениеBooking Service,Booking ServiceПосле получения сообщения делаем соответствующую обработку (записываем в базу)

User Service

  • инициализацияMessageClient

users/controllers/user.go

var client messaging.IMessageClient

func init() {
	client = &messaging.MessageClient{}
	err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
	if err != nil {
		fmt.Println("connect to rabbitmq error", err)
	}
}
  • Добавляйте новые маршруты и реализации

routes.go

register("POST", "/user/{name}/booking", controllers.NewBooking, nil)

users/controllers/user.go

func NewBooking(w http.ResponseWriter, r *http.Request) {
	params := mux.Vars(r)
	user_name := params["name"]
	defer r.Body.Close()

	var bookings models.Booking
	body, _ := ioutil.ReadAll(r.Body)
	err := json.Unmarshal(body, &bookings)
	if err != nil {
		fmt.Println("the format body error ", err)
	}
	fmt.Println("user name:", user_name, bookings)
	go notifyMsg(body)
}
  • Публикация сообщений с помощью сопрограммы
func notifyMsg(body []byte) {
	err := client.PublishToQueue(body, "new_booking")
	if err != nil {
		fmt.Println("Failed to publis message", err)
	}
}

Booking Service

  • Инициализировать MessageClient
var client messaging.IMessageClient

func initMessage() {
	client = &messaging.MessageClient{}
	err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ", err)
	}

	err = client.SubscribeToQueue("new_booking", getBooking)
	if err != nil {
		fmt.Println("Failed to comsuer the msg", err)
	}
}

начать до веб-службы

func main() {

	initMessage()

	r := routes.NewRouter()
	http.ListenAndServe(":8003", r)

}
  • Обработка сообщений после получения
func getBooking(delivery amqp.Delivery) {

  var booking models.Booking
	json.Unmarshal(delivery.Body, &booking)
  booking.Id = bson.NewObjectId().Hex()
	dao.Insert("Booking", "BookModel", booking)
	fmt.Println("the booking msg", booking)
}

Аутентификация, нужно начатьUser ServiceиBooking Service
использоватьPostmanотправить соответствующие данные

post 127.0.0.1:8000/user/kevin_woo/booking

{
	"name":"kevin_woo",
	"books":[
		{
			"date":"20180727",
			"movies":["5b4c45d49d5e3e33c4a5b97a"]
		},
		{
			"date":"20180810",
			"movies":["5b4c45ea9d5e3e33c4a5b97b"]
		}
	]
}

Вы можете видеть, что в базе данных уже есть новая информация о бронировании.

Объяснение, данные, которые я отправляю здесь, являются структурой в базе данных бронирования. Фактическая ситуация требует инкапсуляции данных. При отправке данных данные не проверяются. В реальном процессе разработки все данные должны быть соответствующим образом проверены.Здесь мы в основном рассматриваем процесс обработки доставки сообщений RabbitMQ.

Исходный код Гитхаб