Начало работы с Kafka (3): как работают продюсеры Sarama

Kafka

Резюме

В этой статье я будуSaramaДавайте начнем с того, как создавать синхронных и асинхронных производителей, а затем я познакомлю вас с параметрами производителя и их использованием.

Затем я начну с кода, создающего производителя, и буду работать над кодом, пока сообщение не будет отправлено и не будет получен ответ.

Этот процесс фактически соответствует различным уровням кафки, упомянутым в статье выше.

Как использовать

1.1 Введение

научиться пользоватьсяSaramaПрежде чем создать сообщение, позвольте мне немного представить его.

SaramaСуществует два типа производителей: синхронные производители и асинхронные производители.

To produce messages, use either the AsyncProducer or the SyncProducer. The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases. The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficient, and the actual durability guarantees depend on the configured value of Producer.RequiredAcks. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.

Общий смысл официальной документации заключается в том, что асинхронные производители используютchannelполучать (производственный успех или неудача) сообщения, а также передаватьchannelдля отправки сообщения, которое обычно является наиболее эффективным. И синхронный производитель должен блокировать до полученияacks. Но это тоже приносит две проблемы, одна в том, что производительность становится хуже, а надежность зависит от параметровacksгарантировать.

1.2 Асинхронная отправка

Тогда мы идем прямо кSaramaКак отправлять асинхронные сообщения.

Давайте сначала создадим минимальный асинхронный производитель, исключив все ненужные настройки.

Обратите внимание, что для облегчения чтения я удаляю обработку ошибок и заменяю ее пропущенным номером.

func main() {
	config := sarama.NewConfig()
	client, err := sarama.NewClient([]string{"localhost:9092"}, config)
	...

	producer, err := sarama.NewAsyncProducerFromClient(client)
	...
	defer producer.Close()

	topic := "topic-test"

	for i := 0; i <= 100; i++ {
		text := fmt.Sprintf("message %08d", i)
		producer.Input() <- &sarama.ProducerMessage{
			Topic: topic,
			Key:   nil,
			Value: sarama.StringEncoder(text)}
	}
}

Как можно заметить,SaramaЧтобы отправить сообщение, сначала создайтеconfig, подробнее здесьconfigО содержании будет сказано позже.

Тогда согласно этомуconfig,а такжеbrokerадрес для создания клиента производителя.

Затем создайте объект-производитель по клиенту (на самом деле использование объектов здесь недостаточно строгое, но я думаю, что с этим пониманием проблем нет).

Наконец, вы можете использовать этот объект производителя для отправки сообщений.

При построении сообщения я также опустил другие параметры, а оставил только два самых важных и необходимых параметра: тему и содержание сообщения.

На этом процесс отправки сообщений простым асинхронным производителем закончен.

1.3 Синхронная передача

После прочтения асинхронной отправки у вас может возникнуть много вопросов типа «зачем вы это делаете».

Давайте сначала рассмотрим синхронную отправку, а затем сравним:

func main() {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	client, err := sarama.NewClient([]string{"localhost:9092"}, config)
	...

	producer, err := sarama.NewSyncProducerFromClient(client)
	...
	defer producer.Close()

	topic := "topic-test"

	for i := 0; i <= 10; i++ {
		text := fmt.Sprintf("message %08d", i)
		partition, offset, err := producer.SendMessage(
			&sarama.ProducerMessage{
				Topic: topic,
				Key:   nil,
				Value: sarama.StringEncoder(text)})
		...
		log.Println("send message success, partition = ", partition, " offset = ", offset)
	}
}

Видно, что процесс синхронной отправки и асинхронной отправки очень похож.

Разница в том, что синхронные производители отправляют сообщения, используя неchannel,а такжеSendMessageМетод имеет три возвращаемых значения, которым было отправлено сообщение.partition, в которомoffset, здесьerror.

То есть только после того, как сообщение будет успешно отправлено и записаноbroker, будет возвращаемое значение.

2. Конфигурация

2.1 Конфигурация по умолчанию

Давайте посмотрим на эту строку в исходном коде:

config := sarama.NewConfig()

можно увидетьSaramaКонфигурация по умолчанию была возвращена:

// NewConfig returns a new configuration instance with sane defaults.
func NewConfig() *Config {
	c := &Config{}

  c.Producer.MaxMessageBytes = 1000000
	c.Producer.RequiredAcks = WaitForLocal
	c.Producer.Timeout = 10 * time.Second
	...
}

2.2 Дополнительная конфигурация

Давайте посмотримConfigЭта структура, элементы конфигурации которой могут быть настроены пользователем.

Поскольку он слишком длинный, ограниченный пространством и знаниями автора, он не может быть объяснен один за другим в этой статье, поэтому для объяснения в этой статье будут выбраны только некоторые конфигурации, связанные с производителем.

Но будь то клиент Golang или клиент Java, не имеет значения, вам просто нужно знать, какие параметры имеют отношение к скорости производства вашего производителя, надежности сообщений и т. д.

// Config is used to pass multiple configuration options to Sarama's constructors.
type Config struct {
	Admin struct {
		...
	}
	
	Net struct {
		...
	}
	
	Metadata struct {
		...
	}
	
	Producer struct {
		...
	}
	
	Consumer struct {
		...
	}
	
	ClientID string
	
	...
}

Мы можем видеть, что оSaramaКонфигурация разделена на множество частей, давайте посмотримProducerэтой части.

2.3 Важные параметры производителя

Здесь я намерен представить некоторые параметры производителя, которые я лично считаю более важными.

  • MaxMessageBytes int
    

Этот параметр влияет на максимальное количество байтов в сообщении, по умолчанию 1000000. Но учтите, что этот параметр должен быть меньше, чем у брокера вmessage.max.bytes.

  • RequiredAcks RequiredAcks
    

Этот параметр влияет на то, сколько брокеров сообщение должно быть записано перед возвратом. Значение может быть 0, 1 и -1, что соответствует возврату без ожидания подтверждения от брокера, возврату после подтверждения от лидера раздела и возврату после подтверждения от всех реплик раздела.

  • Partitioner PartitionerConstructor
    

Это разделитель.SaramaПо умолчанию предоставляется несколько разделителей.Если они не указаны, используется разделитель Hash по умолчанию.

  • Retry
    

Этот параметр представляет количество повторных попыток и время повторной попытки, в основном при некоторых повторных ошибках.

  • Flush
    

Он используется для установки сообщения, которое будет упаковано и отправлено.Проще говоря, каждый раз, когда сообщение отправляется брокеру, оно не отправляется сообщением при создании сообщения, а упаковывается и отправляется после накопления сообщений. в некоторой степени. Таким образом, он содержит два параметра. Во-первых, это количество сообщений, которые инициируют отправку пакета, а во-вторых, это накопленный размер сообщения перед отправкой.

2.4 Идемпотентные производители

Прежде чем говорить об идемпотентных производителях, давайте взглянем на еще один важный параметр в производителях:

  • MaxOpenRequests int
    

Этот параметр представляет максимально допустимую одновременную отправку без получения подтверждений.batchколичество.

  • Idempotent bool
    

Для идемпотентных производителей, когда для этого элемента установлено значениеtrueКогда производитель гарантирует, что создаваемые сообщения должны быть упорядочены и точны один раз.

Зачем нужна эта опция?

Если для параметра MaxOpenRequests установлено значение больше 1, это означает, что разрешена отправка нескольких запросов, но ответ не получен. Предполагая, что количество повторных попыток в это время также установлено больше 1, когда два запроса отправляются одновременно, если первый запрос отправляется брокеру, запись брокера завершается ошибкой, но второй запрос записывается успешно. , то клиенту будет повторно отправлен запрос на первое сообщение, что приведет к нарушению порядка в это время.

Другой пример: когда первый запрос возвращает подтверждение, клиент не получает его по сетевым причинам, поэтому клиент повторно отправляет его, что приведет к повторению сообщения.

Следовательно, идемпотентный производитель должен гарантировать, что сообщения, отправляемые брокеру, упорядочены и не повторяются.

Порядок сообщений можно гарантировать, установив для параметра MaxOpenRequests значение 1. В настоящее время каждое сообщение должно получать подтверждение перед отправкой следующего, поэтому оно должно быть в порядке, но нельзя гарантировать, что оно не будет повторяться.

А когда для MaxOpenRequests установлено значение 1, пропускная способность невелика.

Обратите внимание, что при запуске идемпотентного производителя количество повторных попыток должно быть больше 0, а подтверждение должно быть все.

В Java-клиенте разрешены MaxOpenRequests меньше или равные 5.

Но когдаSaramaесть одиночень странное местоЯ не изучал его, давайте взглянем на эту часть кода:

if c.Producer.Idempotent {
		if !c.Version.IsAtLeast(V0_11_0_0) {
			return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
		}
		if c.Producer.Retry.Max == 0 {
			return ConfigurationError("Idempotent producer requires Producer.Retry.Max >= 1")
		}
		if c.Producer.RequiredAcks != WaitForAll {
			return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll")
		}
		if c.Net.MaxOpenRequests > 1 {
			return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests to be 1")
		}
	}

Первый пункт в этой части — номер версии, без проблем, второй и третий пункты —Retryа такжеAcks, нет проблем. Проблема кроется в четвертом пункте, здесьMaxOpenRequestsпараметр, я думаю, он должен быть эквивалентен тому, что в Java-клиентеMAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, согласно конфигурации в Java-клиенте этот параметр должен быть меньше или равен 5, что может гарантировать идемпотентность, но здесь он должен быть установлен в 1.

Проверил проблему Сарамы, разработчик поднял эту проблему, но автор пока не планирует ее решать.

3 broker

В этом разделе я представлю уровень кодаSaramaВесь процесс отправки сообщения производителем. Но поскольку кода много, я опущу часть контента, включая обработку ошибок, повторные попытки и т. д.

это оченьважный, и его нельзя опускать. Однако из-за ограниченного места я могу представить только основноеотправлять сообщениясодержание этого раздела.

Прежде чем я опубликую код, я кратко расскажу об идее этого кода. Позже я добавлю несколько комментариев к коду, чтобы объяснить его более подробно.

Тогда давайте начнем!

producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)

Все начинается с этой строки.

Давайте зайдем и посмотрим.

На самом деле здесь всего две части, с первой поадреса такженастроитьЧтобы построитьclient.

func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
  
  // 构建client
	client, err := NewClient(addrs, conf)
	if err != nil {
		return nil, err
	}
  
  // 构建AsyncProducer
	return newAsyncProducer(client)
}

3.1 Создание клиента

при созданииClientпроцесс, сначала создайтеclientструктура.

Нас не интересуют внутренние параметры, и мы объясним их при использовании.

Затем после создания обновите метаданные и запустите сопрограмму для обновления в фоновом режиме.

func NewClient(addrs []string, conf *Config) (Client, error) {
	...
  // 构建一个client
  client := &client{
		conf:                    conf,
		closer:                  make(chan none),
		closed:                  make(chan none),
		brokers:                 make(map[int32]*Broker),
		metadata:                make(map[string]map[int32]*PartitionMetadata),
		metadataTopics:          make(map[string]none),
		cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
		coordinators:            make(map[string]int32),
	}
  // 把用户输入的broker地址作为“种子broker”增加到seedBrokers中
  // 随后客户端会根据已有的broker地址,自动刷新元数据,以获取更多的broker地址
  // 所以称之为种子
  random := rand.New(rand.NewSource(time.Now().UnixNano()))
	for _, index := range random.Perm(len(addrs)) {
		client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
	}
	...
  // 启动协程在后台刷新元数据
  go withRecover(client.backgroundMetadataUpdater)
  return client, nil
}
  

3.2 Обновление метаданных

Схема обновления метаданных в фоновом режиме на самом деле очень проста.ticker, метаданные обновляются вовремя, покаclientзакрытие.

Здесь мы сначала упомянем, о каких метаданных мы говорим.

Вы можете просто понять, что он включает в себя всеbrokerадрес (потому чтоbroker могут быть добавлены или удалены), и что включеноtopic,ЭтиtopicкакойpartitionЖдать.

func (client *client) backgroundMetadataUpdater() {
  
  // 按照配置的时间更新元数据
  ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency)
	defer ticker.Stop()
  
  // 循环获取channel,判断是执行更新操作还是终止
  for {
		select {
		case <-ticker.C:
			if err := client.refreshMetadata(); err != nil {
				Logger.Println("Client background metadata update:", err)
			}
		case <-client.closer:
			return
		}
	}
}

Затем мы продолжаем видетьclient.refreshMetadata()Этот метод, этот метод заключается в том, чтобы судить о необходимости обновлениякакие темыметаданные илиВсе темыметаданные.

Затем мы продолжаем.

Здесь также не задействована конкретная операция обновления. мы видимtryRefreshMetadataПараметры этого метода могут быть известны, здесь мы устанавливаем тему, которая должна обновить метаданные, количество повторных попыток и период ожидания.

func (client *client) RefreshMetadata(topics ...string) error {
  deadline := time.Time{}
	if client.conf.Metadata.Timeout > 0 {
		deadline = time.Now().Add(client.conf.Metadata.Timeout)
	}
  // 设置参数
	return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
}

И, наконец, пришелtryRefreshMetadataСюда.

В этом методе для создания запроса на получение метаданных выбирается существующий брокер.

После получения ответа, если ошибок нет, эти метаданные используются для обновления клиента.

func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
  ...
  
  broker := client.any()
	for ; broker != nil && !pastDeadline(0); broker = client.any() {
    ...
    		req := &MetadataRequest{
          Topics: topics, 
          // 是否允许创建不存在的主题
          AllowAutoTopicCreation: allowAutoTopicCreation
        }
    response, err := broker.GetMetadata(req)
    switch err.(type) {
		case nil:
			allKnownMetaData := len(topics) == 0
      // 对元数据进行更新
			shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
			if shouldRetry {
				Logger.Println("client/metadata found some partitions to be leaderless")
				return retry(err)
			}
			return err
		case ...
      ...
    }
  }

Затем мы переходим к просмотру, когда клиент получитresponseПосле этого как обновить.

Во-первых, сохраните локальныйbrokerобновить.

Тогда даtopicобновить, и этоtopicнижеpartition.

func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
  ...
  // 假设返回了新的broker id,那么保存这些新的broker,这意味着增加了broker、或者下线的broker重新上线了
  // 如果返回的id我们已经保存了,但是地址变化了,那么更新地址
  // 如果本地保存的一些id没有返回,说明这些broker下线了,那么删除他们
  client.updateBroker(data.Brokers)
  
  // 然后对topic也进行元数据的更新
  // 主要是更新topic以及topic对应的partition
  for _, topic := range data.Topics {
    ...
    // 更新每个topic以及对应的partition
    client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
		for _, partition := range topic.Partitions {
			client.metadata[topic.Name][partition.ID] = partition
			...
		}
  }

На этом обновление наших метаданных закончено.

Давайте поговорим о том, как брокер устанавливает соединение перед обновлением метаданных и как брокер отправляет и получает запрос.

3.3 Установить соединение с Брокером

давай вернемсяtryRefreshMetadataв этом методе.

В этом методе есть строка кода:

broker := client.any()

Давайте зайдем и посмотрим.

В этом методе, еслиseedBrokersЕсли он существует, откройте его, в противном случае откройте другого брокера.

Обратите внимание, что здесь упоминаетсядругие брокеры, который можно получить при обновлении метаданных. Это связано с вышеизложенным.

func (client *client) any() *Broker {
	...
	if len(client.seedBrokers) > 0 {
		_ = client.seedBrokers[0].Open(client.conf)
		return client.seedBrokers[0]
	}

	// 不保证一定是按顺序的
	for _, broker := range client.brokers {
		_ = broker.Open(client.conf)
		return broker
	}

	return nil
}

Тогда давайте посмотримOpenчто делает метод.

Openметод асинхронно создаетtcpсоединение, затем создает буфер размеромMaxOpenRequestsизchannel.

Это называетсяresponsesизchannel, для получения отbrokerсообщение отправлено обратно.

по фактуbroker, используется для отправки и получения сообщенийchannelнастроены на этот размер.

MaxOpenRequestsВы можете понять этот параметр как клиент Javamax.in.flight.requests.per.connection.

Затем запускается другая сопрограмма для приема сообщений.

func (b *Broker) Open(conf *Config) error {
  if conf == nil {
		conf = NewConfig()
	}
  ...
  go withRecover(func() {
    ...
    dialer := conf.getDialer()
		b.conn, b.connErr = dialer.Dial("tcp", b.addr)
    
    ...
    b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
    ...
    go withRecover(b.responseReceiver)
  })

3.4 Получение ответа от Брокера

Давайте посмотримresponseReceiverкак это работает.

В самом деле, легко понять, что когдаbrokerполучилresponseПри , сначала анализируется заголовок сообщения, а затем анализируется содержимое сообщения. и напишите это вresponseизpacketsсередина.

func (b *Broker) responseReceiver() {
  for response := range b.responses {
    
    ...
    // 先根据Header的版本读取对应长度的Header
    var headerLength = getHeaderLength(response.headerVersion)
		header := make([]byte, headerLength)
		bytesReadHeader, err := b.readFull(header)
    decodedHeader := responseHeader{}
		err = versionedDecode(header, &decodedHeader, response.headerVersion)
    
    ...
    // 解析具体的内容
    buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
		bytesReadBody, err := b.readFull(buf)
    
    // 省略了一些错误处理,总之,如果发生了错误,就把错误信息写进 response.errors 中
    response.packets <- buf
  }
}

На самом деле логика кода этой части получения ответа проста для понимания, т.е. когдаresponseэтоchannelКогда есть сообщение, прочитайте его, а затем запишите прочитанное содержимое вresponseсередина.

Тогда у вас может возникнуть вопрос, когда вы поедете вresponseэтоchannelА как насчет отправки сообщения?

Несложно догадаться, что когда мы отправляем сообщениеbroker, следует уведомитьreceiver, готовы получать новости.

В таком случае давайте перейдем к разделу, где мы только что обновили метаданные, и посмотримsaramaкак отправляется сообщение.

3.5 Отправка и получение сообщений

Вернемся к этой строке кода:

response, err := broker.GetMetadata(req)

Мы вошли прямо внутрь и обнаружили, что структура, которая принимает возвращаемую информацию, была построена здесь, а затем вызванаsendAndReceiveметод.

func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
	response := new(MetadataResponse)

	err := b.sendAndReceive(request, response)

	if err != nil {
		return nil, err
	}

	return response, nil
}

Продолжаем вниз.

Здесь мы видим, что первый вызовsendметод, который затем возвращаетpromise. и когда на это пишется сообщениеpromiseкогда будет получен результат.

И вспомним, что мы были вreceiver, чтобы получитьresponseнаписано наpacketsнапишите результат ошибки вerrorsНу тут то же самое, да?

func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
	responseHeaderVersion := int16(-1)
	if res != nil {
		responseHeaderVersion = res.headerVersion()
	}

	promise, err := b.send(req, res != nil, responseHeaderVersion)
	if err != nil {
		return err
	}

	if promise == nil {
		return nil
	}

  // 这里的promise,是上面send方法返回的
	select {
	case buf := <-promise.packets:
		return versionedDecode(buf, res, req.version())
	case err = <-promise.errors:
		return err
	}
}

Имея в виду эту идею, давайте посмотримsendчто делает метод.

Это место очень важно, и я также думаю,Saramaспециально разработанныйумнаяместо.

В методе send отправляемое сообщение отправляется брокеру синхронно через tcp-соединение с брокером.

Затем создайте канал типа responsePromise, а затем поместите структуру прямо в канал. Затем вспомните, что в методе responseReceiver мы продолжаем потреблять полученный ответ.

В это время в responseReceiver, после получения responsePromise, переданного методом send, он прочитает данные через conn, а затем запишет данные в пакеты этого responsePromise, либо запишет информацию об ошибках в errors.

В это время снова посмотрите на метод send, который возвращает указатель на этот responsePromise. Поэтому метод sendAndReceive ожидает записи в канал пакетов или ошибок в этом responsePromise. Когда responseReceiver получает ответ и записывает данные, пакеты или ошибки записываются в сообщение.

func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
  
  ...
  // 将请求的内容封装进 request ,然后发送到Broker中
  // 注意一下这里的 b.write(buf) 
  // 里面做了 b.conn.Write(buf) 这件事情
  req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
	buf, err := encode(req, b.conf.MetricRegistry)
  bytes, err := b.write(buf)
  
  ...
  // 如果我们的response为nil,也就是说当不需要response的时候,是不会放进inflight发送队列的
  if !promiseResponse {
		// Record request latency without the response
		b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
		return nil, nil
	}
  
  // 构建一个接收响应的 channel ,返回这个channel的指针
  // 这个 channel 内部包含了两个 channel,一个用来接收响应,一个用来接收错误 
  promise := responsePromise{requestTime, req.correlationID, responseHeaderVersion, make(chan []byte), make(chan error)}
	b.responses <- promise

  // 这里返回指针特别的关键,是把消息的发送跟消息的接收联系在一起了
	return &promise, nil
}

Давайте используем картинку, чтобы проиллюстрировать описанный выше процесс отправки и получения:

Это немного более запутанно, но такжеSaramaОсновное содержание отправки и получения сообщений, надеюсь, мое объяснение поможет вам понять :)

4 AsyncProcuder

В предыдущем разделе мы проанализировали весь процесс построения клиента, а при построении клиента для обновления метаданных мы также объяснили, как sarama отправляет и получает сообщения.

В этом разделе я собираюсь объяснить, как AsyncProcuder отправляет сообщения.

Из-за предвосхищения предыдущего раздела содержание этого раздела должно быть легче для понимания.

мы начинаем сnewAsyncProducer(client)Эта линия начинается.

Давай сначала поговоримinput:make(chan *ProducerMessage), это про отправку нашего сообщения. обратите внимание на этоchannelне буферизован.

То есть, когда мы отправляем сообщениеinputПосередине отправитель в это время будет заблокирован, а это значит, что последующие операции блокировать нельзя, иначе это повлияет на эффективность отправки сообщения.

Тогда нам не нужны другие поля, и мы упомянем их позже.

func newAsyncProducer(client Client) (AsyncProducer, error) {
  ...
  p := &asyncProducer{
		client:     client,
		conf:       client.Config(),
		errors:     make(chan *ProducerError),
		input:      make(chan *ProducerMessage),
		successes:  make(chan *ProducerMessage),
		retries:    make(chan *ProducerMessage),
		brokers:    make(map[*Broker]*brokerProducer),
		brokerRefs: make(map[*brokerProducer]int),
		txnmgr:     txnmgr,
	}
  
  go withRecover(p.dispatcher)
	go withRecover(p.retryHandler)
}

4.1 dispatcher

Давайте посмотрим на следующий запуск сопрограммыgo withRecover(p.dispatcher).

В этом методе сначала создана карта с темой в качестве ключа, и значение этой карты является невосстанавливаемым каналом.

Здесь мы можем легко предположить, что когда сообщение отправляется через ввод, сообщение отправляется диспетчеру и назначается каждой теме.

Обратите внимание, что в это время канал все еще не буферизован, поэтому мы можем предположить, что следующая операция все еще не блокируется.

func (p *asyncProducer) dispatcher() {
  handlers := make(map[string]chan<- *ProducerMessage)
  ...
  for msg := range p.input {
    ...
    // 拦截器
    for _, interceptor := range p.conf.Producer.Interceptors {
			msg.safelyApplyInterceptor(interceptor)
		}
    
    ...
    // 找到这个Topic对应的Handler
    handler := handlers[msg.Topic]
		if handler == nil {
      // 如果此时还不存在这个Topic对应的Handler,那么创建一个
      // 虽然说他叫Handler,但他其实是一个无缓冲的
			handler = p.newTopicProducer(msg.Topic)
			handlers[msg.Topic] = handler
		}
		// 然后把这条消息写进这个Handler中
		handler <- msg
  }
}

тогда давайтеhandler = p.newTopicProducer(msg.Topic)эта строка кода.

Здесь создается размер буфераChannelBufferSizeКанал, в котором хранятся сообщения, отправленные в эту тему.

Затем создается топик-производитель, в это время вы можете думать, что сообщение было доставлено топику-производителю каждой темы.

func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
	input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
	tp := &topicProducer{
		parent:      p,
		topic:       topic,
		input:       input,
		breaker:     breaker.New(3, 1, 10*time.Second),
		handlers:    make(map[int32]chan<- *ProducerMessage),
		partitioner: p.conf.Producer.Partitioner(topic),
	}
	go withRecover(tp.dispatch)
	return input
}

4.2 topicDispatch

Тогда давайте посмотримgo withRecover(tp.dispatch)эта строка кода.

Он также запускает сопрограмму для обработки сообщения.

То есть на данный момент для каждой темы есть сопрограмма для обработки сообщения.

В этом методе dispatch(), когда сообщение также получено, он находит канал раздела, в котором находится сообщение, и затем записывает в него сообщение.

func (tp *topicProducer) dispatch() {
  for msg := range tp.input {
    ...
    
    // 同样是找到这条消息所在的分区对应的channel,然后把消息丢进去
    handler := tp.handlers[msg.Partition]
		if handler == nil {
			handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
			tp.handlers[msg.Partition] = handler
		}

		handler <- msg
  }
}

4.3 PartitionDispatch

мы входимtp.parent.newPartitionProducer(msg.Topic, msg.Partition)Здесь, чтобы увидеть.

Вы можете обнаружить, что PartitionProducer очень похож на TopicProducer.

Фактически они представляют собой распространение сообщения от производителя к топику и разделу.

Обратите внимание, что размер буфера канала здесь также равен ChannelBufferSize.

func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
	input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
	pp := &partitionProducer{
		parent:    p,
		topic:     topic,
		partition: partition,
		input:     input,

		breaker:    breaker.New(3, 1, 10*time.Second),
		retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
	}
	go withRecover(pp.dispatch)
	return input
}

4.4 partitionProducer

На этом этапе давайте посмотрим, как обрабатывается сообщение, когда оно достигает канала, в котором расположен каждый раздел.

На самом деле на этом шаге в основном выполняется обработка ошибок и тому подобное, а затем передается сообщение в BrokerProducer.

Можно понять, что этот шаг является преобразованием из уровня бизнес-логики в уровень сетевого ввода-вывода.До этого нас интересует только то, в какой раздел отправляется сообщение.После этого нам нужно найти адрес брокера, где раздел находится и использовать ранее установленный адрес TCP-соединения для отправки этого сообщения.

func (pp *partitionProducer) dispatch() {
  
  // 找到这个主题和分区的leader所在的broker
  pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
  // 如果此时找到了这个leader
  if pp.leader != nil {
		pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
		pp.parent.inFlight.Add(1) 
    // 发送一条消息来表示同步
		pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
	}
  ...// 各种异常情况
  
  // 然后把消息丢进brokerProducer中
  pp.brokerProducer.input <- msg
}

4.5 brokerProducer

На данный момент это, вероятно, последний шаг во всем процессе отправки.

Давайте посмотримpp.parent.getBrokerProducer(pp.leader)Содержимое этой строки кода.

На самом деле найтиasyncProducerсерединаbrokerProducer, если он не существует, создайте его.

func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
	p.brokerLock.Lock()
	defer p.brokerLock.Unlock()

	bp := p.brokers[broker]

	if bp == nil {
		bp = p.newBrokerProducer(broker)
		p.brokers[broker] = bp
		p.brokerRefs[bp] = 0
	}

	p.brokerRefs[bp]++

	return bp
}

Тогда давайте посмотримbrokerProducerКак он был создан.

Глядя на вторую сопрограмму, запущенную в этом методе, мы можем предположитьbridgeэтоchannelПосле получения сообщения он упакует полученное сообщение в запрос, а затем вызоветProduceметод.

И запишите адрес указателя возвращаемого результата в ответ.

Затем создайте BrokerProducerResponse и запишите его в ответы.

func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
	var (
		input     = make(chan *ProducerMessage)
		bridge    = make(chan *produceSet)
		responses = make(chan *brokerProducerResponse)
	)

	bp := &brokerProducer{
		parent:         p,
		broker:         broker,
		input:          input,
		output:         bridge,
		responses:      responses,
		stopchan:       make(chan struct{}),
		buffer:         newProduceSet(p),
		currentRetries: make(map[string]map[int32]error),
	}
	go withRecover(bp.run)

	// minimal bridge to make the network response `select`able
	go withRecover(func() {
		for set := range bridge {
			request := set.buildRequest()

			response, err := broker.Produce(request)

			responses <- &brokerProducerResponse{
				set: set,
				err: err,
				res: response,
			}
		}
		close(responses)
	})

	if p.conf.Producer.Retry.Max <= 0 {
		bp.abandoned = make(chan struct{})
	}

	return bp
}

посмотрим еще разbroker.Produce(request)эта строка кода.

Знакомы с ним, мы говорили о нем в клиентском разделе.sendAndReceiveметод.

И мы можем обнаружить, что если мы установим потребность в Acks, он вернет ответ, если он не установлен, то после отправки сообщения оно будет проигнорировано.

На этом этапе, после получения ответа и заполнения содержимого ответа, возвращается содержимое ответа.

func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
	var (
		response *ProduceResponse
		err      error
	)

	if request.RequiredAcks == NoResponse {
		err = b.sendAndReceive(request, nil)
	} else {
		response = new(ProduceResponse)
		err = b.sendAndReceive(request, response)
	}

	if err != nil {
		return nil, err
	}

	return response, nil
}

Слишком далеко,SaramaПредставлен контент, связанный с производителем.

напиши в конце

Прошло много времени с тех пор, как я написал это.

Основная причина в том, что автор слишком занят в этот период, и не в полной мере сбалансировал текущую учебу, работу и жизнь, в результате чего на учебу ежедневно уходит не так много времени, а эффективность невысока.

Кроме того, я не мог найти его в Интернете.SaramaСвязанный анализ — это все вызовы API. Поскольку автор только начал изучать Kafka, чтобы лучше понять каждый параметр производителя, я решил изучить клиент производителя.

Однако, поскольку способность автора читать исходный код действительно ограничена, в этом процессе могут возникнуть некоторые недоразумения. Поэтому, когда вы обнаружите некоторые несоответствия, пожалуйста, не стесняйтесь советовать, спасибо!

Еще раз спасибо за то, что вы здесь!