задний план
реактивное программирование
Реактивное программирование в настоящее время является популярным словарем в области программирования.Как создатель Scala, Lightbend (ранее Typesafe) запустил Reactive Manifesto (Reactive Manifesto). Akka, семейство Rx и даже последняя версия WebFlux для Spring присоединились к поп-культуре.
Реактивное программирование может быть не сразу понято в одном предложении.На самом деле ничего нового в себе нет.Основной упор делается на событийно-управляемый быстрый асинхронный ответ.Конечно,чтобы достичь цели,нужно подумать еще и как терпеть ошибки (четыре принципа реагирования: чувствительность, масштабируемость, отказоустойчивость, управление событиями). Простая аналогия заключается в том, что мы обычно используем шаблон MVC, Изменение модели будет уведомлять о быстром изменении уровня представления (уведомление о событии) без необходимости представления постоянно запрашивать изменения уровня модели (опрос). Это быстрый отклик через механизм событий.Конечно, реактивный режим ориентирован не только на событийный и быстрый отклик, но и на масштабирование и расширение приложений под разные нагрузки, а также на высокую отказоустойчивость в асинхронном режиме.
Поток ответов
Одной из основных проблем реактивного программирования являются реактивные потоки. Инженеры из Netflix, Pivotal, Typesafe и других выступили в качестве спонсоров инициативы 2013 года по разработке «Спецификации реактивной потоковой передачи (Reactive Stream Specification)». Он описывает характеристики потока ответа:
- Имеет возможность обрабатывать неограниченное количество элементов
- Процесс по порядку
- Асинхронно передавать элементы
- Должен обеспечиваться неблокирующее противодавление
Akka Stream полностью реализует «Спецификацию реактивной потоковой передачи (Reactive Stream Specification)».
Что угодно может быть адаптивным потоком, например переменные, пользовательский ввод, свойства, кэши, структуры данных и многое другое. Вы можете создавать, объединять, фильтровать потоки ответов, один поток ответов может использоваться в качестве входных данных для другого потока ответов, и даже несколько потоков ответов могут быть входными данными для другого потока ответов. Например, мы хотим обрабатывать события двойного щелчка или нескольких щелчков из потока ответов на щелчки:
обратное давление
Обратное давление — это способ, с помощью которого процессор сообщает восходящему потоку, чтобы он отрегулировал скорость, чтобы устранить несоответствие скорости обработки источника потока ответов и получателя. При отсутствии противодавления ответный поток может вести себя следующим образом:
Тогда есть два типа обратного давления: один заключается в том, что источник отправляет данные слишком быстро, а другой - в том, что отправка слишком занята. Принимающая сторона сообщит, чтобы принять соответствующую скорость отправки. Конечно, есть разные стратегии для конкретной обработки, я не буду здесь вдаваться в подробности, вот простой пример: На самом деле противодавление появилось очень рано, раньше противодавление обычно достигалось блокировкой производителя и ожиданием того, что потребитель обработает сообщение на своей скорости. Этот метод опоры на синхронную обработку сообщений между системами очень неэффективен и сводит на нет преимущества асинхронной обработки (большая масштабируемость и лучшее использование ресурсов), поэтому требуется метод реализации обратного давления.Неблокирующее решение. В контексте реактивных потоков противодавление является неотъемлемой частью модели асинхронной обработки и реализуется посредством асинхронной передачи сообщений.Начинать
Давайте начнем писать код, связанный с Akka Stream, чтобы понять его. Сначала создайте проект sbt и добавьте в сборку:
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.12"
Чтобы иметь возможность запускать все потоки, мы сначала добавляем дваActorSystem
иActorMaterializer
Неявная переменная :
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
object MyFirstStream {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("MyActorSystem")
implicit val materializer = ActorMaterializer()
}
}
Создайте инфраструктуру Stream
Akka Stream включает в себя три основных компонента Source, Sink и Flow.
Source
Источник — источник потока ответов, источник имеет выход данных, как показано на рисунке выше, что более наглядно описывает источник. Мы можем создать источник из различных данных:val sourceFromRange = Source(1 to 10)
val sourceFromIterable = Source(List(1, 2, 3))
val sourceFromFuture = Source.fromFuture(Future.successful("hello"))
val sourceWithSingleElement = Source.single("just one")
val sourceEmittingTheSameElement = Source.repeat("again and again")
Sink
Приемник — это конечный пункт назначения потока, включая ввод данных, мы можем создать приемник следующим образом:val sinkPrintingOutElements = Sink.foreach[String](println(_))
val sinkCalculatingASumOfElements = Sink.fold[Int, Int](0)(_ + _)
val sinkReturningTheFirstElement = Sink.head
val sinkNoop = Sink.ignore
Flow
Поток является промежуточным компонентом потока, включая ввод и вывод данных. Мы можем создать Flow следующим образом:val flowDoublingElements = Flow[Int].map(_ * 2)
val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)
val flowBatchingElements = Flow[Int].grouped(10)
val flowBufferingElements = Flow[String].buffer(1000, OverflowStrategy.backpressure) // 当buffer满了后进行背压
определить поток
Потоки могут быть представлены графами и сетями, состоящими из базовых компонентов.Начнем с самого простого способа их определения.Соединение источника и приемника может сформировать поток:
val streamCalculatingSumOfElements: RunnableGraph[Future[Int]] = sourceFromIterable.toMat(sinkCalculatingASumOfElements)(Keep.right)
Keep.right здесь показывает, что нас интересует только последнее значение, которое получает Sink.
Мы можем добавить Flow между ними, чтобы сформировать слегка дублированный поток:
val streamCalculatingSumOfDoubledElements: RunnableGraph[Future[Int]] = sourceFromIterable.via(flowDoublingElements).toMat(sinkCalculatingASumOfElements)(Keep.right)
поток выполнения
Теперь мы можем использовать метод run для выполнения ранее созданного потока, и результат будет помещен в Future.
val sumOfElements: Future[Int] = streamCalculatingSumOfElements.run()
sumOfElements.foreach(println) // 打印出6
val sumOfDoubledElements: Future[Int] = streamCalculatingSumOfDoubledElements.run()
sumOfDoubledElements.foreach(println) // 打印出12
Мы можем определять и выполнять потоки более простым способом, без промежуточных величин:
// 使用指定的sink来执行流
sourceFromIterable.via(flowDoublingElements).runWith(sinkCalculatingASumOfElements).foreach(println)
// 使用Fold所有元素的sink来执行流
Source(List(1,2,3)).map(_ * 2).runFold(0)(_ + _).foreach(println)