Spring boot объединяет Kafka+Storm

Spring Boot задняя часть Kafka модульный тест

Пожалуйста, указывайте автора и источник при перепечатке

предисловие

Из-за бизнес-требований необходимо интегрировать Strom и kafka в проект весенней загрузки и реализовать другие сервисы для вывода журналов в темы подписки kafka.Storm обрабатывает тему в режиме реального времени для завершения мониторинга данных и другой статистики данных, но есть Есть несколько онлайн-уроков.Сегодня я хочу написать, как интегрировать storm + kafka в spring boot, кстати, позвольте мне рассказать о ямах, с которыми я столкнулся.

Используйте инструменты и конфигурацию среды

​ 1. версия java jdk-1.8

​ 2. Инструмент компиляции использует IDEA-2017

​ 3. maven как управление проектами

​ 4.пружинный ботинок-1.5.8.ВЫПУСК

Отражение спроса

1. Зачем вам нужно интегрироваться в spring boot

Чтобы использовать весеннюю загрузку для унифицированного управления различными микросервисами и одновременно избегать нескольких разрозненных конфигураций.

2. Конкретные идеи и причины интеграции

Используйте spring boot для унифицированного управления bean-компонентами, требуемыми kafka, storm, redis и т. д., собирайте журналы из других сервисов в Kafka, отправляйте журналы в storm в режиме реального времени и выполняйте соответствующие операции обработки в strom Bolt.

возникшие проблемы

​ 1. Нет связанной интеграции storm с использованием весенней загрузки.

​ 2. Я не знаю, как вызвать отправку Topolgy в режиме запуска spring boot

​ 3. Проблема numbis not client localhost при отправке топологии

​ 4. Экземпляры bean-компонентов не могут быть получены через аннотации в Storm Bolt для соответствующих операций.

Решения

Перед интеграцией нам необходимо знать способ запуска и конфигурацию соответствующей загрузки spring (если вы читаете эту статью, то по умолчанию вы уже знаете и используете storm, kafka и spring boot)

  1. В Интернете есть несколько примеров интеграции storm с Spring Boot, но, поскольку есть соответствующие требования, нам все равно нужно интегрировать.

    Сначала импортируйте необходимый пакет jar:

    <dependency>
    			<groupId>org.apache.kafka</groupId>
    			<artifactId>kafka-clients</artifactId>
    			<version>0.10.1.1</version>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-stream-kafka</artifactId>
    			<exclusions>
    				<exclusion>
    					<artifactId>zookeeper</artifactId>
    					<groupId>org.apache.zookeeper</groupId>
    				</exclusion>
    				<exclusion>
    					<artifactId>spring-boot-actuator</artifactId>
    					<groupId>org.springframework.boot</groupId>
    				</exclusion>
    				<exclusion>
    					<artifactId>kafka-clients</artifactId>
    					<groupId>org.apache.kafka</groupId>
    				</exclusion>
    			</exclusions>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.kafka</groupId>
    			<artifactId>spring-kafka</artifactId>
    			<exclusions>
    				<exclusion>
    					<artifactId>kafka-clients</artifactId>
    					<groupId>org.apache.kafka</groupId>
    				</exclusion>
    			</exclusions>
    		</dependency>
    
    
     	<dependency>
     		<groupId>org.springframework.data</groupId>
     		<artifactId>spring-data-hadoop</artifactId>
     		<version>2.5.0.RELEASE</version>
     		<exclusions>
     			<exclusion>
     				<groupId>org.slf4j</groupId>
     				<artifactId>slf4j-log4j12</artifactId>
     			</exclusion>
     			<exclusion>
     				<artifactId>commons-logging</artifactId>
     				<groupId>commons-logging</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>netty</artifactId>
     				<groupId>io.netty</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>jackson-core-asl</artifactId>
     				<groupId>org.codehaus.jackson</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>curator-client</artifactId>
     				<groupId>org.apache.curator</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>jettison</artifactId>
     				<groupId>org.codehaus.jettison</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>jackson-mapper-asl</artifactId>
     				<groupId>org.codehaus.jackson</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>jackson-jaxrs</artifactId>
     				<groupId>org.codehaus.jackson</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>snappy-java</artifactId>
     				<groupId>org.xerial.snappy</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>jackson-xc</artifactId>
     				<groupId>org.codehaus.jackson</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>guava</artifactId>
     				<groupId>com.google.guava</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>hadoop-mapreduce-client-core</artifactId>
     				<groupId>org.apache.hadoop</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>zookeeper</artifactId>
     				<groupId>org.apache.zookeeper</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>servlet-api</artifactId>
     				<groupId>javax.servlet</groupId>
     			</exclusion>
    
     		</exclusions>
     	</dependency>
     	<dependency>
     		<groupId>org.apache.zookeeper</groupId>
     		<artifactId>zookeeper</artifactId>
     		<version>3.4.10</version>
     		<exclusions>
     			<exclusion>
     				<artifactId>slf4j-log4j12</artifactId>
     				<groupId>org.slf4j</groupId>
     			</exclusion>
     		</exclusions>
     	</dependency>
     	<dependency>
     		<groupId>org.apache.hbase</groupId>
     		<artifactId>hbase-client</artifactId>
     		<version>1.2.4</version>
     		<exclusions>
     			<exclusion>
     				<artifactId>log4j</artifactId>
     				<groupId>log4j</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>zookeeper</artifactId>
     				<groupId>org.apache.zookeeper</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>netty</artifactId>
     				<groupId>io.netty</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>hadoop-common</artifactId>
     				<groupId>org.apache.hadoop</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>guava</artifactId>
     				<groupId>com.google.guava</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>hadoop-annotations</artifactId>
     				<groupId>org.apache.hadoop</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>hadoop-yarn-common</artifactId>
     				<groupId>org.apache.hadoop</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>slf4j-log4j12</artifactId>
     				<groupId>org.slf4j</groupId>
     			</exclusion>
     		</exclusions>
     	</dependency>
     	<dependency>
     		<groupId>org.apache.hadoop</groupId>
     		<artifactId>hadoop-common</artifactId>
     		<version>2.7.3</version>
     		<exclusions>
     			<exclusion>
     				<artifactId>commons-logging</artifactId>
     				<groupId>commons-logging</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>curator-client</artifactId>
     				<groupId>org.apache.curator</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>jackson-mapper-asl</artifactId>
     				<groupId>org.codehaus.jackson</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>jackson-core-asl</artifactId>
     				<groupId>org.codehaus.jackson</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>log4j</artifactId>
     				<groupId>log4j</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>snappy-java</artifactId>
     				<groupId>org.xerial.snappy</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>zookeeper</artifactId>
     				<groupId>org.apache.zookeeper</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>guava</artifactId>
     				<groupId>com.google.guava</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>hadoop-auth</artifactId>
     				<groupId>org.apache.hadoop</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>commons-lang</artifactId>
     				<groupId>commons-lang</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>slf4j-log4j12</artifactId>
     				<groupId>org.slf4j</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>servlet-api</artifactId>
     				<groupId>javax.servlet</groupId>
     			</exclusion>
     		</exclusions>
     	</dependency>
     	<dependency>
     		<groupId>org.apache.hadoop</groupId>
     		<artifactId>hadoop-mapreduce-examples</artifactId>
     		<version>2.7.3</version>
     		<exclusions>
     			<exclusion>
     				<artifactId>commons-logging</artifactId>
     				<groupId>commons-logging</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>netty</artifactId>
     				<groupId>io.netty</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>guava</artifactId>
     				<groupId>com.google.guava</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>log4j</artifactId>
     				<groupId>log4j</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>servlet-api</artifactId>
     				<groupId>javax.servlet</groupId>
     			</exclusion>
     		</exclusions>
     	</dependency>
    
     	<!--storm-->
     	<dependency>
     		<groupId>org.apache.storm</groupId>
     		<artifactId>storm-core</artifactId>
     		<version>${storm.version}</version>
     		<scope>${provided.scope}</scope>
     		<exclusions>
     			<exclusion>
     				<groupId>org.apache.logging.log4j</groupId>
     				<artifactId>log4j-slf4j-impl</artifactId>
     			</exclusion>
     			<exclusion>
     				<artifactId>servlet-api</artifactId>
     				<groupId>javax.servlet</groupId>
     			</exclusion>
     		</exclusions>
     	</dependency>
    
     	<dependency>
     		<groupId>org.apache.storm</groupId>
     		<artifactId>storm-kafka</artifactId>
     		<version>1.1.1</version>
     		<exclusions>
     			<exclusion>
     				<artifactId>kafka-clients</artifactId>
     				<groupId>org.apache.kafka</groupId>
     			</exclusion>
     		</exclusions>
     	</dependency>
    
其中去除jar包是因为需要相与项目构建依赖有多重依赖问题,storm版本为1.1.0  spring boot相关依赖为




```java
<!-- spring boot -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>${mybatis-spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

ps: пакет jar maven предназначен только для нужд проекта, а не для самых упорядоченных, только для справки.

Структура проекта:

config — хранить различные файлы конфигурации среды

java-configХранить и создавать классы реализации, связанные с весенней загрузкой Другие, такие как имя сборки

Когда мы начнем весеннюю загрузку, мы найдем

  1. На самом деле, до начала интеграции я меньше знал о Storm, и я не имел с ним контакта в начале, позже я обнаружил, что нет соответствующего способа запуска функции отправки Topolgy после интеграции в spring boot и запускаем весеннюю загрузку, так это тоже наводило на мысли о запуске весны.После загрузки все закончилось, но подождав полчаса, ничего не произошло....-. - Выяснил только, что функция отправки триггера не реализована.

Мои мысли по решению этой проблемы: Запустите spring boot-> создать тему для прослушивания kafka, а затем запустите Topolgy, чтобы завершить запуск, но такая проблема, как kafka, прослушивающая эту тему, будет многократно запускать Topolgy, что явно не то, что нам нужно Метод времени, это спаситель для меня. , Итак, теперь идея срабатывания Тополги становится:

Запустить весеннюю загрузку -> выполнить метод триггера -> выполнить соответствующее условие триггера

Метод сборки такой:

/**
 * @author Leezer
 * @date  2017/12/28
 * spring加载完后自动自动提交Topology
 **/
@Configuration
@Component
public class AutoLoad implements ApplicationListener<ContextRefreshedEvent> {

    private static String BROKERZKSTR;
    private static String TOPIC;
    private static String HOST;
    private static String PORT;
    public AutoLoad(@Value("${storm.brokerZkstr}") String brokerZkstr,
                    @Value("${zookeeper.host}") String host,
                    @Value("${zookeeper.port}") String port,
                    @Value("${kafka.default-topic}") String topic
    ){
        BROKERZKSTR = brokerZkstr;
        HOST= host;
        TOPIC= topic;
        PORT= port;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        try {
            //实例化topologyBuilder类。
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            //设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
            BrokerHosts brokerHosts = new ZkHosts(BROKERZKSTR);
            // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
            SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, TOPIC, "/storm", "s32");
            spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
            spoutConfig.zkServers = Collections.singletonList(HOST);
            spoutConfig.zkPort = Integer.parseInt(PORT);
            //从Kafka最新输出日志读取
            spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
            KafkaSpout receiver = new KafkaSpout(spoutConfig);
            topologyBuilder.setSpout("kafka-spout", receiver, 1).setNumTasks(2);
            topologyBuilder.setBolt("alarm-bolt", new AlarmBolt(), 1).setNumTasks(2).shuffleGrouping("kafka-spout");
            Config config = new Config();
            config.setDebug(false);
            /*设置该topology在storm集群中要抢占的资源slot数,一个slot对应这supervisor节点上的以个worker进程,如果你分配的spot数超过了你的物理节点所拥有的worker数目的话,有可能提交不成功,加入你的集群上面已经有了一些topology而现在还剩下2个worker资源,如果你在代码里分配4个给你的topology的话,那么这个topology可以提交但是提交以后你会发现并没有运行。 而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。
            */
            config.setNumWorkers(1);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Примечание:

  1. При запуске проекта, поскольку для запуска используется встроенный кот, может появиться следующая ошибка
[Tomcat-startStop-1] ERROR o.a.c.c.ContainerBase - A child container failed during start
java.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Tomcat].StandardHost[localhost].TomcatEmbeddedContext[]]
	at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_144]
	at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_144]
	at org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:939) [tomcat-embed-core-8.5.23.jar:8.5.23]
	at org.apache.catalina.core.StandardHost.startInternal(StandardHost.java:872) [tomcat-embed-core-8.5.23.jar:8.5.23]
	at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) [tomcat-embed-core-8.5.23.jar:8.5.23]
	at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1419) [tomcat-embed-core-8.5.23.jar:8.5.23]
	at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1409) [tomcat-embed-core-8.5.23.jar:8.5.23]
	at java.util.concurrent.FutureTask.run?$capture(FutureTask.java:266) [?:1.8.0_144]
	at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_144]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]

Это связано с тем, что существует соответствующий импортированный пакет jar, который вводит версию servlet-api ниже, чем встроенная версия, все, что нам нужно сделать, это открыть зависимость maven и удалить ее.

<exclusion>
   <artifactId>servlet-api</artifactId>
   <groupId>javax.servlet</groupId>
</exclusion>

Затем перезапустите его.

  1. В процессе запуска он также может сообщать:

    org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:90
    

    Я долго думал над этим вопросом, и обнаружил, что объяснения в Интернете все неверны из-за проблемы с конфигурацией storm, но мой storm развернут на сервере, связанной конфигурации нет, и он должен идти на сервер прочитать соответствующую конфигурацию, но результат не очень. Наконец, я попробовал несколько методов и обнаружил, что они неверны.Я обнаружил здесь, что при построении кластера storm предоставил соответствующий локальный кластер.

    LocalCluster cluster = new LocalCluster();
    

    Для локального тестирования, если вы тестируете локально, используйте его для тестирования развертывания.Если вы развертываете на сервере, вам нужно поставить:

    cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
    //修正为:
    StormSubmitter.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
    

    отправлять задачи;

    Вышеупомянутое решает проблемы 1-3, как указано выше.

    Вопрос 4: В болте используется соответствующий экземпляр компонента Я обнаружил, что не могу получить экземпляр после добавления его в Spring с помощью @Component: Я предполагаю, что когда мы создадим и отправим Topolgy, он будет в:

    topologyBuilder.setBolt("alarm-bolt",new AlarmBolt(),1).setNumTasks(2).shuffleGrouping("kafka-spout");
    

    Выполнить болт, связанный:

     @Override
        public void prepare(Map stormConf, TopologyContext context,
                            OutputCollector collector) {
            this.collector = collector;
            StormLauncher stormLauncher = StormLauncher.getStormLauncher();
            dataRepositorys =(AlarmDataRepositorys)  		       stormLauncher.getBean("alarmdataRepositorys");
        }
    

    И болт не инстанцируется, в результате разные резьбы и пружина не достанется.(я тут не особо разбираюсь, если кто знает, можете поделиться волной)

    Смысл использования spring boot в том, чтобы получить эти сложные объекты.Эта проблема беспокоила меня давно.Я наконец подумал,что мы можем получить экземпляр через контекст getbean.Я не знаю, возможно ли это,и тогда я начал определять:

    Например, мне нужно использовать сервис в болте:

    /**
     * @author Leezer
     * @date 2017/12/27
     * 存储操作失败时间
     **/
    @Service("alarmdataRepositorys")
    public class AlarmDataRepositorys extends RedisBase implements IAlarmDataRepositorys {
        private static final String ERRO = "erro";
    
        /**
         * @param type 类型
         * @param key key值
         * @return 错误次数
         **/
        @Override
        public String getErrNumFromRedis(String type,String key) {
            if(type==null || key == null){
                return null;
            }else {
                ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue();
                return valueOper.get(String.format("%s:%s:%s",ERRO,type,key));
            }
    
        }
    
    
        /**
         * @param type 错误类型
         * @param key key值
         * @param value 存储值
         **/
        @Override
        public void setErrNumToRedis(String type, String key,String value) {
            try {
                ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue();
                valueOper.set(String.format("%s:%s:%s", ERRO,type, key), value, Dictionaries.ApiKeyDayOfLifeCycle, TimeUnit.SECONDS);
            }catch (Exception e){
                logger.info(Dictionaries.REDIS_ERROR_PREFIX+String.format("key为%s存入redis失败",key));
            }
        }
    
    
    

    Здесь я указываю имя bean-компонента, а затем, когда болт выполняется, prepare: используйте метод getbean для получения соответствующего bean-компонента для завершения соответствующей операции.

Затем тема подписки kafka отправляется на мой болт для соответствующей обработки.Метод getbean здесь определяется в функции запуска bootmain:

@SpringBootApplication
@EnableTransactionManagement
@ComponentScan({"service","storm"})
@EnableMongoRepositories(basePackages = {"storm"})
@PropertySource(value = {"classpath:service.properties", "classpath:application.properties","classpath:storm.properties"})
@ImportResource(locations = {
		"classpath:/configs/spring-hadoop.xml",
		"classpath:/configs/spring-hbase.xml"})
public class StormLauncher extends SpringBootServletInitializer {

	//设置 安全线程launcher实例
	private volatile static StormLauncher stormLauncher;
	//设置上下文
	private ApplicationContext context;

	public static void main(String[] args) {
     
		SpringApplicationBuilder application = new SpringApplicationBuilder(StormLauncher.class);
     // application.web(false).run(args);该方式是spring boot不以web形式启动
		application.run(args);
		StormLauncher s = new StormLauncher();
		s.setApplicationContext(application.context());
		setStormLauncher(s);
	}

	private static void setStormLauncher(StormLauncher stormLauncher) {
		StormLauncher.stormLauncher = stormLauncher;
	}
	public static StormLauncher getStormLauncher() {
		return stormLauncher;
	}

	@Override
	protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
		return application.sources(StormLauncher.class);
	}


	/**
	 * 获取上下文
	 *
	 * @return the application context
	 */
	public ApplicationContext getApplicationContext() {
		return context;
	}

	/**
	 * 设置上下文.
	 *
	 * @param appContext 上下文
	 */
	private void setApplicationContext(ApplicationContext appContext) {
		this.context = appContext;
	}

	/**
	 * 通过自定义name获取 实例 Bean.
	 *
	 * @param name the name
	 * @return the bean
	 */
	public Object getBean(String name) {
		return context.getBean(name);
	}

	/**
	 * 通过class获取Bean.
	 *
	 * @param <T>   the type parameter
	 * @param clazz the clazz
	 * @return the bean
	 */
	public <T> T getBean(Class<T> clazz) {
		return context.getBean(clazz);
	}

	/**
	 * 通过name,以及Clazz返回指定的Bean
	 *
	 * @param <T>   the type parameter
	 * @param name  the name
	 * @param clazz the clazz
	 * @return the bean
	 */
	public <T> T getBean(String name, Class<T> clazz) {
		return context.getBean(name, clazz);
	}

На этом интеграция storm и kafka в spring boot закончилась, я выложу соответствующие kafka и другие конфигурации на github

Кстати, для kafkaclient тоже есть пит:

Async loop died! java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.

Проект сообщит о проблеме с клиентом kafka. Это связано с тем, что в storm-kafka kafka использует версию 0.8, а NetworkSend — версию выше 0.9. Интеграция здесь должна соответствовать соответствующей версии kafka, которую вы интегрируете.

Хоть интеграция и относительно простая, ссылок относительно мало.Кроме того, я только начал связываться со Штормом, поэтому думаю много, и тоже запишу сюда.

адрес проекта -github

использованная литература:springboot-storm-integration