Suche
Reaktive Programmierung mit Java: CompletableFuture

Checkpoint Java: Reaktive Programmierung mit Fiber und CompletableFuture

Sven Ruppert

© Shutterstock/Suttha Burawonk

Nach den ersten Schritten in der Reaktiven Programmierung mit dem Observer Pattern, schauen wir uns diesmal das CompletableFuture genauer an. Hier ist es wichtig ein Auge auf blockierende und nicht-blockierende Aufrufe zu haben.

Im letzten Teil dieser Serie haben wir uns angesehen, wie wir einen minimalen reaktiven Ansatz mittels Observer Pattern realisieren können. Dabei wurde deutlich, dass es einiges an Implementierung bedarf, um die Beschränkungen der ersten Version aufzuheben. Das JDK selbst bietet seit Java 8 eine komfortable Implementierung. Genau das werden wir uns nun ansehen.

Zuzüglich zu den Quelltextbeispielen in diesem Artikel verwende ich auch die Sourcen des Open-Source-Projekts Functional-Reactive. Die Sourcen liegen auf GitHub.

Es gibt verschiedene Möglichkeiten für Entwickler Abläufe konkurrierend ablaufen zu lassen. Diese Möglichkeiten nutzen unterschiedliche Isolationsstufen und verschiedene Ressourcen. Auf Betriebssystemebene sind wir zum Beispiel in der Lage, verschiedene Dinge in unterschiedliche Prozesse auszulagern. Innerhalb einer Anwendung kennt man den Thread, mittels dem das Programm bzw. die Laufzeitumgebung Dinge auf verschiedene Prozessoren verteilen kann. Ich lasse an dieser Stelle aus, das es unterschiedliche Mappings gibt und nehme vereinfachend an, dass ein Thread auf einen Betriebssystem-Thread gemappt wird.

Jedoch gibt es auch hier noch etwas Leichtgewichtigeres. Hier kommt der Begriff Fiber ins Spiel.
Wer mehr zu der Begriffsdefinition lesen möchte, den verweise ich als Anfangslektüre auf Wikipedia.
An dieser Stelle möchte ich nicht im Detail die Unterschiede erläutern, sondern als Gedankenstütze formulieren, dass ein Fiber etwas Kleineres als ein Thread ist. Ganz vereinfachend ausgedrückt, ein Thread kann mehrere Fiber verarbeiten.

Abb. 1: Zwei Werte werden aus verschiedenen Quellen entnommen und mittels Operator zu einem Ergebnis verarbeitet

Wir werden uns nun das CompletableFuture genauer ansehen. Ich nehme vereinfachend an, dass ein CompletableFuture auf der Ebene eines Fibers zu verorten ist. Das bedeutet, dass wir einen Pool von Threads haben, der dafür verwendet wird eine beliebige Menge an Aufgaben mittels CompletableFuture abzuarbeiten. Als Beispiel nehmen wir an dieser Stelle an, dass zwei Werte aus verschiedenen Quellen entnommen werden müssen und mittels Operator zu einem Ergebnis verarbeitet werden. Dieses Ergebnis und ein weiterer Wert aus einer dritten Quelle wird mit einem weiteren Operator zu einem finalen Ergebnis verarbeitet. Dabei ist wichtig, dass die Quellen mit unterschiedlichen Zeiten antworten. Das bedeutet, dass immer auf einen der beiden Eingangswerte gewartet werden muss. Die Wahrscheinlichkeit, dass beide Werte gleichzeitig zur Verfügung stehen ist sehr gering. Es lässt sich leicht erkennen, dass es hier zu blockierenden Aufrufen kommen wird (Abb. 1). Das wollen wir nun in reaktiver Manier vermeiden. Wie können wir das mit CompletableFuture abbilden?

CompletableFuture: Der Anfang

Die Klasse CompletableFuture eröffnet prinzipiell zwei Wege, eine Aufgabe zu formulieren. Dabei werden wiederum zwei Fälle unterscheiden. Die Aufgabe liefert ein Ergebnis zurück und es wird kein Ergebnis zurückgeliefert. Aufgaben, die kein Ergebnis zurückliefern, werden mit Runnable formuliert. Dieses Interface ist schon aus der klassischen Java-Programmierung mit Threads bekannt. Das es sich um ein FunctionalInterface handelt, können wir für die Formulierung auf Lambdas zurückgreifen.

    final CompletableFuture<Void> cfRunnableA = CompletableFuture.runAsync(new Runnable() {
      @Override
      public void run() {
        System.out.println("Hello reactive World");
      }
    });

    final CompletableFuture<Void> cfRunnableB = CompletableFuture
        .runAsync(() -> System.out.println("Hello reactive World"));

Wird ein Ergebnis erwartet, kann man die Methode supplyAsync verwenden. Hier wird ein Supplier übergeben. Der Supplier soll es dem CompletableFuture ermöglichen, diesen dann mit dem übergebenen Supplier anzufordern, wenn der Wert für die Verarbeitung benötigt wird.

    final CompletableFuture<String> cfCallableA = CompletableFuture
        .supplyAsync(new Supplier<String>() {
          @Override
          public String get() {
            return "Hello reactive World";
          }
        });

    final CompletableFuture<String> cfCallableB = CompletableFuture
        .supplyAsync(() -> "Hello reactive World");

In den gerade gezeigten Beispielen erhalten wir jeweils eine Instanz der Klasse CompletableFuture. Im ersten Beispiel ein CompletableFuture und im zweiten Beispiel ein CompletableFuture.

CompletableFuture: Das Ende

Mit diesen Instanzen kann man auf die Verarbeitung warten. Hier gibt es verschiedene Möglichkeiten. Die Methoden join(), get() und getNow() sind alle blockierend. Es wird also gewartet, bis das Ergebnis eingetroffen oder eine Ausnahmesituation aufgetreten ist. Mit der Methode join() kann gewartet werden, bis das Ergebnis vorhanden ist. Die Methode liefert das Value direkt, ohne es eventuell nochmal in ein Optional zu verpacken. Es kann aber natürlich dazu kommen, dass ewig gewartet wird. Sobald ein Fehler in der Verarbeitung in Form einer Exception auftritt, wird dieser als RuntimeException verpackt geworfen. Bei der Methode get(), die es in den zwei Ausprägungen mit und ohne Angabe eines Time-outs gibt, kann ebenfalls auf das Ergebnis gewartet werden. Die Methode enthält im Unterschied zu der Methode join() zwei bzw. drei Exceptions in der Signatur: InterruptedException, TimeoutException und ExecutionException.

    final CompletableFuture<Void> cfRunnableB = CompletableFuture
        .runAsync(() -> System.out.println("Hello reactive World"));

    try {
      final Void aVoidA = cfRunnableB.get();
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }

    try {
      final Void aVoidB = cfRunnableB.get(1_000 , MILLISECONDS);
    } catch (InterruptedException | TimeoutException | ExecutionException e) {
      e.printStackTrace();

Wenn zu einem definierten Zeitpunkt ein Ergebnis geholten werden soll, kann die Methode getNow() zum Einsatz kommen. Allerdings muss man hier auch immer einen alternativen Wert angeben, da es nicht garantiert ist, dass der finale Wert schon erzeugt worden ist. Dafür hat diese Methode keine Exception in der Signatur. Ob und wann diese Methode eingesetzt werden kann, muss der Entwickler selbst entscheiden.

    final CompletableFuture<String> cfCallableB = CompletableFuture
        .supplyAsync(() -> "Hello reactive World");

    final String now = cfCallableB.getNow("alternative");

Es gibt jedoch auch die Möglichkeit, zu definieren, was passieren soll, wenn die Verarbeitung bis zu diesem Punkt gekommen ist, ohne dass auf das Ergebnis blockierend gewartet wird. Der Rückgabewert ist wiederum eine Instanz vom Typ CompletableFuture. Um das zu erreichen, wird mit der Methode thenAccept(..) ein Consumer übergeben, der das bis dahin erzeugte Ergebnis final verarbeiten soll.

    final Supplier<String> task = ()-> "Hello reactive World";

    final CompletableFuture<String> cf = CompletableFuture.supplyAsync(task);

    final CompletableFuture<Void> acceptA = cf.thenAccept(System.out::println);

Async und ThreadPools

Nun gibt es zum Beispiel die Methode thenAcccept() auch in der Geschmacksrichtung asynchron und nennt sich dann thenAcceptAsync(). Diese namentliche Unterscheidung gibt es in der Klasse CompletableFuture öfter und kennzeichnet immer die asynchrone Variante. Eine weitere Besonderheit ist, dass diese Methode ebenfalls mit der Möglichkeit ausgestattet worden ist, einen dedizierten Executor mit zu übergeben. Dadurch lässt sich der für diesen Schritt zur Verfügung stehende Ressourcenpool explizit angeben. Wenn dem nicht so ist, wird immer der CommonForkJoinPool verwendet.

    final Supplier<String> task = () -> "Hello reactive World";

    final CompletableFuture<String> cf = CompletableFuture.supplyAsync(task);

    final CompletableFuture<Void> acceptA = cf.thenAcceptAsync(System.out::println);


    final ExecutorService fixedThreadPool = Executors
        .newFixedThreadPool(Runtime.getRuntime()
                                   .availableProcessors());

    final CompletableFuture<Void> acceptB = cf.thenAcceptAsync(System.out::println, fixedThreadPool);

Spätestens jetzt stellt sich die Frage, wie man das kompakter schreiben kann. Zum Glück ist das API nett zu uns und wir können mit dem FluentAPI recht einfach zum Ziel kommen. Nachfolgend drei verschiedene Möglichkeiten.

    final ExecutorService fixedThreadPool = Executors
        .newFixedThreadPool(Runtime.getRuntime()
                                   .availableProcessors());

    final Supplier<String> task = () -> "Hello reactive World";
    final Consumer<String> consumer = System.out::println;

    final CompletableFuture<Void> cfA = CompletableFuture
        .supplyAsync(task)
        .thenAcceptAsync(consumer , fixedThreadPool);


    final CompletableFuture<Void> cfB = CompletableFuture
        .supplyAsync(() -> "Hello reactive World")
        .thenAcceptAsync(System.out::println , fixedThreadPool);

    final CompletableFuture<Void> cfC = CompletableFuture
        .supplyAsync(() -> "Hello reactive World")
        .thenAcceptAsync(System.out::println);

Kommen wir nun zurück zu der am Beginn gestellten Aufgabe. Es gilt einen Pfad abzubilden, der aus mehr als nur einem Arbeitsschritt besteht. Dabei gibt es mehrere Zwischenergebnisse. Jedes Zwischenergebnis kann von einem anderen Typ sein. Der erste Schritt besteht in diesem Beispiel darin, einen Wert aus der Quelle A zu holen. Die Quelle steht sinnbildlich für eine beliebige Quelle, kann also ein Wert aus einer Datenbank,aus einer REST-Ressource oder was auch immer sein. Eines haben alle unsere hier angedeuteten Quellen gemeinsam: Die Werte kommen mit einer nicht genau definierbaren Zeitverzögerung.

Als erstes definieren wir für unser Beispiel die drei Eingangstypen. Diese sind alle gleich aufgebaut und bestehen aus einem Pseudowert vom Typ String und einer Instanz vom Typ LocalDateTime.

  public class InputA extends Pair<String, LocalDateTime> {
    public InputA(String value , LocalDateTime timeStamp) {
      super(value , timeStamp);
    }
    public String value(){return getT1();}
    public LocalDateTime timestamp(){return getT2();}
  }

  public class InputB extends Pair<String, LocalDateTime> {
    public InputB(String value , LocalDateTime timeStamp) {
      super(value , timeStamp);
    }
    public String value(){return getT1();}
    public LocalDateTime timestamp(){return getT2();}
  }

  public class InputC extends Pair<String, LocalDateTime> {
    public InputC(String value , LocalDateTime timeStamp) {
      super(value , timeStamp);
    }
    public String value(){return getT1();}
    public LocalDateTime timestamp(){return getT2();}
  }

Starten wir nun mit der Anforderung, den Wert aus der Quelle A, B und C zu holen. Hier werden keine zeitlichen Verzögerungen simuliert und der Zugriff auf die jeweilige Quelle ist der Einfachheit halber gleich realisiert.

  private static Supplier<InputA> supplierA() {
    return () -> {
      //some time consuming stuff
      final int nextInt = new Random().nextInt(10);
      return new InputA(String.valueOf(nextInt) , LocalDateTime.now());
    };
  }

  private static Supplier<InputB> supplierB() {
    return () -> {
      //some time consuming stuff
      final int nextInt = new Random().nextInt(10);
      return new InputB(String.valueOf(nextInt) , LocalDateTime.now());
    };
  }

  private static Supplier<InputC> supplierC() {
    return () -> {
      //some time consuming stuff
      final int nextInt = new Random().nextInt(10);
      return new InputC(String.valueOf(nextInt) , LocalDateTime.now());
    };
  }

Unter der Annahme, dass die jeweiligen Quellen Verzögerungen im System produzieren, werden die
Anfragen in dafür extra einzeln definierten und dimensionierten Ressourcen abgearbeitet. Bitte beim Einsatz in einem realen System unbedingt beachten, dass die Dimensionierung der Ressourcen ein nicht triviales Problem sein kann. Hier werden wir auch dies der Einfachheit halber wieder gleichförmig darstellen.

  private static final int nThreads = Runtime.getRuntime()
                                             .availableProcessors();
  private static final ExecutorService poolInputA = Executors
      .newFixedThreadPool(nThreads);
  private static final ExecutorService poolInputB = Executors
      .newFixedThreadPool(nThreads);
  private static final ExecutorService poolInputC = Executors
      .newFixedThreadPool(nThreads);

Nun haben wir die Supplier, um den technischen Vorgang abzubilden, wie die Daten aus der Quelle geholt werden, und die dafür vorgesehenen Ressourcen definiert. Der nächste Schritt besteht darin die Instanzen vom Typ CompletableFuture zu erzeugen.

  public static CompletableFuture<InputA> sourceA(){
    return CompletableFuture.supplyAsync(supplierA(), poolInputA);
  }  
  public static CompletableFuture<InputB> sourceB(){
    return CompletableFuture.supplyAsync(supplierB(), poolInputB);
  }  
  public static CompletableFuture<InputC> sourceC(){
    return CompletableFuture.supplyAsync(supplierC(), poolInputC);
  }

An dieser Stelle angekommen, können nun alle Werte geholt werden. Es beginnt also jetzt die Definition der Verarbeitung selbst. Auch hier kann wieder bei jedem Schritt der Ergebnistyp ein anderer sein. Das haben wir hier zur Demonstrationszwecken auch gemacht. Demnach kommt jetzt erst einmal die Definition des ersten Ergebnistypen und die Definition der zu verwendenden Ressource.

  public static class ResultOne extends Pair<String, LocalDateTime> {
    public ResultOne(String value , LocalDateTime timeStamp) {
      super(value , timeStamp);
    }

    public String value() {return getT1();}
    public LocalDateTime timestamp() {return getT2();}
  }

  private static final ExecutorService poolOperatorA = Executors
      .newSingleThreadExecutor();

Soweit ist alles schon bekannt. Es fehlt nun noch die Definition des Operators an sich. Es können
Instanzen vom Typ CompletableFuture mit der Methode thenCombine(..) oder auch wieder asynchron mit der Methode thenCombineAsync(..) kombiniert werden. Dazu wird eine BiFunction benötigt, die in der Lage ist, die beiden Ergebniswerte der beiden beteiligten CompletableFuture zu einem neuen Ergebniswert zu verarbeiten.

  private static BiFunction<InputA, InputB, ResultOne> operatorOne() {
    return (a , b) -> {
      //for Demo
      System.out.println("operatorOne.a = " + a);
      System.out.println("operatorOne.b = " + b);
      return new ResultOne(a.value() + " + " + b.value() , LocalDateTime.now());
    };
  }

An dieser Stelle angekommen, kann man die Verarbeitungskette wie folgt formulieren:

    final CompletableFuture<ResultOne> combineAsync = sourceA()
        .thenCombineAsync(sourceB() , operatorOne() , poolOperatorA);

Und wieder erhalten wir eine Instanz vom Typ CompletableFuture. Um den letzten Arbeitsschritt abzubilden, gehen wir genauso vor. Es wird der Ergebnistyp, der Operator und die zu verwendende Ressource definiert.

  public static class ResultTwo extends Pair<String, LocalDateTime> {
    public ResultTwo(String value , LocalDateTime timeStamp) {
      super(value , timeStamp);
    }

    public String value() {return getT1();}
    public LocalDateTime timestamp() {return getT2();}
  }

  private static final ExecutorService poolOperatorB = Executors
      .newSingleThreadExecutor();

  private static BiFunction<ResultOne, InputC, ResultTwo> operatorTwo() {
    return (a , b) -> {
      //for Demo
      System.out.println("operatorTwo.a = " + a);
      System.out.println("operatorTwo.b = " + b);
      return new ResultTwo(a.value() + " - " + b.value() , LocalDateTime.now() );
    };
  }

Nun haben wir alle Schritte zusammen und können final den gesamten Ablauf formulieren.

    sourceA()
        .thenCombineAsync(sourceB() , operatorOne() , poolOperatorA)
        .thenCombineAsync(sourceC() , operatorTwo() , poolOperatorB)
        .thenAcceptAsync(System.out::println)
        .join();

Was man hier schön erkennen kann, ist die Tatsache, dass die funktionalen Aspekte wunderbar zu der Verwendung von CompletableFuture passen. Funktionen definieren die Operatoren, die technischen Aspekte der Dimensionierung ist in der Definition der Ressourcen extrahiert und der Ablauf lässt sich recht linear formulieren. Nun sehen wir uns noch an, wie dieser Ablauf n-mal parallel zum Einsatz kommen kann.

Multiple Pipelines umsetzen

Sobald eine Instanz vom Typ CompletableFuture erzeugt worden ist, beginnt die Abarbeitung. Demnach stellt sich die Frage, wie nun Pipelines definiert werden können, ohne diese direkt in die Verwendung führen zu müssen. Und wie man mehrere Instanzen komfortabel handhaben kann.

So kommen wir zu der Kombination von Streams und dem CompletableFuture. Der Grundgedanke ist recht simpel. Sobald wir in der Lage sind, die Quelle zu definieren, können wir diese dazu verwenden eine Stream-Quelle zu erzeugen. Daraus lassen sich Streams erzeugen, die entweder unendlich die Verarbeitung multipler Datenquellen repräsentieren oder fest definierte Arbeitspakete verarbeiten. Wir nehmen der Einfachheit halber erst einmal an, dass eine feste Menge von 1.000 Ergebnissen produziert werden muss. Dazu kann man zum Beispiel den IntStream verwenden.

    IntStream
        .range(0,1_000)
        .parallel()
        .mapToObj(value -> new Pair<>(value , sourceA()
            .thenCombineAsync(sourceB() , operatorOne() , poolOperatorA)
            .thenCombineAsync(sourceC() , operatorTwo() , poolOperatorB)
            .thenAcceptAsync(System.out::println)))
        .map(p -> {
          final Void join = p.getT2().join();
          return p.getT1();
        })
        .forEach(System.out::println);

Checkpoint Java

In dieser Kolumne klopft der Autor Sven Ruppert (Vaadin) Java auf alltägliche Probleme ab. Er gibt hilfreiche Tipps und Tricks, wie Entwickler gängige Stolperfallen vermeiden und klareren Code schreiben können. Einen besonderen Blick wirft er auf die neuen Möglichkeiten von Funktionaler und Reaktiver Programmierung. Alle Teile der Kolumne Checkpoint Java finden sich hier.

Fazit

In diesem Beispiel werden 1.000 Elemente aus den jeweiligen Quellen genommen und verarbeitet. Wenn man dieselbe Lösung mit Threads realisieren möchte, ist einiges mehr an Aufwand zu erwarten.
Zumal der Entwickler dann ebenfalls mit den Idiomen der verwendeten Synchronisationsmechanismen wie dem StampedLock umgehen muss.

Hier wurde für jede Stufe ein eigener Ressourcenpool definiert und jeweils ein eigener Ergebnistyp definiert. Wenn wir all das kompakter schreiben, kommen wir auf wesentlich weniger Zeilen Quelltext. In jeden Projekt wird es Vereinfachungen oder Annahmen geben, die einzelne Element wegfallen lassen. Zum Beispiel kann man lediglich einen Threadpool zum Warten auf die Eingangswerte definieren und einen für die Durchführung der Operatoren. Die Anzahl der Typen für die jeweiligen Zwischenergebnisse werden dann eventuell nicht so zahlreich sein. Und einige der Operatoren können gleich an Ort und Stelle formuliert werden. Alles zusammen könnte dann zu dem nachfolgenden Beispiel führen:

  private static final int nThreads = Runtime.getRuntime()
                                             .availableProcessors();


  private static final ExecutorService poolToWait = Executors
      .newFixedThreadPool(nThreads * 10);

  private static final ExecutorService poolToWork = Executors
      .newFixedThreadPool(nThreads);

  private static CompletableFuture<Void> createCF() {
    return supplyAsync(() -> valueOf(new Random().nextInt(10)) , poolToWait)
        .thenCombineAsync(supplyAsync(() -> valueOf(new Random().nextInt(10)) , poolToWait) ,
                          (a , b) -> a + " + " + b ,
                          poolToWork)
        .thenCombineAsync(supplyAsync(() -> valueOf(new Random().nextInt(10)) , poolToWait) ,
                          (a , b) -> a + " - " + b ,
                          poolToWork)
        .thenAcceptAsync(System.out::println);
  }

  
  public static void main(String[] args) {

    IntStream
        .range(0 , 1_000)
        .parallel()
        .mapToObj(value -> new Pair<>(value , createCF()))
        .map(p -> {
               p.getT2().join();
               return p.getT1();
             }
        )
        .forEach(System.out::println);

    poolToWait.shutdown();
    poolToWork.shutdown();
  }

Allerdings ist hier schnell ein Punkt erreicht, bei dem der zu wartende Quelltext recht unübersichtlich wird. Demnach neige ich persönlich eher dazu, eine etwas ausführlichere Version zu realisieren. Insgesamt gesehen ist das CompletableFuture allerdings ein sehr mächtiges und dabei doch einfaches Werkzeug, um reaktive Aspekte in einer Anwendung zu realisieren.

In den nächsten Teilen werden wir die gerade gezeigten Elemente um die neuen Möglichkeiten erweitert, die uns Java 9 mitgebracht hat. Den Quelltext findet ihr auf GitHub. Bei Fragen oder Anregungen einfach melden unter sven@vaadin.com oder per Twitter @SvenRuppert.

Happy Coding!

Geschrieben von
Sven Ruppert
Sven Ruppert
Sven Ruppert arbeitet seit 1996 mit Java und ist Developer Advocate bei Vaadin. In seiner Freizeit spricht er auf internationalen und nationalen Konferenzen, schreibt für IT-Magazine und für Tech-Portale. Twitter: @SvenRuppert
Kommentare

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht.