Модель функционального программирования Spring CloudStream

Spring Cloud

Адрес домашней страницы

задний план

После версии SpringCloud 3.1@EnableBinding,@Outputи другие аннотации Streamapi отмечены как устаревшие

/**
 * Indicates that an output binding target will be created by the framework.
 *
 * @author Dave Syer
 * @author Marius Bogoevici
 * @author Artem Bilan
 *
 * @deprecated as of 3.1 in favor of functional programming model
 */

@Qualifier
@Target({ ElementType.FIELD, ElementType.METHOD, ElementType.ANNOTATION_TYPE,
		ElementType.PARAMETER })
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Deprecated
public @interface Output {

	/**
	 * Specify the binding target name; used as a bean name for binding target and as a
	 * destination name by default.
	 * @return the binding target name
	 */
	String value() default "";

}
/**
 * Enables the binding of targets annotated with {@link Input} and {@link Output} to a
 * broker, according to the list of interfaces passed as value to the annotation.
 *
 * @author Dave Syer
 * @author Marius Bogoevici
 * @author David Turanski
 * @author Soby Chacko
 *
 * @deprecated as of 3.1 in favor of functional programming model
 */
@Target({ ElementType.TYPE, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Configuration
@Import({ BindingBeansRegistrar.class, BinderFactoryAutoConfiguration.class })
@EnableIntegration
@Deprecated
public @interface EnableBinding {

	/**
	 * A list of interfaces having methods annotated with {@link Input} and/or
	 * {@link Output} to indicate binding targets.
	 * @return list of interfaces
	 */
	Class<?>[] value() default {};

}

Статьи по Теме

Producing and Consuming Messages

@EnableBinding @deprecated as of 3.1 in favor of functional programming model

The type EnableBinding is deprecated

Поэтому всем API, использующим императивное программирование, официально рекомендуется использовать более простые API функционального программирования при обновлении до новой версии.

Полный пример кода

FJiayang/spring-cloud-stream-rabbit-example

Способ обновления

императивная модель программирования

В качестве примера возьмем простое сообщение с несколькими темами, оригинальный метод программирования выглядит следующим образом.

режиссер

@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

/**
 * @author F嘉阳
 * @date 2018-10-08 17:57
 */
@RestController
@EnableBinding(MySource.class)
public class Producer {

    @Autowired
    private MySource channel;

    @RequestMapping("/send")
    public String send() {
        channel.output().send(MessageBuilder.withPayload(new Date()).build());
        return "success";
    }
}

/**
 * @author F嘉阳
 * @date 2018-10-08 18:01
 */
public interface MySource {
    String OUTPUT = "output";

    @Output(MySource.OUTPUT)
    MessageChannel output();
}

конфигурационный файл

spring:
  rabbitmq:
    host: 192.168.163.128
    username: cms
    password: cms-mq-admin
  cloud:
    stream:
      bindings:
        output:
          destination: my-test-channel
server:
  port: 8082

потребитель

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

/**
 * @author F嘉阳
 * @date 2018-10-08 18:10
 */
@EnableBinding(MySink.class)
public class Consumer {
    @StreamListener(MySink.INPUT)
    public void receive(Message<String> message) {
        System.out.println("接收到MQ消息:" + message.getPayload());
    }
}

/**
 * @author F嘉阳
 * @date 2018-10-08 18:07
 */
public interface MySink {
    String INPUT = "input";

    @Input(MySink.INPUT)
    SubscribableChannel input();
}

конфигурационный файл

spring:
  rabbitmq:
    host: 192.168.163.128
    username: cms
    password: cms-mq-admin
  cloud:
    stream:
      bindings:
        input:
          destination: my-test-channel
server:
  port: 8081

модель функционального программирования

Чтобы перейти с императивного на функциональное программирование, сначала необходимо обновить версию SpringCloud до версии 3.1 или выше, а затем выполнить обновление до последней версии 2020.0.1 здесь.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>top.fjy8018</groupId>
    <artifactId>cloud-stream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>pom</packaging>

    <name>cloud-stream</name>
    <description>Demo project for Spring Boot</description>

    <modules>
        <module>producer</module>
        <module>consumer</module>
    </modules>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <spring-cloud.version>2020.0.1</spring-cloud.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>

В настоящее время в Китае относительно мало четких руководств по обновлению, в настоящее время мы можем ссылаться только на официальные документы и материалы.

Programming Model/

Модель 1 — очередь сообщений, управляемая системным временем

Описание официального документа

Suppliers (Sources)

Function and Consumer are pretty straightforward when it comes to how their invocation is triggered. They are triggered based on data (events) sent to the destination they are bound to. In other words, they are classic event-driven components.

However, Supplier is in its own category when it comes to triggering. Since it is, by definition, the source (the origin) of the data, it does not subscribe to any in-bound destination and, therefore, has to be triggered by some other mechanism(s). There is also a question of Supplier implementation, which could be imperative or reactive and which directly relates to the triggering of such suppliers.

Consider the following sample:

@SpringBootApplication
public static class SupplierConfiguration {

	@Bean
	public Supplier<String> stringSupplier() {
		return () -> "Hello from Supplier";
	}
}

The preceding Supplier bean produces a string whenever its get() method is invoked. However, who invokes this method and how often? The framework provides a default polling mechanism (answering the question of "Who?") that will trigger the invocation of the supplier and by default it will do so every second (answering the question of "How often?"). In other words, the above configuration produces a single message every second and each message is sent to an output destination that is exposed by the binder.

Общая идея заключается в том, что если используется модель функционального программирования по умолчанию, сообщение будет отправляться потребителю каждую секунду, а параметры синхронизации отправки могут быть настроены.

После апгрейда по документу выглядит следующим образом

режиссер
@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

    @Bean
    public Supplier<Date> source1() {
        return () -> new Date();
    }
}

Функциональное программирование не требует определения очереди прослушивания в коде, ее нужно только прописать в конфигурационном файле в соответствии с соглашением.

Описание официального документа

Functional binding names

В отличие от явного именования, требуемого поддержкой на основе аннотаций (унаследованной версии), которая использовалась в предыдущих версиях spring-cloud-stream, модель функционального программирования по умолчанию использует простое соглашение, когда речь идет об именах привязок, что значительно упрощает настройку приложения. первый пример:

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase() {
	    return value -> value.toUpperCase();
	}
}

In the preceding example we have an application with a single function which acts as message handler. As a Function it has an input and output. The naming convention used to name input and output bindings is as follows:

  • input - <functionName> + -in- + <index>
  • output - <functionName> + -out- + <index>

The in and out corresponds to the type of binding (such as input or output). The index- это индекс привязки ввода или вывода. Он всегда равен 0 для типичной функции с одним вводом/выводом, поэтому он актуален только дляFunctions with multiple input and output arguments.

So if for example you would want to map the input of this function to a remote destination (e.g., topic, queue etc) called "my-topic" you would do so with the following property:

--spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic

Note how uppercase-in-0 is used as a segment in property name. The same goes for uppercase-out-0.

конфигурационный файл

spring:
  rabbitmq:
    host: 192.168.163.128
    username: cms
    password: cms-mq-admin

  cloud:
    stream:
      bindings:
        source1-out-0:
          destination: test1
    function:
      definition: source1

server:
  port: 8083

Видно, что общая конфигурация сильно упрощена, в основном для规约大于配置определяется в порядкеfunctionРасположение конфигурации находится вstreamнастроенный пир

потребитель
@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Bean
    public Consumer<Date> sink1() {
        return System.out::println;
    }
}

Файл конфигурации тот же

spring:
  rabbitmq:
    host: 192.168.163.128
    username: cms
    password: cms-mq-admin

  cloud:
    stream:
      bindings:
        sink1-in-0:
          destination: test1
    function:
      definition: sink1


server:
  port: 8081
результат операции

Производитель отправляет сообщение каждую секунду

1613880363469

Модель 2 -StreamBridge

Однако в реальном производстве это больше связано с бизнес-сценариями, поэтому этот режим нельзя использовать для настройки.StreamBridgeмодель

Знакомство с официальными документами

Sending arbitrary data to an output (e.g. Foreign event-driven sources)

There are cases where the actual source of data may be coming from the external (foreign) system that is not a binder. For example, the source of the data may be a classic REST endpoint. How do we bridge such source with the functional mechanism used by spring-cloud-stream?

Spring Cloud Stream предоставляет два механизма, поэтому давайте рассмотрим их более подробно.

Здесь для обоих примеров мы будем использовать стандартный метод конечной точки MVC, называемыйdelegateToSupplier bound to the root web context, delegating incoming requests to stream via two different mechanisms - imperative (via StreamBridge) and reactive (via EmitterProcessor).

Суть в том, что фактический источник данных может управляться внешними событиями, например, через интерфейс Rest.

Официальный образец

Using StreamBridge
@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.source=toStream");
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("toStream-out-0", body);
	}
}

Here we autowire a StreamBridge bean which allows us to send data to an output binding effectively bridging non-stream application with spring-cloud-stream. Note that preceding example does not have any source functions defined (e.g., Supplier bean) leaving the framework with no trigger to create source bindings, which would be typical for cases where configuration contains function beans. So to trigger the creation of source binding we use spring.cloud.stream.source property where you can declare the name of your sources. The provided name will be used as a trigger to create a source binding. So in the preceding example the name of the output binding will be toStream-out-0 which is consistent with the binding naming convention used by functions (see Binding and Binding names). You can use ; to signify multiple sources (e.g., --spring.cloud.stream.source=foo;bar)

Обратитесь к официальной документации после преобразования

режиссер

Здесь напрямую публикуется режим потребления нескольких тем.В реальном производстве большинство из них также подписываются на несколько тем.

@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

/**
 * @author F嘉阳
 * @date 2018-10-08 17:57
 */
@RestController
public class Producer {

    @Autowired
    private StreamBridge streamBridge;

    @RequestMapping("/send1")
    public String send1() {
        streamBridge.send("source1-out-0", new Date());
        return "success1";
    }

    @RequestMapping("/send2")
    public String send2() {
        streamBridge.send("source2-out-0", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        return "success2";
    }

}

конфигурационный файл

spring:
  rabbitmq:
    host: 192.168.133.128
    username: dev-user
    password: devpassword

  cloud:
    stream:
      bindings:
        source1-out-0:
          destination: test2
        source2-out-0:
          destination: test3
    function:
      definition: source1;source2


server:
  port: 8083
потребитель
@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Bean
    public Consumer<Date> sink1() {
        return System.out::println;
    }

    @Bean
    public Consumer<String> sink2() {
        return System.out::println;
    }
}

конфигурационный файл

spring:
  rabbitmq:
    host: 192.168.133.128
    username: dev-user
    password: devpassword

  cloud:
    stream:
      bindings:
        sink1-in-0:
          destination: test2
        sink2-in-0:
          destination: test3
    function:
      definition: sink1;sink2

server:
  port: 8081
результат операции

1613881049845

вызывать/send1

1613881012552

вызывать/send2

1613881021809

Видимые сообщения маршрутизируются правильно

Суммировать

В целом функциональное программирование более лаконично, чем императивное, и в сочетании с тем, что протокол Spring Cloud больше, чем конфигурация, это позволяет значительно сократить объем кода конфигурации, что является тенденцией будущего развития.