1. Установите плагин Scala
Flink предоставляет API на основе языка Java и языка Scala соответственно.Если вы хотите использовать язык Scala для разработки программ Flink, вы можете установить подключаемый модуль Scala в IDEA, чтобы предоставлять подсказки по синтаксису, подсветку кода и другие функции. Откройте ИДЕЮ, нажмитеFile => settings => plugins
Откройте страницу установки плагина, найдите плагин Scala и установите его.После завершения установки перезапустите IDEA, чтобы изменения вступили в силу.
Два, инициализация проекта Flink
2.1 Сборка с официальными скриптами
Flink официально поддерживает использование Maven и Gradle для создания проектов Flink на основе языка Java; он поддерживает использование SBT и Maven для создания проектов Flink на основе языка Scala. Здесь мы берем Maven в качестве примера, потому что он может поддерживать создание проектов на языке Java и языке Scala одновременно. Следует отметить, что Flink 1.9 поддерживает только Maven 3.0.4 и выше.После установки Maven проект можно собрать двумя способами:
1. Стройте прямо на Maven Archetype
Непосредственно используйте следующий оператор mvn для сборки, а затем введите groupId , ArtiftId и имя пакета по очереди в соответствии с подсказками интерактивной информации и дождитесь завершения инициализации:
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.0
Примечание. Если вы хотите создать проект на основе языка Scala, вам нужно всего лишь заменить flink-quickstart-java на flink-quickstart-scala, и последует то же самое.
2. Используйте официальные сценарии для быстрого построения
Чтобы удобнее инициализировать проект, предусмотрен официальный скрипт быстрого сборки, который можно назвать непосредственно по следующей команде:
$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.9.0
Этот метод фактически инициализируется выполнением команды maven archetype.Содержимое сценария выглядит следующим образом:
PACKAGE=quickstart
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=${1:-1.8.0} \
-DgroupId=org.myorg.quickstart \
-DartifactId=$PACKAGE \
-Dversion=0.1 \
-Dpackage=org.myorg.quickstart \
-DinteractiveMode=false
Видно, что по сравнению с первым методом этот метод просто напрямую указывает groupId, ArtiftId, версию и другую информацию.
2.2 Создание с помощью IDEA
Если вы используете IDEA в качестве инструмента разработки, вы можете напрямую выбрать Maven Flink Archetype на странице создания проекта, чтобы инициализировать проект:
Если в вашей ИДЕЕ нет вышеуказанного Архетипа, вы можете нажать в правом верхнем углуADD ARCHETYPE
, чтобы добавить, заполните необходимую информацию в свою очередь, эти сведения могут быть получены из вышеуказанногоarchetype:generate
полученное в заявлении. нажмитеOK
После сохранения Архетип всегда будет существовать в вашей IDEA, и каждый раз, когда вы создаете проект, вам нужно только напрямую выбрать Архетип:
Выберите Архетип Flink и нажмитеNEXT
Кнопка, все последующие шаги такие же, как и в обычных проектах Maven.
3. Структура проекта
3.1 Структура проекта
Автоматически сгенерированная структура проекта после создания выглядит следующим образом:
Среди них BatchJob — пример кода пакетной обработки, исходный код выглядит следующим образом:
import org.apache.flink.api.scala._
object BatchJob {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
....
env.execute("Flink Batch Scala API Skeleton")
}
}
getExecutionEnvironment представляет среду выполнения для получения пакетной обработки.Если он работает локально, он получит локальную среду выполнения, если он работает в кластере, он получит среду выполнения кластера. Если вы хотите получить среду выполнения потоковой обработки, вам нужно только установитьExecutionEnvironment
заменитьStreamExecutionEnvironment
, соответствующий пример кода находится в StreamingJob:
import org.apache.flink.streaming.api.scala._
object StreamingJob {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
...
env.execute("Flink Streaming Scala API Skeleton")
}
}
Обратите внимание, что для проектов потоковой обработкиenv.execute()
Этот код обязателен, иначе обработчик потока не будет выполнен, но он необязателен для пакетных проектов.
3.2 Основные зависимости
Проекты, созданные на основе скелета Maven, в основном обеспечивают следующие основные зависимости:flink-scala
Используется для поддержки разработки пакетных программ;flink-streaming-scala
Используется для поддержки разработки обработчиков потоков;scala-library
Используется для предоставления библиотеки классов, требуемой языком Scala. Если язык Java был выбран при создании с использованием скелета Maven, по умолчанию предоставляетсяflink-java
иflink-streaming-java
полагаться.
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
Вышеуказанные зависимости требуют особого вниманияscope
Все теги помечены как предоставленные, что означает, что ни одна из этих зависимостей не будет включена в окончательный пакет JAR. Поскольку эти зависимости уже предоставлены в установочном пакете Flink, расположенном в его каталоге lib с именемflink-dist_*.jar
, который содержит все основные классы и зависимости Flink:
scope
Тег, помеченный как предоставленный, вызовет исключение ClassNotFoundException при запуске проекта в IDEA. По этой причине при создании проекта с IDEA также автоматически генерируются следующие конфигурации профилей:
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
где идентификаторadd-dependencies-for-IDEA
В профиле все основные зависимости помечены как компилируемые.В настоящее время вы можете запустить проект Flink непосредственно в IDEA без изменения кода, просто проверьте профиль в панели Maven IDEA:
Четыре, случай статистики частоты слов
После того, как проект создан, вы можете написать простой пример статистики частоты слов, чтобы попытаться запустить проект Flink.В следующем примере используется язык Scala, чтобы представить примеры программирования программы потоковой обработки и пакетной программы:
4.1 Пример партии
import org.apache.flink.api.scala._
object WordCountBatch {
def main(args: Array[String]): Unit = {
val benv = ExecutionEnvironment.getExecutionEnvironment
val dataSet = benv.readTextFile("D:\\wordcount.txt")
dataSet.flatMap { _.toLowerCase.split(",")}
.filter (_.nonEmpty)
.map { (_, 1) }
.groupBy(0)
.sum(1)
.print()
}
}
вwordcount.txt
Содержание следующее:
a,a,a,a,a
b,b,b
c,c
d,d
На этом компьютере не нужно настраивать какую-либо другую среду Flink, просто запустите метод Main напрямую.Результаты следующие:
4.2 Пример потоковой обработки
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WordCountStreaming {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = senv.socketTextStream("192.168.0.229", 9999, '\n')
dataStream.flatMap { line => line.toLowerCase.split(",") }
.filter(_.nonEmpty)
.map { word => (word, 1) }
.keyBy(0)
.timeWindow(Time.seconds(3))
.sum(1)
.print()
senv.execute("Streaming WordCount")
}
}
Здесь содержимое в указанном номере порта используется в качестве примера, используйте следующую команду для открытия службы порта:
nc -lk 9999
После ввода тестовых данных можно наблюдать обработку потоковой обработки программой.
5. Использование оболочки Scala
Для ежедневных демонстрационных проектов, если вы не хотите часто запускать IDEA для наблюдения за результатами тестов, вы можете напрямую использовать Scala Shell для запуска таких программ, как Spark, что является более интуитивно понятным и экономит время для ежедневного обучения. Адрес загрузки установочного пакета Flink выглядит следующим образом:
https://flink.apache.org/downloads.html
Большинство версий Flink предоставляют установочные пакеты для Scala 2.11 и Scala 2.12 для загрузки:
После завершения загрузки разархивируйте его.Оболочка Scala находится в каталоге bin каталога установки.Вы можете запустить его в локальном режиме напрямую с помощью следующей команды:
./start-scala-shell.sh local
После запуска командной строки она предоставила среду выполнения пакетной (benv и btenv) и потоковой обработки (senv и stenv), и вы можете напрямую запустить программу Scala Flink. Пример выглядит следующим образом:
Наконец, объясняется распространенное исключение: здесь я использую FLINK версии 1.9.1, которая при запуске выдает следующее. Согласно официальному описанию, все версии установочного пакета Scala 2.12 не поддерживают Scala Shell, поэтому, если вы хотите использовать Scala Shell, вы можете выбрать только версию установочного пакета SCALA 2.11.
[root@hadoop001 bin]# ./start-scala-shell.sh local
错误: 找不到或无法加载主类 org.apache.flink.api.scala.FlinkShell
Дополнительные статьи серии о больших данных см. в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным