Серия Flink (8) — развертывание автономного кластера Flink

Flink

1. Режим развертывания

Flink поддерживает использование нескольких режимов развертывания для удовлетворения потребностей приложений разного масштаба.Обычные из них включают автономный режим и режим автономного кластера.Flink также поддерживает развертывание на других сторонних платформах, таких как YARN, Mesos, Docker, Kubernetes. , и т.д. Далее в основном представлено развертывание его автономного режима и режима автономного кластера.

2. Автономный режим

Автономный режим — это стандартный режим, который может работать на одном сервере и подходит для повседневной разработки и отладки. Конкретные этапы операции следующие:

2.1 Установка и развертывание

1. Предварительные условия

Работа Flink зависит от среды JAVA, поэтому необходимо предварительно установить JDK. Конкретные шаги см.Установка JDK в среде Linux

2. Скачайте, разархивируйте и запустите

Установочные пакеты для всех версий Flink можно загрузить непосредственно с егоОфициальный сайтСкачать, версия Flink, которую я скачал здесь1.9.1, требуемая версия JDK1.8.x +. После загрузки извлеките его в указанный каталог:

tar -zxvf flink-1.9.1-bin-scala_2.12.tgz  -C /usr/app

Никакой настройки не требуется, просто используйте следующую команду, чтобы запустить автономную версию Flink:

bin/start-cluster.sh

3. ВЕБ-интерфейс

Flink предоставляет веб-интерфейс для интуитивно понятного управления кластерами Flink.8081:

https://github.com/heibaiying

Интерфейс пользовательского веб-интерфейса Flink поддерживает наиболее распространенные функции, такие как отправка заданий, отмена заданий, проверка рабочего состояния каждого узла, проверка состояния выполнения задания и т. д. Вы можете перейти на эту страницу для подробного просмотра после завершения развертывания.

2.2 Представление задания

После запуска вы можете запустить случай статистики частоты слов, который поставляется с установочным пакетом.Конкретные шаги заключаются в следующем:

1. Откройте порт

nc -lk 9999

2. Отправьте задание

bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999

Исходный код пакета JAR можно найти в официальном репозитории Flink на GitHub по адресу:SocketWindowWordCount, необязательные параметры включают имя хоста, порт и соответствующие данные о частоте слов должны быть разделены пробелами.

3. Введите тестовые данные

a a b b c c c a e

4. Просмотр вывода консоли

Вы можете просмотреть текущий статус системы заданий через консоль веб-интерфейса:

https://github.com/heibaiying

Вы также можете просмотреть статистические результаты через WEB-консоль:

https://github.com/heibaiying

2.3 Остановить работу

Вы можете напрямую щелкнуть соответствующее задание в веб-интерфейсе.Cancel Jobкнопку для отмены или используйте командную строку для отмены. При использовании командной строки для отмены вам необходимо сначала получить JobId задания, вы можете использоватьflink listКоманда для просмотра, вывод выглядит следующим образом:

[root@hadoop001 flink-1.9.1]# ./bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
05.11.2019 08:19:53 : ba2b1cc41a5e241c32d574c93de8a2bc : Socket Window WordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

После получения JobId вы можете использоватьflink cancelКоманда для отмены задания:

bin/flink cancel ba2b1cc41a5e241c32d574c93de8a2bc

2.4 Остановить мерцание

Команда выглядит следующим образом:

bin/stop-cluster.sh

3. Автономный кластер

Автономный режим кластера — это режим кластера, который поставляется с Flink.Конкретные шаги настройки следующие:

3.1 Предварительные условия

Перед использованием этого режима необходимо убедиться, что между всеми серверами настроена служба входа без пароля SSH. Здесь я беру в качестве примера три сервера, имена хостов — hadoop001, hadoop002, hadoop003, из которых hadoop001 — главный узел, а два других — подчиненные узлы, этапы построения следующие:

3.2 Этапы строительства

Исправлятьconf/flink-conf.yamlКоммуникационный адрес узла менеджера заданий — hadoop001:

jobmanager.rpc.address: hadoop001

Исправлятьconf/slavesФайл конфигурации для настройки hadoop002 и hadoop003 в качестве подчиненных узлов:

hadoop002
hadoop003

Распространите настроенный установочный пакет Flink на два других сервера:

 scp -r /usr/app/flink-1.9.1 hadoop002:/usr/app
 scp -r /usr/app/flink-1.9.1 hadoop003:/usr/app

Запустите кластер на hadoop001 с помощью той же команды, что и в автономном режиме:

bin/start-cluster.sh

На данный момент вывод консоли выглядит следующим образом:

https://github.com/heibaiying

Можно использовать после запускаJpsКомандой или через WEB интерфейс, чтобы проверить успешность запуска.

3.3 Дополнительная конфигурация

Помимо вышеперечисленногоjobmanager.rpc.addressВ дополнение к обязательной конфигурации Flink h также поддерживает использование других необязательных параметров для оптимизации производительности кластера, в основном следующим образом:

  • jobmanager.heap.size: размер кучи JVM для JobManager, по умолчанию 1024 м .
  • taskmanager.heap.size: размер памяти кучи JVM диспетчера задач, по умолчанию 1024 м .
  • taskmanager.numberOfTaskSlots: количество слотов в диспетчере задач, обычно равное количеству ядер процессора или его половине.
  • parallelism.default: параллелизм задачи по умолчанию.
  • io.tmp.dirs: путь для хранения временных файлов, если он не настроен, по умолчанию используется временный каталог сервера, например, LInux's/tmpсодержание.

Дополнительные сведения о настройке см. в официальном руководстве Flink:Configuration

4. Автономный кластер HA

Автономный кластер, который мы настроили выше, на самом деле имеет только один JobManager, и в настоящее время существует единственная точка отказа, поэтому режим высокой доступности автономного кластера официально предоставляется для достижения высокой доступности кластера.

4.1 Предварительные условия

В режиме Standalone Cluster HA кластер может иметь несколько диспетчеров заданий, но только один из них находится в активном состоянии, а остальные находятся в состоянии ожидания Flink использует ZooKeeper для выбора активного диспетчера заданий и полагается на него для предоставления услуг координации согласованности. поэтому необходимо предварительно установить ZooKeeper.

Кроме того, в режиме высокой доступности необходимо использовать распределенную файловую систему для постоянного хранения метаданных JobManager.Наиболее часто используется HDFS, поэтому Hadoop также необходимо предварительно установить. Для построения кластера Hadoop и кластера ZooKeeper см.:

4.2 Этапы строительства

Исправлятьconf/flink-conf.yamlфайл, добавьте следующую конфигурацию:

# 配置使用zookeeper来开启高可用模式
high-availability: zookeeper
# 配置zookeeper的地址,采用zookeeper集群时,可以使用逗号来分隔多个节点地址
high-availability.zookeeper.quorum: hadoop003:2181
# 在zookeeper上存储flink集群元信息的路径
high-availability.zookeeper.path.root: /flink
# 集群id
high-availability.cluster-id: /standalone_cluster_one
# 持久化存储JobManager元数据的地址,zookeeper上存储的只是指向该元数据的指针信息
high-availability.storageDir: hdfs://hadoop001:8020/flink/recovery

Исправлятьconf/mastersфайл для настройки и hadoop001, и hadoop002 в качестве главных узлов:

hadoop001:8081
hadoop002:8081

Убедившись, что Hadoop и ZooKeeper работают, запустите кластер с помощью следующей команды:

bin/start-cluster.sh

На данный момент вывод следующий:

https://github.com/heibaiying

Вы видите, что кластер был запущен в режиме HA, и вам нужно использовать его на каждом узле в это время.jpsкоманда для проверки того, успешно ли запущен процесс, нормальная ситуация выглядит следующим образом:

https://github.com/heibaiying

Только когда процессы JobManager Hadoop001 и Hadoop002 и процессы TaskManager Hadoop002 и Hadoop003 полностью запущены, это означает, что режим высокой доступности автономного кластера успешно установлен.

4.3 Общие исключения

Если процесс не запускается, вы можете проверитьlogЖурнал в каталоге, чтобы найти ошибку, общая ошибка выглядит следующим образом:

2019-11-05 09:18:35,877 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint      
- Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics
java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
.......
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file 
system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no 
Hadoop file system to support this scheme could be loaded.
.....
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in 
the classpath/dependencies.
......

Видно, что связанные зависимости Hadoop не могут быть найдены в каталоге пути к классам.В настоящее время вам необходимо проверить, настроен ли путь установки Hadoop в переменной среды.Если путь был настроен, но вышеуказанная проблема все еще существует, вы можете нажатьофициальный сайт ФлинкаЗагрузите соответствующую версию пакета Hadoop:

https://github.com/heibaiying

После завершения загрузки загрузите пакет JAR ввсеКаталог установки Флинкаlibкаталог.

использованная литература

Другие статьи серии больших данных можно найти в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным