Kafka Study Notes (2): Предварительное изучение Кафки

Java задняя часть Kafka Scala

Прочитав предыдущую статью, я считаю, что у всех есть предварительное представление о системе сообщений и общей композиции Кафки. Лучший способ чему-то научиться - это использовать это. Сегодня давайте взглянем на Кафку и завершим нашу дебютную работу. .

Путешествие сообщений в Кафке

Хотя нам нужно действовать шаг за шагом, после того, как мы получим общее представление о предмете, это будет полезно для нашего понимания и изучения его, поэтому мы можем сначала взглянуть на опыт сообщения с момента его отправки. конец, когда он получен отправителем сообщений Что?

На приведенном выше рисунке кратко показан весь поток сообщений в Kafka (при условии, что вся система Kafka развернута, а соответствующий Topic, раздел и другие детали будут рассмотрены отдельно позже):

  • 1. Производитель сообщения публикует сообщение в определенной теме и распределяет его по определенному разделу по определенному алгоритму или случайным образом;
  • 2. В соответствии с фактическими потребностями, следует ли реализовать логику обработки сообщений;
  • 3. При необходимости реализовать определенную логику и опубликовать результат в выходной теме;
  • 4. Потребители подписываются на соответствующие темы в соответствии со своими потребностями и потребляют сообщения;

В целом, процесс относительно ясен и прост.Давайте попрактикуемся со мной в основных операциях Kafka и, наконец, реализуем небольшую демонстрацию подсчета слов.

Основная операция

Следующий код и соответствующие тесты прошли тест в следующей среде: Mac OS + JDK1.8, система Linux также должна работать, студенты, интересующиеся Windows, могут перейти на официальный сайт, чтобы загрузить соответствующую версию для соответствующую тестовую практику.

Скачать Кафку

Студенты системы Mac могут использовать brew для установки:

brew install kafka

Изучающие систему Linux могут загрузить исходный код с официального сайта и распаковать его или напрямую выполнить следующие команды:

cd 
mkdir test-kafka && cd test-kafka
curl -o kafka_2.11-1.0.1.tgz http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.1/kafka_2.11-1.0.1.tgz
tar -xzf kafka_2.11-1.0.1.tgz
cd kafka_2.11-1.0.1

запускать

Kafka использует Zookeeper для хранения информации о кластере, поэтому здесь нам нужно сначала запустить Zookeeper, а затем мы узнаем больше о взаимосвязи между Kafka и Zookeeper.

bin/zookeeper-server-start.sh config/zookeeper.properties

Затем мы запускаем узел Kafka Server:

bin/kafka-server-start.sh config/server.properties

В это время система Kafka была запущена.

Создать тему

После того, как все будет готово, мы должны приступить к чрезвычайно важному шагу, который заключается в создании Топика.Тема является ядром всего потока системы.Кроме того, сам Топик также содержит множество сложных параметров, таких как количество факторов репликации, количество разделов и т.д., здесь Для простоты поставим соответствующие параметры равными 1, что удобно всем для проверки:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kakfa-test

Конкретное значение параметров:

Атрибуты Функции
--create Создайте тему от имени
--zookeeper информация о кластере zookeeper
--replication-factor репликатор
--partitions Информация о разделе
--topic Название темы

На данный момент мы создали тему под названием kakfa-test.

Отправить сообщение в тему

После того, как у нас есть тема, мы можем отправить ей сообщение:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kakfa-test

Затем мы вводим несколько сообщений в консоль:

this is my first test kafka
so good

На этот раз вышла новость на тему какфа-теста.

Получить сообщение из темы

Теперь, когда есть сообщение по теме, теперь вы можете получить сообщение из него и использовать:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-test --from-beginning

На данный момент мы видим в консоли:

this is my first test kafka
so good

До сих пор мы тестировали самую простую демонстрацию Kafka. Надеюсь, вы сможете попробовать ее сами. Несмотря на то, что она очень проста, она познакомит вас со всем процессом Kafka.

WordCount

Давайте воспользуемся некоторыми из приведенных выше основных операций для реализации простой программы WordCount, которая имеет следующие функции:

  • 1. Поддерживает непрерывный ввод фраз, то есть производители продолжают генерировать сообщения;
  • 2. Программа автоматически получает исходные данные из входной темы, после чего обрабатывает их, и публикует результат обработки в теме подсчета;
  • 3. Потребители могут получить соответствующие результаты WordCount из темы подсчета;

1. Запустить кафку

Как и в случае с запуском выше, просто следуйте ему.

2. Создайте входную тему

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic kafka-word-count-input --partitions 1 --replication-factor 1

3. Введите сообщение в тему

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-word-count-input

4. Логика потоковой обработки

Эта часть контента является ядром всего примера. Эта часть кода имеет версии Java 8+ и Scala. Лично я считаю, что потоковая обработка более краткая и понятная по функциональному синтаксису. Следуя функциональному мышлению, я больше не хочу писать синтаксис анонимного внутреннего класса Java.

Давайте сначала посмотрим на версию Java 8:

public class WordCount {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-word-count");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.<String, String>stream("kafka-word-count-input");
        Pattern pattern = Pattern.compile("\\W+");
        source
           .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase(Locale.getDefault()))))
           .groupBy((key, value) -> value)
           .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")).mapValues(value->Long.toString(value))
           .toStream()
           .to("kafka-word-count-output");
        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

Вы удивлены, что можете написать такой лаконичный код на java, поэтому, если есть применимые сценарии, рекомендуется попробовать написать java-код с функциональным мышлением.

Давайте снова посмотрим на версию Scala:


object WordCount {
  def main(args: Array[String]) {
    val props: Properties = {
      val p = new Properties()
      p.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-word-count")
      p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
      p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass)
      p
    }

    val builder: StreamsBuilder = new StreamsBuilder()
    val source: KStream[String, String] = builder.stream("kafka-word-count-input")
    source
      .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
      .groupBy((_, word) => word)
      .count(Materialized.as[String, Long, KeyValueStore[Bytes, Array[Byte]]]("counts-store")).toStream.to("kafka-word-count-output")
    val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
    streams.start()
  }
}

Можно обнаружить, что код, написанный в функциональном стиле Java 8, уже очень похож на Scala.

5. Запустите логику обработки

У многих учащихся на компьютерах не установлена ​​программа sbt, поэтому здесь демонстрируется версия Java, созданная с помощью Maven.Пожалуйста, обратитесь к конкретным шагам реализации.Нажмите здесь kafka-word-countописание выше.

6. Запустите потребительский процесс

Наконец, мы запускаем процесс-потребитель и вводим несколько слов в производителя, например:

Наконец, мы можем увидеть следующий вывод в процессе-потребителе:

bin/kafka-console-consumer.sh --topic kafka-word-count-output --from-beginning --bootstrap-server localhost:9092  --property print.key=true

kafka-word-count-output

Суммировать

Эта статья в основном объясняет основной рабочий процесс и некоторые основные операции Кафки, но это необходимый шаг для нас, чтобы узнать что-то. Только имея прочную основу, мы можем понять это более глубоко и понять, почему он разработан таким образом. также столкнулся с большим количеством проблем в этом процессе, поэтому я все еще надеюсь, что каждый сможет практиковать его самостоятельно и в конечном итоге получит больше.