Начало работы с основами Flink

Flink

предисловие

Я просмотрел то, что написал Спарк, и я чувствую, что еслиВ этой статье вы познакомитесь с основными понятиями Spark RDD.Если разобраться досконально, то особо расширять по этой штуке особо нечего, поэтому прыгнул в яму и сразу Флинк.Если есть что надо дополнить, добавлю позже.

Spark Streaming — это именноМикропакетная обработкаПодход псевдореального времени, но Flink обработает его, если он действительно придет, и нам нужно вручную управлять проблемой смещения, когда Spark Streaming и Kafka интегрированы, Во Flink это автоматически поможет нам управлять.

Да и операторы Флинка намного богаче Spark Streaming.Флинк это тренд будущего, по крайней мере я так думаю

Flink возник из исследовательского проекта Stratosphere, целью которого является создание платформы для анализа больших данных нового поколения, и 16 апреля 2014 года стал инкубационным проектом Apache.

Для программ реального времени мы фактически фокусируемся на следующих трех моментах, а именно на вводе данных, обработке данных и выводе данных. Картинка ниже взята с официального сайта Flink версии 1.9.


Эта картина очень четко разделена.Есть два источника данных для ввода предыдущих данных.Один принес события в реальном времени, который относится к направлению в реальном времени, в то время как нижняя база данных, файловая система и тип СХД — это оффлайн бизнес

В обработке данных написано, что наш Flink можно развернуть в K8s, Yarn, Mesos и т.д. Последним выводом может быть Application, смысл приложения, а может быть Event log, типа Kafka, или это может дать Вернуть вышеупомянутый компонент постоянства.

Apache Flink — это платформа и механизм распределенной обработки для вычислений с отслеживанием состояния на неограниченных и ограниченных потоках данных. Flink работает во всех распространенных кластерных средах и может выполнять вычисления со скоростью памяти и в любом масштабе.

Флинк считает, что обработка данных — это потоковая обработка, а данные можно разделить на два типа: ограниченные и неограниченные.

  1. Неограниченный поток определяет начало потока, но не определяет его конец. Они бесконечно генерируют данные.Неограниченные потоки данных должны обрабатываться непрерывно, то есть данные необходимо обрабатывать сразу после их приема. Мы не можем ждать, пока все данные поступят для обработки, потому что ввод бесконечен и ввод не будет завершен в любой момент. Обработка неограниченных данных часто требует, чтобы события принимались в определенном порядке, например в порядке их возникновения, чтобы можно было сделать вывод о полноте результатов.

  2. Ограниченный поток имеет начало определенного потока и конец определенного потока. Ограниченные потоки могут быть вычислены после приема всех данных. Все данные в ограниченном потоке можно упорядочить, поэтому упорядоченный прием не требуется.Обработку ограниченного потока часто называют пакетной обработкой.

В самом начале у Spark были только RDD.Потом Spark учился на наборе Flink.Flink начал с реального времени, потому что он не мог обрабатывать Spark, когда он запускался в автономном режиме. Так что они на самом деле очень любят друг друга.

1.1.2 Развертывание в нескольких сценариях

Apache Flink Для выполнения приложения требуются вычислительные ресурсы. Flink интегрируется со всеми распространенными менеджерами ресурсов кластера, такими как Hadoop YARN, Apache Mesos и Kubernetes, но также может работать как автономный кластер.

Flink предназначен для хорошей работы с каждым из вышеперечисленных диспетчеров ресурсов, что достигается за счет шаблона развертывания, специфичного для диспетчера ресурсов. Flink может взаимодействовать способом, совместимым с текущим менеджером ресурсов.

При развертывании приложения Flink Flink автоматически определяет необходимые ресурсы на основе параллелизма, настроенного приложением, и запрашивает эти ресурсы у диспетчера ресурсов. В случае сбоя Flink заменяет отказавший контейнер, запрашивая новые ресурсы. Вся связь для отправки или управления приложением осуществляется через вызовы REST, что упрощает интеграцию Flink с различными средами.

1.1.3 Применение данных в различных масштабах

Flink предназначен для запуска потоковых приложений с отслеживанием состояния в любом масштабе. Таким образом, приложение распараллеливается на потенциально тысячи задач, которые распределяются по кластеру и выполняются одновременно. Таким образом, приложение может использовать бесконечные ресурсы процессора, памяти, диска и сетевого ввода-вывода.

И Flink позволяет легко поддерживать очень большое состояние приложения. Его алгоритм асинхронного и инкрементного контрольно-пропускного пункта имеет минимальное влияние на задержку обработки, гарантируя ровно-после согласованности государственной последовательности.
Флинк может это сделать

每天处理数万亿的事件
可以维护几TB大小的状态
可以部署上千个节点的集群

1.1.4 Полное использование производительности памяти

Программы Flink с отслеживанием состояния оптимизированы для локального доступа к состоянию. Состояние задачи всегда хранится в памяти, и если размер состояния превышает доступную память, оно сохраняется в структуре данных на диске, к которой можно эффективно обращаться. Задачи выполняют все вычисления, обращаясь к локальному состоянию (обычно в памяти), что приводит к очень низкой задержке обработки. Flink обеспечивает однократную согласованность состояния в сценариях сбоя, периодически и асинхронно сохраняя локальное состояние.

Мы считаем в режиме реального времени каждыйкаждую 1 секундустатистикаКоличество вхождений слова за последние 2 секунды, рекомендуется использовать IDEA для разработки

1.3.1 Добавление зависимостей POM

<properties>
    <flink.version>1.9.0</flink.version>
    <scala.version>2.11.8</scala.version>
</properties>

<dependencies>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>${flink.version}</version>
    </dependency>
</dependencies>

1.3.2 Разработка Java-кода

Здесь мы используем Java для разработки, на самом деле можно использовать и Scala, но на самом деле большинство предприятий используют Java.

При вводе кода будьте осторожны, чтобы не импортировать Scala при импорте пакетов.

public class WindowWordCountJava {
    public static void main(String[] args) throws Exception {
        // Flink提供的获取传递的参数的工具类
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String hostname = parameterTool.get("hostname");
        int port = parameterTool.getInt("port");

        //步骤一:获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //步骤二:获取数据源
        DataStream<String> dataStream = env.socketTextStream(hostname, port);
        //步骤三:执行逻辑操作
        DataStream<WordCount> wordAndOneStream = dataStream.flatMap(new FlatMapFunction<String, WordCount>() {
            public void flatMap(String line, Collector<WordCount> out) {
                String[] fields = line.split(",");
                for (String word : fields) {
                    out.collect(new WordCount(word, 1L));
                }
            }
        });

        // 类比于Spark Streaming中的window算子
        DataStream<WordCount> resultStream = wordAndOneStream.keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))//每隔1秒计算最近2秒
                .sum("count");
        //步骤四:结果打印
        resultStream.print();
        //步骤五:任务启动
        env.execute("WindowWordCountJava");
    }

    public static class WordCount{
        public String word;
        public long count;
        //记得要有这个空构造方法
        public WordCount(){

        }
        public WordCount(String word,long count){
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

Скала-версия

Также необходимо добавить зависимости Flink Scala, зависимости разработки Scala и плагины компиляции.

<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>${flink.version}</version>

1.9.0 2.11.8

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

<build>
    <pluginManagement>
        <plugins>
            <!-- scala插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
            </plugin>
            <!-- maven 插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </pluginManagement>
    <plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <executions>
                <execution>
                    <id>scala-compile-first</id>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>scala-test-compile</id>
                    <phase>process-test-resources</phase>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <executions>
                <execution>
                    <phase>compile</phase>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Scala только сейчас переписывает логику

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * 滑动窗口
  * 每隔1秒钟统计最近2秒内的数据,打印到控制台。
  */
object WindowWordCountScala {
  def main(args: Array[String]): Unit = {
    //获取参数
    val hostname = ParameterTool.fromArgs(args).get("hostname")
    val port = ParameterTool.fromArgs(args).getInt("port")
    //TODO 导入隐式转换
    import org.apache.flink.api.scala._
    //步骤一:获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //步骤二:获取数据源
    val textStream = env.socketTextStream(hostname,port)
    //步骤三:数据处理
    val wordCountStream = textStream.flatMap(line => line.split(","))
      .map((_, 1))
      .keyBy(0)
      .timeWindow(Time.seconds(2), Time.seconds(1))
      .sum(1)
    //步骤四:数据结果处理
    wordCountStream.print()
    //步骤六:启动程序
    env.execute("WindowWordCountScala")
  }
}

1.4 установка локального режима

  1. Установите jdk, настройте JAVA_HOME, рекомендуется использовать jdk1.8 или выше

  2. Адрес загрузки установочного пакета: http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz

  3. Загрузите инсталляционный пакет прямо на сервер

    Разархивируйте установочный пакет: tar -zxvf flink-1.9.1-bin-scala_2.11.tgz

    Создать ярлык: ln -s flink-1.9.1-bin-scala_2.11.tgz flink

    Настройте переменную среды FLINK_HOEM.

  4. запустить службу

    В локальном режиме вам не нужно настраивать какие-либо элементы конфигурации, просто запустите сервер напрямую

    cd $FLIKE_HOME

    ./bin/start-cluster.sh запустить службу

    ./bin/stop-cluster.sh остановить службу

  5. Просмотр веб-страницы localhost:8081

1.5 Установка в автономном режиме

1.5.1 Кластерное планирование

Имя процессора JobManager TaskManager
hadoop01 да
hadoop02 да
hadoop03 да

1.5.2 Зависимости

Выше jdk1.8 настройте JAVA_HOME

Без пароля между хостами

flink-1.9.1-bin-scala_2.11.tgz

1.5.3 Этапы установки

  1. Изменить conf/flink-conf.yaml

    jobmanager.rpc.address: hadoop01

  2. Изменить конфиг/рабы

    hadoop02
    hadoop03

  3. копировать на другие узлы

    scp -rq /usr/local/flink-1.9.1 hadoop02:/usr/local
    scp -rq /usr/local/flink-1.9.1 hadoop03:/usr/local

  4. Начните с узла hadoop01 (JobMananger).

    start-cluster.sh

  5. Посетите http://hadoop01:8081

1.5.4 Параметры, которые необходимо учитывать в автономном режиме

  1. jobmanager.heap.mb --- объем памяти, доступный узлу менеджера заданий

  2. taskmanager.heap.mb --- объем памяти, доступный для узлов диспетчера задач

  3. taskmanager.numberOfTaskSlots — количество ЦП, доступных на машину.

  4. parallelism.default — параллелизм задач по умолчанию

  5. taskmanager.tmp.dirs --- каталог временного хранения данных для диспетчера задач

1.6 Flink при установке режима Yarn

Есть два способа намотать пряжу:

1.6.1 Первый способ

Запустите кластер Flink в YARN, а затем мы отправляем задачи в кластер Flink.Если кластер Flink не будет остановлен, ресурсы не будут освобождены, что приведет к их трате.

1.6.2 Второй способ

Каждый раз при отправке задачи на пряже запускается небольшой кластер flink (рекомендуется), а ресурсы автоматически освобождаются после запуска задачи.

1.7 Подача задания в разных режимах

1.7.1 yarn-session.sh (развертывание ресурсов) + запуск flink (отправка задач)

Этот метод в основном не используется, поэтому я не буду его объяснять.

启动一个一直运行的flink集群

/bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]

把任务附着到一个已存在的flink yarn session

•./bin/yarn-session.sh -id application_1463870264508_0029

•执行任务

•./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/LICENSE -output hdfs://hadoop100:9000/wordcount-result.txt 

停止任务 【web界面或者命令行执行cancel命令】

1.7.2 flink run -m yarn-cluster (развернуть ресурсы + отправить задачи)

启动集群,执行任务

./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar

Если вы знаете, что yjm — это Yarn JobManager, ytm — это Yarn taskManager, а значения других связанных параметров можно найти с помощью поиска Baidu.

Options for yarn-cluster mode:
 -d,--detached                        以独立模式运行任务
 -m,--jobmanager <arg>                连接 JobManager(主)的地址。
                                      使用此标志连接一个不同的 JobManager 在配置中指定的
 -yD <property=value>                 使用给定属性的值
 -yd,--yarndetached                   以独立模式运行任务(过期的;用 non-YARN 选项代替)
 -yh,--yarnhelp                       Yarn session CLI 的帮助信息
 -yid,--yarnapplicationId <arg>       用来运行 YARN Session 的 ID
 -yj,--yarnjar <arg>                  Flink jar 文件的路径
 -yjm,--yarnjobManagerMemory <arg>    JobManager 容器的内存可选单元(默认值: MB)
 -yn,--yarncontainer <arg>            分配 YARN 容器的数量(=TaskManager 的数量)
 -ynm,--yarnname <arg>                给应用程序一个自定义的名字显示在 YARN 上
 -yq,--yarnquery                      显示 YARN 的可用资源(内存,队列)
 -yqu,--yarnqueue <arg>               指定 YARN 队列
 -ys,--yarnslots <arg>                每个 TaskManager 的槽位数量
 -yst,--yarnstreaming                 以流式处理方式启动 Flink
 -yt,--yarnship <arg>                 在指定目录中传输文件
                                      (t for transfer)
 -ytm,--yarntaskManagerMemory <arg>   每个 TaskManager 容器的内存可选单元(默认值: MB)
 -yz,--yarnzookeeperNamespace <arg>   用来创建高可用模式的 Zookeeper 的子路径的命名空间。
 -ynl,--yarnnodeLabel <arg>           指定 YARN 应用程序  YARN 节点标签

Примечание: клиентская сторона должна быть установленаYARN_CONF_DIRилиHADOOP_CONF_DIRилиHADOOP_HOMEПеременная окружения, через которую можно прочитать информацию о конфигурации YARN и HDFS, иначе запуск не удастся

Finally

Следующая статья начинает знакомить с операторами