Reaktiv in Java mit RxJava

Reaktiv in die Praxis: Reaktive Programmierung mit RxJava

Christian Lambert, Matthias Ebbing

© Shutterstock / Kaspars Grinvalds

Das „Reactive Manifesto“ ist mittlerweile vielen Entwicklern ein Begriff. Insbesondere der Streaminganbieter Netflix setzt stark auf diesen Architekturansatz und hat nicht nur viele Blogeinträge und Präsentationen zu dem Thema veröffentlicht, sondern auch zentrale, intern entwickelte und eingesetzte Bibliotheken Open Source zur Verfügung gestellt. Auf Basis zweier dieser Bibliotheken, RxJava und Hystrix, wollen wir eine praktische Einführung in die reaktive Programmierung geben.

Obwohl bereits viel über reaktive Programmierung und das reaktive Manifesto gesprochen und geschrieben wurde, ist das Thema in der Praxis häufig noch nicht angekommen. Manchmal wird es hinter vorgehaltener Hand doch eher als esoterisch angesehen oder als etwas, das nur für die Systeme großer Konzerne wie Amazon, eBay oder Netflix interessant ist. In unserem täglichen Projektgeschäft spielt die reaktive Programmierung jedenfalls selten eine Rolle. Wir möchten mit diesem Artikel dazu beitragen, anhand praktischer Beispiele das Verständnis für den reaktiven Ansatz zu erhöhen und die Hemmschwelle für den Einsatz zu senken.

Aber was bedeutet eigentlich reaktive Programmierung? Im Wesentlichen geht es darum, Zustandsänderungen aktiv an alle Systembestandteile zu propagieren, die sich für diesen Zustand interessieren. Ein Standardbeispiel, um den Unterschied zum imperativen Paradigma zu verdeutlichen, ist die Umsetzung der einfachen Funktion c = a + b. Imperativ würde c zu einem bestimmten Zeitpunkt die Summe von a und b zugewiesen werden. Wenn sich die Werte von a oder b nach der Zuweisung noch ändern, hat dies keinen Einfluss auf den Wert von c. Er bleibt konstant bis zu einer neuen expliziten Zuweisung. In einem reaktiven System hingegen würde c immer die Summe der aktuellen Werte von a und b widerspiegeln. Selbstverständlich ist dies eine sehr vereinfachte Darstellung. Im weiteren Verlauf dieses Artikels werden wir aber zeigen, wie, basierend auf diesem Prinzip, RxJava und Hystrix dazu beitragen können, den im reaktiven Manifest formulierten Ansprüchen an ein System zu entsprechen.

Reaktiv in Java mit RxJava

Netflix entschied sich bereits vor einigen Jahren, zur Optimierung ihrer Services ein reaktives Programmiermodell einzusetzen. Die Gründe und Ziele sind detailliert in einem Eintrag des Netflix Tech Blog dargestellt. Es stellte sich schnell heraus, dass die Verwendung der Möglichkeiten, in Java asynchron über Änderungen informiert zu werden, also Futures, nicht ausreichend für die Implementierung eines reaktiven Systems ist. Insbesondere bei verschachtelten oder parallelen asynchronen Aufrufen, deren Ergebnisse kombiniert werden müssen, wird es schnell kompliziert und damit fehleranfällig. Die Verwendung von Callbacks ist effizienter, führt bei komplexeren Abhängigkeiten und Verschachtelung aber zwangsläufig in die aus JavaScript berühmte Callback Hell und somit zu schwer wartbarem Code. Netflix entschied sich daher die von Microsoft entwickelten „Reactive Extensions (Rx)“ nach Java zu portieren. Unter dem Namen RxJava wurde es 2014 Open Source zur Verfügung gestellt, nachdem es intern bereits ein Jahr in Produktion verwendet wurde. Unter dem Namen ReactiveX ist unter www.reactivex.io mittlerweile eine Sammlung von Open-Source-Projekten entstanden, die diese Bibliothek für verschiedene Plattformen umsetzen. Neben .NET und Java existieren beispielsweise noch Implementierungen für JavaScript, Scala, Swift und einige andere Sprachen. Die ReactiveX-Seite bietet eine gute grundlegende Dokumentation und die Verlinkung auf viele weiterführende Tutorials.

RxJava ist unter der Apache-2.0-Lizenz auf GitHub verfügbar. Es besteht aus einem einzigen JAR und hat keine weiteren Abhängigkeiten. Eingebunden wird RxJava einfach per Maven Dependency.

<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.0.16</version>

Entsprechende Dependency-Informationen für andere Tools wie Gradle oder Ivy sind ebenfalls auf der GitHub-Startseite zu finden. RxJava ist ab Java 6 lauffähig und lässt sich, da keine weiteren Abhängigkeiten bestehen, prinzipiell in jedem Java-Umfeld einsetzen. Neben der Unterstützung von Java-8-Lambdas kann die Bibliothek auch mit Scala, Groovy, Clojure und Kotlin verwendet werden.

Aber was genau bietet RxJava nun eigentlich? Am einfachsten beschreibt es der einleitende Satz auf der ReactiveX-Seite: „An API for asynchronous programming with observable streams“. Was das genau bedeutet, lässt sich am besten anhand konkreter Beispiele darstellen.

Observable-Pattern um Operatoren erweitert

RxJava erweitert das bekannte Observer-Pattern um Operatoren und Funktionalitäten, die es erlauben, asynchrone Datenströme und Events zu transformieren und zu orchestrieren. Ein so genanntes Observable bildet eine Menge von asynchron zur Verfügung gestellten Daten ab und füllt damit eine Lücke in Java.

lambert-tab1

Tabelle 1: Ein Observable bildet eine Menge von asynchron zur Verfügung gestellten Daten ab

Die in Java 8 eingeführten Streams, die sequenzielle und parallele Operationen auf Datensequenzen (beispielsweise Collections) unterstützten, ähneln RxJava sehr in der Art wie Datenmengen kontrolliert, gefiltert, aggregiert, transformiert und orchestriert werden.

Starten wir also mit einem einfachen Beispiel, in dem wir dieselbe Funktionalität zunächst mit Standard-Java-8-Streams und anschließend mit RxJava implementieren.

List<String> source = Arrays.asList("Hello", "World", "Test");
String result = 
  source.stream()
    .filter(s -> s.contains("e"))
    .map(s -> s.toUpperCase() + " ")
    .reduce("", (a, b) -> a + b);
System.out.println(result + "!");

Es werden alle String-Elemente aus der Eingabeliste nach dem Buchstaben „e“ gefiltert, in Großbuchstaben konvertiert und schließlich zu einem String zusammengefügt. Wenn man diesen Code ausführt, erhält man die Ausgabe „HELLO TEST!“. Mit RxJava kann dieselbe Logik wie folgt umgesetzt werden:

Observable.from(source)
  .filter(s -> s.contains("e"))
  .map(s -> s.toUpperCase() + " ")
  .reduce(new StringBuilder(), StringBuilder::append)
  .subscribe(System.out::print, e -> {}, () -> System.out.println("!"));

Die Ähnlichkeit der beiden Lösungen ist unübersehbar. Es ist jedoch zu beachten, dass einige Methoden, beispielsweise reduce, eine andere Semantik haben. Um auch im Java-8-Beispiel einen StringBuilder zu verwenden, muss man collect(Collectors.joining()) verwenden.

Nun aber zu ein paar wichtigen Grundlagen von RxJava. Zunächst fällt Observable.from(source) ins Auge. Damit wird ein Observable erzeugt, das die Werte der übergebenen Liste als einzelne Werte emittiert. Diese Werte können nun gefiltert und manipuliert werden (filter, map, reduce). Mittels der subscribeMethode wird ein Observer registriert. Dieser Methode können bis zu drei Parameter übergeben werden. Der erste Parameter ist eine Funktion, die für jeden der emittierten Werte aufgerufen werden soll (onNext). Die zweite übergebene Funktion wird im Fehlerfall aufgerufen (onError) und die dritte, wenn das Observable alle Werte emittiert hat, sich also sozusagen beendet (onCompleted).

Ein Observable aus einer statischen Liste zu erzeugen, ist relativ langweilig und erklärt auch nicht, warum man statt des Stream-API RxJava verwenden sollte. Deswegen folgt jetzt ein etwas dynamischeres Beispiel. Listing 1 zeigt ein einfaches Observable, das Eingaben über den bei der Erzeugung angegebenen InputStream entgegennimmt und sie über die onNextMethode zur Verfügung stellt.

public class OnSubscribeFromInputStream implements OnSubscribe<String>{
  private final BufferedReader reader;
  public OnSubscribeFromInputStream(InputStream is) {
    this.reader = new BufferedReader(new InputStreamReader(is));
  }
  @Override
  public void call(Subscriber<? super String> subscriber) {
    try {
      String line;
      while (! subscriber.isUnsubscribed() 
        && (line = reader.readLine()) != null && !line.isEmpty()) {
    subscriber.onNext(line);
      }
    } catch (IOException ex) {
      subscriber.onError(ex);
    }
    if (!subscriber.isUnsubscribed()) {
      subscriber.onCompleted();
      }
  }
}

Sobald kein Wert mehr vorhanden ist, wird die onCompleteMethode und im Fehlerfall onError aufgerufen. Die Implementierung von Observables erfolgt prinzipiell immer nach diesem relativ einfachen Muster. Der folgende Code verwendet nun das OnSubscribeFromInputStream Observable.

Observable.create(new OnSubscribeFromInputStream(System.in)).
subscribe(System.out::println);

Hiermit werden Eingaben auf System.in zeilenweise entgegengenommen und auf System.out ausgegeben. Genauso einfach kann jeder beliebige InputStream, beispielsweise basierend auf einem File, verwendet werden.

Wie schon an den ersten beiden Beispielen erkennbar ist, gibt es verschiedene Möglichkeiten, Observables zu erzeugen. Mit Observable.from können Observables für Arrays und alle Datenstrukturen erzeugt werden, die java.lang.Iterable implementieren. Observable.create dient dazu, Observables basierend auf eigenen OnSubscribeImplementierungen zu erzeugen.

Operatoren lassen keine Wünsche offen

In den Beispielen haben wir bereits einige Operatoren von RxJava benutzt, nämlich create, from, map, reduce, filter und subscribe. Darüber hinaus existieren viele weitere, die alle wesentlichen Bereiche abdecken, wie Erzeugung, Transformation, Filterung, Kombination/Orchestrierung, Fehlerbehandlung und Aggregation. Später werden wir noch auf weitere Operatoren eingehen, beispielsweise auf zip zur Kombination von Ausgaben verschiedener Observables. Eine ausführliche und dokumentierte Auflistung aller Operatoren gibt es hier. Dort werden zur Verdeutlichung der Funktionsweise jedes Operators so genannte Marble-Diagramme verwendet.

Ein MarbleDiagramm (Abb. 1) beschreibt die Auswirkung eines Operators auf das Observable. Es liest sich wie folgt: Von links nach rechts ist der zeitliche Verlauf abgebildet, während die Transformation der Daten von oben nach unten dargestellt ist. Die Linien oberhalb des Operators sind die Quell-Observables. Die Symbole stellen die emittierten Elemente da. Im Operator-Kasten ist die Transformationsfunktion angegeben und darunter das Ausgabe-Observable. Wer die Wirkungsweise eines Rx-Operators interaktiv ausprobieren möchte, kann den Marble-Generator ausprobieren.

ebbing_rxjava_1

Abb. 1: Marble-Diagramm mit Legende

Multicast fasst mehrere Observer zusammen

Normalerweise beginnt ein Observable seine Daten zu emittieren, sobald seine subscribeMethode aufgerufen wird. Mithilfe der publishOperation ist es möglich, dieses Verhalten zu verhindern und mehrere Observer für ein Observable zu registrieren. Diese Observer erhalten dann alle exakt denselben Datenstrom. Andernfalls wird für jeden Subscriber die OnSubscribeFunktion aufs Neue ausgeführt.

  ConnectableObservable<String> io = Observable.create(
    new OnSubscribeFromInputStream(System.in)).publish(); 
  io.filter(s -> s.contains("ok")).subscribe(System.out::println);
  io.filter(s -> s.contains("error")).subscribe(System.err::println);
  io.connect(); 

Der publishAufruf bei der Erzeugung des Observables sorgt dafür, dass das Observable nicht beim ersten Aufruf von subscribe seine Arbeit aufnimmt, sondern erst nach Aufruf der connectMethode. Es ist auch noch danach möglich, Observer zu registrieren, die mithilfe der replayMethode sogar dieselbe Datensequenz erhalten können, wie die schon zuvor registrierten.

Beispiel: Produktempfehlung

Nachdem wir uns mit den Grundlagen von RxJava vertraut gemacht haben, kommen wir nun zu einem praktischen Beispiel. Nehmen wir an, in einem Webshop sollen dem Kunden bis zu zehn individuelle Produktempfehlungen angezeigt werden. Eine Empfehlung besteht aus dem Produkt selbst, der Bewertung und dem aktuell gültigen Preis. Die Daten werden über REST-Schnittstellen von verschiedenen Microservices zur Verfügung gestellt (Abb. 2):

  • Produkt: Verwaltet die Stammdaten der Produkte und erstellt eine Liste mit Produkten, für die sich der Kunde interessieren könnte.
  • Bewertungssystem: Liefert zu einem Produkt die durchschnittliche Kundenbewertung inklusive Kommentaren.
  • Preiskalkulation: Berechnet zu einem Produkt anhand von verschiedenen Faktoren den aktuell gültigen Preis.
ebbing_rxjava_2

Abb. 2: Webshop Systemarchitektur

Abbildung 3 zeigt den Ablauf der Aufrufe. Zuerst wird die Liste der Produkte abgefragt. Davon abhängig muss jedes Produkt um Bewertung und Preis angereichert werden, d. h. diese Aufrufe können erst getätigt werden, sobald das Ergebnis des ersten Aufrufs vorliegt.

Insgesamt haben wir bei zehn Produkten 1+2*10 = 21 REST-Aufrufe. Diese sequenziell synchron aufzurufen, wäre keine gute Idee. Angenommen jeder REST-Aufruf benötigt durchschnittlich 200 ms, bräuchte man über vier Sekunden. So lange möchte kein Kunde warten. Zum Glück kann man dies aber optimieren. Aufrufe, die nicht voneinander abhängig sind, lassen sich gleichzeitig parallel ausführen. So lässt sich die gesamte Ausführungsdauer auf unter eine halbe Sekunde senken. Leider macht das die Umsetzung auch komplizierter. Man hat es nun für gewöhnlich mit geschachtelten (pro Abhängigkeit eine Ebene) Future Callbacks zu tun und muss sich über Synchronisation der Threads Gedanken machen, beispielsweise mithilfe von CountDownLatch. Wir wollen nun zeigen, dass es auch anders geht. Die reaktive Variante ist von der Laufzeit identisch zur komplizierten asynchronen Lösung, aber genauso gut lesbar wie der synchrone Ansatz.

ebbing_rxjava_3

Abb. 3: Ablauf der Webservices-Aufrufe

Reaktiver REST-Client macht es einfacher

Fangen wir mit dem REST-Client an. Mit dem Fluent-Client-API von JAX-RS sieht ein asynchroner HTTP-GET-Aufruf wie folgt aus:

Future<T> response = client.target(uri).request().async().get(responseType);

Für den aufzurufenden URI wird ein WebTarget erzeugt und für dieses ein asynchroner Invoker, dessen getMethode das Future auf die Antwort des Servers liefert. Der Response-Body wird automatisch in den übergebenen Typ konvertiert. Bei Futures hat man allerdings das Problem, dass der getAufruf blockiert. Möchte man dies vermeiden, kann man dem Invoker ein InvocationCallback mitgeben, der aufgerufen wird sobald der Client die Antwort empfangen hat. Alternativ gibt es aber für die JAX-RS-Referenzimplementierung Jersey auch eine reaktive Erweiterung, die wir verwenden wollen und mit folgender Maven Dependency erhalten:

  <groupId>org.glassfish.jersey.ext.rx</groupId>
  <artifactId>jersey-rx-client-rxjava</artifactId>
  <version>2.22.1</version>

Die Benutzung ist ebenfalls fluent und erfolgt analog zum vorherigen Beispiel. Einstiegspunkt ist die Klasse RxObservable. Statt async() ruft man nun rx() auf und erhält ein Observable zurück.

Observable<T> response = RxObservable.from(client)
  .target(uri)
  .request()
  .rx()
  .get(responseType)

Da wir diese Aufrufkette im Folgenden häufiger benötigen, haben wir sie in die Hilfsmethode createRestCallObservable (Listing 2) ausgelagert. Diese liefert, wie oben beschrieben, ein Observable zurück, das ein Element mit dem Ergebnis des Aufrufs emittiert, sobald die Antwort des Servers vorliegt.

private Client client = ClientBuilder.newClient();
  public <T> Observable<T> createRestCallObservable(Class<T> clazz, String uri) {
  return
    RxObservable.from(client)
    .target(uri)
    .request()
    .rx()
    .get(clazz)
  ;
}

Genau wie die obige Variante blockiert der Aufruf den aktuellen Thread nicht. In beiden Fällen wird die Asynchronität über einen Executor-Thread-Pool erreicht, dessen Threads blockieren und sich auch nicht unterbrechen lassen. Damit die Threads nicht unendlich lange blockiert sind, sollte man unbedingt darauf achten entsprechende Time-outs zu setzen. In Version 3 wird Jersey übrigens endlich NIO unterstützen.

Betrachten wir nun noch den Fehlerfall. Wenn beispielsweise eine SocketTimeoutException geworfen wird, weil der Server zu langsam antwortet, so emittiert das Observable keinen Wert, sondern ruft stattdessen die onErrorMethode des Subscribers auf. Nun gibt es aber häufig die Situation, dass man den Datenstrom trotzdem mit einem Fallback-Wert fortsetzen möchte. So wollen wir eine Produktempfehlung auch dann anzeigen, wenn die dazugehörige Bewertung nicht ermittelt werden kann. Dies erreicht man wie folgt:

createRestCallObservable(…).onErrorReturn(error -> new Bewertung())

Die REST-Calls richtig orchestrieren

Jetzt müssen wir die ganzen REST-Calls miteinander verknüpfen und zu einem Ergebnis zusammenführen (Listing 3). Zum besseren Verständnis wird der Datenstrom mithilfe eines Marble-Diagramms dargestellt (Abb. 4).

String URI_PRODUKTEMPFEHLUNG = "http://localhost:8080/empfehlungen/"; 
String URI_BEWERTUNG = "http://localhost:8081/bewertungen/";
String URI_PREIS = "http://localhost:8082/preise/";

public List<Empfehlung> getEmpfehlungen(String user) {
    return createRestCallObservable(Produkt[].class,URI_PRODUKTEMPFEHLUNG+user)
.onErrorReturn(error -> new Produkt[] {})
  .flatMap(Observable::from)
  .flatMap(produkt-> Observable.zip(
      createRestCallObservable(Bewertung.class, URI_BEWERTUNG+produkt.getId())
        .onErrorReturn(error -> new Bewertung()), 
      createRestCallObservable(Double.class, URI_PREIS+produkt.getId())
           .onErrorReturn(error -> null), 
      (bewertung, preis) -> new Empfehlung(produkt, bewertung, preis)))
  .toSortedList()
  .toBlocking()
  .single();
}
ebbing_rxjava_4

Abb. 4: Marble-Diagramm für die Ermittlung der Produktempfehlungen

Zuerst rufen wir mit createRestCallObservable den REST Service für die empfohlenen Produkte auf und erhalten ein Observable<Produkt[]>, auf dem wir flatMap aufrufen.

<R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func)

Mit flatMap wird auf jedes Ausgabeelement eines Quell-Observables mittels einer angegebenen Transformationsfunktion jeweils ein neues Observable erzeugt. Die von diesen Observables emittierten Werte werden dann zu der Ausgabe eines einzigen Observables zusammengefasst (flattened). In unserem Beispiel haben wir ein Quellelement, nämlich das Array von Produkten. Hieraus erzeugen wir mit Observable.from ein Observable, das die einzelnen Produkte emittiert.

Als Nächstes ermitteln wir für jedes Produkt sowohl die Bewertung als auch den Preis, um daraus eine Empfehlung zu erstellen, sobald beide Antworten vorliegen. Hierfür verwenden wir wieder den flatMap-Operator. Diesmal wird für jedes Inputelement genau ein Outputelement (nämlich die Empfehlung) erstellt. Mihilfe des Operators zip kann man zwei Observables zu einem Observable vereinigen.

<T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction)

Dabei wird für die beiden Elemente an der jeweils gleichen Position die übergebene zip-Funktion aufgerufen, die den zu emittierenden Wert berechnet.

Wichtig an dieser Stelle ist zu erwähnen, dass aufgrund der flatMapOperation die Reihenfolge der emittierten Empfehlungen nicht mehr der Reihenfolge des Produkt-Observables entsprechen muss, sondern von dem Eintreffen der REST-Antworten abhängt. Daher ist es nicht möglich, ein Observable mit allen Bewertungen und ein Observable mit allen Preisen zu erzeugen und diese dann mit zip zu vereinigen, da man Gefahr liefe, die Bewertung mit dem Preis eines anderen Produkts zu vereinigen. Das ist ein beliebter Fehler.

Eigentlich wären wir an diese Stelle schon fertig, wenn man beispielsweise eine WebSocket-Verbindung zum Client hätte und, sobald eine Empfehlung vorliegt, diese an den Webclient sendet.

Wir möchten an dieser Stelle aber noch zeigen, wie man alle Empfehlungen als sortierte Liste synchron zurückgibt. Der Operator sort sammelt alle Elemente der Quelle und sortiert diese, sobald das Completed-Event kommt. Als Ergebnis liefert er ein Element mit der sortierten Liste.

Zum Schluss muss der aktuelle Thread analog zu Future.get() so lange blockiert werden, bis das Ergebnis vorliegt, um dieses als Serviceergebnis zurückzugeben. Dafür wandeln wir das Observable mit toBlocking in ein BlockingObservable um, das Methoden anbietet, die warten, bis das zugrunde liegende Observable den gewünschten Wert emittiert, und diesen dann zurückgeben. In unserem Beispiel rufen wir single() auf, da wir genau ein Element erwarten, und erhalten als Rückgabewert die sortierte Liste. Bei mehreren Elementen könnte man beispielsweise toIterable() aufrufen.

Resilience mit Hystrix

Was jetzt noch fehlt, sind die Resilience-Eigenschaften. Wenn ein verwendeter Microservice – beispielsweise das Bewertungssystem – überlastet ist und gar nicht oder nur noch sehr langsam antwortet, soll das unseren Empfehlungsservice nicht negativ beeinflussen. Auch sollen im Fehlerfall nicht unnötig viele Ressourcen gebunden werden. Beispielsweise sollen I/O-Threads nicht bis zum Time-out vergeblich auf eine Antwort warten. Daher wollen wir unser Beispiel noch um das Circuit-Pattern erweitern. Genau hierfür wurde die Bibliothek Hystrix von Netflix entwickelt, die mit folgender Maven Dependency eingebunden wird

<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.4.21</version>

Wir wollen an dieser Stelle nicht genauer auf die Details von Hystrix eingehen, da sie schon in vorherigen Artikeln [1] ausführlich beschrieben wurden. Aber wir wollen zeigen, wie man ein Hystrix Command reaktiv ausführen und somit einfach in unser Beispiel integrieren kann.

Für die reaktive Verwendung leitet man von der Klasse HystrixObservableCommand ab und implementiert die Methode construct, die ein Observable zurückgibt. In unserem Fall geben wir das Observable des REST-Service-Aufrufs zurück. Optional kann noch die Methode resumeWithFallback implementiert werden, die ebenfalls ein Observable zurückgibt, das den Fallback-Wert emittiert.

Zur Ausführung des Hystrix Commands stehen zwei Methoden zur Verfügung, die jeweils ein Observable zurückgeben. Das Observable emittiert die Werte des construct-Observable und im Fehlerfall die des resumeWithFallback-Observable.

  • observe(): Startet die Ausführung sofort (hot observable)
  • toObservable(): Startet die Ausführung, sobald sich ein Subscriber registriert (cold observable)

Mithilfe der Klasse HystrixWrapper (Listing 4) können wir ein beliebiges Observable mit der Hystrix-Funktionalität dekorieren und sparen uns innere Klassen.

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixObservableCommand;
import rx.Observable;

public class HystrixWrapper<T> extends HystrixObservableCommand<T> {

  private Observable<T> observable;
  private Observable<T> fallback;
  
  public static <T> Observable<T> from(Observable<T> observable, T fallbackValue, String commandKey) {
      return new HystrixWrapper<>(
    observable, Observable.just(fallbackValue), commandKey)
.toObservable();
  }

  public HystrixWrapper(Observable<T> o, Observable<T> fallback, String commandKey) {
    super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(commandKey))
    .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey))  );
    this.observable = o;
    this.fallback = fallback;
  }

  @Override
  protected Observable<T> construct() {
    return observable;
  }

  @Override
  protected Observable<T> resumeWithFallback() {
    return fallback != null ? fallback : super.resumeWithFallback();
  }
}

Die statische Methode from erzeugt eine Instanz und liefert ein cold Observable zurück. Observable.just ist identisch zu Observable.from, nur mit dem Unterschied, dass nicht mehrere Elemente, sondern nur ein einzelnes (der Fallback-Wert) emittiert wird.

new HystrixWrapper<>(observable, Observable.just(fallbackValue), commandKey)
.toObservable();

Der dritte Parameter gibt den Key des Hystrix Commands an. Somit sieht die Hystrix-Variante von unserem Beispiel wie folgt aus:

HystrixWrapper.from(createRestCallObservable(…), new Produkt[] {}, "Produkt")
.flatMap(Observable::from)
.flatMap(produkt-&gt Observable.zip(
  HystrixWrapper.from(createRestCallObservable(…), new Bewertung(), "Bewertung"), 
  HystrixWrapper.from(createRestCallObservable(…), null, "Preis"), 
    (bewertung, preis) -&gt new Empfehlung(produkt, bewertung, preis)))

Fazit

Natürlich konnten wir in diesem Artikel nur einen kleinen Einblick vermitteln und haben Features wie Scheduler und viele Operatoren nicht vorstellen können. Für den effektiven Einsatz von RxJava ist unserer Meinung nach einige Einarbeitung und auch ein gewisses Umdenken notwendig. Unserer Erfahrung nach ist häufig ein Verständnis der genauen Funktionsweise erforderlich, um korrekten und robusten Code zu schreiben. Ist diese Hürde allerdings erst einmal genommen, lässt sich auch komplexe asynchrone Datenverarbeitung elegant, verständlich und effektiv umsetzen. Durch die nahtlose Integration mit Hystrix lässt sich einfach die Fehlertoleranz und Stabilität erhöhen, kurz das, was der Begriff Resilience allgemein zusammenfasst.

Aufmacherbild: Netflix is a global provider via Shutterstock / Urheberrecht: Kaspars Grinvalds

Verwandte Themen:

Geschrieben von
Christian Lambert
Christian Lambert
Christian Lambert ist Diplom-Informatiker und seit vielen Jahren für die adesso AG tätig. Als Softwarearchitekt ist er in Kundenprojekten für die Konzeption und Entwicklung von Java-basierten Geschäftsanwendungen verantwortlich.
Matthias Ebbing
Matthias Ebbing
Matthias Ebbing ist als Senior-Softwarearchitekt für die adesso AG tätig. Seine Schwerpunkte liegen in dem Entwurf Java-basierter Geschäftsanwendungen und der Performanceanalyse und -optimierung.
Kommentare

Hinterlasse einen Kommentar

Hinterlasse den ersten Kommentar!

avatar
400
  Subscribe  
Benachrichtige mich zu: