SpringBoot объединяет Kafka и Storm

база данных GitHub Spring Kafka

предисловие

В этой статье в основном рассказывается об интеграции kafka и storm в SpringBoot, а также о некоторых проблемах и решениях, возникающих в этом процессе.

Знание кафки и бури

если ты правkafkaиstormЕсли вы знакомы с ним, вы можете сразу пропустить этот раздел! Если вы не знакомы с этим, вы также можете проверить мой предыдущий блог. Некоторые связанные блоги приведены ниже.

Окружающая среда установки kafka и storm

Адрес: http://www.panchengming.com/2018/01/26/pancm70/

Похожие варианты использования слова кафка

Адрес: http://www.panchengming.com/2018/01/28/pancm71/ http://www.panchengming.com/2018/02/08/pancm72/

Похожие варианты использования слова storm

Адрес: http://www.panchengming.com/2018/03/16/pancm75/

SpringBoot объединяет kafka и storm

Зачем использовать SpringBoot для интеграции kafka и storm

В общем, интеграция Storm с kafka может справиться с большинством задач. Но с точки зрения масштабируемости это может быть не очень хорошо. Текущая основная среда микросервисов SpringCloud основана на SpringBoot, поэтому использование SpringBoot для интеграции kafka и storm может быть настроено единообразно, а масштабируемость будет лучше.

Что делать с помощью SpringBoot для интеграции kafka и storm

Вообще говоря, интеграция kafka и storm, использование kafka для передачи данных, а затем использование storm для обработки данных в kafka в режиме реального времени.

Здесь мы делаем это после добавления SpringBoot, но SpringBoot управляет kafka и storm одинаково.

Если это все еще сложно понять, вы можете понять это с помощью следующего простого бизнес-сценария:

В базе большое количество пользовательских данных, и многие из этих пользовательских данных являются ненужными, то есть грязными данными.Нам нужно очистить эти пользовательские данные, а затем повторно сохранить их в базе данных, но требуют реального времени низкая задержка и простота управления.

Так что здесь мы можем использовать SpringBoot+kafka+storm для соответствующей разработки.

подготовка к разработке

Перед разработкой кода нам нужно четко понимать, что разрабатывать. В приведенном выше бизнес-сценарии требуется большой объем данных, но мы здесь просто разрабатываем его, то есть пишем простую демку, в которой можно легко реализовать эти функции, поэтому нам нужно только выполнить следующие условия:

  1. Предоставляет интерфейс для записи пользовательских данных в kafka;
  2. Используйте штормовой носик, чтобы получить данные кафки и отправить их на болт;
  3. Удалить в болт данные пользователей моложе 10 лет и записать в mysql;

Затем согласно вышеуказанным требованиям интегрируем SpringBoot, kafka и storm. Во-первых, требуется соответствующий пакет jar, поэтому зависимости maven следующие:


	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<java.version>1.8</java.version>
		<springboot.version>1.5.9.RELEASE</springboot.version>
		<mybatis-spring-boot>1.2.0</mybatis-spring-boot>
		<mysql-connector>5.1.44</mysql-connector>
		<slf4j.version>1.7.25</slf4j.version>
		<logback.version>1.2.3</logback.version>
		<kafka.version>1.0.0</kafka.version>
		<storm.version>1.2.1</storm.version>
		<fastjson.version>1.2.41</fastjson.version>
		<druid>1.1.8</druid>
	</properties>


	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
			<version>${springboot.version}</version>
		</dependency>

		<!-- Spring Boot Mybatis 依赖 -->
		<dependency>
			<groupId>org.mybatis.spring.boot</groupId>
			<artifactId>mybatis-spring-boot-starter</artifactId>
			<version>${mybatis-spring-boot}</version>
		</dependency>

		<!-- MySQL 连接驱动依赖 -->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>${mysql-connector}</version>
		</dependency>


		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>${slf4j.version}</version>
		</dependency>


		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>${logback.version}</version>
		</dependency>

		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>${logback.version}</version>
		</dependency>


		<!-- kafka -->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.12</artifactId>
			<version>${kafka.version}</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.zookeeper</groupId>
					<artifactId>zookeeper</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
				<exclusion>
					<groupId>log4j</groupId>
					<artifactId>log4j</artifactId>
				</exclusion>
			</exclusions>
			<scope>provided</scope>
		</dependency>


		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>${kafka.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
			<version>${kafka.version}</version>
		</dependency>


		<!--storm相关jar -->
		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-core</artifactId>
			<version>${storm.version}</version>
			<!--排除相关依赖 -->
			<exclusions>
				<exclusion>
					<groupId>org.apache.logging.log4j</groupId>
					<artifactId>log4j-slf4j-impl</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.apache.logging.log4j</groupId>
					<artifactId>log4j-1.2-api</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.apache.logging.log4j</groupId>
					<artifactId>log4j-web</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
				<exclusion>
					<artifactId>ring-cors</artifactId>
					<groupId>ring-cors</groupId>
				</exclusion>
			</exclusions>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-kafka</artifactId>
			<version>${storm.version}</version>
		</dependency>


		<!--fastjson 相关jar -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>${fastjson.version}</version>
		</dependency>

		<!-- Druid 数据连接池依赖 -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>druid</artifactId>
			<version>${druid}</version>
		</dependency>
	</dependencies>

После успешного добавления соответствующих зависимостей давайте добавим сюда соответствующую конфигурацию. существуетapplication.propertiesДобавьте следующую конфигурацию в:

	
	# log
	logging.config=classpath:logback.xml
	
	## mysql
	spring.datasource.url=jdbc:mysql://localhost:3306/springBoot2?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true
	spring.datasource.username=root
	spring.datasource.password=123456
	spring.datasource.driverClassName=com.mysql.jdbc.Driver
	
	
	## kafka 
	kafka.servers = 192.169.0.23\:9092,192.169.0.24\:9092,192.169.0.25\:9092  
	kafka.topicName = USER_TOPIC
	kafka.autoCommit = false
	kafka.maxPollRecords = 100
	kafka.groupId = groupA
	kafka.commitRule = earliest

Примечание. Приведенная выше конфигурация является лишь частью, полную конфигурацию можно найти на моем github.

Скрипт базы данных:

-- springBoot2库的脚本

CREATE TABLE `t_user` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `name` varchar(10) DEFAULT NULL COMMENT '姓名',
  `age` int(2) DEFAULT NULL COMMENT '年龄',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8

Примечание. Поскольку здесь мы просто моделируем бизнес-сценарий, мы просто создаем простую таблицу.

написание кода

Описание: Здесь я объясняю только несколько ключевых классов, а полную ссылку на разработку проекта можно найти в нижней части блога.

Прежде чем использовать SpringBoot для интеграции kafka и storm, мы можем сначала написать соответствующий код kfaka и storm, а затем интегрировать его.

Первый — получение источников данных, то есть использование spout в Storm для вытягивания данных из kafka.

В предыдущем введении к storm я говорил о запущенном процессе storm, где spout — это компонент storm для получения данных, среди которых мы в основном реализуемnextTupleметод, напишите код для получения данных из kafka, вы можете получить данные после начала шторма.

spoutОсновной код класса выглядит следующим образом:

@Override
public void nextTuple() {
	for (;;) {
		try {
			msgList = consumer.poll(100);
			if (null != msgList && !msgList.isEmpty()) {
				String msg = "";
				List<User> list=new ArrayList<User>();
				for (ConsumerRecord<String, String> record : msgList) {
					// 原始数据
					msg = record.value();
					if (null == msg || "".equals(msg.trim())) {
						continue;
					}
					try{
						list.add(JSON.parseObject(msg, User.class));
					}catch(Exception e){
						logger.error("数据格式不符!数据:{}",msg);
						continue;
					}
			     } 
				logger.info("Spout发射的数据:"+list);
				//发送到bolt中
				this.collector.emit(new Values(JSON.toJSONString(list)));
				 consumer.commitAsync();
			}else{
				TimeUnit.SECONDS.sleep(3);
				logger.info("未拉取到数据...");
			}
		} catch (Exception e) {
			logger.error("消息队列处理异常!", e);
			try {
				TimeUnit.SECONDS.sleep(10);
			} catch (InterruptedException e1) {
				logger.error("暂停失败!",e1);
			}
		}
	}
}

Примечание. Если spout не может отправить данные при отправке данных, они будут отправлены повторно!

Вышеупомянутый класс spout в основном передает данные, полученные от kafka, в Bolt, а затем класс Bolt обрабатывает данные, после успешной обработки они записываются в базу данных, а затем выдается ответ sqout, чтобы избежать повторной передачи.

Основной метод класса болта для обработки бизнес-логики:execute, тут же написан наш основной метод реализации. Следует отметить, что здесь используется только один болт, поэтому нет необходимости снова определять поле для пересылки. Класс реализации кода выглядит следующим образом:

@Override
	public void execute(Tuple tuple) {
		String msg=tuple.getStringByField(Constants.FIELD);
		try{
			List<User> listUser =JSON.parseArray(msg,User.class);
			//移除age小于10的数据
			if(listUser!=null&&listUser.size()>0){
				Iterator<User> iterator = listUser.iterator();
				 while (iterator.hasNext()) {
					 User user = iterator.next();
					 if (user.getAge()<10) {
						 logger.warn("Bolt移除的数据:{}",user);
						 iterator.remove();
					 }
				 }
				if(listUser!=null&&listUser.size()>0){
					userService.insertBatch(listUser);
				}
			}
		}catch(Exception e){
			logger.error("Bolt的数据处理失败!数据:{}",msg,e);
		}
	}

Написав носик и болт, напишем основной класс шторма.

Основной класс storm в основном для отправки Topology (топологии).При отправке Topology нужно выставить носик и болт соответственно. Существует два режима работы топологии:

  1. Одним из них является локальный режим, в котором для запуска используется среда имитации jar локального шторма.
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TopologyApp", conf,builder.createTopology());
  1. Другой — удаленный режим, то есть работа в штормовом кластере.
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

Для удобства здесь написаны оба метода, которые управляются параметром args основного метода.TopologyСоответствующие инструкции по настройке очень подробно описаны в комментариях к коду, поэтому я не буду здесь говорить больше. код показывает, как показано ниже:

  public  void runStorm(String[] args) {
	// 定义一个拓扑
	TopologyBuilder builder = new TopologyBuilder();
	// 设置1个Executeor(线程),默认一个
	builder.setSpout(Constants.KAFKA_SPOUT, new KafkaInsertDataSpout(), 1);
	// shuffleGrouping:表示是随机分组
	// 设置1个Executeor(线程),和两个task
	builder.setBolt(Constants.INSERT_BOLT, new InsertBolt(), 1).setNumTasks(1).shuffleGrouping(Constants.KAFKA_SPOUT);
	Config conf = new Config();
	//设置一个应答者
	conf.setNumAckers(1);
	//设置一个work
	conf.setNumWorkers(1);
	try {
		// 有参数时,表示向集群提交作业,并把第一个参数当做topology名称
		// 没有参数时,本地提交
		if (args != null && args.length > 0) { 
			logger.info("运行远程模式");
			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
		} else {
			// 启动本地模式
			logger.info("运行本地模式");
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("TopologyApp", conf, builder.createTopology());
		}
	} catch (Exception e) {
		logger.error("storm启动失败!程序退出!",e);
		System.exit(1);
	}
	logger.info("storm启动成功...");
	}

Что ж, после написания кода, связанного с kafka и storm, давайте интегрироваться со SpringBoot!

Перед интеграцией со SpringBoot нам нужно решить следующие несколько проблем.

1 Как представить топологию storm в программе SpringBoot?

Storm запускается путем отправки Topolgy.Обычно он запускается с помощью основного метода, но метод запуска SpringBoot обычно запускается с помощью основного метода. Так как же это решить?

  • Решение: Напишите топологию шторма в основном классе, запускаемом SpringBoot, и запускайте его при запуске SpringBoot.
  • Экспериментальные результаты: можно начинать вместе (разумеется, что можно). Но затем возникает следующая проблема: классы болтов и носиков не могут использовать пружинные аннотации.

2 Как заставить классы болтов и носиков использовать пружинные аннотации?

  • Решение: Узнав, что классы носика и болта инстанцируются со стороны нимба, затем передаются супервизору через сериализацию, а затем обратно сериализуются, поэтому аннотации использовать нельзя, так что здесь можно изменить идею, так как аннотации использовать нельзя, затем динамический Просто получите компонент Spring.
  • Экспериментальные результаты: После использования метода динамического получения бобов можно успешно запустить storm.

3. Иногда загрузка нормальная, иногда не запускается и динамические бины не получаются?

  • Решение: После решения проблем 1 и 2 иногда возникает проблема 3. Долго искал, т.к. горячее развертывание SpringBoot было добавлено раньше, а после его удаления не появлялось....

Вышеупомянутые три проблемы возникли при интеграции.Решение представляется возможным в настоящее время.Возможно, проблемы могут быть вызваны другими причинами, но после интеграции ни разу не возникало.Другие вопросы. Если вышеуказанные проблемы и решения неуместны, пожалуйста, покритикуйте и исправьте!

После решения вышеуказанных проблем возвращаемся к коду. Среди них запись программы, то есть код основного класса, после интеграции выглядит следующим образом:

@SpringBootApplication
public class Application{

	public static void main(String[] args) {
		// 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件
		ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
		GetSpringBean springBean=new GetSpringBean();
		springBean.setApplicationContext(context);
		TopologyApp app = context.getBean(TopologyApp.class);
		app.runStorm(args);
	}
	
}

Код для динамического получения bean-компонентов выглядит следующим образом:

public class GetSpringBean implements ApplicationContextAware{

	private static ApplicationContext context;

	public static Object getBean(String name) {
		return context.getBean(name);
	}

	public static <T> T getBean(Class<T> c) {

		return context.getBean(c);
	}

	@Override
	public void setApplicationContext(ApplicationContext applicationContext)
			throws BeansException {
		if(applicationContext!=null){
			context = applicationContext;
		}
	}

}

Введение основного кода здесь, в остальном он в основном такой же, как и раньше.

Результаты теста

После успешного запуска программы мы сначала вызываем интерфейс, чтобы добавить несколько кусков данных в kafka.

Добавить запрос:

POST http://localhost:8087/api/user

{"name":"张三","age":20}
{"name":"李四","age":10}
{"name":"王五","age":5}

После успешного добавления мы можем использовать инструмент xshell для просмотра данных в кластере kafka. входить:**kafka-console-consumer.sh --zookeeper master:2181 --topic USER_TOPIC --from-beginning**

Затем вы можете увидеть следующий вывод.

Вышеупомянутое также указывает на то, что данные были успешно записаны в kafka. Поскольку это данные из kafka в режиме реального времени, мы также можем просмотреть распечатанный оператор из консоли.

Вывод консоли:

 INFO  com.pancm.storm.spout.KafkaInsertDataSpout - Spout发射的数据:[{"age":5,"name":"王五"}, {"age":10,"name":"李四"}, {"age":20,"name":"张三"}]
 WARN  com.pancm.storm.bolt.InsertBolt - Bolt移除的数据:{"age":5,"name":"王五"}
 INFO  com.alibaba.druid.pool.DruidDataSource - {dataSource-1} inited
 DEBUG com.pancm.dao.UserDao.insertBatch - ==>  Preparing: insert into t_user (name,age) values (?,?) , (?,?) 
 DEBUG com.pancm.dao.UserDao.insertBatch - ==> Parameters: 李四(String), 10(Integer), 张三(String), 20(Integer)
 DEBUG com.pancm.dao.UserDao.insertBatch - <==    Updates: 2
 INFO  com.pancm.service.impl.UserServiceImpl - 批量新增2条数据成功!

Вы можете успешно увидеть процесс обработки и результаты в консоли. Затем мы также можем запросить все данные в базе данных через интерфейс.

Запрос запроса:

GET http://localhost:8087/api/user

Возвращаемый результат:

[{"id":1,"name":"李四","age":10},{"id":2,"name":"张三","age":20}]

Результаты, возвращаемые тестами в приведенном выше коде, очевидно, соответствуют нашим ожиданиям.

Эпилог

Интеграция kafka и storm со SpringBoot на данный момент подошла к концу. В этой статье только кратко представлены эти связанные области применения, а фактическое применение может быть более сложным. Если у вас есть лучшие идеи и предложения, пожалуйста, оставьте сообщение для обсуждения! Я разместил проект интеграции Kafka и Storm с SpringBoot на github.Если вы чувствуете себя хорошо, пожалуйста, дайте звезду. Адрес Gihub: https://github.com/xuwujing/springBoot-study Кстати, для кафки тоже есть проект по интеграции storm, который тоже есть у меня на гитхабе. Адрес: https://github.com/xuwujing/kafka-study

Это конец этой статьи, спасибо за прочтение.

Уведомление об авторских правах: Автор: ничтожество Источник блога сада: http://www.cnblogs.com/xuwujing Источник CSDN: http://blog.csdn.net/qazwsxpcm     Источник личного блога: http://www.panchengming.com Оригинальность непростая, пожалуйста, указывайте источник при перепечатке, спасибо!