Golang реализует высокопроизводительное промежуточное ПО обратного вызова сообщений с 500 строками кода Golang.

задняя часть API RabbitMQ дизайн

Внедрить высокопроизводительное промежуточное ПО обратного вызова сообщений с 500 строками кода Golang.

В этой статье описывается, как реализовать промежуточное программное обеспечение обратного вызова сообщений.Благодаря идеям программирования конвейеров и сопрограмм golang, благодаря продуманному дизайну, всего около 500 строк кода могут обеспечить высокую производительность, плавное завершение работы, автоматическое повторное подключение и другие функции.Весь код имеет также был открыт вgithub/fishtrip/watchman.

проблема

С ростом сложности бизнеса и увеличением количества сервисов после разделения сервисов необходимо внедрение асинхронных очередей сообщений. Когда служб немного, например, на ранней стадии бизнеса, часто это относительно большое отдельное приложение или небольшое количество служб.Очередь сообщений (позже написанная как MQ, очередь сообщений) используется следующим образом:

  • Отправитель, непосредственно подключенный к MQ, отправляет сообщения в соответствии с бизнес-требованиями;
  • Потребительский конец через фоновый процесс подключается к MQ через длительное соединение, затем потребляйте сообщение в режиме реального времени, затем выполните соответствующую обработку;

Условно говоря, сторона отправителя относительно проста, а сторона потребителя более сложна и требует обработки большего количества логики. Например, кроссовки, используемые в настоящее время нашей компанией, должны иметь дело со следующей логикой:

  1. Потребителю требуется длительное соединение и требуется независимый процесс для потребления сообщений в режиме реального времени (некоторые языки могут быть независимым потоком);
  2. После обработки сообщения вам необходимо загрузить бизнес-инфраструктуру (например, Sneakers необходимо присоединиться к среде Rails для выполнения бизнес-кода) и вызвать соответствующий код для обработки сообщения;
  3. Когда MQ не может подключиться, ему необходимо автоматически переподключиться, и приложение также должно иметь возможность корректно перезапуститься, чтобы не потерять сообщения.
  4. Сообщение о потреблении, скорее всего, не будет обработано. В настоящее время требуется относительно безопасный и надежный механизм, гарантирующий, что сообщение не может быть потеряно. В то же время также требуется возможность повторной попытки отправить сообщение после период времени.После многократных повторных попыток сообщение нуждается в дальнейшей обработке;

Архитектура системы в настоящее время в целом выглядит следующим образом:

С увеличением количества сервисов развертывание такого фонового процесса для каждого сервиса, которому необходимо получать сообщения, явно не является экологически безопасным:

  1. Каждая служба добавляет процесс, что увеличивает стоимость развертывания и эксплуатации;
  2. Для управления очередями (создание, уничтожение, связывание) и механизма повторной попытки сообщения, если каждая служба отвечает сама за себя, легко привести к несовместимости стандартов;
  3. Если разные сервисы находятся на разных языках и фреймворках, каждый язык приходится реализовывать заново, что приведет к трате большого количества ресурсов на разработку;

Есть ли способ лучше?

Общий подход заключается в создании глобального высокопроизводительного промежуточного программного обеспечения обратного вызова сообщений, которое отвечает за управление очередями, отправку и получение сообщений, повторные попытки и обработку ошибок, чтобы каждой службе больше не приходилось учитывать такие сообщения, как потеря сообщения, потеря сообщения , а также обработка ошибок.Повторные попытки и другие проблемы в основном решили вышеуказанные недостатки. Какие функции должен иметь этот центр обратного вызова сообщений?

  1. Унифицированное управление всеми сообщениями и создание монитора очередей MQ;
  2. Когда сообщение получено, промежуточное ПО вызывает адрес обратного вызова соответствующей службы, поскольку центр обратного вызова отвечает за все службы, промежуточное ПО должно быть высокопроизводительным и высококонкурентным;
  3. Промежуточное программное обеспечение должно иметь функцию повторного попытки сообщения, и сообщение не должно быть потеряно при повторении сообщения;
  4. Промежуточное программное обеспечение должно иметь основные функции, такие как «переподключение» и «изящное отключение», чтобы гарантировать, что никакие сообщения не теряются;

На данный момент структура следующая:

Таким образом, работа каждой службы становится намного легче. Цель этой статьи — реализовать версию промежуточного программного обеспечения обратного вызова сообщений, доступную в производственной среде. Конечно, наша первая версия callback-центра не требует слишком много функций со следующими ограничениями:

  1. Весь процесс повтора требует встроенной поддержки RabbitMQ, поэтому на данный момент поддерживается только RabbitMQ;
  2. В настоящее время поддерживает только метод обратного вызова HTTP;

Основной спрос, как добиться беспорядок промежуточного программного обеспечения обратного вызова?

Решения

Выбор языка разработки

Как «язык разработки системного уровня» Golang очень подходит для разработки такого промежуточного программного обеспечения. Встроенный механизм горутины/канала очень легко обеспечивает высокий уровень параллелизма. Как новичок в Golang, этот проект не сложен, он очень подходит для практики и дальнейшего обучения.

надежность сообщения

Как насчет повторных попыток и обработки ошибок? Мы позаимствовали его метод из реализации Sneakers.Используя встроенный механизм RabbitMQ, то есть через механизм x-dead-letter, мы можем гарантировать, что сообщение может быть высоконадежным при повторной попытке.Подробности см. к тому, что я писал некоторое время назад.эта статья. Простое изложение идей в тексте:

  1. Когда сообщение обрабатывается нормально, просто подтвердите сообщение напрямую;
  2. Если при обработке сообщения возникла ошибка и требуется повторная попытка, отклоните сообщение, и сообщение попадет в отдельную очередь повторных попыток;
  3. Очередь повторных попыток настроена с тайм-аутом ttl, и когда тайм-аут достигнут, сообщение попадет в очередь повторных запросов Exchange (концепция RabbitMQ, используемая для маршрутизации сообщений);
  4. Сообщение снова войдет в рабочую очередь, ожидая повторной попытки в следующий раз;
  5. Если количество повторных попыток сообщения превышает определенное значение, сообщение попадет в очередь ошибок для дальнейшей обработки;

Есть два места, где используется механизм Dead-Letter Rabbitmq:

  1. Когда сообщение отклонено, оно поступает в очередь для обмена недоставленными сообщениями, т. е. в очередь повторных попыток;
  2. Когда сообщение очереди повторяется, когда истекает время ожидания (очередь установлена ​​с длительностью ttl-expires), сообщение попадает в обмен недоставленными сообщениями очереди, то есть повторно входит в рабочую очередь.

С помощью этого механизма можно гарантировать, что сообщение не будет потеряно во время обработки сообщения, независимо от того, нормально это или ошибочно. Подробнее об этом можно прочитать в статье выше.

достичь высокой параллелизма

Для ПО промежуточного слоя требования к производительности относительно высоки, а производительность также включает два аспекта: низкая задержка и высокий уровень параллелизма. В этом сценарии мы не можем решить проблему низкой задержки, поскольку задержка после обратного вызова сообщения определяется другими бизнес-службами. Поэтому мы больше стремимся к высокому параллелизму.

Как добиться высокой параллелизма? Во-первых, это выбор языка разработки.Такой низкоуровневый мидлвар очень подходит для реализации на Golang.Почему? Поскольку основная логика центра обратного вызова заключается в непрерывном обратном вызове каждой службы, а промежуточное ПО времени задержки каждой службы не может контролироваться, поэтому, если вы хотите добиться высокого параллелизма, лучше всего использовать механизм асинхронных событий. С помощью встроенного канала Golang он может не только получить производительность, близкую к асинхронным событиям, но и сделать всю разработку простой и эффективной, что является более подходящим выбором.

А конкретная реализация? На самом деле, для callback-центра он примерно делится на следующие этапы:

  1. Получить сообщения: подключиться к очереди сообщений (в настоящее время нам нужно только поддерживать RabbitMQ), потреблять сообщения;
  2. Бизнес-интерфейс обратного вызова: после использования сообщения, в соответствии с информацией о конфигурации, различным очередям может потребоваться вызвать разные адреса обратного вызова, чтобы начать вызов бизнес-интерфейса (в настоящее время нам нужна только поддержка протокола HTTP);
  3. Обработайте сообщение в соответствии с результатом обратного вызова: если вызов бизнес-интерфейса прошел успешно, просто подтвердите сообщение напрямую, если вызов не удался, отклоните сообщение, если превышено максимальное количество повторных попыток, войдите в логику обработки ошибок;
  4. Логика обработки ошибок: подтвердить исходное сообщение и отправить сообщение в очередь ошибок, ожидая дальнейшей обработки (может быть сигнал тревоги, а затем обработка вручную);

Через такую ​​"сущность", как сообщение, все вышеперечисленные процессы можно соединить последовательно, очень похоже на конвейер? иpipelineШаблон дизайна является очень рекомендуемым способом для Голанга для достижения высокого параллелизма. Каждый из вышеуказанных шагов можно рассматривать как группу Goroutines, и они общаются через трубопроводы, поэтому нет конкуренции ресурсов, что значительно снижает расходы на разработку.

И каждый из вышеперечисленных шагов может обеспечить высокий уровень параллелизма за счет разработки различных моделей Goroutine:

  1. Получение сообщений: требует длительного подключения к RabbitMQ. Лучшая реализация — иметь отдельный набор сопрограмм для каждой очереди, чтобы прием сообщений между очередями не мешал друг другу. не будет несвоевременной обработки сообщений;
  2. Обратный вызов бизнес-интерфейса: Каждое сообщение будет вызывать бизнес-интерфейс, но время обработки бизнес-интерфейса прозрачно для промежуточного программного обеспечения. Так что лучшая модель здесь — одна сопрограмма на сообщение. Если появится более медленный интерфейс, внутренний механизм планирования горутины не повлияет на пропускную способность системы, а горутина может поддерживать миллионы параллелизма, поэтому этот режим является наиболее подходящим.
  3. Обработайте сообщение в соответствии с результатом обратного вызова: этот шаг в основном предназначен для подключения к RabbitMQ и отправки сообщений подтверждения/отклонения. По умолчанию мы считаем, что RabbitMQ надежен, и здесь для обработки можно использовать тот же набор сопрограмм.
  4. Логика обработки ошибок: количество сообщений здесь должно быть значительно уменьшено, потому что сюда будут поступать сообщения, которые не удалось выполнить несколько раз (больше, чем количество повторных попыток). Мы также можем использовать тот же набор сопрограмм для обработки.

В предыдущих четырех шагах мы использовали три модели дизайна сопрограмм, чтобы уточнить картинку выше, она выглядит так.

выполнить

При описанном выше процессе проектирования код не является сложным и может быть условно разделен на несколько частей: реализация управления конфигурацией, основной процесс, объект сообщения, логика повторных попыток и корректное завершение работы. Подробный код находится на github:fishtrip/watchman

Управление конфигурацией

В этой части управления конфигурацией то, что мы реализовали в этой версии, относительно простое, а именно чтение файла конфигурации yml. Файл конфигурации в основном содержит три части информации:

  • Определение очереди сообщений. Вызов интерфейса RabbitMQ для создания связанных очередей (очередей повторных попыток, очередей ошибок и т. д.) в соответствии с конфигурацией очереди сообщений;
  • Настройка адреса обратного вызова. Для разных очередей сообщений требуются разные адреса обратного вызова;
  • другая конфигурация. Такие как количество повторных попыток, тайм-ауты и т. Д.
# config/queues.example.yml
projects:
  - name: test
    queues_default:
      notify_base: "http://localhost:8080"
      notify_timeout: 5
      retry_times: 40
      retry_duration: 300
      binding_exchange: fishtrip
    queues:
      - queue_name: "order_processor"
        notify_path: "/orders/notify" 
        routing_key:
          - "order.state.created"
          - "house.state.#"

Мы можем использовать пакеты YAML.V2, которые могут быть легко проанализированы в файле конфигурации YAML для структуры, например, определение очереди, структура реализована следующим образом:

// config.go 28-38

type QueueConfig struct {
    QueueName       string   `yaml:"queue_name"`
    RoutingKey      []string `yaml:"routing_key"`
    NotifyPath      string   `yaml:"notify_path"`
    NotifyTimeout   int      `yaml:"notify_timeout"`
    RetryTimes      int      `yaml:"retry_times"`
    RetryDuration   int      `yaml:"retry_duration"`
    BindingExchange string   `yaml:"binding_exchange"`

    project *ProjectConfig
}

Потребность в указателе ProjectConfig выше, в основном, для того, чтобы облегчить чтение конфигурации проекта, поэтому при загрузке точки необходимо проектировать очередь.

// config.go
func loadQueuesConfig(configFileName string, allQueues []*QueueConfig) []*QueueConfig {
    // ......
    projects := projectsConfig.Projects
    for i, project := range projects {
        log.Printf("find project: %s", project.Name)


        // 这里不能写作  queue := project.Queues
        queues := projects[i].Queues

        for j, queue := range queues {
            log.Printf("find queue: %v", queue)

            // 这里不能写作  queues[j].project = &queue 
            queues[j].project = &projects[i]
            allQueues = append(allQueues, &queues[j])
        }
    }
   // .......
}

В приведенном выше коде есть место, подверженное ошибкам, то есть переменная очереди не может быть использована напрямую при установке указателя внутри цикла for, потому что полученная в это время переменная очереди является копией данных, а не исходными данными .

Кроме того, большая часть логики в config.go написана объектно-ориентированным образом, например:

// config.go
func (qc QueueConfig) ErrorQueueName() string {
    return fmt.Sprintf("%s-error", qc.QueueName)
}
func (qc QueueConfig) WorkerExchangeName() string {
    if qc.BindingExchange == "" {
        return qc.project.QueuesDefaultConfig.BindingExchange
    }
    return qc.BindingExchange
}

Таким образом, можно написать более чистый и удобный для сопровождения код.

инкапсуляция объекта сообщения

Во всей программе данные, переданные в канале, являются все объекты сообщения. Через эту инкапсуляцию объекта очень удобно передавать данные между различными типами Goroutines.

Класс сообщения определяется следующим образом:

type Message struct {
    queueConfig    QueueConfig // 消息来自于哪个队列
    amqpDelivery   *amqp.Delivery // RabbitMQ 的消息封装
    notifyResponse NotifyResponse // 消息回调结果
}

Мы инкапсулируем собственные сообщения в RabbitMQ, а также информацию об очереди и результаты обратного вызова, и таким образом передаем объекты Message между каналами. В то же время Message инкапсулирует множество методов для удобного вызова другими сопрограммами.

// Message 相关方法
func (m Message) CurrentMessageRetries() int {}
func (m *Message) Notify(client *http.Client) *Message {}
func (m Message) IsMaxRetry() bool {}
func (m Message) IsNotifySuccess() bool {}
func (m Message) Ack() error {}
func (m Message) Reject() error {}
func (m Message) Republish(out chan<- Message) error {}
func (m Message) CloneAndPublish(channel *amqp.Channel) error {}

Обратите внимание на принимающий объект вышеуказанного метода, принимающий объект с указателем означает, что значение объекта будет изменено.

основной процесс

Основной процесс — это то, о чем мы говорили выше, через конвейерный режим весь процесс сообщения соединяется последовательно. Основной код здесь:

// main.go
<-resendMessage(ackMessage(workMessage(receiveMessage(allQueues, done))))

Каждая из вышеперечисленных функций получает одно и то же определение конвейера, поэтому их можно использовать в тандеме. На самом деле реализация каждой функции не сильно отличается, и разные модели сопрограмм могут требовать разных методов написания.

Вот полученная формулировка, и подробные заметки. Очередная очередь восстановления для каждого сообщения, генерируемая N COROUTINE, а затем записывает все прочитанные сообщения.

// main.go
func receiveMessage(queues []*QueueConfig, done <-chan struct{}) <-chan Message {

    // 创建一个管道,这个管道会作为函数的返回值
    out := make(chan Message, ChannelBufferLength)

    // WaitGroup 用于同步,这里来控制协程是否结束
    var wg sync.WaitGroup

    // 入参是队列配置,这个见下文传入的值
    receiver := func(qc QueueConfig) {
        defer wg.Done()

    // RECONNECT 标记用于跳出循环来重新连接 RabbitMQ
    RECONNECT:
        for {
            _, channel, err := setupChannel()
            if err != nil {
                PanicOnError(err)
            }

            // 消费消息
            msgs, err := channel.Consume(
                qc.WorkerQueueName(), // queue
                "",                   // consumer
                false,                // auto-ack
                false,                // exclusive
                false,                // no-local
                false,                // no-wait
                nil,                  // args
            )
            PanicOnError(err)

            for {
                select {
                case msg, ok := <-msgs:
                    if !ok {
                        log.Printf("receiver: channel is closed, maybe lost connection")
                        time.Sleep(5 * time.Second)
                        continue RECONNECT
                    }

                    // 这里生成消息的 UUID,用来跟踪整个消息流,稍后会解释
                    msg.MessageId = fmt.Sprintf("%s", uuid.NewV4())
                    message := Message{qc, &msg, 0}

                    // 这里把消息写到出管道
                    out <- message

                    message.Printf("receiver: received msg")
                case <-done:

                    // 当主协程收到 done 信号的时候,自己也退出
                    log.Printf("receiver: received a done signal")
                    return
                }
            }
        }
    }

    for _, queue := range queues {
        wg.Add(ReceiverNum)
        for i := 0; i < ReceiverNum; i++ {

            // 每个队列生成 N 个协程共同消费
            go receiver(*queue)
        }
    }

    // 控制协程,当所有的消费协程退出时,出口管道也需要关闭,通知下游协程
    go func() {
        wg.Wait()
        log.Printf("all receiver is done, closing channel")
        close(out)
    }()

    return out
}

Есть несколько ключевых точек для заметок.

  1. Каждая функция представляет собой подобную структуру, набор рабочих сопрограмм и сопрограмм, когда все рабочие сопрограммы выходят, канал выхода закрывается и уведомляются нижестоящие сопрограммы. Учтите, что в голанге для использования пайпов его нужно закрывать со стороны записи, иначе легко зависнуть.
  2. Мы записываем уникальный uuid в каждое сообщение, и этот uuid используется в журнале для отслеживания всего потока информации.
  3. Из-за возможных сетевых условий мы должны принимать решения.Если соединение не установлено, спите напрямую в течение определенного периода времени, а затем снова подключайтесь.
  4. done Этот конвейер управляется основной сопрограммой и в основном используется для корректного завершения работы. Функция изящного завершения работы заключается в том, чтобы гарантировать, что никакие сообщения не будут потеряны при обновлении конфигурации и основной программы (сопрограмма не завершится, пока сообщение действительно не будет завершено, и вся программа завершится).

Суммировать

Благодаря эффективной способности экспрессии Golang, стабильное Message Callback Middleware реализуется примерно 500 линиям кода, а со следующими функциями:

  • высокая производительность.在 macbook pro 15 上简单测试,每个队列的处理能力可以轻松达到 3000 message/second 以上,多个队列也可以做到线性的增加性能,整体应用达到几万每秒很轻松。同时,得益于 golang 的协程设计,如果下游出现了慢调用,那么也不会影响并发。
  • Закройте изящно. Отслеживая сигнал, можно корректно закрыть всю программу без потери сообщений, что удобно при изменении конфигурации и перезапуске программы. Это очень важно в производственной среде.
  • Автоматическое повторное подключение. Когда службе RabbitMQ не удается подключиться, приложение может автоматически переподключиться.
Конечно,Наша командаЯ все еще новичок в golang, и я не провел слишком много модульных тестов и тестов производительности.Следующим шагом может быть продолжение оптимизации, улучшение тестовой работы и оптимизация управления конфигурацией.Добро пожаловать вgithubИсточник толпы.