Suche
Reaktive Programmierung mit Java 9:

Checkpoint Java: Die ersten reaktiven Schritte mit dem Observer Pattern

Sven Ruppert

© Shutterstock/Suttha Burawonk

Von der Funktionalen Programmierung mit Java 9 wenden wir uns nun der Reaktiven zu. Dazu machen wir unsere ersten Schritte mit dem Observer Pattern. Das Fernziel ist, funktionale mit reaktiven Aspekten zu verbinden, um das Beste aus beiden Welten zu vereinen.

Vereinfacht kann man sagen, dass es sich beim reaktiven Programmieren in Java um alle möglichen Formen des Observer Patterns handelt. Genau an dieser Stelle werden viele Leser aufschreien, die sich schon damit beschäftigt haben. Aber sehen wir uns das erst einmal an. Das Grundprinzip ist, dass der Aufrufer nicht wartet bis das Ergebnis berechnet worden ist. Die Aufgabe wird formuliert und der nächsten Stufe übergeben. Wenn diese Stufe dann fertig ist, wird diese sich schon zurückmelden.

Zuzüglich zu den Quelltextbeispielen in diesem Artikel verwende ich auch die Sourcen des Open-Source-Projekts Functional-Reactive. Die Sourcen liegen auf GitHub.

Im ersten Schritt bedeutet das, dass ich nicht selbst den Aufruf starte, sondern dass er gestartet wird. Und da sind wir schon beim Observer Pattern. Die verarbeitende Einheit meldet sich beim Produzenten von Informationseinheiten an. Ist eine Informationseinheit fertig für die Verarbeitung, werden die Listener – die Menge kann auch null Elemente enthalten – oder ein Listener explizit mit diesem Wert aufgerufen. Dieser wiederum verarbeitet auf dieselbe Art und Weise sein Ergebnis und liefert es dann auch aus. Und schon haben wir eine beliebig lange Kette an Operatoren, die aufeinander aufbauend ein Ergebnis berechnen.

Das Observer Pattern

Implementieren wir als erstes eine sehr einfache Version des Observer Patterns. Hier ist es nichts anderes als ein Wrapper um eine Map<KEY, Consumer>. Die Map selbst wird mittels ConcurrentHashMap realisiert, damit wir zumindest an dieser Stelle keine Nebenläufigkeitsprobleme bekommen. Denn wir können nicht vorhersehen, wie viele unterschiedliche Threads zu welchem Zeitpunkt Ereignisse erzeugen werden. Ob das reichen wird? Wir kommen später noch dazu. Unter einem Schlüssel kann sich ein Verbraucher registrieren und auch selbst wieder entfernen, sobald er keine weiteren Events mehr empfangen möchte.

Wird ein Event erzeugt und dem Observer übergeben, wird dieses Event an alle registrierten Verbraucher übergeben. Was der Verbraucher damit macht, ist ihm selbst überlassen. Hier zeigt sich aber auch das erste Problem. Ein Event, also die Daten oder besser gesagt die Datenstruktur, wird jedem Verbraucher übergeben. Das bedeutet, dieselbe Instanz wird jedem Verbraucher übergeben. Hier wird stillschweigend angenommen, dass der jeweilige Verbraucher den Inhalt des Ereignises nicht verändert. Eine sehr gewagte Anforderung.

Kommen wir nun zur Implementierung selbst. Diese ist im Moment noch einfach.

  public class Observable<KEY, VALUE> {

    private final Map<KEY, Consumer<VALUE>> listeners = new ConcurrentHashMap<>();

    public void register(KEY key , Consumer<VALUE> listener) {
      listeners.put(key , listener);
    }

    public void unregister(KEY key) { listeners.remove(key);}

    public void sendEvent(VALUE event) {
      listeners.values().forEach(c -> c.accept(event));
    }
  }

Nachfolgend eine beispielhafte Verwendung unseres Verbrauchers.

  public static void main(String[] args) {

    final Observable<String, String> observable = new Observable<>();

    observable.register("key1", System.out::println);
    observable.register("key2", System.out::println);

    observable.sendEvent("Hello World");

    observable.unregister("key1");
    observable.sendEvent("Hello World again");
  }

Die dazugehörige Ausgabe ist:

Hello World
Hello World
Hello World again

Selbst diese einfache Implementierung führt allerdings immer mal wieder dazu, dass Memory Leaks produziert werden. Memory Leaks und Java? Doch das geht!

Kommen wir zu einem recht typischen Anwendungsfall. Oft werden solche Observer genommen, um eine Art Event-Service aufzubauen. Dazu wird der Observer statisch zur Verfügung gestellt. Nun können von allen möglichen Stellen im Programm Listener registriert werden.

  public class Registry {
    private static final Observable<String, String> observable = new Observable<>();

    public static void register(String key, Consumer<String> consumer){
      observable.register(key, consumer);
    }

    public static void unregister(String key){
      observable.unregister(key);
    }

    public static void sendEvent(String input){
      observable.sendEvent(input);
    }
  }

In der Verwendung hat sich nicht viel geändert.

  public static void main(String[] args) {
    Registry.register("key1" , System.out::println);
    Registry.register("key2" , System.out::println);

    Registry.sendEvent("Hello World");

    Registry.unregister("key1");
    Registry.sendEvent("Hello World again");
  }

Leider passiert immer wieder, dass in einem Programm Verbraucher registriert werden, die nie wieder entfernt werden. Das kann zum einen dazu führen, dass der Observer selbst einfach überlaufen wird, zum anderen kann es auch verhindern, dass bestimmte Teile der Anwendung von dem Garbage Collector (GC) erfasst werden können.

Nehmen wir als Beispiel eine Webanwendung, die im Consumer eine grafische Einheit verwendet, zum Beispiel eine Instanz von einem Label. Wird die Session geschlossen, hängt diese Einheit leider immer noch im Consumer und damit im Observer. Solange dieser nicht aufräumt, oder selbst dem GC zum Opfer fallen kann oder darf, kann es sein, dass alle indirekt enthaltenen Daten über das Label gehalten werden. Was fehlt, ist ein komfortabler Weg für den Entwickler, dieses automatisch erledigen zu lassen.

Checkpoint Java

In dieser Kolumne klopft der Autor Sven Ruppert (Vaadin) Java auf alltägliche Probleme ab. Er gibt hilfreiche Tipps und Tricks, wie Entwickler gängige Stolperfallen vermeiden und klareren Code schreiben können. Einen besonderen Blick wirft er auf die neuen Möglichkeiten von Funktionaler und Reaktiver Programmierung. Alle Teile der Kolumne Checkpoint Java finden sich hier.

Lösung mit CallBack

Ein Lösungsansatz besteht darin, dass die Elemente, die mit dem Observer interagieren, über einen Lebenszyklus verfügen. Damit meine ich nicht die Methode finalize. Diese sollte man so lange wie möglich nicht verwendet. Als Beispiel hier wieder die Webanwendung. Wenn die Komponenten über etwas derartiges verfügen wie ein attach() und detach(), kann man sie verwenden, um sich wieder vom Observer zu lösen. Hierfür definieren wir ein Interface Registration, der als Rückgabewert der registrierenden Methode verwendet wird. Hier wird das Lösen der Verbindung schon hinterlegt.

  public static interface Registration {
    public void remove();
  }

  public class Observable<KEY, VALUE> {

    private final Map<KEY, Consumer<VALUE>> listeners = new ConcurrentHashMap<>();

    public Registration register(KEY key , Consumer<VALUE> listener) {
      listeners.put(key , listener);

      return () -> listeners.remove(key);
    }

    public void sendEvent(VALUE event) {
      listeners.values().forEach(c -> c.accept(event));
    }
  }

Wie auch schon beim Observer, gibt es nun keine Methode mehr, um einen Eintrag explizit zu entfernen.

  public class Registry {

    private static final Observable<String, String> observable = new Observable<>();

    public static Registration register(String key , Consumer<String> consumer) {
      return observable.register(key , consumer);
    }

    public static void sendEvent(String input) {
      observable.sendEvent(input);
    }
  }

Die Verwendung ändert sich derart, dass der Vorgang zum Entfernen der Verbindung nun auf der Registration selbst gestartet wird. Diese Methode sollte dann direkt oder indirekt von dem Teil der Anwendung aufgerufen werden, der sich um den Lebenszyklus dieser Elemente kümmert.

  public static void main(String[] args) {
    final Registration registerA = Registry.register("key1" , System.out::println);
    final Registration registerB = Registry.register("key2" , System.out::println);
    Registry.sendEvent("Hello World");

    //done by life cycle
    registerA.remove();

    Registry.sendEvent("Hello World again");
  }

Mehr als ein Observer

Kommen wir nun zurück zum Observer und versuchen mehrere miteinander zu koppeln. Dazu werden wir die Ein- und Ausgänge der Observer miteinander verbinden. Als Beispiel kommt die folgende Klasse mit zwei Methoden zum Einsatz.

  public static class Worker {

    public String doWork(String input) {
      return input.toUpperCase();
    }

    public String[] split(String input) {
      return input.split(" ");
    }
  }

Wenn diese Arbeitsschritte hintereinander ausgeführt werden sollen, könnte man das wie folgt schreiben:

    final Worker worker = new Worker();
    final String[] split = worker.split(worker.doWork("Hello World"));
    System.out.println("result = " + Arrays.asList(split));

Möchte man dies mittels Oberserver miteinander koppeln, muss in der derzeitigen Lösung für jede Zwischenstufe ein Oserver definiert werden. Hier sind es drei Stück. Der Ablauf ist recht einfach:

  • Rufe die Methode worker.doWork(s) auf.
  • Rufe mit dem Ergebnis aus dem ersten Aufruf die Methode worker.split(..) auf.
  • Das finale Ergebnis sollte nun zur Verfügung stehen.

Um die erste Methode aufzurufen, kann man sich den folgenden Aufruf vorstellen:
observableA.sendEvent("Hello World ");. Damit die Daten verarbeitet werden, muss der Arbeitsschritt zuvor registriert worden sein. Nun kommen wir zum ersten Punkt, der ein wenig mehr Beachtung bedarf. Wenn ein Observer ein Ergebnis verarbeitet, erhalten wir kein Ergebnis zurück. Um das abzubilden, arbeiten wir indirekt auf dem Ergebnis mit observableB.sendEvent(worker.doWork(s)). Beides zusammen ergibt nun den ersten Observer.

    final Observable<String, String> observableA = new Observable<>();
    final Observable<String, String> observableB = new Observable<>();
    observableA.register("A" , s -> observableB.sendEvent(worker.doWork(s)));

Innerhalb des zweiten Observers passiert noch nichts sinnvolles. Auch hier muss noch ein Verarbeitungsschritt registriert werden.: observableB.register("B" , s -> observableC.sendEvent(worker.split(s)));. Hier wird ebenfalls indirekt auf dem Ergebnis gearbeitet. Und so kommen wir zu unserem dritten und vorerst letzten Observer: observableC.register("C" , strings -> results.addAll(Arrays.asList(strings)));. Hier kommen wir nun zu dem Punkt, an dem wir das Ergebnis in irgendeiner Form aus dieser Kette herausholen wollen. In diesem Fall arbeiten wir auf einer Datenstruktur, die außerhalb definiert worden ist: List. Alles zusammen sieht nun wie folgt aus:

    final List<String> results = new ArrayList<>();
    
    final Observable<String, String> observableA = new Observable<>();
    final Observable<String, String> observableB = new Observable<>();
    final Observable<String, String[]> observableC = new Observable<>();

    observableA.register("A" , s -> observableB.sendEvent(worker.doWork(s)));
    observableB.register("B" , s -> observableC.sendEvent(worker.split(s)));
    observableC.register("C" , strings -> results.addAll(Arrays.asList(strings)));

    observableA.sendEvent("Hello World ");

    System.out.println("results = " + results);

Soweit haben wir dasselbe Ergebnis wie zuvor. Einen wirklichen Vorteil haben wir uns aber noch nicht erarbeitet. Es ist eher schwieriger geworden, da wir die Verarbeitungskette von hinten nach vorne definiert haben.

Unabhängige Schritte

Bisher wird die Kette von einer Instanz der Klasse Worker verarbeitet. Wenn mehr als ein Thread Events sendet, kann es leicht zu Nebenläufigkeitsproblemen kommen. Damit uns das erspart bleibt, verwenden wir als erstes mehr als einer Instanz der Klasse Worker.

    final List<String> results = new ArrayList<>();

    final Observable<String, String> observableA = new Observable<>();
    final Observable<String, String> observableB = new Observable<>();
    final Observable<String, String[]> observableC = new Observable<>();

    observableA.register("A" , s -> observableB.sendEvent(new Worker().doWork(s)));
    observableB.register("B" , s -> observableC.sendEvent(new Worker().split(s)));
    observableC.register("C" , strings -> results.addAll(Arrays.asList(strings)));

    observableA.sendEvent("Hello World ");

Der nächste Schritt entfernt erst einmal vollständig die Instanz der Klasse Worker inklusive der Implementierung und ersetzt jeden Schritt durch die Methodenimplementierung selbst. Hiermit haben wir eventuell vorhandene Zustände weiter reduziert.

    final List<String> results = new ArrayList<>();

    final Observable<String, String> observableA = new Observable<>();
    final Observable<String, String> observableB = new Observable<>();
    final Observable<String, String[]> observableC = new Observable<>();

    observableA.register("A" , s -> observableB.sendEvent(s.toUpperCase()));
    observableB.register("B" , s -> observableC.sendEvent(s.split(" ")));
    observableC.register("C" , strings -> results.addAll(Arrays.asList(strings)));

    observableA.sendEvent("Hello World ");

    System.out.println("results = " + results);

Hier kann man schon erkennen, dass sich die Verwendung von Funktionen geradezu aufdrängt.

Von List zu Tree

Vorteile ergeben sich aber erst, wenn man anstelle einer linearen Liste von Verarbeitungen multiple Wege beschreitet. Wir bauen nun einen Baum auf. Gehen wir davon aus, dass wir nicht nur alles in Großbuchstaben haben möchten, sondern auch eine Variante, die nur aus Kleinbuchstaben besteht. Um das zu erreichen, kann man einen weiteren Observer einhängen.

    final List<String> results = new ArrayList<>();

    final Observable<String, String> observableA = new Observable<>();
    final Observable<String, String> observableB = new Observable<>();
    final Observable<String, String[]> observableC = new Observable<>();

    observableA.register("A1" , s -> observableB.sendEvent(s.toUpperCase()));
    observableA.register("A2" , s -> observableB.sendEvent(s.toLowerCase()));
    observableB.register("B" , s -> observableC.sendEvent(s.split(" ")));
    observableC.register("C" , strings -> results.addAll(Arrays.asList(strings)));

    observableA.sendEvent("Hello World ");

    System.out.println("results = " + results);

Das kann man an allen möglichen Stellen anbauen. Zum Beispiel kann man auch die terminale Operation, die das Ergebnis auf der Kommandozeile ausgibt, mittels Observer abbilden.

    final List<String> results = new ArrayList<>();

    final Observable<String, String> observableA = new Observable<>();
    final Observable<String, String> observableB = new Observable<>();
    final Observable<String, String[]> observableC = new Observable<>();

    observableA.register("A1" , s -> observableB.sendEvent(s.toUpperCase()));
    observableA.register("A2" , s -> observableB.sendEvent(s.toLowerCase()));
    observableB.register("B" , s -> observableC.sendEvent(s.split(" ")));
    observableC.register("C1" , strings -> results.addAll(Arrays.asList(strings)));
    observableC.register("C2" , strings -> System.out.println("From C2 " + Arrays.asList(strings)));

    observableA.sendEvent("Hello World ");

    System.out.println("results = " + results);

ConcurrentHashSet einsetzen

In diesem Schritt werden wir die Map in dem Observer durch ein Set ersetzen. Zu Beachten ist hier die Art und Weise wie ein ConcurrentHashSet erzeugt wird. Dies folgende Version ist meiner Meinung nach der synchronisierten Version vorzuziehen:

  public static class Observable<VALUE> {

    private final Set<Consumer<VALUE>> listeners = ConcurrentHashMap.newKeySet();

    public Registration register(Consumer<VALUE> listener) {
      listeners.add(listener);

      return () -> listeners.remove(listener);
    }

    public void sendEvent(VALUE event) {
      listeners.forEach(c -> c.accept(event));
    }

Dadurch vereinfachen sich auch die Implementierungen.

    final List<String> results = new ArrayList<>();

    final Observable<String> observableA = new Observable<>();
    final Observable<String> observableB = new Observable<>();
    final Observable<String[]> observableC = new Observable<>();

    observableA.register(s -> observableB.sendEvent(s.toUpperCase()));
    observableA.register(s -> observableB.sendEvent(s.toLowerCase()));
    observableB.register(s -> observableC.sendEvent(s.split(" ")));
    observableC.register(strings -> results.addAll(Arrays.asList(strings)));
    observableC.register(strings -> System.out.println("From C2 " + Arrays.asList(strings)));

    observableA.sendEvent("Hello World ");

    System.out.println("results = " + results);

Fazit

An diesem Schritt angekommen ergeben sich einige Fragen. Zum Beispiel:

  • Wie kann man ganze Teilbäume de-registrieren ohne alles manuell zu realisieren?
  • Welcher Weg wird angeboten, um an ein Ergebnis einer Verarbeitungsstufe zu gelangen?
  • Wie kann man skalieren?
  • Ist asynchrone Abarbeitung möglich?

Schnell erkennt man, dass es recht viel Aufwand bedeuten kann, wenn man mehr als nur einen
einfachen Observer benötigt. Deutlich wird ebenso, dass es auch bei kleinen Aufgaben sehr unübersichtlich werden kann, den Weg wie Aufgabenketten zu formulieren. Ungünstig ist hier zum Beispiel, dass der Prozess eher von hinten nach vorne definiert werden muss. Aber dennoch kann man mit den reaktiven Ansätzen sehr performante und skalierbare Anwendungen produzieren. Schon in diesem Beispiel zeigt sich, das funktionale Aspekte hier sehr gut passen. Denn es gilt: Zustände sind mit Vorsicht zu genießen und sollten vermieden werden.

Wir werden uns in den nächsten Teilen ausführlich mit dieser Thematik beschäftigen und zusätzlich die Brücke zu den funktionalen Aspekten herstellen. Den Quelltext findet ihr auf GitHub. Bei Fragen oder Anregungen einfach melden unter sven@vaadin.com oder per Twitter @SvenRuppert.

Happy Coding!

Geschrieben von
Sven Ruppert
Sven Ruppert
Sven Ruppert arbeitet seit 1996 mit Java und ist Developer Advocate bei Vaadin. In seiner Freizeit spricht er auf internationalen und nationalen Konferenzen, schreibt für IT-Magazine und für Tech-Portale. Twitter: @SvenRuppert
Kommentare

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht.