Flow API, CompletableFuture und Concurrency Bibliothek

JEP 266: Das steckt im Detail hinter „More Concurrency Updates“

Vadym Kazulkin, Rodion Alukhanov

© Shutterstock.com / Wright Studio

Seit Version 8 unterstützt Java mit dem CompletableFuture/CompletionStage API die Continuation-Style-Programmierung, mit Futures und Stream API gibt es in Java Pull-Style-Operationen auf Elementen der Collections. Allerdings mangelte es an der Unterstützung des APIs für Push-Style-Operationen auf Elementen, die aus einer aktiven Quelle verfügbar gemacht werden. Das ändert sich mit Java 9, denn damit haben die wichtigsten Interfaces wie Publisher und Subscriber Einzug in die Concurrency-Bibliothek als Flow API gehalten. Außerdem gibt es Erweiterungen des seit Java 8 existierenden CompletableFuture APIs und andere eher kleinere Anpassungen. Diese im Java Extension Proposal (JEP) 266 beschriebenen Erweiterungen tragen die Bezeichnung “More Concurrency Updates”.

Seit Java 5 gibt es den Type Future, der allerdings beim Aufruf der Methode get() den aktuellen Thread blockiert. Abhilfe dafür wurde in Java 8 mit der Einführung der APIs CompletableFuture und CompletionStage geschaffen. Diese ermöglichen es, die Verarbeitung in der definierten Callback-Methode asynchron fortzusetzen bzw. weitere Schritte daran zu knüpfen. JavaScript-Entwickler kennen diese Vorgehensweise unter der Bezeichnung “Promise”.

Diese Abstraktion hat ihre Einschränkungen. Wenn wir beispielsweise CompletableFuture<List<T>> haben, müssen wir solange warten, bis wir die Liste komplett aufgebaut haben. Erst danach können wir deren Elemente im Callback verarbeiten. Es wäre besser, sie verarbeiten zu können, sobald sie verfügbar sind.

Das Stream API, das ebenfalls seit Java 8 existiert, ist ein pull-basiertes API. Zum Ausführen von Latenz-sensitiven I/O-Operationen ist es nur begrenzt geeignet und wurde dafür auch nicht konzipiert. Ein weiterer Nachteil des Stream APIs liegt darin, dass es vor allem für die Verarbeitung der Elemente der Collections und Arrays bereitgestellt wurde. Genau an dieser Stelle glänzen die reaktiven APIs. Zu deren Stärken gehört allerdings nicht die blockierende Verarbeitung der potenziell unendlichen Sequenzen von beliebigen Events, die mit Latenz ankommen können.

Java 9 Flow API und die Reactive-Streams-Initiative

Mit Java 9 haben folgende vier API-Komponenten in Form von Interfaces Eingang ins Package java.util.concurrent.Flow gefunden. In folgendem Listing ist dies zu sehen:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
 
public interface Subscription {
    public void request(long n);
    public void cancel();
}
 
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Das Flow API in JDK 9 entspricht der Spezifikation Reactive Streams. Die Reactive-Streams-Spezifikation ist eine der Initiativen zur Standardisierung der reaktiven Programmierung. Sie wurde von den folgenden Firmen gegründet: Netflix, Pivotal, Red Hat, Twitter und Typesafe. Außer den besagten vier Interfaces wird die Regel (Spezifikation) für die Reactive Streams definiert und ein Technology Compatibility Kit (TCK) mitgeliefert. Mehrere Implementierungen unterstützen bereits die Reactive-Streams-Spezifikation, darunter unter anderem Spring Reactor 3, RxJava 2, Akka Stream etc., die jeweils von den Mitgliedern der Initiative stammen. Oracle hat lediglich die Interfaces aus der Spezifikation Reactive Streams als Flow API in Java 9 übernommen.

Lesen Sie auch: Java 9 ist da! Project Jigsaw & die neuen Features auf einen Blick

Das Flow API (und das Reactive Streams API) ist in gewisser Weise eine Kombination von Ideen aus Iterator- und Observer-Patterns. Der Iterator ist ein Pull-Modell, bei dem die Applikation Items aus der Quelle zieht. Der Observer ist ein Push-Modell, bei dem die Items aus der Quelle an die Applikation gepusht werden. Mit dem Flow API fordert der Subscriber zunächst n Items vom Publisher an, dann pusht der Publisher höchstens n Items an den Subscriber. Damit ist das Flow API eine Mischung aus Pull- und Push-Programmiermodellen (Abbildung 1).

Abb. 1: Pull- und Push-Programmiermodellen

Abb. 1: Pull- und Push-Programmiermodelle

Bevor wir uns aber die vier genannten API-Komponenten und deren Zusammenspiel im Detail anschauen, werfen wir einen Blick auf die reaktive Programmierung.

Zusammenfassend ist Reactive Streams eine Kombination aus einem Standard und einer Spezifikation für Stream-orientierte Bibliotheken für die JVM, die

  • potenziell eine unbegrenzte Anzahl an Elementen sequenziell verarbeitet,
  • dabei Elemente zwischen Komponenten asynchron übergibt
  • und mit zwingendem, nicht blockierendem Gegendruck (Backpressure) arbeitet.

Reaktive Programmierung

Die Eigenschaften, die reaktive Anwendungen erfüllen müssen, sind im Reaktiven Manifest zusammengefasst (Abbildung 2).

Abb. 2: Eigenschaften reaktiver Anwendungen

Abb. 2: Eigenschaften reaktiver Anwendungen

  • Responsive: Das System reagiert in akzeptabler Zeit.
  • Resilient: Das System reagiert auch im Falle der auftretenden Probleme. Die Resilienz wird durch Replikation, Isolation und Delegation erreicht. Ausfälle können in jeder Komponente vorkommen. Die Isolation der Komponenten voneinander gewährleistet, dass Teile des Systems ausfallen und sicher wiederhergestellt werden können, ohne einen Ausfall des ganzen Systems zu verursachen.
  • Elastisch: Das System reagiert auf wechselnde Arbeitsbelastung. Reaktive Systeme können auf Änderungen der Lastschwankungen reagieren, weil sie so entwickelt worden sind, dass sich die einzelnen voneinander unabhängigen Komponenten nach Bedarf replizieren lassen.
  • Message Driven: Reaktive Systeme setzen auf asynchrone Nachrichtenübermittlung, um eine Grenze zwischen Komponenten zu schaffen, die eine lose Kopplung oder Isolation gewährleistet. Diese Grenze bietet auch die Möglichkeit, Fehler als Nachrichten zu übermitteln.

Die Verarbeitung der Datenströme – insbesondere von „Live“-Daten, deren Volumen nicht eingeschränkt ist – erfordert eine besondere Sorgfalt innerhalb eines synchronen Systems. Der entscheidende Punkt ist, den Ressourcenverbrauch sorgfältig zu kontrollieren, sodass eine schnelle Datenquelle (Publisher) das Stream-Ziel (Subscriber) nicht überwältigt. Asynchronität wird benötigt, um die parallele Nutzung von Rechenressourcen sowie die Zusammenarbeit der Netzwerk-Hosts oder mehrerer CPU-Kerne innerhalb einer einzigen Maschine zu ermöglichen.

 API-Komponenten

Laut der Spezifikation von Reactive Streams besteht das API aus vier Komponenten. Der Anbieter einer JDK-9-konformen Implementierung muss folgende Komponenten bereitstellen:

  • Publisher
  • Subscriber
  • Subscription
  • Processor

Ein Publisher ist der Anbieter einer potenziell unbegrenzten Anzahl von sequenzierten Elementen. Er veröffentlicht sie entsprechend der Nachfrage, die er von einem oder mehreren Subscriber(n) erhält.

Wenn ein Subscriber sich beim Publisher registriert, benachrichtigt ihn der Publisher über die Ereignisse. Die Registrierung erfolgt durch den Aufruf der Methode Publisher.subscribe. Die Benachrichtigung vom Subscriber erfolgt über eine Reihe von Callback-Methoden. Siehe Abbildung 3.

Abb. 3: Kommunikation zwischen Publisher und Subscriber

Abb. 3: Kommunikation zwischen Publisher und Subscriber

Durch onSubscribe wird die Registrierung bestätigt. Danach folgt eine Reihe von onNext-Signalen. Je nach Publisher können unbegrenzt viele oder gar keine ankommen. Die onNext-Signale kommen nicht unangefordert, sondern müssen vom Subscriber zusätzlich bestellt werden.

Die Signal-Sequenz kann in folgenden Fällen unterbrochen werden: Der Publisher hat keine weitere Daten mehr und ruft onComplete auf. Der Publisher meldet ein Problem durch den onError-Aufruf. Die Subscription wird seitens des Subscribers storniert.

Das Interface Subscription ist eine Art Umfangskontrolle. Die request-Methode ist die Implementierung des Backpressure-Konzepts. Mit deren Hilfe wird dem Publisher mitgeteilt, so viele Ereignisse zu erzeugen, wie auch verarbeitet werden können.

Ein Processor repräsentiert eine Verarbeitungsstufe, die sowohl ein Subscriber als auch ein Publisher ist und den Kontrakten beider entsprechen muss.

README.md vom Projekt reactive-streams-jvm enthält eine Reihe von Regeln, die die Reactive-Streams-Implementierung zu erfüllen hat. Es lohnt sich, diese zu lesen.

Erweiterungen des CompletableFuture APIs

Das seit Java 8 existierende CompletableFuture API besitzt die Eigenschaften einer Future- und CompletionStage (beide aus dem Package java.util.concurrent). Diese Future kann explizit abgeschlossen und weitere Funktionen und Aktionen daraufhin getriggert werden.

In Java 9 gibt es einige Erweiterungen des CompletableFuture APIs. Dazu gehören die Unterstützung von Delays und Timeouts sowie die Verbesserung der Unterstützung von Subclassing- und Factory-Methoden. Es sind die zeitbasierten Methoden, die es einer Future ermöglichen, mit einem Wert oder einer Exception nach einer gewissen Dauer abzuschließen:

Die Methode completeOnTimeout(T value, long timeout, TimeUnit unit) schließt diese CompletableFuture mit dem als Parameter übergebenen Wert ab, falls diese nicht bereits vor dem als Parameter übergebenen Timeout abgeschlossen wurde.

Die alternative orTimeout(long timeout, TimeUnit unit) "exceptionally" schließt diese CompletableFuture mit einer TimeoutException ab, falls diese nicht bereits vor dem als Parameter übergebenen Timeout abgeschlossen wurde.

Es wurden verschiedene Verbesserungen an CompletableFuture vorgenommen, um es zu vereinfachen, diese Klasse zu erweitern. Beispielsweise ist es möglich, die neue Executor-Methode defaultExecutor() zu überschreiben, um einen alternativen Standard-Executor zu unterstützen.

Java 8 führte die Factory-Methode completeFuture(U value) ein. Eine solche Future ist bereits mit einem vorgegebenen Wert abgeschlossen. Java 9 ergänzt diese Methode mit einer Factory-Methode failedFuture(Throwable ex). Diese Future ist mit einem vordefinierten Wert „exceptionally“ abgeschlossen.

Darüber hinaus stellt Java 9 das folgende Paar der Factory-Methoden in CompletableFuture bereit, um korrekte und mit der Exception abgeschlossene Completion Stages zurückzugeben:

  • completedStage(U value) gibt eine neue CompletionStage zurück, die bereits mit dem angegebenen Wert abgeschlossen ist.
  • failedStage(Throwable ex) gibt eine neue CompletionStage zurück, die bereits mit der angegebenen Exception abgeschlossen ist und unterstützt nur die Methoden in der Schnittstelle CompletionStage.

Weitere kleine Anpassungen an der Concurrency-Bibliothek

Seit Java 8 wurden im Rahmen des JEP 193 zahlreiche Anpassungen an der Concurrency-Bibliothek vorgenommen. Zum Teil decken sich die Änderungen mit den angekündigten kleineren Anpassungen im Rahmen des JEP 226. Anbei eine kleine Übersicht.

Der Entwickler bekommt mit dem JDK 9 mehr Kontrolle über Speicherzugriffe. Die meisten neuen Features finden sich in den Klassen MethodHandles und VarHandle [1].

Man erhält die Möglichkeit, auf bereits existierende Variablen nach Bedarf als volatile oder plain zuzugreifen. Auf eine non-volatile-Variable kann dadurch mit volatile-Semantik zugegriffen werden und umgekehrt. Dazu zählen auch die Zugriffe auf Elemente eines Arrays, was vor JDK 9 nur über Umwege möglich gewesen ist. Eine CAS-Operation auf einem Array wird in folgendem dargestellt.

String[] sa = ...
VarHandle avh = MethodHandles.arrayElementVarHandle(String[].class);
boolean r = avh.compareAndSet(sa, 10, "expected", "new");

Die Klasse VarHandle selbst ist nicht für den täglichen Einsatz gedacht. Sie ist vielmehr eine dokumentierende Version von Suns Unsafe-Klasse und bildet eine Grundlage für die Entwicklung von Bibliotheken. Andere Klassen, z.B. AtomicReferenceArray, profitieren von den neuen Funktionen und bekommen weitere Methoden für die Steuerung der Speicherzugriffe.

Zwei der wichtigsten Neuerungen, die von der Java-Community schon länger gefordert wurden, sind die speziellen Zugriffsmöglichkeiten Opaque und Release/Acquire. Sie sind performanter und flexibler, als es mit volatile und synchronized möglich wäre [1]. Die Klassen VarHandle und AtomicReferenceArray bieten eine Reihe von Methoden, die dies ermöglichen.

Sehen Sie auch: Unsere Java 9 Infografik: Project Jigsaw auf einen Blick

Fazit

Mit der Einführung von Java 9 hat Oracle die Bedeutung der reaktiven Programmierung erkannt und von der Reactive-Streams-Initiative herausgearbeitete Interfaces in die Bibliothek aufgenommen. Dabei hat Java keinen Anspruch darauf, eigene Implementierungen zu liefern, sondern der Entwickler muss sich nach wie vor für eine der bestehenden Bibliotheken entscheiden. Im Moment implementieren diese Bibliotheken die Reactive Streams Interfaces. In der nahen Zukunft ist jedoch zu erwarten, dass sie auf die Java Flow Interfaces setzen. Im Moment werden aber Adaptoren angeboten, bspw. JdkFlowAdaptor für das Spring-Reactor-Projekt. Außerdem wurden im Rahmen des JEP 266 („More Concurrency Updates“) Erweiterungen am seit Java 8 existierenden CompletableFuture API und sonstige kleine Anpassungen umgesetzt.

Verwandte Themen:

Geschrieben von
Vadym Kazulkin
Vadym Kazulkin
Vadym Kazulkin ist Chief Software Architekt bei ip.labs GmbH, einer 100% Tochter der FUJIFLM Gruppe mit Sitz in Bonn. Ip.labs ist das weltweit führende White Label E-Commerce Unternehmen im Bereich Softwareanwendungen für den digitalen Fotoservice, mit einem modernen und motivierenden Arbeitsumfeld, das technologisch immer einen Schritt voraus ist. Vadym beschäftigt sich mit dem Java-Ökosystem seit über 15 Jahren. Dabei legt er sein Augenmerk auf die Themen rund um Effective Java, Design Patterns, Nebenläufigkeit und Performanz. Seine Schwerpunkte und Interessen gelten derzeit der Konzeption und Implementierung der hochskalierbaren und hochverfügbaren Anwendungen und AWS-Cloud.
Rodion Alukhanov
Rodion Alukhanov
Rodion Alukhanov ist Senior Software Entwickler bei ip.labs GmbH mit Sitz in Bonn. Seine mehrjährige Erfahrung streckt sich von MS-DOS über die Hausautomation bis in die AWS Cloud. Seine Schwerpunkte sind Cloud Migration, Big Data und Integration Tests.
Kommentare

Hinterlasse einen Kommentar

Hinterlasse den ersten Kommentar!

avatar
400
  Subscribe  
Benachrichtige mich zu: