Первое знакомство с Akka Stream

задняя часть MVC реактивное программирование Akka

задний план

реактивное программирование

Реактивное программирование в настоящее время является популярным словарем в области программирования.Как создатель Scala, Lightbend (ранее Typesafe) запустил Reactive Manifesto (Reactive Manifesto). Akka, семейство Rx и даже последняя версия WebFlux для Spring присоединились к поп-культуре.

Реактивное программирование может быть не сразу понято в одном предложении.На самом деле ничего нового в себе нет.Основной упор делается на событийно-управляемый быстрый асинхронный ответ.Конечно,чтобы достичь цели,нужно подумать еще и как терпеть ошибки (четыре принципа реагирования: чувствительность, масштабируемость, отказоустойчивость, управление событиями). Простая аналогия заключается в том, что мы обычно используем шаблон MVC, Изменение модели будет уведомлять о быстром изменении уровня представления (уведомление о событии) без необходимости представления постоянно запрашивать изменения уровня модели (опрос). Это быстрый отклик через механизм событий.Конечно, реактивный режим ориентирован не только на событийный и быстрый отклик, но и на масштабирование и расширение приложений под разные нагрузки, а также на высокую отказоустойчивость в асинхронном режиме.

响应式编程准则

Поток ответов

Одной из основных проблем реактивного программирования являются реактивные потоки. Инженеры из Netflix, Pivotal, Typesafe и других выступили в качестве спонсоров инициативы 2013 года по разработке «Спецификации реактивной потоковой передачи (Reactive Stream Specification)». Он описывает характеристики потока ответа:

  • Имеет возможность обрабатывать неограниченное количество элементов
  • Процесс по порядку
  • Асинхронно передавать элементы
  • Должен обеспечиваться неблокирующее противодавление

Akka Stream полностью реализует «Спецификацию реактивной потоковой передачи (Reactive Stream Specification)».

Что угодно может быть адаптивным потоком, например переменные, пользовательский ввод, свойства, кэши, структуры данных и многое другое. Вы можете создавать, объединять, фильтровать потоки ответов, один поток ответов может использоваться в качестве входных данных для другого потока ответов, и даже несколько потоков ответов могут быть входными данными для другого потока ответов. Например, мы хотим обрабатывать события двойного щелчка или нескольких щелчков из потока ответов на щелчки:

点击响应流

обратное давление

Обратное давление — это способ, с помощью которого процессор сообщает восходящему потоку, чтобы он отрегулировал скорость, чтобы устранить несоответствие скорости обработки источника потока ответов и получателя. При отсутствии противодавления ответный поток может вести себя следующим образом:

没有背压
Тогда есть два типа обратного давления: один заключается в том, что источник отправляет данные слишком быстро, а другой - в том, что отправка слишком занята. Принимающая сторона сообщит, чтобы принять соответствующую скорость отправки. Конечно, есть разные стратегии для конкретной обработки, я не буду здесь вдаваться в подробности, вот простой пример:
背压
背压
На самом деле противодавление появилось очень рано, раньше противодавление обычно достигалось блокировкой производителя и ожиданием того, что потребитель обработает сообщение на своей скорости. Этот метод опоры на синхронную обработку сообщений между системами очень неэффективен и сводит на нет преимущества асинхронной обработки (большая масштабируемость и лучшее использование ресурсов), поэтому требуется метод реализации обратного давления.Неблокирующее решение. В контексте реактивных потоков противодавление является неотъемлемой частью модели асинхронной обработки и реализуется посредством асинхронной передачи сообщений.

Начинать

Давайте начнем писать код, связанный с Akka Stream, чтобы понять его. Сначала создайте проект sbt и добавьте в сборку:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.12"

Чтобы иметь возможность запускать все потоки, мы сначала добавляем дваActorSystemиActorMaterializerНеявная переменная :

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

object MyFirstStream {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("MyActorSystem")
    implicit val materializer = ActorMaterializer()
    
  }
}

Создайте инфраструктуру Stream

Akka Stream включает в себя три основных компонента Source, Sink и Flow.

Source

Source
Источник — источник потока ответов, источник имеет выход данных, как показано на рисунке выше, что более наглядно описывает источник. Мы можем создать источник из различных данных:

val sourceFromRange = Source(1 to 10)
val sourceFromIterable = Source(List(1, 2, 3))
val sourceFromFuture = Source.fromFuture(Future.successful("hello"))
val sourceWithSingleElement = Source.single("just one")
val sourceEmittingTheSameElement = Source.repeat("again and again")

Sink

Sink
Приемник — это конечный пункт назначения потока, включая ввод данных, мы можем создать приемник следующим образом:

val sinkPrintingOutElements = Sink.foreach[String](println(_))
val sinkCalculatingASumOfElements = Sink.fold[Int, Int](0)(_ + _)
val sinkReturningTheFirstElement = Sink.head
val sinkNoop = Sink.ignore

Flow

Flow
Поток является промежуточным компонентом потока, включая ввод и вывод данных. Мы можем создать Flow следующим образом:

val flowDoublingElements = Flow[Int].map(_ * 2)
val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)
val flowBatchingElements = Flow[Int].grouped(10)
val flowBufferingElements = Flow[String].buffer(1000, OverflowStrategy.backpressure) // 当buffer满了后进行背压

определить поток

Потоки могут быть представлены графами и сетями, состоящими из базовых компонентов.Начнем с самого простого способа их определения.Соединение источника и приемника может сформировать поток:

Source Sink

val streamCalculatingSumOfElements: RunnableGraph[Future[Int]] = sourceFromIterable.toMat(sinkCalculatingASumOfElements)(Keep.right)

Keep.right здесь показывает, что нас интересует только последнее значение, которое получает Sink.

Мы можем добавить Flow между ними, чтобы сформировать слегка дублированный поток:

Source Flow Sink

val streamCalculatingSumOfDoubledElements: RunnableGraph[Future[Int]] = sourceFromIterable.via(flowDoublingElements).toMat(sinkCalculatingASumOfElements)(Keep.right)

поток выполнения

Теперь мы можем использовать метод run для выполнения ранее созданного потока, и результат будет помещен в Future.

val sumOfElements: Future[Int] = streamCalculatingSumOfElements.run()
sumOfElements.foreach(println) // 打印出6
val sumOfDoubledElements: Future[Int] = streamCalculatingSumOfDoubledElements.run()
sumOfDoubledElements.foreach(println) // 打印出12

Мы можем определять и выполнять потоки более простым способом, без промежуточных величин:

// 使用指定的sink来执行流
sourceFromIterable.via(flowDoublingElements).runWith(sinkCalculatingASumOfElements).foreach(println)
// 使用Fold所有元素的sink来执行流
Source(List(1,2,3)).map(_ * 2).runFold(0)(_ + _).foreach(println)