Завершите свою первую программу Spark Streaming

Spark

предисловие

На самом деле Spark Streaming в основном использует операторы и вводит больше кодов. В то время я думал, что о Spark Streaming особо нечего упоминать, поэтому я просто пропустил его. Потом···

Тогда я вернусь и поговорю

1. Искра потокового вещания

1.1 Запуск процесса

  1. Прежде всего, вне зависимости от того, отправляем ли мы Spark core, Spark SQL или Spark Streaming, мы будем называть их одним целым."Заявление"
  2. И нам нужно знать,Spark SQLа такжеSpark StreamingНижележащие слои зависят отSpark Core, поэтому, прежде чем мы захотим их использовать, мы должны инициализировать запись программы Spark Core.Из кода StreamingContext должен зависеть от SparkContext, а в SparkContext мы должны инициализировать службу Driver, поэтому их структура выглядит следующим образом
  1. После того, как все вышеперечисленное инициализировано, наши воркеры естественно выделили Executors
  2. Затем служба Driver отправитReceiverВозражатьExecutorНа Ресивере по умолчанию стоит только один Ресивер, конечно, его тоже можно поставить на несколько по коду
  3. После того, как Receiver запущен, его фактически можно рассматривать как задачу Task (данная Task начинает непрерывно получать данные, и инкапсулирует их в блок, а затем записывает блок в память Executor)
  4. Приемник сообщит водителю информацию об этих блоках.

Хотя картинка очень простая, хорошо точно передать информацию.

  1. Водитель поставит этиBlockорганизована вRDD,фактическиБлок — это раздел (1 раздел -> 1 задача)

1.2 Блоковый интервал и пакетный интервал

Сколько данных будет формировать блок в это время? Ответ состоит в том, чтобы формировать блок каждые 200 миллисекунд.

Сколько времени потребуется, чтобы объединить эти блоки в один RDD? Ответ - это предложение в вашем коде

// 这里的2是2秒的意思
val ssc = new StreamingContext(conf,Seconds(2));

Драйвер будет обрабатывать данные в этих 2-х как RDD

Это наши BlockInterval и BatchInterval, эти два более важных временных параметра, один по умолчанию 200 миллисекунд, а другой контролируется нашими пользователями

Мы также можем проверить параметры этих конфигураций на официальном сайте.

Затем ctrl+f, поиск по blockInterval или 200 можно найти

Это все, что нам нужно знать на данный момент, давайте напишем код

1.3 Начало работы с программой WordCount

Это неоднократно упоминалось в предыдущем сообщении.Если это программа реального времени, нас волнует ееВвод данных, обработка данных и вывод данных

1.3.1 POM-файл

<properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.2.1</spark.version>
    <hadoop.version>2.7.5</hadoop.version>
    <encoding>UTF-8</encoding>
</properties>

<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>


</dependencies>



<build>
    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
            </plugin>
        </plugins>
    </pluginManagement>
    <plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <executions>
                <execution>
                    <id>scala-compile-first</id>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>scala-test-compile</id>
                    <phase>process-test-resources</phase>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <executions>
                <execution>
                    <phase>compile</phase>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

1.3.2 Ввод данных

Сначала нам нужно создать запись программы для потоковой передачи Spark.

// 1.数据的输入
val conf = new SparkConf()
conf.setMaster("local[2]")
conf.setAppName("wordCount")

Здесь мы объясняем, первая строка - это то, что мы только что упомянули, создание записи программы SparkCore, это не проблема, setAppName - установить имя, что такое любовь, здесь нечего сказать.

1.3.3 Добавлено расширение setMaster(local[2])

Во второй строке мы устанавливаемMaster(local[2]).Здесь следует отметить, что мы используем данные Kafka двумя способами: Receiver и Direct, которые отличаются.

На официальном сайте есть два пакета JAR,Один основан на интеграции версии 0.8, обеспечивающей интеграцию ресиверной и прямой версий.,Одним из них является интеграция версии 0.10, которая предоставляет только прямой метод.

Интеграция приемника заключается в запуске потока приемника в исполнителе Spark для извлечения данных. Получатель извлеченных данных не поможет с обработкой, поэтому приемник должен перемещать вещи, и он потребляется на основе высокоуровневого API Kafka. ,Смещение автоматически сохраняется в zookeeper, поэтому вам не нужно поддерживать его самостоятельно..

В это время поток извлечения данных и поток обработки данных не взаимодействуют друг с другом.Когда наш поток обработки данных зависает, поток извлечения данных не может его воспринять и продолжает извлекать данные. В это время все данные будут накапливаться в памяти исполнителя, и возникнут проблемы.

Прямой режим больше не запускает отдельный поток для вытягивания данных, и полученные данные больше не хранятся в экзекьюторе, а полученные данные обрабатываются напрямую.Вытягивание и обработка — это полностью группа людей.

Конечно, у него тоже есть проблемы: использование низкоуровневого API Kafka для потребления требует ручного обслуживания значения смещения. В старой версии мы также сохраним его в zookeeper.Новая версия по умолчанию хранится в топике Kafka по умолчанию.Конечно, для некоторых особых нужд существуют MySQL и Hbase... это тоже возможно.

На данный момент мы можем установитьMaster(local[1])? Если это прямой метод, то это возможно, но в данный момент мы тянем и обрабатываем его в одном потоке, но если он потребляется на основе метода Receiver, он будет завершен.Local[1] означает только запуск один поток, в это время вы обнаружите, что ваша программа не сообщает об ошибке, но данные не являются живыми, поэтому все должны обратить внимание, если это локально [*], то есть сколько процессорных ядер компьютера включено на

Позже

val ssc = new StreamingContext(conf, Seconds(2))

Получите запись программы SparkStreaming, установите 2s для формирования RDD, вы можете

1.3.3 Сбор и обработка данных

На самом деле это простой вопрос получения данных из сокета.

// 2.数据的处理
val dataDStream = ssc.socketTextStream("localhost", 8888)
val result = dataDStream.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)

1.3.4 Вывод данных

// 3.数据的输出
result.print()

Если мы приступим к корпоративной работе, мы также обнаружим, что в основном нам нужно написать только ту часть кода, которая обрабатывает данные, а конфигурация получения передней части и нисходящего вывода не будет уделять слишком много внимания.

На этом этапе мы можем запустить код и использовать netcat, упомянутый во второй статье flink, для прослушивания порта 8888 (не 8888, просто делайте что угодно)

Но в настоящее время наша статистика не является кумулятивной, и для кумулятивных требований необходимо использовать некоторые расширенные операторы.

finally

После этого случайным образом появились Flink,ES и Spark.Сейчас планирую переучить все компоненты.Заинтересованные друзья могут обратить внимание на номер паблика:Говорите свои пожелания