Suche
Reaktive Programmierung mit Java

Checkpoint Java: Saubere reaktive Verarbeitungsketten aufbauen

Sven Ruppert

© Shutterstock/Suttha Burawonk

Mit CompletableFuture kann man Arbeitspakete auf verschiedene Resource Pools verteilen und diese auch als eine Kette von asynchronen Aufrufen formulieren. Das kann aber zu umständlichem und schwer wartbarem Code führen, vor allem, wenn es darum geht, eine beliebige Anzahl an Funktion in einer Kette zu verschalten.

Im letzten Teil dieser Serie haben wir uns angesehen, wie wir mit CompletableFuture Arbeitspakete auf verschiedene Resource Pools verteilen können und wie wir diese Arbeit als eine Kette von asynchronen Aufrufen formulieren können. In diesem Teil werden wir uns damit auseinandersetzen, wie wir die Ketten kompakter formulieren können und welche Möglichkeiten zur Steuerung zur Verfügung stehen.

Beginnen wir mit der trivialen Aufgabe einen String in einen Integer-Wert zu transformieren. Hierbei gibt es einige typische Fallstricke, obwohl der Vorgang selbst einfach und auch schon innerhalb der JDK gelöst ist. Die hier verwendete Ausgangslage ist wie im folgenden Code-Abschnitt definiert. Wir haben dabei im Hinterkopf, dass die beiden Funktionen Stellvertreter für aufwendigere Arbeitspakete sind.

final String input = .... // some String
final Integer value = Integer.parseInt(input);

Auch bei diesem einfachen Beispiel kann schon eine Menge schief gehen. Nehmen wir an, dass der String null sein kann oder nicht. Oder das der Inhalt des gelieferten String schlicht und ergreifend keine Zahl ist oder eine, die nicht in den Wertebereich eines Integer passt. Die Kurzversion lautet also: Es können Fehler auftreten, die wiederum zu Exceptions führen.

Zuzüglich zu den Quelltextbeispielen in diesem Artikel verwende ich auch die Sourcen des Open-Source-Projekts Functional-Reactive. Die Sourcen liegen auf GitHub.

Teile und verteile

Als erstes werden wir die Schritte wieder aufteilen und daraus einzelne Instanzen der Klasse completableFuture erzeugen.

    final Supplier<String> nextStringValue = () -> valueOf(new Random().nextInt(10));
    final CompletableFuture<String> step01 = CompletableFuture.supplyAsync(nextStringValue);
    
    final CompletableFuture<Integer> step02 = supplyAsync(nextStringValue)
        .thenComposeAsync(new Function<String, CompletionStage<Integer>>() {
          @Override
          public CompletionStage<Integer> apply(String s) {
            return CompletableFuture.completedFuture(Integer.parseInt(s));
          }
        });

In diesem Fall kommt eine neue Methode zum Einsatz. Gemeint ist hier das Erzeugen einer Instanz vom Typ CompletableFuture, bei dem das Ergebnis schon feststeht. Gemeint ist die Methode CompletableFuture.completedFuture(Integer.parseInt(s)). In diesem Beispiel ist demnach nichts mehr zu erledigen. Es wurden lediglich die einzelnen Werte verpackt. Das geht aber natürlich wesentlich kompakter.

    final CompletableFuture<Integer> step02 = supplyAsync(nextStringValue)
        .thenComposeAsync(s -> completedFuture(Integer.parseInt(s)));

Exceptions einfangen mit handleAsync

Der Umgang mit einer auftretenden Exception wird in diesem Fall mit der Methode handleAsync definiert. Hier kann man zwei Dinge festlegen: Das erste ist der positive Fall, dass ein Ergebnis vorliegen wird, und der zweite Fall ist die Verarbeitung der geworfenen Exception.

    final CompletableFuture<String> step02 = supplyAsync(nextStringValue)
        .thenComposeAsync(s -> completedFuture(Integer.parseInt(s)))
        .handleAsync(new BiFunction<Integer, Throwable, String>() {
          @Override
          public String apply(Integer value , Throwable throwable) {
            return (throwable == null)
                   ? "all is ok, value is " + value
                   : throwable.getMessage();
          }
        });

Natürlich kann man es mit einem Lambda-Konstrukt auch kompakter schreiben.

    final CompletableFuture<String> step02 = supplyAsync(nextStringValue)
        .thenComposeAsync(s -> completedFuture(Integer.parseInt(s)))
        .handleAsync((value , throwable) -> (throwable == null)
                                            ? "all is ok, value is " + value
                                            : throwable.getMessage());                                

In diesem Fall ist die Unterscheidung mit BiFunction realisiert. Das bedeutet, dass es einen Rückgabewert geben muss. Nun sind wir wieder an der Stelle, an der die Klasse Optional oder eben die Optional-Weiterentwicklung Result sinnvoll ist.

    final CompletableFuture<Result<Integer>> step02 = supplyAsync(nextStringValue)
        .thenComposeAsync(s -> completedFuture(Integer.parseInt(s)))
        .handleAsync((value , throwable) -> (throwable == null)
                                            ? Result.success(value)
                                            : Result.failure(throwable.getMessage()));

Diese Fallunterscheidung lässt sich aber auch mit der schon vorher vorgestellten CheckedFunction abbilden. Die Fallunterscheidung ist damit schon definiert und sieht wie folgt aus:

    final CompletableFuture<Result<Integer>> step02 = supplyAsync(nextStringValue)
        .thenComposeAsync(s -> completedFuture(((CheckedFunction<String, Integer>) Integer::parseInt).apply(s)));

Wenn man an dieser Stelle angelangt ist, ist der nächste Schritt mehr als eine Verarbeitungsstufe zu kombinieren. Ist das Ergebnis doch nun in einem Result verpackt. Man hat auch die Möglichkeit, unnötige Schritte zu verkürzen, da meistens Fehler in einer Stufe die restlichen, nachfolgenden Verarbeitungsstufen überflüssig machen.

Verarbeitungskette mit thenComposeAsync

Als nächstes erweitern wir also den Verarbeitungsprozess um eine weitere Stufe. Es soll im Erfolgsfall eine Nachricht generiert werden. Im Fehlerfall wird eine mehr oder weniger aussagekräftige Nachricht erstellt und zurückgeliefert.

    final CompletableFuture<Result<String>> result = supplyAsync(nextStringValue)
        .thenComposeAsync(input -> completedFuture(((CheckedFunction<String, Integer>) Integer::parseInt).apply(input)))
        .thenComposeAsync(input -> completedFuture(input.isPresent()
                                                   ? input.map(integer -> "Result is now " + integer)
                                                   : Result.<String>failure("Value was not available")));

Hier liegt implizit eine Reduktion um eine Stufe vom Typ Result vor. Im Erfolgsfall wird mit der Methode map eine neue Instnaz vom Typ Result erzeugt. Genauso ist es mit der Fehlernachricht. Das lässt sich expliziter schreiben, um zu verdeutlichen was genau passiert.

    final BiFunction<Result<Integer>, Function<Integer, Result<String>>, Result<String>> flatMap
        = (input , funct) -> (input.isPresent())
                             ? funct.apply(input.get())
                             : input.asFailure();

In unserem Fall gibt es eine Methode flatMap sowohl in der Klasse Optional als auch in der Klasse Result. Hiermit ist das Problem an dieser Stelle gelöst. Innerhalb einer Methode flatMap kann natürlich auch wieder eine Exception verarbeitet werden.

        supplyAsync(nextStringValue)
            .thenComposeAsync(input -> completedFuture(((CheckedFunction<String, Integer>) Integer::parseInt).apply(input)))
            .thenComposeAsync(input -> completedFuture(input.flatMap((CheckedFunction<Integer, String>) integer -> "Result is now " + integer)))

In den Quelltexten zu diesem Teil der Kolumne befindet sich noch eine kleine Komfortfunktion, mit der das Ergebnis der erzeugten Instanzen vom Typ CompletableFuture auf der Kommandozeile ausgegeben werden kann.

    Consumer<CompletableFuture<Result<?>>> print = (cf) -> cf
        .whenCompleteAsync((result , throwable) -> {
          if (throwable == null)
            result.ifPresentOrElse(
                System.out::println ,
                (Consumer<String>) System.out::println
            );
          else System.out.println("throwable = " + throwable);
        })
        .join();

Damit sieht der Aufruf wie folgt aus:

    print.accept(
        supplyAsync(() -> "oh no")
            .thenComposeAsync(input -> completedFuture(((CheckedFunction<String, Integer>) Integer::parseInt).apply(input)))
            .thenComposeAsync(input -> completedFuture(input.flatMap((CheckedFunction<Integer, String>) integer -> "Result is now " + integer)))
    );

Funktionen als Kette verdrahten

Wenn man sich die bisher erstellten Beispiele ansieht, erkennt man schnell, dass der Aufwand immer höher wird je mehr Funktionen hintereinander geschaltet werden sollen. Außerdem müssen wir uns um den Fall kümmern, dass eine vorher nicht definierte Anzahl an Funktionen in einer Verarbeitungskette hintereinander geschaltet werden sollen. Definieren wir einfach drei Funktionen, im Moment noch alle mit dem selben Eingangs- und Ausgangstyp.

    final Function<String, String> step1 = (input) -> input.toUpperCase();
    final Function<String, String> step2 = (input) -> input + " next A";
    final Function<String, String> step3 = (input) -> input + " next B";

Wenn man diese in klassischer Art und Weise hintereinander schaltet, erhält man folgende Funktion:

    final String hello = step1
        .andThen(step2)
        .andThen(step3)
        .apply("hello"); // blocking call

Das wird jetzt in eine Kette nicht-blockierender Aufrufe überführt. An dieser Stelle soll eigentlich eine TriFunction erreicht werden, die basierend auf den drei Funktionen eine Funktion liefert, die einen Eingangswert erhält und ein CompletableFuture zurückliefert. Damit hat man etwas recht Interessantes erreicht: Es kann eine Kette basierend auf mehreren Instanzen vom Typ CompletableFuture gebaut werden, ohne dass diese direkt in Aktion tritt.

    TriFunction<
        Function<String, String>,
        Function<String, String>,
        Function<String, String>,
        Function<String, CompletableFuture<String>>> inputTriA = (f1 , f2 , f3) -> {
      return (value) -> {
        final CompletableFuture<String> result1 = supplyAsync(() -> f1.apply(value));
        final CompletableFuture<String> result2 = result1.thenComposeAsync(v -> supplyAsync(() -> f2.apply(v)));
        final CompletableFuture<String> result3 = result2.thenComposeAsync(v -> supplyAsync(() -> f3.apply(v)));
        return result3;
      };
    };

Das kann man natürlich auch wieder kompakter schreiben.

    TriFunction<
        Function<String, String>,
        Function<String, String>,
        Function<String, String>,
        Function<String, CompletableFuture<String>>> inputTriB
        = (f1 , f2 , f3)
        -> (value)
        ->
        CompletableFuture
            .completedFuture(value)
            .thenComposeAsync(v -> supplyAsync(() -> f1.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> f2.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> f3.apply(v)));

Und auch noch allgemeiner definieren.

  public <A, B, C, D> TriFunction<
      Function<A, B>, 
      Function<B, C>,
      Function<C, D>,
      Function<A, CompletableFuture<D>>> genericTriFunction() {

    return (f1 , f2 , f3)
        -> (value)
        ->
        CompletableFuture
            .completedFuture(value)
            .thenComposeAsync(v -> supplyAsync(() -> f1.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> f2.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> f3.apply(v)));
  }

Wenn man diese Funktion in eine curried Version transformiert, erhält man die Möglichkeit, die einzelnen Funktionen nacheinander zu übergeben. Somit kommen wir dem gewünschten Verhalten ein wenig näher, dass eine beliebige Anzahl an Funktionen übergeben werden kann. Zur Erinnerung: Für ein einfaches Beispiel sieht das Ergebnis wie folgt aus.

    TriFunction<String, String, String,Integer> triDemo 
        = (s1,s2,s3)->{return -1;};
    
    final Function<String, Function<String, Function<String, Integer>>> 
    triDemoCurried 
        = Transformations.<String, String, String, Integer>curryTriFunction().apply(triDemo);
    
    final Integer i = triDemoCurried.apply("A").apply("B").apply("C");

Wenn man sich allerdings die transformierte Funktion basierend auf den Instanzen vom Typ CompletableFuture ansieht, wird man recht schnell zu dem Entschluss kommen, dass es nicht sonderlich wartbarer Quelltext ist.

     final Function<
            Function<String, String>, 
            Function<Function<String, String>, 
                     Function<Function<String, String>, 
                              Function<String, CompletableFuture<String>>>>> apply
        = Transformations.<Function<String, String>, 
                           Function<String, String>, 
                           Function<String, String>, 
                           Function<String, CompletableFuture<String>>>
                           curryTriFunction()
                           .apply(inputTriA);

    final Function<String, 
                   CompletableFuture<String>> resultCF 
                   = apply
                          .apply(step1)
                          .apply(step2)
                          .apply(step3);
    
    final CompletableFuture<String> cf = resultCF.apply("hello World");

Es muss also noch einen anderen Weg geben, dieses Ziel zu erreichen. Gehen wir dazu nochmal einen Schritt zurück und nehmen die manuelle Version als Ausgangslage.

    //manual
    Function<String, CompletableFuture<String>> f = (value) ->
        CompletableFuture
            .completedFuture(value)
            .thenComposeAsync(v -> supplyAsync(() -> step1.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> step2.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> step3.apply(v)));

Erreicht werden soll, dass der Anteil mit composeAsync definiert wird, ohne dass schon eine Ausführung gestartet wird. Der Startwert steht also noch nicht zur Verfügung. Ziel soll auch sein, dass eine Funktion zur Verfügung steht, die dafür sorgt, dass der zu verarbeitende Wert übergeben wird und als Rückgabewert eine Instanz von CompletableFuture erhalten wird. Die Anzahl der Verarbeitungsschritte soll nicht vorher fest definiert werden. Erst beim Aufbau der Kette werden die Funktionen nach und nach zur Verfügung gestellt. Definieren wir nun erneut drei Funktionen, so dass sich bei jeder Verarbeitungsstufe auch der Ergebnistyp ändert.

    final Function<String, Integer> step1A = Integer::parseInt;
    final Function<Integer, String> step2A = (input) -> input + " next A";
    final Function<String, Pair<String, Integer>> step3A = (input) -> new Pair<>(input , input.length());

Manuell ergibt sich wieder folgendes Ergebnis:

    Function<String, CompletableFuture<Pair<String, Integer>>> fB = (value) ->
        CompletableFuture
            .completedFuture(value)
            .thenComposeAsync(v -> supplyAsync(() -> step1A.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> step2A.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> step3A.apply(v)));

Wenn man den Anteil mit thenComposeAsync extrahiert und generisch formuliert, ist ein mögliches Ergebnis die nachfolgende Methode. Abstrakt gesprochen passiert hier folgendes: Nehme die Funktion Function<T, CompletableFuture> aus dem Schritt n-1 und hänge dich selbst mit supplyAsync(() -> nextTransformation.apply(v)) an. Die Instanz der Klasse CFQ ist der Rahmen um alles. Dazu kommen wir gleich.

    private Function<T, CompletableFuture<R>> resultFunction;

    public <N> CFQ<T, N> thenCombineAsync(Function<R, N> nextTransformation) {
      final Function<T, CompletableFuture<N>> f = this.resultFunction
          .andThen(before -> before.thenComposeAsync(v -> supplyAsync(() -> nextTransformation.apply(v))));
      return new CFQ<>(f);
    }

Nun fehlt noch der initiale Schritt. Hier definieren wir die Methode, die uns aus einer normalen Function<T,R> eine Funktion vom Typ Function<T, CompletableFuture> erzeugt.

    public static <T, R> CFQ<T, R> define(Function<T, R> transformation) {
      return new CFQ<>(t -> CompletableFuture.completedFuture(transformation.apply(t)));
    }

Jetzt haben wir alle Schritte zusammen, um eine Kette von beliebigen Funktionen in eine Kette von Arbeitsschritten zu transformieren. Ausgelassen habe ich an der Stelle, dass natürlich jede Funktion auch in einer definierten Ressource abgearbeitet werden könnte. Das wäre nur ein weiterer Parameter vom Typ Executor. Die Funktionen, die übergeben werden, müssen natürlich dem Kriterium entsprechen, dass der Ausgangstyp der Funktion n-1 gleich dem Eingangstyp der Funktion n ist.

  public static class CFQ<T, R> {

    private Function<T, CompletableFuture<R>> resultFunction;

    private CFQ(Function<T, CompletableFuture<R>> resultFunction) {
      this.resultFunction = resultFunction;
    }

    public static <T, R> CFQ<T, R> define(Function<T, R> transformation) {
      return new CFQ<>(t -> CompletableFuture.completedFuture(transformation.apply(t)));
    }

    public <N> CFQ<T, N> thenCombineAsync(Function<R, N> nextTransformation) {
      final Function<T, CompletableFuture<N>> f = this.resultFunction
          .andThen(before -> before.thenComposeAsync(v -> supplyAsync(() -> nextTransformation.apply(v))));
      return new CFQ<>(f);
    }

    public Function<T, CompletableFuture<R>> resultFunction() {
      return this.resultFunction;
    }
  }

In der Verwendung sieht das dann so aus:

    // functions die chain
    final Function<String, Integer> step1A = Integer::parseInt;
    final Function<Integer, String> step2A = (input) -> input + " next A";
    final Function<String, Pair<String, Integer>> step3A = (input) -> new Pair<>(input , input.length());

    //transformation
    final Function<String, CompletableFuture<Pair<String, Integer>>> f = CFQ
        .define(step1A)
        .thenCombineAsync(step2A)
        .thenCombineAsync(step3A)
        .resultFunction();
    
    //usage - activate
    final CompletableFuture<Pair<String, Integer>> cf = f.apply("hello");

    //usage - get result
    final String hello = cf
        .join()
        .getT1();

Checkpoint Java

In dieser Kolumne klopft der Autor Sven Ruppert (Vaadin) Java auf alltägliche Probleme ab. Er gibt hilfreiche Tipps und Tricks, wie Entwickler gängige Stolperfallen vermeiden und klareren Code schreiben können. Einen besonderen Blick wirft er auf die neuen Möglichkeiten von Funktionaler und Reaktiver Programmierung. Alle Teile der Kolumne Checkpoint Java finden sich hier.

Fazit

Mit der Klasse CFQ ist man in der Lage, eine beliebige Anzahl an Funktionen in eine Kette zu verschalten. Jeder Schritt kann in einen eigenen Ressourcenpool ausgelagert werden und blockiert die Ausführung an dieser Stelle nicht. Ebenfalls können solche Ketten definiert werden, ohne dass die Verarbeitung direkt beginnen muss. Der Aufwand dies zu formulieren, ist im Vergleich zu der manuellen Variante erheblich geringer und dabei auch noch typsicher. Die Klasse CFQ ist im Projekt auf GitHub unter dem Namen CompletableFutureQueue zu finden. Den Quelltext findet ihr auf GitHub. Bei Fragen und Anregungen einfach melden unter sven@vaadin.com oder per Twitter @SvenRuppert.

Happy Coding!

Geschrieben von
Sven Ruppert
Sven Ruppert
Sven Ruppert arbeitet seit 1996 mit Java und ist Developer Advocate bei Vaadin. In seiner Freizeit spricht er auf internationalen und nationalen Konferenzen, schreibt für IT-Magazine und für Tech-Portale. Twitter: @SvenRuppert
Kommentare

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht.