Написание WordCount в Flink Введение

Flink

Word Count — это привет, мир фреймворков и платформ для обработки данных. Функционал программы очень простой, то есть сколько там английских слов. Сегодня мы собираемся сделать потоковую версию Word Count, используя Flink 1.10. В этой потоковой версии Word Count строки будут передаваться из ввода во Flink, и мы будем наблюдать за статистикой символов во время обработки.

условие! условие!

На официальном веб-сайте Flink мы видим, что определение Flink, данное сообществом Flink, — это Apache Flink® — Stateful Computations over Data Streams, механизм вычислений с отслеживанием состояния поверх потоков данных. Перед официальным началом строительства нам нужно иметь очень предварительное и очень предварительное представление о статусе. На официальном сайте Flink много профессионального о государствеобъяснять, Сегодня я расскажу о некоторых простых понятиях в этой статье.Заинтересованные друзья могут прочитать официальное объяснение Flink.

Во Flink задачи описываются с помощью направленного ациклического графа, а узлы каждого графа на самом деле являются операцией или оператором (Operator), и эти функции и операторы будут обрабатываться, когда Flink обрабатывает один элемент и событие. официальный веб-сайт использует запомнить), поэтому мы можем сказать, что операторы и функции Flink имеют состояние. Состояние очень важно во Flink, и позже я расскажу о состоянии Flink отдельно.

Что такое поток

В процессе обучения программированию мы столкнемся с различными потоками, файловыми потоками, сетевыми потоками, потоковым API Java и так далее. В мире Flink данные могут формировать потоки событий, записи журналов, данные датчиков, поведение пользователей и т. д. — все это может формировать потоки данных.официальный сайт ФлинкаПоток делится на две категории: ограниченный поток и неограниченный поток.

Flink 官网的解释图

Проще говоря, неограниченный поток имеет начальную точку, но не конечную точку, и данные поступают непрерывно, поэтому нам также необходимо непрерывно обрабатывать данные. Ограниченный поток имеет конечную точку потока, и данные могут быть обработаны после поступления всех данных, поэтому мы можем понимать ограниченный поток как пакет. В отличие от настоящей пакетной обработки, Flink поддерживает состояние, и данные в каждом состоянии могут меняться по мере поступления данных. Стоит отметить, что поскольку неограниченный поток будет поступать непрерывно, мы обычно используем характеристики времени в качестве порядка обработки неограниченного потока.

Создайте проект Maven

Сначала создайте проект Maven и добавьте следующие ключевые зависимости:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.10.0</flink.version>
    <java.version>1.8</java.version>
    <scala.binary.version>2.11</scala.binary.version>
    <maven.compiler.source>${java.version}</maven.compiler.source>
    <maven.compiler.target>${java.version}</maven.compiler.target>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.15</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>
</dependencies>

Написать задание по подсчету слов

После завершения Maven нам нужно написать код задачи для Word Count. Как и обычная программа на Java, Flink найдет и выполнит логику обработки, определенную в основном методе. Прежде всего, нам нужно использовать метод getExecutionEnvironment() в первом предложении основного метода, чтобы получить потоковую среду, необходимую для запуска программы.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Затем нам нужно зарегистрировать источник данных в среде потоковой обработки для непрерывного чтения строк:

DataStream<String> source = env.addSource(new WordCountSource()).name("SteveJobs-Word");

После получения источника данных нам необходимо обработать данные. Первый заключается в использовании flatMap с инструментом сегментации слов для сегментации входной строки для получения нескольких кортежей из двух элементов. Первым элементом этих биграмм является результирующее слово, а вторым элементом является 1. Почему 1, потому что будем группировать по словам, каждое слово будет иметь отдельныйусловие, в каждом состоянии хранится информация об обрабатываемых данных. В этом примере flink суммирует вторые элементы всех кортежей в состоянии каждого слова. Поскольку второй элемент равен всем 1, видно, что результат суммирования равен количеству слов.

DataStream<Tuple2<String,Integer>> proc = source.flatMap(new TextTokenizer())
                .keyBy(0)
                .sum(1)
                .name("Process-Word-Count");

Во Flink все данные нужно выводить в Sink. Sink может выводить данные в базу данных, в Elasticsearch, в HDFS или в Kafka MQ. Поэтому нам нужно указать место вывода результатов вычислений Flink. Как и в этом примере, мы будем выводить данные в

proc.addSink(new WordCountSink())
    .name("Word-Count-Sink");

Задачи Flink должны быть упакованы локально и загружены на сервер, прежде чем их можно будет запускать в кластере Flink. Поэтому нам нужно указать имя задачи в конце основного метода и вызвать метод выполнения среды для запуска задачи.

env.execute("Word Count");

Наконец, мы можем написать WordCountJob:

public class WordCountJob  {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> source = env.addSource(new WordCountSource())
                .name("SteveJobs-Word");

        DataStream<Tuple2<String,Integer>> proc = source.flatMap(new TextTokenizer())
                .keyBy(0)
                .sum(1)
                .name("Process-Word-Count");

        proc.addSink(new WordCountSink())
                .name("Word-Count-Sink");

        env.execute("Word Count");
    }

}

Источник и приемник данных

В этом проекте мы сначала проводим эксперимент с ограниченным потоком. Во Flink Source и Sink являются операторами ввода и вывода данных, то есть данные проходят черезSourceввод оператора, черезSinkвывод оператора. Согласно тому, что было только что введено, ограниченный поток по существу относится к пакетной обработке, а обрабатываемые данные ограничены. Итак, в нашей части источника входных данных мы реализуем простой источник данных итератора (JavaDoc), переходим непосредственно к коду:

public class WordCountSource extends FromIteratorFunction<String> implements Serializable {

    private static final long serialVersionUID = 0L;

    public WordCountSource() {
        super(new WordIterator());
    }

    private static class WordIterator implements Iterator<String>,Serializable{

        private static final long serialVersionUID = 4L;

        private int index = 0;
        private int length = -1;

        private WordIterator(){
            length = StaticWordData.STEVE_JOBS_WORD.length;
        }

        @Override
        public boolean hasNext() {
            return (index < length);
        }

        @Override
        public String next() {
            return StaticWordData.STEVE_JOBS_WORD[index ++];
        }
    }

}

В качестве вывода Sink мы ссылаемся на метод написания официальных пошаговых руководств по коду Flink и выводим его непосредственно в журнал. Для конкретного письма, пожалуйста, обратитесь к Sink'sJavaDoc, следующее мое письмо:

public class WordCountSink implements SinkFunction<Tuple2<String,Integer>> {

    private static final long serialVersionUID = 1L;
    private static final Logger logger = LoggerFactory.getLogger(WordCountSink.class);

    @Override
    public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        logger.info("{ Word: \""+ value.f0 + "\", Cnt:" + value.f1 +"}");
    }
}

контрольная работа

После написания кода используйте инструкции Maven для упаковки:

$ mvn clean package

В предыдущем обновлении я рассказал, как создать автономную версию Flink.портал. Мы запускаем автономную версию Flink, затем выбираем Submit New Job, нажимаем Add new, чтобы загрузить только что упакованный пакет через веб-версию.

上传好的效果图

Мы можем нажать «Показать план», чтобы увидеть график выполнения, который мы определили в программе.

执行图

Нажмите «Отправить», чтобы отправить задачу во Flink и выполнить ее. Мы можем выбрать и просмотреть журнал диспетчера задач, выполняющего задачу в данный момент в диспетчере задач. Вы можете увидеть лог, который мы выводим в Sink:

Log

Как видите, количество слов продолжает расти по мере ввода потоковых данных.

Многословные слова в каждом обновлении

Я предоставляю исходный код для этого эксперимента, пожалуйста, загрузите его на мой Github:ousheobin/flink-word-count.

Вы можете играть с ним по-разному, например, внедрить неограниченный источник данных или вывести результаты подсчета слов в Kafka MQ.