Eine kurze Einführung

Reactive Streams mit Akka

Heiko Seeberger, Christian Uhl

© Shutterstock.com/M. Pellinni

Das Reaktive Manifest definiert nicht blockierende, asynchrone Nachrichtenübermittlung als Grundlage für antwortbereite, elastische und widerstandsfähige – kurz: reaktive – Systeme. Reactive Streams setzt darauf auf und definiert einen Standard für asynchrone Datenströme mit nicht blockierendem Rückdruck. Im Folgenden führen wir kurz anhand von Akka Streams, einer Implementierung von Reactive Streams, in das Thema ein.

Aktuelle Trends wie z. B. Real Time Analytics und Big Data lassen der Betrachtung von Daten als Datenströme eine gesteigerte Bedeutung zukommen. Gründe hierfür sind unter anderem, dass Daten genau auf diese Weise zwischen Systemen übertragen werden und insbesondere, dass die Gesamtheit der Daten oft zu groß ist, um lokal effektiv verarbeitet zu werden. Dabei ist es wichtig zu verstehen, dass es sich bei Datenströmen – anders als bei Collections – grundsätzlich um flüchtige und potenziell unbegrenzte Daten handelt: „Wer in dieselben Flüsse hinabsteigt, dem strömt stets anderes Wasser zu“ (Heraklit).

Bei Reactive Streams geht es im Kern um die Steuerung eines Datenstroms von einem Produzenten zu einem Konsumenten über eine asynchrone Grenze hinweg. Die Motivation für die Asynchronizität ergibt sich aus dem Reaktiven Manifest, insbesondere wegen Skalierbarkeit über Rechner und Rechnerkerne hinweg sowie wegen Widerstandsfähigkeit durch Kapselung und Isolation.

Abb. 1: Reactive-Streams-Flusskontrolle

Abb. 1: Reactive-Streams-Flusskontrolle

Um zu vermeiden, dass ein schneller Produzent einen langsameren Konsumenten überlastet, setzt Reactive Streams auf eine besondere Form von Flusskontrolle: Nur wenn der Konsument Bedarf signalisiert, darf der Produzent dementsprechend Daten senden. Dieser Bedarf wird übrigens – wie die Daten – nicht blockierend und asynchron übertragen. Mit anderen Worten: Der Konsument übt einen Rückdruck auf den Produzenten aus (Abb. 1). Im Endeffekt führt das dynamisch zu einem Pull- oder Push-Verfahren, je nachdem welche Seite schneller ist.

Das Reactive-Streams-API ist nicht für Endbenutzer gedacht, sondern für Implementierungen wie Akka Streams – zur Sicherstellung der Konformität mit der Reactive-Streams-Spezifikation dient ein TCK.

Akka Streams

Akka Streams implementiert – wenig verwunderlich – Reactive Streams auf Basis von Akka-Aktoren. Allerdings ist das, genau wie das Reactive-Streams-API, quasi ein Implementierungsdetail: Akka Streams bietet ein eigenständiges API für Java und Scala zum Prozessieren von Datenflüssen.

Um Wiederverwendbarkeit einfach zu machen, trennt Akka Streams zwischen Beschreibungen von Datenflusstopologien, z. B. linearen Flüssen oder fast beliebig komplexen Graphen und deren Ausführung. Im Rahmen dieser Einführung werden wir uns auf einfache lineare Flüsse beschränken, welche mit folgenden Bausteinen beschrieben werden können:

  • Source: ein Baustein mit einem Ausgang
  • Flow: ein Baustein mit einem Eingang und einem Ausgang
  • Sink: ein Baustein mit einem Eingang

Mithilfe von Kombinatoren können diese Bausteine zu einem RunnableGraph zusammengefügt werden, welcher ausgeführt werden kann. Abbildung 2 zeigt schematisch das Zusammensetzen anhand von zwei Beispielen, einem minimalen RunnableGraph bestehend aus einer Source und einer Sink, sowie einem weiteren, der zusätzlich einen Flow enthält.

Abb. 2: RunnableGraphs zusammensetzen

Abb. 2: RunnableGraphs zusammensetzen

Listing 1 zeigt ein einfaches konkretes Beispiel: Die Source basiert auf einer Collection, die Sink, welche mit der foreach-Methode erzeugt wird, führt die übergebene Funktion für jedes Element aus und die to-Methode verbindet beide zu einem RunnableGraph. Um diesen auszuführen, benötigen wir einen Materializer, welcher steuert, wie die abstrakte Flussbeschreibung konkret umgesetzt wird.

Listing 1
def demo1()(implicit mat: Materializer): Unit = {
  val printNumbers = Source(1.to(7)).to(Sink.foreach(println))
  println("Demo 1:")
  printNumbers.run()
}

Selbstverständlich ist dieses Beispiel – ebenso wie die weiteren – zu Demonstrationszwecken künstlich konstruiert. Der geneigte Leser möge sich als Source vielleicht einen Stream von Kafka oder Spark vorstellen und als Sink eine Datenbank wie Cassandra.

Wenn der RunnableGraph selbst nicht benötigt wird, können wir mit der runWith-Methode, welche eine Sink erwartet, direkt zur Ausführung übergehen. Listing 2 zeigt das entsprechend modifizierte Beispiel von vorher, wobei die mit der fold-Methode erzeugte Sink analog zur foldLeft-Methode, die vom Collection API bekannt ist, die übergebene Funktion auf die Zwischenwerte und die Elemente des Datenflusses anwendet – in unserem Beispiel wird einfach die Summe gebildet.

Listing 2
demo2().onSuccess { case sum => println(s"Demo 2: $sum") }
...
def demo2()(implicit mat: Materializer): Future[Int] =
 Source(1.to(7)).runWith(Sink.fold(0)(_ + _))
Ende

Listing 2 zeigt ein sehr wichtiges Konzept: Jeder Baustein – also jede Source, jeder Flow und jede Sink – besitzt einen materialisierten Wert, dessen Typ zur Kompilierzeit feststeht und dessen Wert sich zur Laufzeit durch die Ausführung ergibt. Obige Source(1.to(7)) ist vom Typ Source[Int, Unit], wobei das zweite Typargument – also Unit – für den materialisierten Wert steht. Ein bisschen interessanter wird es bei obiger Sink.foreach, welche vom Typ Sink[Int, Future[Unit]] ist. Hier können wir mit dem materialisierten Wert feststellen, ob die Ausführung fertig ist. So richtig interessant ist die Sink.fold, welche vom Typ Sink[Int, Future[Int]] ist, also ein „richtiges“ Ergebnis enthält.

Ein weiteres sehr wichtiges Grundprinzip von Akka Streams, wenngleich hier nicht direkt ersichtlich, ist: Alle Bausteine sind unveränderlich und können beliebig wiederverwendet werden. Erst durch das Materialisieren wird ein RunnableGraph ausgeführt – zuvor handelt es sich dabei, sowie bei allen enthaltenen Bausteinen, quasi um Blaupausen.

Wenn zwei Bausteine kombiniert werden, gilt dies auch für die materialisierten Werte. Die to-Methode übernimmt einfach den linken Wert, aber mit der toMat-Methode kann man eine Funktion angeben, welche beide materialisierten Werte als Argumente erhält. So kann man beispielsweise den rechten Wert übernehmen oder beide zu einem Tupel kombinieren. Listing 3 zeigt eine entsprechende Modifikation des ursprünglichen Beispiels, welches mit der vordefinierten Funktion Keep.right die Future[Unit] der Sink.foreach zurückgibt.

Listing 3
def demo1()(implicit mat: Materializer): Future[Unit] = {
  val printNumbers = Source(1.to(7)).toMat(Sink.foreach(println))(Keep.right)
  println("Demo 1:")
  printNumbers.run()
}

Die runWith-Methode übernimmt übrigens den materialisierten Wert der übergebenen Sink, also den rechten Wert. Deshalb ist die demo2-Methode auch vom Typ Future[Int], repräsentiert also den materialisierten Wert der mit Sink.fold erzeugten Sink.

Abschließend zeigen wir in Listing 4 noch ein etwas vollständigeres Beispiel, welches zum einen eine unbegrenzte Source auf Basis eines Iterators enthält, zum anderen einen Flow definiert und mithilfe der via-Methode mit der Source kombiniert. Des Weiteren kommen die Methoden take, map und filter zum Einsatz, die den Datenstrom modifizieren: Die Ausführung wird nach sieben Elementen beendet, zu jedem Element wird eins addiert und schließlich werden nur durch drei teilbare Elemente durchgelassen.

Listing 4
def demo3()(implicit mat: Materializer): Future[Int] = {
  val doubler = Flow[Int].map(_ * 2)
  Source(() => Iterator.from(1))
   .via(doubler)
   .take(7)
   .map(_ + 1)
   .filter(_ % 3 == 0)
   .runWith(Sink.fold(0)(_ + _))
}

Ausblick

Mit dieser kurzen Einführung haben wir nur an der Oberfläche gekratzt, sowohl was Reactive Streams, als auch Akka Streams angeht. Wir hoffen, dass wir Interesse für das immer wichtiger werdende Thema der Datenströme und deren Verarbeitung sowie der genannten Werkzeuge wecken konnten, denn es gibt noch so viel mehr zu entdecken, z. B. Flussgraphen in Akka Streams, Akka HTTP, das auf Akka Streams aufbaut oder die Interoperabilität zwischen verschiedenen Implementierungen von Reactive Streams. Der komplette Sourcecode für obige Beispiele steht auf GitHub zur Verfügung. Die Autoren freuen sich über Rückmeldungen!

Aufmacherbild: landscape with mountains trees and a river in front von Shutterstock / Urheberrecht: M. Pellinni

Verwandte Themen:

Geschrieben von
Heiko Seeberger
Heiko Seeberger
Heiko Seeberger is Fellow at codecentric. He has been a Scala enthusiast ever since he came to know this fascinating language in 2008. Deeply interested in distributed computing he became a contributor to the amazing Akka project in 2010. Heiko has more than 20 years of professional expertise in consulting and software development and regularly shares his expertise in articles and talks. He is also the author of the German Scala book "Durchstarten mit Scala". Heiko tweets as @hseeberger and blogs under heikoseeberger.de.
Christian Uhl

Christian Uhl ist Consultant bei codecentric, Experte für Search, Big-Data-Themen und leidenschaftlicher Scala-Entwickler. Er berät aktuell Kunden bei der Migration zu Microservices in funktionalen Sprachen.

Twitter: @chrisuhlcc

Kommentare

Hinterlasse einen Kommentar

avatar
4000
  Subscribe  
Benachrichtige mich zu: