Word Count — это привет, мир фреймворков и платформ для обработки данных. Функционал программы очень простой, то есть сколько там английских слов. Сегодня мы собираемся сделать потоковую версию Word Count, используя Flink 1.10. В этой потоковой версии Word Count строки будут передаваться из ввода во Flink, и мы будем наблюдать за статистикой символов во время обработки.
условие! условие!
На официальном веб-сайте Flink мы видим, что определение Flink, данное сообществом Flink, — это Apache Flink® — Stateful Computations over Data Streams, механизм вычислений с отслеживанием состояния поверх потоков данных. Перед официальным началом строительства нам нужно иметь очень предварительное и очень предварительное представление о статусе. На официальном сайте Flink много профессионального о государствеобъяснять, Сегодня я расскажу о некоторых простых понятиях в этой статье.Заинтересованные друзья могут прочитать официальное объяснение Flink.
Во Flink задачи описываются с помощью направленного ациклического графа, а узлы каждого графа на самом деле являются операцией или оператором (Operator), и эти функции и операторы будут обрабатываться, когда Flink обрабатывает один элемент и событие. официальный веб-сайт использует запомнить), поэтому мы можем сказать, что операторы и функции Flink имеют состояние. Состояние очень важно во Flink, и позже я расскажу о состоянии Flink отдельно.
Что такое поток
В процессе обучения программированию мы столкнемся с различными потоками, файловыми потоками, сетевыми потоками, потоковым API Java и так далее. В мире Flink данные могут формировать потоки событий, записи журналов, данные датчиков, поведение пользователей и т. д. — все это может формировать потоки данных.официальный сайт ФлинкаПоток делится на две категории: ограниченный поток и неограниченный поток.
Проще говоря, неограниченный поток имеет начальную точку, но не конечную точку, и данные поступают непрерывно, поэтому нам также необходимо непрерывно обрабатывать данные. Ограниченный поток имеет конечную точку потока, и данные могут быть обработаны после поступления всех данных, поэтому мы можем понимать ограниченный поток как пакет. В отличие от настоящей пакетной обработки, Flink поддерживает состояние, и данные в каждом состоянии могут меняться по мере поступления данных. Стоит отметить, что поскольку неограниченный поток будет поступать непрерывно, мы обычно используем характеристики времени в качестве порядка обработки неограниченного потока.
Создайте проект Maven
Сначала создайте проект Maven и добавьте следующие ключевые зависимости:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.10.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.15</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
Написать задание по подсчету слов
После завершения Maven нам нужно написать код задачи для Word Count. Как и обычная программа на Java, Flink найдет и выполнит логику обработки, определенную в основном методе. Прежде всего, нам нужно использовать метод getExecutionEnvironment() в первом предложении основного метода, чтобы получить потоковую среду, необходимую для запуска программы.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Затем нам нужно зарегистрировать источник данных в среде потоковой обработки для непрерывного чтения строк:
DataStream<String> source = env.addSource(new WordCountSource()).name("SteveJobs-Word");
После получения источника данных нам необходимо обработать данные. Первый заключается в использовании flatMap с инструментом сегментации слов для сегментации входной строки для получения нескольких кортежей из двух элементов. Первым элементом этих биграмм является результирующее слово, а вторым элементом является 1. Почему 1, потому что будем группировать по словам, каждое слово будет иметь отдельныйусловие, в каждом состоянии хранится информация об обрабатываемых данных. В этом примере flink суммирует вторые элементы всех кортежей в состоянии каждого слова. Поскольку второй элемент равен всем 1, видно, что результат суммирования равен количеству слов.
DataStream<Tuple2<String,Integer>> proc = source.flatMap(new TextTokenizer())
.keyBy(0)
.sum(1)
.name("Process-Word-Count");
Во Flink все данные нужно выводить в Sink. Sink может выводить данные в базу данных, в Elasticsearch, в HDFS или в Kafka MQ. Поэтому нам нужно указать место вывода результатов вычислений Flink. Как и в этом примере, мы будем выводить данные в
proc.addSink(new WordCountSink())
.name("Word-Count-Sink");
Задачи Flink должны быть упакованы локально и загружены на сервер, прежде чем их можно будет запускать в кластере Flink. Поэтому нам нужно указать имя задачи в конце основного метода и вызвать метод выполнения среды для запуска задачи.
env.execute("Word Count");
Наконец, мы можем написать WordCountJob:
public class WordCountJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> source = env.addSource(new WordCountSource())
.name("SteveJobs-Word");
DataStream<Tuple2<String,Integer>> proc = source.flatMap(new TextTokenizer())
.keyBy(0)
.sum(1)
.name("Process-Word-Count");
proc.addSink(new WordCountSink())
.name("Word-Count-Sink");
env.execute("Word Count");
}
}
Источник и приемник данных
В этом проекте мы сначала проводим эксперимент с ограниченным потоком. Во Flink Source и Sink являются операторами ввода и вывода данных, то есть данные проходят черезSourceввод оператора, черезSinkвывод оператора. Согласно тому, что было только что введено, ограниченный поток по существу относится к пакетной обработке, а обрабатываемые данные ограничены. Итак, в нашей части источника входных данных мы реализуем простой источник данных итератора (JavaDoc), переходим непосредственно к коду:
public class WordCountSource extends FromIteratorFunction<String> implements Serializable {
private static final long serialVersionUID = 0L;
public WordCountSource() {
super(new WordIterator());
}
private static class WordIterator implements Iterator<String>,Serializable{
private static final long serialVersionUID = 4L;
private int index = 0;
private int length = -1;
private WordIterator(){
length = StaticWordData.STEVE_JOBS_WORD.length;
}
@Override
public boolean hasNext() {
return (index < length);
}
@Override
public String next() {
return StaticWordData.STEVE_JOBS_WORD[index ++];
}
}
}
В качестве вывода Sink мы ссылаемся на метод написания официальных пошаговых руководств по коду Flink и выводим его непосредственно в журнал. Для конкретного письма, пожалуйста, обратитесь к Sink'sJavaDoc, следующее мое письмо:
public class WordCountSink implements SinkFunction<Tuple2<String,Integer>> {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(WordCountSink.class);
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
logger.info("{ Word: \""+ value.f0 + "\", Cnt:" + value.f1 +"}");
}
}
контрольная работа
После написания кода используйте инструкции Maven для упаковки:
$ mvn clean package
В предыдущем обновлении я рассказал, как создать автономную версию Flink.портал. Мы запускаем автономную версию Flink, затем выбираем Submit New Job, нажимаем Add new, чтобы загрузить только что упакованный пакет через веб-версию.
Мы можем нажать «Показать план», чтобы увидеть график выполнения, который мы определили в программе.
Нажмите «Отправить», чтобы отправить задачу во Flink и выполнить ее. Мы можем выбрать и просмотреть журнал диспетчера задач, выполняющего задачу в данный момент в диспетчере задач. Вы можете увидеть лог, который мы выводим в Sink:
Как видите, количество слов продолжает расти по мере ввода потоковых данных.
Многословные слова в каждом обновлении
Я предоставляю исходный код для этого эксперимента, пожалуйста, загрузите его на мой Github:ousheobin/flink-word-count.
Вы можете играть с ним по-разному, например, внедрить неограниченный источник данных или вывести результаты подсчета слов в Kafka MQ.