предисловие
На самом деле Spark Streaming в основном использует операторы и вводит больше кодов. В то время я думал, что о Spark Streaming особо нечего упоминать, поэтому я просто пропустил его. Потом···
Тогда я вернусь и поговорю
1. Искра потокового вещания
1.1 Запуск процесса
- Прежде всего, вне зависимости от того, отправляем ли мы Spark core, Spark SQL или Spark Streaming, мы будем называть их одним целым."Заявление"
- И нам нужно знать,Spark SQLа такжеSpark StreamingНижележащие слои зависят отSpark Core, поэтому, прежде чем мы захотим их использовать, мы должны инициализировать запись программы Spark Core.Из кода StreamingContext должен зависеть от SparkContext, а в SparkContext мы должны инициализировать службу Driver, поэтому их структура выглядит следующим образом
- После того, как все вышеперечисленное инициализировано, наши воркеры естественно выделили Executors
- Затем служба Driver отправитReceiverВозражатьExecutorНа Ресивере по умолчанию стоит только один Ресивер, конечно, его тоже можно поставить на несколько по коду
- После того, как Receiver запущен, его фактически можно рассматривать как задачу Task (данная Task начинает непрерывно получать данные, и инкапсулирует их в блок, а затем записывает блок в память Executor)
- Приемник сообщит водителю информацию об этих блоках.
Хотя картинка очень простая, хорошо точно передать информацию.
- Водитель поставит этиBlockорганизована вRDD,фактическиБлок — это раздел (1 раздел -> 1 задача)
1.2 Блоковый интервал и пакетный интервал
Сколько данных будет формировать блок в это время? Ответ состоит в том, чтобы формировать блок каждые 200 миллисекунд.
Сколько времени потребуется, чтобы объединить эти блоки в один RDD? Ответ - это предложение в вашем коде
// 这里的2是2秒的意思
val ssc = new StreamingContext(conf,Seconds(2));
Драйвер будет обрабатывать данные в этих 2-х как RDD
Это наши BlockInterval и BatchInterval, эти два более важных временных параметра, один по умолчанию 200 миллисекунд, а другой контролируется нашими пользователями
Мы также можем проверить параметры этих конфигураций на официальном сайте.
Затем ctrl+f, поиск по blockInterval или 200 можно найти
Это все, что нам нужно знать на данный момент, давайте напишем код
1.3 Начало работы с программой WordCount
Это неоднократно упоминалось в предыдущем сообщении.Если это программа реального времени, нас волнует ееВвод данных, обработка данных и вывод данных
1.3.1 POM-файл
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.1</spark.version>
<hadoop.version>2.7.5</hadoop.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
1.3.2 Ввод данных
Сначала нам нужно создать запись программы для потоковой передачи Spark.
// 1.数据的输入
val conf = new SparkConf()
conf.setMaster("local[2]")
conf.setAppName("wordCount")
Здесь мы объясняем, первая строка - это то, что мы только что упомянули, создание записи программы SparkCore, это не проблема, setAppName - установить имя, что такое любовь, здесь нечего сказать.
1.3.3 Добавлено расширение setMaster(local[2])
Во второй строке мы устанавливаемMaster(local[2]).Здесь следует отметить, что мы используем данные Kafka двумя способами: Receiver и Direct, которые отличаются.
На официальном сайте есть два пакета JAR,Один основан на интеграции версии 0.8, обеспечивающей интеграцию ресиверной и прямой версий.,Одним из них является интеграция версии 0.10, которая предоставляет только прямой метод.
Интеграция приемника заключается в запуске потока приемника в исполнителе Spark для извлечения данных. Получатель извлеченных данных не поможет с обработкой, поэтому приемник должен перемещать вещи, и он потребляется на основе высокоуровневого API Kafka. ,Смещение автоматически сохраняется в zookeeper, поэтому вам не нужно поддерживать его самостоятельно..
В это время поток извлечения данных и поток обработки данных не взаимодействуют друг с другом.Когда наш поток обработки данных зависает, поток извлечения данных не может его воспринять и продолжает извлекать данные. В это время все данные будут накапливаться в памяти исполнителя, и возникнут проблемы.
Прямой режим больше не запускает отдельный поток для вытягивания данных, и полученные данные больше не хранятся в экзекьюторе, а полученные данные обрабатываются напрямую.Вытягивание и обработка — это полностью группа людей.
Конечно, у него тоже есть проблемы: использование низкоуровневого API Kafka для потребления требует ручного обслуживания значения смещения. В старой версии мы также сохраним его в zookeeper.Новая версия по умолчанию хранится в топике Kafka по умолчанию.Конечно, для некоторых особых нужд существуют MySQL и Hbase... это тоже возможно.
На данный момент мы можем установитьMaster(local[1])? Если это прямой метод, то это возможно, но в данный момент мы тянем и обрабатываем его в одном потоке, но если он потребляется на основе метода Receiver, он будет завершен.Local[1] означает только запуск один поток, в это время вы обнаружите, что ваша программа не сообщает об ошибке, но данные не являются живыми, поэтому все должны обратить внимание, если это локально [*], то есть сколько процессорных ядер компьютера включено на
Позже
val ssc = new StreamingContext(conf, Seconds(2))
Получите запись программы SparkStreaming, установите 2s для формирования RDD, вы можете
1.3.3 Сбор и обработка данных
На самом деле это простой вопрос получения данных из сокета.
// 2.数据的处理
val dataDStream = ssc.socketTextStream("localhost", 8888)
val result = dataDStream.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)
1.3.4 Вывод данных
// 3.数据的输出
result.print()
Если мы приступим к корпоративной работе, мы также обнаружим, что в основном нам нужно написать только ту часть кода, которая обрабатывает данные, а конфигурация получения передней части и нисходящего вывода не будет уделять слишком много внимания.
На этом этапе мы можем запустить код и использовать netcat, упомянутый во второй статье flink, для прослушивания порта 8888 (не 8888, просто делайте что угодно)
Но в настоящее время наша статистика не является кумулятивной, и для кумулятивных требований необходимо использовать некоторые расширенные операторы.
finally
После этого случайным образом появились Flink,ES и Spark.Сейчас планирую переучить все компоненты.Заинтересованные друзья могут обратить внимание на номер паблика:Говорите свои пожелания