Suche
Programmieren ohne Blockieren

Reaktive Programmiermodelle im Vergleich

Oliver Heger

© Shutterstock/whiteMocca

Reaktive Programmierung verspricht, moderne Hardware effizient zu nutzen und den Anwender von lästigen Wartezeiten zu befreien. Doch sie hat ihren Preis: Sie stellt hohe Anforderungen an uns Entwickler und zwingt uns, ausgetretene Pfade zu verlassen. Dieser Artikel vergleicht den reaktiven Programmierstil mit dem herkömmlichen.

Es kann ein sehr zufriedenstellendes Gefühl sein, eine Aufgabenliste von oben nach unten abzuarbeiten. Man fokussiert sich exklusiv auf das, was gerade ansteht, und setzt anschließend einen Haken darunter. Offensichtlich führt solch ein rein serieller Ansatz nicht unbedingt zu optimalen Ergebnissen. Lautet eine der Aufgaben beispielsweise „Wäsche waschen“, macht es wenig Sinn, die Waschmaschine zu bestücken und dann gebannt durch das Bullauge zu starren, bis das Programm abgelaufen ist, während man doch nebenher bereits den Fußboden wischen könnte. Das andere Extrem besteht darin, eine Vielzahl von Aufgaben nahezu gleichzeitig zu starten, den Fortschritt zu überwachen und aktiv zu werden, sobald es erforderlich wird oder ein Ergebnis eintritt. Durch geschicktes Jonglieren mit den einzelnen Aktivitäten lässt sich die eigene Produktivität deutlich steigern – vorausgesetzt, Sie wissen noch, wo Ihnen der Kopf steht.

Die Analogie lässt sich ganz gut auf die Softwareentwicklung übertragen. Die meisten von uns werden wohl mit einem seriellen Ansatz aufgewachsen sein und sich damit am wohlsten fühlen. Ein Programm besteht aus einzelnen Anweisungen, die der Computer Schritt für Schritt ausführt. Das ist intuitiv, alle Ergebnisse liegen dann vor, wenn man sie braucht, und man muss sich nicht mit Nebenläufigkeit und anderen esoterischen Konzepten herumschlagen. Leider ist ein solches Vorgehen für moderne Hardware ähnlich ineffizient wie das Waschmaschinenanalogon. Aus der Sicht eines Prozessorkerns dauert eine I/O-Operation eine halbe Ewigkeit. Zeit, die er wesentlich sinnvoller nutzen könnte, als untätig auf das Eintreffen der Ergebnisse zu warten. Um den Anwender mit Software zufriedenzustellen, die auf seinem Rechner mit der erwarteten Geschwindigkeit läuft, müssen wir uns schon mehr anstrengen. Ein Lösungsansatz besteht darin, den Prozessor eben nicht warten zu lassen, sondern die Zeit bis zum Eintreffen der Daten mit alternativen Aufgaben zu überbrücken. Das erhöht natürlich den Koordinationsaufwand.

Dieser Artikel untersucht verschiedene Ansätze zur reaktiven Programmierung und ihren Einfluss auf die Komplexität des resultierenden Codes. Dabei werden sowohl Methoden betrachtet, die Java von sich aus mitbringt, als auch solche, die populäre Open-Source-Frameworks anbieten. Gegenstand der Untersuchung ist ein relativ einfacher Anwendungsfall: das Einlesen einer Datei. Das ist ziemlich übersichtlich und wurde vermutlich von jedem bereits in der einen oder anderen Form implementiert. Die Dateioperation steht dabei stellvertretend für eine ganze Klasse ähnlicher Probleme, bei denen Daten von externen Quellen – typischerweise über blockierende Aufrufe – angefordert werden. Beispiele sind Zugriffe auf das Netzwerk, Aufrufe von entfernten Services oder Datenbankabfragen. Die angestellten Betrachtungen lassen sich größtenteils auf alle diese Probleme übertragen. Um eine Grundlage für weitere Vergleiche zu haben, starten wir mit einer möglichst einfachen, blockierenden Implementierung zum Einlesen einer Datei. Mit den Möglichkeiten einer modernen Java-Version könnte eine Lösung dieses Problems so aussehen (der vollständige Code für alle im Artikel referenzierten Beispiele ist in einem GitHub-Repository verfügbar):

public void readFile(Path path) throws IOException {
byte[] bytes = Files.readAllBytes(path);
// tue irgendwas mit den Daten
}

Mithilfe von Bequemlichkeitsfunktionen aus der java.nio.file.Files-Klasse genügt eine einzelne Zeile, um die Aufgabe zu lösen. Das ist natürlich eine sehr naive und nicht gerade optimierte Implementierung. Zunächst kann die Verarbeitung der Daten erst dann erfolgen, wenn die komplette Datei gelesen wurde. Außerdem geht dieses Fragment von der Annahme aus, dass die Datei als Ganzes in den Hauptspeicher passt. Geht es um die Verarbeitung von Massendaten, dürfte sich dieser Ansatz als wenig geeignet erweisen. Beide Kritikpunkte lassen sich durch den Einsatz einer Schleife entschärfen, die über einen traditionellen InputStream die Datei blockweise liest und die einzelnen Blöcke an einen Konsumenten zur Weiterverarbeitung übergibt. Das ist immer noch recht überschaubar und sollte den meisten vertraut sein. Der Nachteil, dass der ausführende Thread jeweils für die Leseoperationen blockiert wird, ist beiden Lösungen gemeinsam.

(Brave) New I/O

In den Anfangstagen von Java konnten I/O-Operationen ausschließlich blockierend durchgeführt werden. Eine andere Option hat die Standardbibliothek schlichtweg nicht geboten. Seitdem hat die Welt sich weiter gedreht, und mit Java 1.4 ist das nio-Paket (für New I/O) hinzugekommen. Darin enthalten sind erstmals Klassen, über die sich Datei- oder Netzwerkoperationen asynchron, also im Hintergrund, ausführen lassen. Ich weiß nicht, wie es Ihnen geht, aber ich bin mit dem nio-Paket nie so richtig warm geworden. Die verwendeten Abstraktionen unterscheiden sich stark von den gewohnten Stream-Klassen aus dem alten io-Paket, was ihren Einsatz nicht gerade intuitiv macht: Sie müssen Buffer flippen, mit Channels hantieren und Selektoren orchestrieren. Das scheint die These zu bestätigen, dass man tief in die Trickkiste greifen muss, um effiziente Ein-/Ausgabeoperationen hinzukriegen.

Mit Java 1.7 sind dem Paket einige neue Klassen hinzugefügt worden, die sich etwas einfacher nutzen lassen. Eine davon ist AsynchronousFileChannel, die im folgenden Beispiel Verwendung findet. Die Klasse bietet eine read()-Methode, die einen Puffer zur Ablage der Ergebnisse, die Position, ab der gelesen werden soll, sowie ein Objekt erwartet, das nach Beendigung der Operation aufgerufen werden soll. Hier begegnen wir bereits einem grundlegenden Muster der Reaktiven Programmierung: Methoden liefern kein Ergebnis zurück, sondern erwarten ein Rückruf- oder Callback-Objekt als Parameter. Der aktuelle Thread wartet nicht auf das Ergebnis der Methode, sondern läuft direkt weiter. Die eigentliche Aktion wird dann in einem weiteren Thread im Hintergrund ausgeführt. Sobald Ergebnisse vorliegen, werden sie an das Callback-Objekt zur weiteren Verarbeitung übergeben. Da der Callback-Aufruf durch einen anderen Thread erfolgt, sind entsprechende Vorkehrungen zur Synchronisation der Daten erforderlich. AsynchronousFileChannel arbeitet mit einem Callback-Objekt vom Typ CompletionHandler. Das Interface definiert eine Methode für den Erfolgsfall und eine zur Fehlerbehandlung. Hier zeigt sich, dass die Klasse noch aus der Ära vor Java 8 stammt: CompletionHandler ist kein funktionales Interface und kann daher nicht durch einen Lambdaausdruck umgesetzt werden.

Listing 1 zeigt eine mögliche Implementierung basierend auf dieser Klasse. Einstiegspunkt ist die readFile()-Methode, die den ersten Block der zu lesenden Datei anfordert. Sie übergibt dabei eine Callback-Implementierung, die die weitere Koordination der Leseoperation übernimmt. Wenn Daten eintreffen, wird der nächste Block angefordert, so lange, bis das Dateiende erreicht ist. Zur Verwaltung des aktuellen Zustands der Operation (wie die aktuelle Leseposition oder die bereits erhaltenen Daten) kommt ein spezielles Kontextobjekt zum Einsatz. Das umgeht das Problem der Datensynchronisation zwischen mehreren Threads: Der Kontext wird direkt an die read()-Methode übergeben und ist lokal für die aktuelle Operation; er wird also nicht gemeinsam von verschiedenen Threads genutzt. Als Ergebnis liefert readFile() ein CompletableFuture-Objekt zurück. Es wird als komplett markiert, wenn alle Daten gelesen wurden, oder ein Fehler aufgetreten ist. Offensichtlich ist diese Lösung um Größenordnungen komplexer als die blockierende Referenzimplementierung. In eigenen Projekten möchte man auf diese Komplexität sicher gerne verzichten. Daher wäre es praktisch, wenn man auf eine entsprechende Funktionalität in Frameworks zurückgreifen könnte.

public class AsyncFileReader {
  private static final int BUF_SIZE = 1024;
  private static final int CONTENT_BUFFER = 8192;

  /** The completion handler used by this instance. */
  private final CompletionHandler<Integer, ReadContext> handler =
    createHandler();

  public CompletableFuture readFile(Path path) {
    CompletableFuture future = new CompletableFuture<>();
    try {
      AsynchronousFileChannel channel = AsynchronousFileChannel.open(path,
        StandardOpenOption.READ);
      ReadContext context = new ReadContext(channel, future);
      readBlock(context);
    } catch (IOException e) {
        future.completeExceptionally(e);
    }
    return future;
  }

  // Triggers another read operation for a chunk of data.
  private void readBlock(ReadContext context) {
    context.buffer.clear();
    context.channel.read(context.buffer, context.position, context,
      handler);
  }

  // Creates a handler to process the results of a read operation.
  private CompletionHandler<Integer, ReadContext> createHandler() {
    return new CompletionHandler<Integer, ReadContext>() {
      @Override
      public void completed(Integer count, ReadContext context) {
        if (count < 0) { // Dateiende erreicht
          context.close();
          context.future.complete(context.content.toString());
        } else { // Lese nächsten Block
            context.buffer.flip();
            byte[] data = new byte[count];
            context.buffer.get(data);
            context.content.append(new String(data));
            context.position += count;
            readBlock(context);
          }
        }

        @Override
        public void failed(Throwable exc, ReadContext context) {
          context.fail(exc);
        }
    };
  }

  // Context information for a file read operation.
  private static class ReadContext {
    private final AsynchronousFileChannel channel;
    private final CompletableFuture future;
    private final ByteBuffer buffer;
    private final StringBuilder content;
    private int position;

    ReadContext(AsynchronousFileChannel c, CompletableFuture f) {
      channel = c;
      future = f;
      content = new StringBuilder(CONTENT_BUFFER);
      buffer = ByteBuffer.allocate(BUF_SIZE);
    }

    public void fail(Throwable ex) {
      future.completeExceptionally(ex);
      close();
    }

    public void close() {
      try {
        channel.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
}

Vert.x

Solche Frameworks gibt es tatsächlich. Eins davon ist Vert.x. Entwickelt unter dem Dach der Eclipse Foundation, stellt dieses Framework eine Reihe nicht blockierender Implementierungen für unterschiedliche Anwendungsfälle zur Verfügung, unter anderem für HTTP-Aufrufe, Socket-Kommunikation und auch für Dateisystemoperationen. Alle diese Dienste folgen dabei einem einheitlichen Programmiermodell basierend auf (diesmal lambdafreundlichen) Callbacks. Außerdem erlaubt Vert.x, Logik in sogenannten Verticles zu organisieren. Ein Verticle entspricht in etwa einem Aktor aus dem Aktoren-Modell. Vert.x stellt sicher, dass ein Verticle immer nur von einem bestimmten Thread aus aufgerufen wird, auch wenn der Aufruf aus einem Callback heraus erfolgt. Dadurch wird es möglich, Zustand in Verticles zu halten, ohne besondere Vorkehrungen zur Synchronisation mit anderen Threads zu treffen. Verglichen mit der Handhabe von Callbacks der AsynchronousFileChannel-Klasse ist das ein deutlicher Vorteil. Betrachten wir nun, wie sich eine einfache Operation zum Lesen einer Datei mit Vert.x realisieren lässt (Listing 2).

private void readFile(String path) {
  vertx.fileSystem().readFile(path, res -> {
    if (res.succeeded()) {
      String content = res.result().toString();
      // Verarbeite Daten aus der Datei
      ...
    } else {
      LOG.error("Reading file failed!", res.cause());
    }
  });
}

Der FileSystem-Service von Vert.x bietet eine readFile()-Methode zum Lesen einer Datei als Ganzes. Sie erwartet als Callback ein Objekt vom Typ Handler<AsyncResult><Buffer>. Das ist der von Vert.x standardmäßig genutzte Typ für Operationen, die fehlschlagen können. Inklusive einfacher Fehlerbehandlung sind wir damit also wieder in Regionen, die nahe bei der Referenzimplementierung liegen.

Rückrufaktionen

Ermutigt durch dieses Resultat können wir uns jetzt eines etwas komplizierteren Problems annehmen. Die Datei soll nicht nur gelesen, sondern auch modifiziert und wieder geschrieben werden. Als Modifikation implementieren wir eine einfache Base64-Codierung. Außerdem soll die Aktion fehlschlagen, wenn die Ausgabedatei bereits existiert (Abb. 1). Eine Umsetzung dieser neuen Anforderungen sehen Sie in Listing 3.

Abb. 1: Umzusetzende Dateiverarbeitung

Abb. 1: Umzusetzende Dateiverarbeitung

private void processFile(Message<Objectglt; msg) {
  String path = String.valueOf(msg.body());
  String outPath = path + ".processed";
  vertx.fileSystem().exists(outPath, rEx -glt; {
    if (rEx.failed()) {
      sendResponse(msg, false, "Exists check failed");
    } else if (rEx.result()) {
      sendResponse(msg, false, "File already exists");
    } else {
      vertx.fileSystem().readFile(path, rRead -glt; {
        if (rRead.failed()) {
          sendResponse(msg, false, "Read failed: " + rRead.cause());
        } else {
          byte[] content = rRead.result().getBytes();
          String encoded = printBase64Binary(content);
          vertx.fileSystem().writeFile(outPath, Buffer.buffer(encoded), rWrt -glt; {
            if (rWrt.succeeded()) {
              sendResponse(msg, true, "Generated " + outPath);
            } else {
              sendResponse(msg, false, "Write failed: " + rWrt.cause());
            }
          });
        }
      });
    }
  });
}

Hier sehen wir deutlich die Nachteile des reinen Callback-Ansatzes. Anstelle eines einzelnen Callbacks haben wir es jetzt mit drei ineinander verschachtelten zu tun. In jedem davon passiert zunächst eine Fehlerprüfung. Solange noch alles in Ordnung ist, erfolgt der nächste nicht blockierende Aufruf unter Angabe eines weiteren Callbacks. Die Einrückungstiefe wird immer größer, und falls Sie einmal in die Verlegenheit kommen sollten, etwas an der Ablauflogik zu ändern, müssen Sie erst die passende Klammerungsebene identifizieren. Willkommen in der Callback-Hölle! Das sollte doch irgendwie besser gehen.

Eine bessere Zukunft

Die Grenzen eines auf Callbacks beruhenden Programmiermodells sind natürlich auch den Entwicklern von Vert.x bewusst. Daher gibt es einige Auswege aus dem Dilemma. Einer davon beruht auf Futures. Ein Future-Objekt hat die Semantik, dass es irgendwann einmal einen Wert enthalten wird. Wenn dieser Wert eintrifft, können weitere Aktionen ausgelöst werden, beispielsweise die Ausführung einer Aktion. Future-Objekte können als eine Art implizite Callbacks gesehen werden [1]. Das Konzept stammt ursprünglich aus der funktionalen Programmierung und kann daher gut kombiniert und zu größeren Einheiten zusammengefasst werden. Das Beispiel aus dem letzten Abschnitt lässt sich so umschreiben, dass für die einzelnen Schritte jeweils ein Future-Objekt angelegt wird. Vert.x stellt dafür eine Future-Implementierung bereit, die die Vert.x-typischen Handler-Interfaces erweitert, und daher anstelle eines Callbacks an einen Service übergeben werden kann. Daraus resultieren drei Future-Objekte, die wir per Komposition in ein neues überführen. Das erhält einen Callback, der nur noch für das Gesamtergebnis zuständig ist. Das deutlich besser lesbare Ergebnis findet sich in Listing 4.

private void processFile(Message<Object> msg) {
  String path = String.valueOf(msg.body());
  String outPath = path + ".processed";

  existsFile(outPath)
    .compose(res -> !res ? Future.succeededFuture() :
      Future.failedFuture(new IOException("File already exists")))
    .compose(v -> readFile(path))
    .map(b -> Buffer.buffer(printHexBinary(b.getBytes())))
    .compose(buf -> writeFile(outPath, buf))
    .setHandler(res -> sendResponse(msg, res.succeeded(),
      res.succeeded() ? "Generated " + outPath :
        res.cause().getMessage()));
}

private Future<Boolean> existsFile(String path) {
  Future<Boolean> result = Future.future();
  vertx.fileSystem().exists(path, result);
  return result;
}

private Future<Buffer> readFile(String path) { ... }
private Future<Void> writeFile(String outPath, Buffer data) { ... }

Aus der verschachtelten Hierarchie von Callbacks wurde eine lineare Kette von Future-Kompositionen. Das vereinfacht auch stark die Fehlerbehandlung, denn die Kette schlägt fehl, sobald ein Teil davon einen Fehler produziert.

Mit Future-Objekten haben wir ein weiteres Muster der reaktiven Programmierung kennen gelernt. Sie sind einem reinen Callback-basierten Ansatz überlegen. Aufgrund ihrer Kombinierbarkeit eignen sie sich auch gut dafür, eine Reihe voneinander unabhängiger Aufgaben zu parallelisieren [1]. Dazu werden die Aufgaben angestoßen und auf Futures abgebildet; sie werden damit im Hintergrund in eigenen Threads ausgeführt. Die einzelnen Future-Objekte lassen sich wiederum zu einem kombinierten Future-Objekt zusammenfassen, das automatisch aufgerufen wird, sobald alle Teilergebnisse vorliegen (Kasten: „Futures in Java“).

„Futures“ in Java

Bezüglich der in den betrachteten Frameworks verwendeten Future-Implementierungen kann es zu Verwechslungen mit Klassen aus der Java-Standardbibliothek kommen. Java besitzt schon seit Längerem eine Future-Klasse. Diese ist aber für reaktive Programmieransätze ungeeignet, denn sie bietet nur eine Möglichkeit, an das gespeicherte Ergebnis heranzukommen: blockierendes Warten.
Erst die mit Java 8 hinzugekommene CompletableFuture-Klasse besitzt die erforderlichen Eigenschaften und befindet sich damit in etwa auf dem Niveau der Frameworkklassen.

Akka

Als Ergänzung zu den Betrachtungen über Vert.x möchte ich noch kurz auf ein weiteres reaktives Programmiermodell eingehen, das von dem ebenfalls populären Framework Akka angeboten wird. Akka ist in Scala geschrieben und bevorzugt daher eher funktionale Ansätze. Es verzichtet weitestgehend auf Callbacks und setzt stattdessen auf Futures, um nebenläufige Aktivitäten zu koordinieren. Eine Einführung in Akka und das von dem Framework implementierte Aktorenmodell ist in einer früheren Ausgabe des Java Magazins erschienen [2]. Der Teil von Akka, um den es mir hier geht, ist das Streamingmodul. Es beinhaltet eine Implementierung der Reactive-Streams-Spezifikation. Dabei geht es im Wesentlichen um das Versenden, Empfangen und Verarbeiten großer Datenströme unter den Rahmenbedingungen, dass nur begrenzter Speicher genutzt wird und sich Sender und Empfänger auf eine für beide akzeptable Verarbeitungsgeschwindigkeit einigen können.

Akka-Streaming hat die eher technischen Interfaces aus der Spezifikation mit einem intuitiven und funktionalen API ergänzt. Es gibt Abstraktionen für Datenquellen (Sources), Datensenken (Sinks) und Operatoren (Flow stages), die man dazwischenschalten kann, um Daten zu filtern oder zu manipulieren. Da es auch Source- und Sink-Implementierungen für Dateien gibt, lässt sich unser Referenzbeispiel auch mit dieser Bibliothek umsetzen – mit nur geringfügigem Mehraufwand sogar in einer erweiterten Form. Listing 5 (in Akkas nativer Sprache Scala geschrieben) zeigt ein Fragment, das eine Datei zeilenweise einliest, Leer- und Kommentarzeilen herausfiltert, Text in Kleinbuchstaben umwandelt und das Ergebnis in eine neue Datei schreibt. Zurück kommt ein Future-Objekt, das benachrichtigt wird, wenn die Verarbeitung komplett ist.

def processFile(input: Path, output: Path)(implicit system: ActorSystem,
  mat: ActorMaterializer): Future[IOResult] = {
  val source = FileIO.fromPath(input)
  val sink = FileIO.toPath(output)
  source.via(Framing.delimiter(ByteString("\r"), 1024, allowTruncation = true))
    .map(_.utf8String.trim)
    .filter(s => s.length > 0 && !s.startsWith(CommentPrefix))
    .map(s => ByteString(s.toLowerCase(Locale.ENGLISH)+System.lineSeparator()))
    .runWith(sink)
}

Ich möchte nicht auf die Details eingehen, sondern den Code für sich selbst sprechen lassen. Wie schon bei Futures zeigt sich auch hier, dass Ansätze aus der funktionalen Programmierung gut mit dem reaktiven Programmierstil zusammenpassen. Meiner Meinung nach werden sich Streamingansätze in Zukunft weiter verbreiten. Viele Probleme lassen sich als ein Strom von Daten modellieren, der auf seinem Weg von der Quelle zur Senke diverse Manipulationen erfährt. Auch Java 9 hat mit dem Flow-API ein ähnliches Konstrukt eingeführt.

Fazit

Reaktive Programmierung kommt nicht zum Nulltarif. Es sind andere und mitunter ungewohnte Ansätze erforderlich, um unter Verzicht auf blockierende Aufrufe mit externen Diensten und Systemen zu kommunizieren und ihre Antworten zu koordinieren. Etablierte Frameworks wie Vert.x oder Akka bieten dabei Unterstützung durch geeignete Abstraktionen. Allerdings sollte man auch hier darauf achten, welche Abstraktion sich für welches Problem eignet. Nach einer gewissen Eingewöhnungsphase erscheint ein asynchrones Programmiermodell dann hoffentlich nicht mehr als Hexenwerk.

Geschrieben von
Oliver Heger
Oliver Heger
Oliver Heger arbeitet als Softwareentwickler und Architekt bei der Agfa HealthCare AG im Bereich Clinical Research Solutions.
Kommentare

Hinterlasse einen Kommentar

Hinterlasse den ersten Kommentar!

avatar
400
  Subscribe  
Benachrichtige mich zu: