Коротко о весеннем облачном потоке и кафке

Spring Cloud

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.

Дикий перевод: spring cloud stream - это человек, который намеревается объединить гарем промежуточного программного обеспечения для обмена сообщениями. Он гибкий и имеет за собой резервную пружину. Он может сделать восемнадцать видов оружия (режим подписки на сообщения, группа потребителей, разделы с отслеживанием состояния и т. д.) Donggong Niangniang kafka и Xigong Niangniang rabbitMQ.

Gossip Party: Сегодня давайте посмотрим на связь весеннего облачного потока и кафки, RabbitMQ позволит ей остаться в холодном дворце.

1. Первая дама во дворце: кафка

Apache Kafka® is a distributed streaming platform. What exactly does that mean?

A streaming platform has three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way.
  • Process streams of records as they occur.

Дикий перевод: Lao Niang — это платформа для потоковой обработки, которая может выполнять много работы:

  • Может обрабатывать сообщения публикации/подписки
  • Сохраняйте сообщения стабильно
  • Просто смирись с этим, это очень быстро

Подводя итог в одном предложении, это быстро, стабильно и точно.

Операция kafka очень проста, начиная сздесьЗагрузите, затем сначала запустите zookeeper. Zookeeper также включен в последний пакет загрузки kafka, который можно использовать напрямую. После запуска zookeeper вам необходимо настроить ip и порт zookeeper в конфигурационном файле kafka config/server.properties.

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

Затем запустите команду в каталоге bin, чтобы запустить kafka.

bin/kafka-server-start.sh -daemon config/server.properties

2. личный менеджер кафки, kafka-manager

Хоть кафка и запущена, но нам все равно нужен менеджер, чтобы сообщать о ситуации, если надо в ней разбираться.здесь. Жаль, что есть только скачивание исходников, а работающей версии нет. Вам нужно скомпилировать самому. Эта скорость компиляции довольно низкая. Я предоставлю скомпилированную версию для всех здесь. Нажмитездесь.

kafka-manager также должен настроить отношения с kafka в файле conf/application.conf, но конфигурация не сама kafka, а zookeeper, смонтированный kafka.

kafka-manager.zkhosts="localhost:2181"

Затем запустите bin/kafka-manager (kafka-manager.bat также можно запустить в среде Windows)

Тут есть подвох.Если запускать под windows, то может не запуститься и выдать сообщение, что строка ввода слишком длинная.

Это связано с тем, что каталог слишком длинный, а имя каталога kafak-manager-2.0.0.2 можно сократить для нормальной работы.

После запуска заполните адрес и порт zookeeper для Cluster Zookeeper Host через Add Cluster. Версия Kafka Version должна совпадать с используемой версией kafka, иначе содержимое kafka может быть не видно.

Затем мы можем увидеть брокера, тему, потребителей, разделы и другую информацию о kafka.

3. Император прибывает, весенний облачный поток

Отправная точка всего по-прежнему находится на start.spring.io.

Этот темный интерфейс — то, что весна делает для Хэллоуина. С нами связаны две зависимости справа, которые соответствуют этим двум зависимостям в pom.xml

<dependencies>
        <dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
		</dependency>
        <dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-test-support</artifactId>
			<scope>test</scope>
		</dependency>
</dependencies>
<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>${spring-cloud.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

Но этого недостаточно, если вы запустите его напрямую, вам будет предложено

Caused by: java.lang.IllegalStateException: Unknown binder configuration: kafka

Также необходимо добавить пакет зависимостей

        <dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka</artifactId>
		</dependency>

4. Отправить сообщение, biubiubiu

После того, как структура проекта Spring Cloud Stream настроена, нам нужно разделить ее на две части: одна часть — это часть, которая отправляет сообщения, а другая — место для получения сообщений. Давайте сначала посмотрим на часть отправки сообщения, первая — это файл конфигурации, application.yml

spring:
  cloud:
    stream:
      default-binder: kafka #默认的绑定器,
      kafka: #如果用的是rabbitMQ这里填 rabbit
        binder:
          brokers: #Kafka的消息中间件服务器地址
          - localhost:9092
      bindings:
        output: #通道名称
          binder: kafka
          destination: test1 #消息发往的目的地,对应topic
          group: output-group-1 #对应kafka的group
          content-type: text/plain #消息的格式

Обратите внимание, что вывод здесь означает, что сообщение опубликовано, что соответствует последующему сообщению о подписке. Имя этого вывода — это имя канала сообщений, которое можно настроить, что будет рассмотрено позже.

Затем нам нужно создать издатель

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@EnableBinding(Source.class)
public class Producer {
	private Source mySource;

	public Producer(Source mySource) {
		super();
		this.mySource = mySource;
	}

	public Source getMysource() {
		return mySource;
	}

	public void setMysource(Source mysource) {
		mySource = mySource;
	}
}

@EnableBinding Буквально понимает, что это канал привязки. Имя связанного канала — это вывод выше. Soure.class предоставляется Spring, что указывает на то, что это привязываемый канал публикации, и его имя канала выводится, а вывод — в приложении. yml соответствует

Исходный код хорошо виден

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * Bindable interface with one output channel.
 *
 * @author Dave Syer
 * @author Marius Bogoevici
 * @see org.springframework.cloud.stream.annotation.EnableBinding
 */
public interface Source {

	/**
	 * Name of the output channel.
	 */
	String OUTPUT = "output";

	/**
	 * @return output channel
	 */
	@Output(Source.OUTPUT)
	MessageChannel output();

}

Если нам нужно определить наш собственный канал, мы можем сами написать класс, например следующий: имя канала изменено на my-out

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;  
public interface MySource {
    String INPUT = "my-in";
    String OUTPUT = "my-out";
    @Input(INPUT)
    SubscribableChannel myInput();
    @Output(OUTPUT)
    MessageChannel myOutput();
}

В этом случае application.yml будет изменен

        my-out:
          binder: kafka
          destination: mytest #消息发往的目的地,对应topic
          group: output-group-2 #对应kafka的group
          content-type: text/plain #消息的格式

@EnableBinding класса Product.class также нужно изменить, чтобы соответствовать, я написал еще один MyProducer

import org.springframework.cloud.stream.annotation.EnableBinding;

@EnableBinding(MySource.class)
public class MyProducer {
	private MySource mySource;

	public MyProducer(MySource mySource) {
		super();
		this.mySource = mySource;
	}

	public MySource getMysource() {
		return mySource;
	}

	public void setMysource(MySource mysource) {
		mySource = mySource;
	}
}

Таким образом, часть публикации сообщения написана, давайте напишем контроллер для отправки сообщения

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.wphmoon.kscs.service.ChatMessage;
import com.wphmoon.kscs.service.MyProducer;
import com.wphmoon.kscs.service.Producer;

@RestController
public class MyController {
	@Autowired
	private Producer producer;
	@Autowired
	private MyProducer myProducer;

	


// get the String message via HTTP, publish it to broker using spring cloud stream
	@RequestMapping(value = "/sendMessage/string", method = RequestMethod.POST)
	public String publishMessageString(@RequestBody String payload) {
// send message to channel output
		producer.getMysource().output().send(MessageBuilder.withPayload(payload).setHeader("type", "string").build());
		return "success";
	}
	@RequestMapping(value = "/sendMyMessage/string", method = RequestMethod.POST)
	public String publishMyMessageString(@RequestBody String payload) {
// send message to channel myoutput
		myProducer.getMysource().myOutput().send(MessageBuilder.withPayload(payload).setHeader("type", "string").build());
		return "success";
	}
}

Это очень просто, просто позвоните производителю, чтобы отправить строку, я использую почтальона, чтобы инициировать это действие.

Сообщение отправлено, как мы получим сообщение? Посмотрите вниз.

5. Получай новости, приходи и приходи

Точно так же мы используем предыдущий фреймворк проекта spring cloud stream как часть получения сообщений, первым является файл application.yml

server:
  port: 8081
spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          brokers:
          - localhost:9092
      bindings:
        input:
         binder: kafka
         destination: test1
         content-type: text/plain
         group: input-group-1
        my-in:
         binder: kafka
         destination: mytest
         content-type: text/plain
         group: input-group-2

Основное внимание уделяется вводу и входу, что соответствует предыдущему выходу и выводу один к одному.

Класс Source по умолчанию соответствует классу Sink, который официально предоставляется Код выглядит следующим образом.

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * Bindable interface with one input channel.
 *
 * @author Dave Syer
 * @author Marius Bogoevici
 * @see org.springframework.cloud.stream.annotation.EnableBinding
 */
public interface Sink {

	/**
	 * Input channel name.
	 */
	String INPUT = "input";

	/**
	 * @return input channel.
	 */
	@Input(Sink.INPUT)
	SubscribableChannel input();

}

Вызывающий его класс Consumer используется для получения сообщений, код выглядит следующим образом

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.FormatStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.handler.annotation.Payload;

@EnableBinding(Sink.class)
public class Consumer {
	private static final Logger logger = LoggerFactory.getLogger(Consumer.class);

	@StreamListener(target = Sink.INPUT)
	public void consume(String message) {
		logger.info("recieved a string message : " + message);
	}

	@StreamListener(target = Sink.INPUT, condition = "headers['type']=='chat'")
	public void handle(@Payload ChatMessage message) {
		final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM)
				.withZone(ZoneId.systemDefault());
		final String time = df.format(Instant.ofEpochMilli(message.getTime()));
		logger.info("recieved a complex message : [{}]: {}", time, message.getContents());
	}
}

И код нашего пользовательского класса канала MySink и MyConsumer выглядит следующим образом:

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySink {
	String INPUT = "my-in";
    @Input(INPUT)
    SubscribableChannel myInput();
}
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.FormatStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.handler.annotation.Payload;

@EnableBinding(MySink.class)
public class MyConsumer {
	private static final Logger logger = LoggerFactory.getLogger(MyConsumer.class);

	@StreamListener(target = MySink.INPUT)
	public void consume(String message) {
		logger.info("recieved a string message : " + message);
	}

	@StreamListener(target = MySink.INPUT, condition = "headers['type']=='chat'")
	public void handle(@Payload ChatMessage message) {
		final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM)
				.withZone(ZoneId.systemDefault());
		final String time = df.format(Instant.ofEpochMilli(message.getTime()));
		logger.info("recieved a complex message : [{}]: {}", time, message.getContents());
	}
}

Это нормально Когда мы используем postman для отправки сообщения выше, мы можем увидеть его прямо в журнале здесь.

2019-10-29 18:42:39.455  INFO 13556 --- [container-0-C-1] com.wphmoon.kscsclient.MyConsumer        : recieved a string message : 你瞅啥
2019-10-29 18:43:17.017  INFO 13556 --- [container-0-C-1] com.wphmoon.kscsclient.Consumer          : recieved a string message : 你瞅啥

6. Заходим в kafka-manager и смотрим

Назначение, которое мы определили в application.yml, — это тема kafka, которую можно увидеть в списке тем kafka-manager.

И потребитель, который получает сообщение, также может видеть

Это любовь императора и королевы весеннего облачного ручья и кафки, но как может их политический брак быть таким простым, о сложном мы поговорим позже, так что следите за обновлениями, езжайте обратно во дворец (дикий перевод: The Возвращение короля)

адрес исходного кода