предисловие
Причина написания этой статьи связана с предыдущей статьей оKafka
ненормальное потребление, то для решения проблемы пришлось использовать временное решение.
Подводя итог, в конечном счете, это вызвано незнакомством с Кафкой, плюс потребности ежедневной работы, а потом я потратил некоторое время на его просмотр.Kafka
сопутствующие документы.
Когда использовать MQ
говоря оKafka
Я должен упомянуть MQ, который является своего рода очередью сообщений. В качестве базового промежуточного программного обеспечения он широко используется в интернет-проектах.
Технология естественным образом создается для решения определенной задачи, как правило, в следующих сценариях:
- Требуется взаимодействие между процессами: системе B нужны выходные данные системы A в качестве входного параметра.
- Когда выходные возможности системы А намного превышают вычислительные возможности системы Б.
В первом случае есть два варианта:
- использовать
RPC
Удаленный вызов, А напрямую звонит Б. - использовать
MQ
, A публикует сообщениеMQ
,B подписывается на сообщение.
Когда наше требование: A вызывает B для ответа в реальном времени и заботится о результате ответа в реальном времени, используйтеRPC
, в этом случае вы должны использовать синхронный вызов.
И наоборот, когда нас не волнует результат выполнения после вызова, а выполнение вызываемого объекта может занимать очень много времени, эта ситуация очень подходит для использованияMQ
для достижения цели асинхронного вызова.
Например, общий сценарий входа может быть вызван только синхронно, потому что этот процесс требует результатов ответа в реальном времени, и невозможно ждать несколько дополнительных секунд после того, как пользователь нажмет на вход, чтобы исключить сетевые причины.
но аналогично случаю, когда для входа пользователя требуются бонусные баллы, затем используйтеMQ
было бы лучше, потому что вход в систему не имеет значения для очков, просто отправьте сообщение наMQ
, Подписка на услуги по обработке баллов может быть обработана, что также может решить лавинный эффект, вызванный сбоем системы баллов.
MQ
Еще одна основная функция —Пиковое ограничение тока, это, скорее всего, сделает систему B недоступной, если запрос вызывается непосредственно в системе B в сценарии с большим трафиком. Этот сценарий очень подходит для размещения запросов вMQ
, можно не только использоватьMQ
Пиковое сглаживание также обеспечивает максимальную доступность системы.
Знакомство с Кафкой
Это обсуждение сосредоточено наKafka
.
Проще говоряKafka
Это распределенная система обмена сообщениями, поддерживающая горизонтальное расширение и высокую пропускную способность.
Kafka
Общие знания о:
-
Topic
: взаимодействие между производителями и потребителями вращается вокругTopic
Вообще говоря, он дифференцируется по бизнесу и создается после переговоров между производителем и потребителем. -
Partition
(раздел): даTopic
состав из следующих, обычноTopic
Под ним находится один или несколько разделов.После того, как сообщение будет создано, оно будет загружено в каждый раздел по определенному алгоритму, поэтому раздел такжеKafka
ключ к производительности. Когда вы обнаружите, что производительность невысока, вы можете рассмотреть возможность добавления нового раздела.
Структурная схема выглядит следующим образом:
СоздайтеTopic
Kafka
На официальном сайте установки есть очень подробное объяснение. Вот некоторые распространенные операции в ежедневной разработке, такие как созданиеTopic
:
sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic `test`
создал три разделаtest
тема.
использовать
sh bin/kafka-topics.sh --list --zookeeper localhost:2181
могу перечислить всеTopic
.
Кафка продюсер
использоватьkafka
предоставлено официальнымJava API
Для создания сообщений реализация кодирования чаще используется в реальных условиях:
/** Kafka生产者
* @author crossoverJie
*/
public class Producer {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
/**
* 消费配置文件
*/
private static String consumerProPath;
public static void main(String[] args) throws IOException {
// set up the producer
consumerProPath = System.getProperty("product_path");
KafkaProducer<String, String> producer = null;
try {
FileInputStream inputStream = new FileInputStream(new File(consumerProPath));
Properties properties = new Properties();
properties.load(inputStream);
producer = new KafkaProducer<String, String>(properties);
} catch (IOException e) {
LOGGER.error("load config error", e);
}
try {
// send lots of messages
for (int i=0 ;i<100 ; i++){
producer.send(new ProducerRecord<String, String>(
"topic_optimization", i+"", i+""));
}
} catch (Throwable throwable) {
System.out.printf("%s", throwable.getStackTrace());
} finally {
producer.close();
}
}
}
Сообщение может быть отправлено со следующими параметрами запуска:
-Dproduct_path=/xxx/producer.properties
и файл конфигурации производителя:
#集群地址,可以多个
bootstrap.servers=10.19.13.51:9094
acks=all
retries=0
batch.size=16384
auto.commit.interval.ms=1000
linger.ms=0
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full=true
Конкретные инструкции по настройке можно найти здесь:Kafka.apache.org/0100/doc UME…
Процесс очень прост, на самом деле, некоторыеAPI
вызов.
После отправки сообщения вы можете просмотреть ситуацию в очереди с помощью следующей команды:
sh kafka-consumer-groups.sh --bootstrap-server localhost:9094 --describe --group group1
один из них
lag
количество сообщений в очереди.
Кафка потребитель
С производителями потребители также незаменимы.Здесь мы сначала сосредоточимся на однопоточном потреблении:
/**
* Function:kafka官方消费
*
* @author crossoverJie
* Date: 2017/10/19 01:11
* @since JDK 1.8
*/
public class KafkaOfficialConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaOfficialConsumer.class);
/**
* 日志文件地址
*/
private static String logPath;
/**
* 主题名称
*/
private static String topic;
/**
* 消费配置文件
*/
private static String consumerProPath ;
/**
* 初始化参数校验
* @return
*/
private static boolean initCheck() {
topic = System.getProperty("topic") ;
logPath = System.getProperty("log_path") ;
consumerProPath = System.getProperty("consumer_pro_path") ;
if (StringUtil.isEmpty(topic) || logPath.isEmpty()) {
LOGGER.error("system property topic ,consumer_pro_path, log_path is required !");
return true;
}
return false;
}
/**
* 初始化kafka配置
* @return
*/
private static KafkaConsumer<String, String> initKafkaConsumer() {
KafkaConsumer<String, String> consumer = null;
try {
FileInputStream inputStream = new FileInputStream(new File(consumerProPath)) ;
Properties properties = new Properties();
properties.load(inputStream);
consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList(topic));
} catch (IOException e) {
LOGGER.error("加载consumer.props文件出错", e);
}
return consumer;
}
public static void main(String[] args) {
if (initCheck()){
return;
}
int totalCount = 0 ;
long totalMin = 0L ;
int count = 0;
KafkaConsumer<String, String> consumer = initKafkaConsumer();
long startTime = System.currentTimeMillis() ;
//消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
if (records.count() <= 0){
continue ;
}
LOGGER.debug("本次获取:"+records.count());
count += records.count() ;
long endTime = System.currentTimeMillis() ;
LOGGER.debug("count=" +count) ;
if (count >= 10000 ){
totalCount += count ;
LOGGER.info("this consumer {} record,use {} milliseconds",count,endTime-startTime);
totalMin += (endTime-startTime) ;
startTime = System.currentTimeMillis() ;
count = 0 ;
}
LOGGER.debug("end totalCount={},min={}",totalCount,totalMin);
/*for (ConsumerRecord<String, String> record : records) {
record.value() ;
JsonNode msg = null;
try {
msg = mapper.readTree(record.value());
} catch (IOException e) {
LOGGER.error("消费消息出错", e);
}
LOGGER.info("kafka receive = "+msg.toString());
}*/
}
}
}
Со следующими параметрами запуска:
-Dlog_path=/log/consumer.log -Dtopic=test -Dconsumer_pro_path=consumer.properties
Метод опроса используется для получения сообщений, а данные в процессе потребления записываются.
Конфигурация, принятая потребителями:
bootstrap.servers=192.168.1.2:9094
group.id=group1
# 自动提交
enable.auto.commit=true
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# fast session timeout makes it more fun to play with failover
session.timeout.ms=10000
# These buffer sizes seem to be needed to avoid consumer switching to
# a mode where it processes one bufferful every 5 seconds with multiple
# timeouts along the way. No idea why this happens.
fetch.min.bytes=50000
receive.buffer.bytes=262144
max.partition.fetch.bytes=2097152
Для простоты я использую автоматическую фиксациюoffset
.
механизм хранения сообщений
говоря оoffset
Необходимо рассказать о механизме хранения сообщений Кафки.
Kafka
Сообщения не будут удалены немедленно, поскольку они используются. Все сообщения будут сохранены в файле журнала и настроены со временем истечения срока действия. Когда время истечет, данные с истекшим сроком действия будут автоматически удалены.И ему все равно, были ли израсходованы содержащиеся в нем данные.
Из-за этого механизма должен быть флаг, указывающий, какие данные были использованы,offset(偏移量)
Это роль, она похожа на указатель на определенные данные, при потребленииoffset
Он будет двигаться вперед линейно, так что сообщение может быть использовано произвольно, пока мы модифицируемoffset
можно использовать значение.
Еще один момент, на который стоит обратить внимание в процессе потребления:
Только один потребитель может потреблять в одной и той же группе потребителей (group.id равен), что действительно заставит многих людей ступить на яму в начале.
Многопоточное потребление
Естественно, его относительно просто реализовать для однопоточного потребления, но эффективность также сильно снижается.
С этой целью я провел тест, и результаты использования 120009 фрагментов данных с использованием предыдущего одиночного потока следующие:
В общей сложности это заняло 12450 мс.
Так как же переключиться на многопоточное потребление?
мы можем использоватьpartition
Функция разделения системы может улучшить способность потребления.Когда она однопоточная, это означает, что один поток должен потреблять все данные во всех разделах.Если он заменен многопоточностью, один поток может потреблять только один раздел , поэтому эффективность, естественно, улучшится, поэтому ThreadscoreSize<=partition
.
Первый взгляд на вход:
public class ConsumerThreadMain {
private static String brokerList = "localhost:9094";
private static String groupId = "group1";
private static String topic = "test";
/**
* 线程数量
*/
private static int threadNum = 3;
public static void main(String[] args) {
ConsumerGroup consumerGroup = new ConsumerGroup(threadNum, groupId, topic, brokerList);
consumerGroup.execute();
}
}
один из нихConsumerGroup
Добрый:
public class ConsumerGroup {
private static Logger LOGGER = LoggerFactory.getLogger(ConsumerGroup.class);
/**
* 线程池
*/
private ExecutorService threadPool;
private List<ConsumerCallable> consumers ;
public ConsumerGroup(int threadNum, String groupId, String topic, String brokerList) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("consumer-pool-%d").build();
threadPool = new ThreadPoolExecutor(threadNum, threadNum,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
consumers = new ArrayList<ConsumerCallable>(threadNum);
for (int i = 0; i < threadNum; i++) {
ConsumerCallable consumerThread = new ConsumerCallable(brokerList, groupId, topic);
consumers.add(consumerThread);
}
}
/**
* 执行任务
*/
public void execute() {
long startTime = System.currentTimeMillis() ;
for (ConsumerCallable runnable : consumers) {
Future<ConsumerFuture> future = threadPool.submit(runnable) ;
}
if (threadPool.isShutdown()){
long endTime = System.currentTimeMillis() ;
LOGGER.info("main thread use {} Millis" ,endTime -startTime) ;
}
threadPool.shutdown();
}
}
Окончательная реальная логика выполненияConsumerCallable
:
public class ConsumerCallable implements Callable<ConsumerFuture> {
private static Logger LOGGER = LoggerFactory.getLogger(ConsumerCallable.class);
private AtomicInteger totalCount = new AtomicInteger() ;
private AtomicLong totalTime = new AtomicLong() ;
private AtomicInteger count = new AtomicInteger() ;
/**
* 每个线程维护KafkaConsumer实例
*/
private final KafkaConsumer<String, String> consumer;
public ConsumerCallable(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
//自动提交位移
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
@Override
public ConsumerFuture call() throws Exception {
boolean flag = true;
int failPollTimes = 0 ;
long startTime = System.currentTimeMillis() ;
while (flag) {
// 使用200ms作为获取超时时间
ConsumerRecords<String, String> records = consumer.poll(200);
if (records.count() <= 0){
failPollTimes ++ ;
if (failPollTimes >= 20){
LOGGER.debug("达到{}次数,退出 ",failPollTimes);
flag = false ;
}
continue ;
}
//获取到之后则清零
failPollTimes = 0 ;
LOGGER.debug("本次获取:"+records.count());
count.addAndGet(records.count()) ;
totalCount.addAndGet(count.get()) ;
long endTime = System.currentTimeMillis() ;
if (count.get() >= 10000 ){
LOGGER.info("this consumer {} record,use {} milliseconds",count,endTime-startTime);
totalTime.addAndGet(endTime-startTime) ;
startTime = System.currentTimeMillis() ;
count = new AtomicInteger();
}
LOGGER.debug("end totalCount={},min={}",totalCount,totalTime);
/*for (ConsumerRecord<String, String> record : records) {
// 简单地打印消息
LOGGER.debug(record.value() + " consumed " + record.partition() +
" message with offset: " + record.offset());
}*/
}
ConsumerFuture consumerFuture = new ConsumerFuture(totalCount.get(),totalTime.get()) ;
return consumerFuture ;
}
}
Проверьте логику:
Фактически, три экземпляра потребителя инициализируются для использования трех потоков. К нему добавляются некоторые статистические данные, и окончательный результат потребления 120 009 фрагментов данных выглядит следующим образом.
Поскольку он работает параллельно, можно увидеть, что потребление 120 009 фрагментов данных может увеличиться примерно на 2 секунды, и эффект будет более очевидным, когда данные будут увеличены на более высокий порядок.
Но это также имеет некоторые недостатки:
- Гибкость невысокая, и ее нельзя настроить адаптивно при изменении количества разделов.
- Логика потребления и логика обработки находятся в одном потоке, если логика обработки сложнее, это повлияет на эффективность и связь будет выше. Конечно, эта логика обработки может быть отправлена в другую программу через внутреннюю очередь для обработки.
Суммировать
Kafka
Есть еще очки знаний.Kafka
Использование далеко от этого. Я продолжу делиться некоторымиKafka
мониторинг и т.д.
адрес проекта:GitHub.com/crossover J я…
личный блог:crossoverjie.top.