Серия Flink (2) — построение среды разработки Flink

задняя часть Flink

1. Установите плагин Scala

Flink предоставляет API на основе языка Java и языка Scala соответственно.Если вы хотите использовать язык Scala для разработки программ Flink, вы можете установить подключаемый модуль Scala в IDEA, чтобы предоставлять подсказки по синтаксису, подсветку кода и другие функции. Откройте ИДЕЮ, нажмитеFile => settings => pluginsОткройте страницу установки плагина, найдите плагин Scala и установите его.После завершения установки перезапустите IDEA, чтобы изменения вступили в силу.

https://github.com/heibaiying

Два, инициализация проекта Flink

2.1 Сборка с официальными скриптами

Flink официально поддерживает использование Maven и Gradle для создания проектов Flink на основе языка Java; он поддерживает использование SBT и Maven для создания проектов Flink на основе языка Scala. Здесь мы берем Maven в качестве примера, потому что он может поддерживать создание проектов на языке Java и языке Scala одновременно. Следует отметить, что Flink 1.9 поддерживает только Maven 3.0.4 и выше.После установки Maven проект можно собрать двумя способами:

1. Стройте прямо на Maven Archetype

Непосредственно используйте следующий оператор mvn для сборки, а затем введите groupId , ArtiftId и имя пакета по очереди в соответствии с подсказками интерактивной информации и дождитесь завершения инициализации:

$ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.9.0

Примечание. Если вы хотите создать проект на основе языка Scala, вам нужно всего лишь заменить flink-quickstart-java на flink-quickstart-scala, и последует то же самое.

2. Используйте официальные сценарии для быстрого построения

Чтобы удобнее инициализировать проект, предусмотрен официальный скрипт быстрого сборки, который можно назвать непосредственно по следующей команде:

$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.9.0

Этот метод фактически инициализируется выполнением команды maven archetype.Содержимое сценария выглядит следующим образом:

PACKAGE=quickstart

mvn archetype:generate \
  -DarchetypeGroupId=org.apache.flink \
  -DarchetypeArtifactId=flink-quickstart-java \
  -DarchetypeVersion=${1:-1.8.0} \
  -DgroupId=org.myorg.quickstart \
  -DartifactId=$PACKAGE	\
  -Dversion=0.1 \
  -Dpackage=org.myorg.quickstart \
  -DinteractiveMode=false

Видно, что по сравнению с первым методом этот метод просто напрямую указывает groupId, ArtiftId, версию и другую информацию.

2.2 Создание с помощью IDEA

Если вы используете IDEA в качестве инструмента разработки, вы можете напрямую выбрать Maven Flink Archetype на странице создания проекта, чтобы инициализировать проект:

https://github.com/heibaiying

Если в вашей ИДЕЕ нет вышеуказанного Архетипа, вы можете нажать в правом верхнем углуADD ARCHETYPE, чтобы добавить, заполните необходимую информацию в свою очередь, эти сведения могут быть получены из вышеуказанногоarchetype:generateполученное в заявлении. нажмитеOKПосле сохранения Архетип всегда будет существовать в вашей IDEA, и каждый раз, когда вы создаете проект, вам нужно только напрямую выбрать Архетип:

https://github.com/heibaiying

Выберите Архетип Flink и нажмитеNEXTКнопка, все последующие шаги такие же, как и в обычных проектах Maven.

3. Структура проекта

3.1 Структура проекта

Автоматически сгенерированная структура проекта после создания выглядит следующим образом:

https://github.com/heibaiying

Среди них BatchJob — пример кода пакетной обработки, исходный код выглядит следующим образом:

import org.apache.flink.api.scala._

object BatchJob {
  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment
      ....
    env.execute("Flink Batch Scala API Skeleton")
  }
}

getExecutionEnvironment представляет среду выполнения для получения пакетной обработки.Если он работает локально, он получит локальную среду выполнения, если он работает в кластере, он получит среду выполнения кластера. Если вы хотите получить среду выполнения потоковой обработки, вам нужно только установитьExecutionEnvironmentзаменитьStreamExecutionEnvironment, соответствующий пример кода находится в StreamingJob:

import org.apache.flink.streaming.api.scala._

object StreamingJob {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
      ...
    env.execute("Flink Streaming Scala API Skeleton")
  }
}

Обратите внимание, что для проектов потоковой обработкиenv.execute()Этот код обязателен, иначе обработчик потока не будет выполнен, но он необязателен для пакетных проектов.

3.2 Основные зависимости

Проекты, созданные на основе скелета Maven, в основном обеспечивают следующие основные зависимости:flink-scalaИспользуется для поддержки разработки пакетных программ;flink-streaming-scalaИспользуется для поддержки разработки обработчиков потоков;scala-libraryИспользуется для предоставления библиотеки классов, требуемой языком Scala. Если язык Java был выбран при создании с использованием скелета Maven, по умолчанию предоставляетсяflink-javaиflink-streaming-javaполагаться.

<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

<!-- Scala Library, provided by Flink as well. -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
    <scope>provided</scope>
</dependency>

Вышеуказанные зависимости требуют особого вниманияscopeВсе теги помечены как предоставленные, что означает, что ни одна из этих зависимостей не будет включена в окончательный пакет JAR. Поскольку эти зависимости уже предоставлены в установочном пакете Flink, расположенном в его каталоге lib с именемflink-dist_*.jar, который содержит все основные классы и зависимости Flink:

https://github.com/heibaiying

scopeТег, помеченный как предоставленный, вызовет исключение ClassNotFoundException при запуске проекта в IDEA. По этой причине при создании проекта с IDEA также автоматически генерируются следующие конфигурации профилей:

<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
    <profile>
        <id>add-dependencies-for-IDEA</id>

        <activation>
            <property>
                <name>idea.version</name>
            </property>
        </activation>

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
                <scope>compile</scope>
            </dependency>
        </dependencies>
    </profile>
</profiles>

где идентификаторadd-dependencies-for-IDEAВ профиле все основные зависимости помечены как компилируемые.В настоящее время вы можете запустить проект Flink непосредственно в IDEA без изменения кода, просто проверьте профиль в панели Maven IDEA:

https://github.com/heibaiying

Четыре, случай статистики частоты слов

После того, как проект создан, вы можете написать простой пример статистики частоты слов, чтобы попытаться запустить проект Flink.В следующем примере используется язык Scala, чтобы представить примеры программирования программы потоковой обработки и пакетной программы:

4.1 Пример партии

import org.apache.flink.api.scala._

object WordCountBatch {

  def main(args: Array[String]): Unit = {
    val benv = ExecutionEnvironment.getExecutionEnvironment
    val dataSet = benv.readTextFile("D:\\wordcount.txt")
    dataSet.flatMap { _.toLowerCase.split(",")}
            .filter (_.nonEmpty)
            .map { (_, 1) }
            .groupBy(0)
            .sum(1)
            .print()
  }
}

вwordcount.txtСодержание следующее:

a,a,a,a,a
b,b,b
c,c
d,d

На этом компьютере не нужно настраивать какую-либо другую среду Flink, просто запустите метод Main напрямую.Результаты следующие:

https://github.com/heibaiying

4.2 Пример потоковой обработки

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WordCountStreaming {

  def main(args: Array[String]): Unit = {

    val senv = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = senv.socketTextStream("192.168.0.229", 9999, '\n')
    dataStream.flatMap { line => line.toLowerCase.split(",") }
              .filter(_.nonEmpty)
              .map { word => (word, 1) }
              .keyBy(0)
              .timeWindow(Time.seconds(3))
              .sum(1)
              .print()
    senv.execute("Streaming WordCount")
  }
}

Здесь содержимое в указанном номере порта используется в качестве примера, используйте следующую команду для открытия службы порта:

nc -lk 9999

После ввода тестовых данных можно наблюдать обработку потоковой обработки программой.

5. Использование оболочки Scala

Для ежедневных демонстрационных проектов, если вы не хотите часто запускать IDEA для наблюдения за результатами тестов, вы можете напрямую использовать Scala Shell для запуска таких программ, как Spark, что является более интуитивно понятным и экономит время для ежедневного обучения. Адрес загрузки установочного пакета Flink выглядит следующим образом:

https://flink.apache.org/downloads.html

Большинство версий Flink предоставляют установочные пакеты для Scala 2.11 и Scala 2.12 для загрузки:

https://github.com/heibaiying

После завершения загрузки разархивируйте его.Оболочка Scala находится в каталоге bin каталога установки.Вы можете запустить его в локальном режиме напрямую с помощью следующей команды:

./start-scala-shell.sh local

После запуска командной строки она предоставила среду выполнения пакетной (benv и btenv) и потоковой обработки (senv и stenv), и вы можете напрямую запустить программу Scala Flink. Пример выглядит следующим образом:

https://github.com/heibaiying

Наконец, объясняется распространенное исключение: здесь я использую FLINK версии 1.9.1, которая при запуске выдает следующее. Согласно официальному описанию, все версии установочного пакета Scala 2.12 не поддерживают Scala Shell, поэтому, если вы хотите использовать Scala Shell, вы можете выбрать только версию установочного пакета SCALA 2.11.

[root@hadoop001 bin]# ./start-scala-shell.sh local
错误: 找不到或无法加载主类 org.apache.flink.api.scala.FlinkShell

Дополнительные статьи серии о больших данных см. в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным