1. Режим развертывания
Flink поддерживает использование нескольких режимов развертывания для удовлетворения потребностей приложений разного масштаба.Обычные из них включают автономный режим и режим автономного кластера.Flink также поддерживает развертывание на других сторонних платформах, таких как YARN, Mesos, Docker, Kubernetes. , и т.д. Далее в основном представлено развертывание его автономного режима и режима автономного кластера.
2. Автономный режим
Автономный режим — это стандартный режим, который может работать на одном сервере и подходит для повседневной разработки и отладки. Конкретные этапы операции следующие:
2.1 Установка и развертывание
1. Предварительные условия
Работа Flink зависит от среды JAVA, поэтому необходимо предварительно установить JDK. Конкретные шаги см.Установка JDK в среде Linux
2. Скачайте, разархивируйте и запустите
Установочные пакеты для всех версий Flink можно загрузить непосредственно с егоОфициальный сайтСкачать, версия Flink, которую я скачал здесь1.9.1
, требуемая версия JDK1.8.x +
. После загрузки извлеките его в указанный каталог:
tar -zxvf flink-1.9.1-bin-scala_2.12.tgz -C /usr/app
Никакой настройки не требуется, просто используйте следующую команду, чтобы запустить автономную версию Flink:
bin/start-cluster.sh
3. ВЕБ-интерфейс
Flink предоставляет веб-интерфейс для интуитивно понятного управления кластерами Flink.8081
:
Интерфейс пользовательского веб-интерфейса Flink поддерживает наиболее распространенные функции, такие как отправка заданий, отмена заданий, проверка рабочего состояния каждого узла, проверка состояния выполнения задания и т. д. Вы можете перейти на эту страницу для подробного просмотра после завершения развертывания.
2.2 Представление задания
После запуска вы можете запустить случай статистики частоты слов, который поставляется с установочным пакетом.Конкретные шаги заключаются в следующем:
1. Откройте порт
nc -lk 9999
2. Отправьте задание
bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999
Исходный код пакета JAR можно найти в официальном репозитории Flink на GitHub по адресу:SocketWindowWordCount, необязательные параметры включают имя хоста, порт и соответствующие данные о частоте слов должны быть разделены пробелами.
3. Введите тестовые данные
a a b b c c c a e
4. Просмотр вывода консоли
Вы можете просмотреть текущий статус системы заданий через консоль веб-интерфейса:
Вы также можете просмотреть статистические результаты через WEB-консоль:
2.3 Остановить работу
Вы можете напрямую щелкнуть соответствующее задание в веб-интерфейсе.Cancel Job
кнопку для отмены или используйте командную строку для отмены. При использовании командной строки для отмены вам необходимо сначала получить JobId задания, вы можете использоватьflink list
Команда для просмотра, вывод выглядит следующим образом:
[root@hadoop001 flink-1.9.1]# ./bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
05.11.2019 08:19:53 : ba2b1cc41a5e241c32d574c93de8a2bc : Socket Window WordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
После получения JobId вы можете использоватьflink cancel
Команда для отмены задания:
bin/flink cancel ba2b1cc41a5e241c32d574c93de8a2bc
2.4 Остановить мерцание
Команда выглядит следующим образом:
bin/stop-cluster.sh
3. Автономный кластер
Автономный режим кластера — это режим кластера, который поставляется с Flink.Конкретные шаги настройки следующие:
3.1 Предварительные условия
Перед использованием этого режима необходимо убедиться, что между всеми серверами настроена служба входа без пароля SSH. Здесь я беру в качестве примера три сервера, имена хостов — hadoop001, hadoop002, hadoop003, из которых hadoop001 — главный узел, а два других — подчиненные узлы, этапы построения следующие:
3.2 Этапы строительства
Исправлятьconf/flink-conf.yaml
Коммуникационный адрес узла менеджера заданий — hadoop001:
jobmanager.rpc.address: hadoop001
Исправлятьconf/slaves
Файл конфигурации для настройки hadoop002 и hadoop003 в качестве подчиненных узлов:
hadoop002
hadoop003
Распространите настроенный установочный пакет Flink на два других сервера:
scp -r /usr/app/flink-1.9.1 hadoop002:/usr/app
scp -r /usr/app/flink-1.9.1 hadoop003:/usr/app
Запустите кластер на hadoop001 с помощью той же команды, что и в автономном режиме:
bin/start-cluster.sh
На данный момент вывод консоли выглядит следующим образом:
Можно использовать после запускаJps
Командой или через WEB интерфейс, чтобы проверить успешность запуска.
3.3 Дополнительная конфигурация
Помимо вышеперечисленногоjobmanager.rpc.addressВ дополнение к обязательной конфигурации Flink h также поддерживает использование других необязательных параметров для оптимизации производительности кластера, в основном следующим образом:
- jobmanager.heap.size: размер кучи JVM для JobManager, по умолчанию 1024 м .
- taskmanager.heap.size: размер памяти кучи JVM диспетчера задач, по умолчанию 1024 м .
- taskmanager.numberOfTaskSlots: количество слотов в диспетчере задач, обычно равное количеству ядер процессора или его половине.
- parallelism.default: параллелизм задачи по умолчанию.
-
io.tmp.dirs: путь для хранения временных файлов, если он не настроен, по умолчанию используется временный каталог сервера, например, LInux's
/tmp
содержание.
Дополнительные сведения о настройке см. в официальном руководстве Flink:Configuration
4. Автономный кластер HA
Автономный кластер, который мы настроили выше, на самом деле имеет только один JobManager, и в настоящее время существует единственная точка отказа, поэтому режим высокой доступности автономного кластера официально предоставляется для достижения высокой доступности кластера.
4.1 Предварительные условия
В режиме Standalone Cluster HA кластер может иметь несколько диспетчеров заданий, но только один из них находится в активном состоянии, а остальные находятся в состоянии ожидания Flink использует ZooKeeper для выбора активного диспетчера заданий и полагается на него для предоставления услуг координации согласованности. поэтому необходимо предварительно установить ZooKeeper.
Кроме того, в режиме высокой доступности необходимо использовать распределенную файловую систему для постоянного хранения метаданных JobManager.Наиболее часто используется HDFS, поэтому Hadoop также необходимо предварительно установить. Для построения кластера Hadoop и кластера ZooKeeper см.:
4.2 Этапы строительства
Исправлятьconf/flink-conf.yaml
файл, добавьте следующую конфигурацию:
# 配置使用zookeeper来开启高可用模式
high-availability: zookeeper
# 配置zookeeper的地址,采用zookeeper集群时,可以使用逗号来分隔多个节点地址
high-availability.zookeeper.quorum: hadoop003:2181
# 在zookeeper上存储flink集群元信息的路径
high-availability.zookeeper.path.root: /flink
# 集群id
high-availability.cluster-id: /standalone_cluster_one
# 持久化存储JobManager元数据的地址,zookeeper上存储的只是指向该元数据的指针信息
high-availability.storageDir: hdfs://hadoop001:8020/flink/recovery
Исправлятьconf/masters
файл для настройки и hadoop001, и hadoop002 в качестве главных узлов:
hadoop001:8081
hadoop002:8081
Убедившись, что Hadoop и ZooKeeper работают, запустите кластер с помощью следующей команды:
bin/start-cluster.sh
На данный момент вывод следующий:
Вы видите, что кластер был запущен в режиме HA, и вам нужно использовать его на каждом узле в это время.jps
команда для проверки того, успешно ли запущен процесс, нормальная ситуация выглядит следующим образом:
Только когда процессы JobManager Hadoop001 и Hadoop002 и процессы TaskManager Hadoop002 и Hadoop003 полностью запущены, это означает, что режим высокой доступности автономного кластера успешно установлен.
4.3 Общие исключения
Если процесс не запускается, вы можете проверитьlog
Журнал в каталоге, чтобы найти ошибку, общая ошибка выглядит следующим образом:
2019-11-05 09:18:35,877 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
- Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics
java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
.......
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file
system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no
Hadoop file system to support this scheme could be loaded.
.....
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in
the classpath/dependencies.
......
Видно, что связанные зависимости Hadoop не могут быть найдены в каталоге пути к классам.В настоящее время вам необходимо проверить, настроен ли путь установки Hadoop в переменной среды.Если путь был настроен, но вышеуказанная проблема все еще существует, вы можете нажатьофициальный сайт ФлинкаЗагрузите соответствующую версию пакета Hadoop:
После завершения загрузки загрузите пакет JAR ввсеКаталог установки Флинкаlib
каталог.
использованная литература
Другие статьи серии больших данных можно найти в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным