Suche
Das Game-Changing Apache Kafka Release

Unter der Lupe: Apache Kafka 0.11 – Data Streaming der nächsten Generation

Hans-Peter Grahsl

© Shutterstock / Haver

 

Weder schwarze noch weiße Magie – und auch kein Feenstaub. Dennoch setzt das Release Apache Kafka 0.11 neue Maßstäbe. Wir schauen uns an, warum.

Apache Kafka konnte sich über die letzten Jahre als Herzstück unternehmensweiter Datenarchitekturen zahlreicher namhafter Firmen etablieren. Ob als Message Broker für einfache Producer- / Consumer-Verarbeitung oder als vollwertige Datenintegrationsplattform bis hin zur Verwendung als echtzeitnahe Stream Processing Lösung –  Kafka ist mittlerweile in der Lage, die wichtigsten Anwendungsszenarien abzudecken und den damit verbundenen Herausforderungen beim Datenmanagement Herr zu werden.

Dieser Artikel befasst sich mit den lang ersehnten und bahnbrechenden Neuerungen des Ende Juni erschienenen Release 0.11 von Apache Kafka. Ein Highlight des Release ist die neue, wesentlich stärkere Übermittlungsgarantie für Nachrichten namens exactly once. Wir betrachten die dafür benötigten Änderungen und Verbesserungen rund um idempotentes sowie transaktionales Verhalten von Producern und Consumern. Auf Basis dieser neuen Funktionalität können kritische Anwendungsfälle wie das Verarbeiten von Finanztransaktionen abgedeckt werden, bei denen Daten selbst bei Serverausfällen oder Netzwerkproblemen fehlerfrei bearbeitet werden müssen. Dadurch wird Kafka nicht nur leistungsfähiger sondern auch wesentlich attraktiver für Industrien wie Banken, Versicherungen oder Logistik sowie für das Gesundheitswesen.

Exactly-Once-Semantik

Im Kontext verteilter Datenverarbeitung können involvierte Systeme gänzlich, oder ausgewählte Teil-Komponenten der Systeme unabhängig voneinander fehlschlagen. Für Kafka bedeutet dies, dass sich je nachdem, wie in bestimmten Fehlerfällen reagiert wird, unterschiedliche Übermittlungsgarantien für zu versendende Nachrichten ergeben. In vergangenen Kafka-Versionen – bis einschließlich Release 0.10.x – standen die folgenden beiden Semantiken zur Verarbeitung von Nachrichten zur Verfügung:

  • at most once: Für den Fall, dass ein Producer aufgrund eines Timeouts keine Rückmeldung erhält oder ein Fehler auftritt, erfolgt kein erneutes Senden. Betroffene Nachrichten landen folglich nicht im jeweiligen Kafka Topic. Um potentielle Duplikate durch wiederholtes Senden ausschließen zu können, wird ein möglicher Nachrichtenverlust akzeptiert.
  • at least once: Producer können entsprechend konfiguriert werden, um bei Timeouts oder Fehlern Nachrichten automatisch erneut zu senden, was je nach Fehlerfall zu doppelten Nachrichten im Kafka Topic führen kann. Um einen Nachrichtenverlust zu vermeiden, werden mögliche Duplikate in Kauf genommen, welche je nach Anwendungsfall nachträglich und außerhalb von Kafka zu behandeln sind.

Die Wichtigkeit der von Producern versendeten Nachrichten entscheidet darüber, welche Verarbeitungssemantik gewählt wird. Sobald für den jeweiligen Anwendungsfall ein potentieller Verlust einzelner Nachrichten untragbar wäre, gab es in Kafka bislang lediglich die Möglichkeit, auf at least once zu setzen. Damit verbunden ist allerdings die Herausforderung seitens der Anwendungsentwickler, sich selbst um mögliche Duplikate von Nachrichten zu kümmern. Zwangsläufig steigt dadurch die Komplexität von Lösungen, und es kommt unter Umständen zu einem spürbaren Mehraufwand im Rahmen der Umsetzung. Das neue Kafka Release 0.11 schafft genau in diesem Punkt Abhilfe und führt eine weitere Alternative mit wesentlich stärkerer Übermittlungsgarantie ein:

  • exactly once: Diese neue Verarbeitungssemantik stellt den sogenannten Sweetspot dar, womit es weder zu einem nachhaltigen Nachrichtenverlust, noch zu ungewollten Duplikaten von Nachrichten in Kafka Topics kommt. Damit dieses oftmals wünschenswerte und von einigen Anwendungen zwingend benötigte Verhalten überhaupt möglich wird, müssen alle involvierten Kafka-Komponenten (Producer, Broker, Consumer) entsprechend abgestimmt und konfiguriert sein, um richtig zusammenzuarbeiten.

Dieses Feature stellt einen revolutionären Meilenstein in der Entwicklungsgeschichte von Apache Kafka dar, zählt exactly once doch zu den großen Herausforderungen im Rahmen verteilter Daten- bzw. Datenstromverarbeitung. Zur Realisierung von der  Exactly-Once-Semantik waren in erster Linie zwei fundamentale Verbesserungen nötig, die bereits seit längerer Zeit akribisch geplant wurden und schließlich innerhalb des letzten Jahres erfolgreich implementiert werden konnten. Ebenso wurden mittels tausender Codezeilen verteilte Chaos-Tests realisiert, die über mehrere Wochen liefen, um potentiell fehlerhaftes Verhalten unter realen Bedingungen identifizieren und beheben zu können. Schauen wir uns diese beiden fundamentalen und rigoros getesteten Neuerungen etwas näher an.

Idempotenz für den Nachrichtenversand

Das neue Release bietet die Möglichkeit, Nachrichten eines Producers an den Kafka Broker idempotent zu übermitteln. Idempotenz bedeutet im vorliegenden Kontext, dass Sende-Operationen, die aufgrund von Problemen ggf. mehrfach erfolgen müssen, jedenfalls in exakt einer geschriebenen Nachricht im adressierten Topic resultieren. Innerhalb von Topic-Partition können dadurch Duplikate vermieden werden.

Um dieses Verhalten zu erreichen, wird die neue Producer-Konfigurationseinstellung namens enable.idempotence auf true gesetzt. Die darunterliegende Funktionalität verwendet ein einfaches Sequenznummernverfahren, auf Basis dessen ein Kafka Broker die Deduplizierung empfangener Nachrichten vornehmen kann. Jede Nachricht bzw. jeder Batch von Nachrichten erhält demnach eine fortlaufende Sequenznummer, die ebenfalls in einem replizierten Kafka Log persistiert wird. Dadurch ist auch im Failover-Fall – Broker B übernimmt nach Ausfall von Broker A – sichergestellt, dass der aktuell zuständige Broker ein Duplikat aufgrund wiederholter Sendevorgänge seitens des Producers erkennen kann. Ein typisches Konfigurationsbeispiel zeigt Listing 1:

enable.idempotence=true
max.in.flight.requests.per.connection=1
ack=all
retries=Integer.MAX_VALUE (jedenfalls > 1)

Abgesehen von diesen Konfigurationseinstellungen sind für existierende Applikationen keinerlei Code-Anpassungen zur idempotenten Übermittelung von Nachrichten nötig. Wichtig ist jedoch die Erkenntnis, dass Kafka keine Möglichkeit zur automatischen Deduplizierung hat, sofern Applikationen sich unabhängig der getätigten Konfiguration selbst verantwortlich fühlen, Nachrichten im Fehlerfall aktiv und bewusst mehrfach zu senden, anstatt sich auf den Kafka internen Retry-Mechanismus zu verlassen.

Transaktionales Verhalten mittels atomarer Schreibvorgänge

Zusätzlich zum idempotentem Producerverhalten steht im neuen Release erstmals ein Kafka internes Transaction API bereit, auf Basis dessen atomare Schreibvorgänge sowohl über Partitionsgrenzen hinweg als auch mehrere Topics betreffend erzielt werden können. Das Transaktionsverhalten in Kafka garantiert dabei, dass ein Producer mehrere Nachrichten an unterschiedliche Partitionen bzw. Topics senden kann und entweder alle oder aber keine einzige dieser Nachrichten für Consumer verfügbar gemacht werden.

Zur Verwendung transaktionaler Semantik ist es erforderlich, die Producer-Konfiguration transactional.id auf einen eindeutigen Wert zu setzen. Dieser eindeutige Bezeichner wird typischerweise in Anlehnung an die Partitionierungsstrategie definiert und wird benötigt, um den transaktionalen Zustand auch im Fall von Neustarts der Anwendung korrekt aufrechterhalten zu können. Zusätzlich bewirkt diese Einstellung automatisch die Aktivierung von idempotentem Schreibverhalten seitens des Producers.

Lesen Sie auch: Kafka 101 – Massive Datenströme mit Apache Kafka

Der Transaktionsmechanismus in Kafka benötigt einen Transaction Coordinator, der eine neue weitere Komponente innerhalb des Brokers darstellt. Dieser ist für die Verwaltung des Transaktionsstatus von Producern zuständig und verwendet einen Transaction Log zur dauerhaften Speicherung. Listing 2 zeigt die denkbar einfache Verwendung des Transaction API aus Java. Der Umgang fühlt sich im Vergleich zu herkömmlichen Persistenztechnologien sehr vertraut an.

producer.initTransactions();
try {
  producer.beginTransaction();
  producer.send(record1);
  producer.send(recordN);
  producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException
| AuthorizationException e) {
//can't recover from these exceptions: only option is to close producer & exit
producer.close();
} catch (KafkaException e) {
// for other kafka exceptions we abort the transaction and may try again
producer.abortTransaction();
 }

 

Transaktionsschritte im Detail

Die wichtigsten Schritte während der Abarbeitung einer Producer-Transaktion werden nachfolgend beschrieben und anhand eines einfachen Beispiels (1 Producer, 1 Broker, 1 Koordinator und 2 Partitionen) illustriert.

  • initTransactions: registriert die id des Producers beim Coordinator. Die Initialisierung erfolgt pro Producer-Instanz einmalig zu Beginn. Weiters wird sichergestellt, dass vorhergehende Transaktionen anderer Producer-Instanzen zur selben ID abgeschlossen sind: Falls eine Transaktion offen war und der Producer vorzeitig fehlschlug, wird diese abgebrochen. Sofern sich eine Transaktion mitten im Abschluss befunden hat, wird auf deren Fertigstellung gewartet.

  • beginTransaction: Signalisiert den Start einer neuen Transaktion innerhalb des jeweiligen Producers. Es kann pro Producer-Instanz zeitgleich nicht mehrere, sondern nur jeweils eine offene Transaktion geben.
  • send: Jede adressierte Partition der Sende-Operationen wird dem Coordinator mitgeteilt und dort als zur aktiven Transaktion zugehörig erfasst. Ebenso werden die als transaktional gekennzeichneten Nachrichten in die Topic-Partitionen geschrieben. Sofern im Zuge der Sende-Operationen Fehler auftreten, wird dies durch Exceptions kommuniziert, was letztlich zum Abbruch der Transaktion führt.
  • commitTransaction: Löst im Wesentlichen einen 2 Phase Commit aus. In Phase 1 erfolgt die Vorbereitung der Transaktion. Sobald der Coordinator diese Transaktionsanfrage nachhaltig ins Transaction Log übertragen hat, wird die Transaktion zu einem späteren Zeitpunkt erfolgen können. Sofern dies nicht möglich sein sollte, oder die Producer Instanz stirbt, wird die Transaktion letztlich automatisch abgebrochen werden. In Phase 2 schreibt der Coordinator Transaktionsmarker in sämtliche Topic-Partitionen, in welche im Kontext der vorliegenden Transaktion Nachrichten geschrieben wurden. Diese Markierungen signalisieren, ob eine Producer-Transaktion erfolgreich war (commit-marker) oder abgebrochen (abort-marker) werden musste. Zuletzt wird die Transaktion vom Coordinator im Transaction Log als durchgeführt vermerkt. Transaktionsmarker gelten als Control Messages, welche ausschließlich zur Kafka internen Verwendung bestimmt sind und daher für Client-Applikationen nicht direkt nach außen sichtbar gemacht werden.

  • abortTransaction: Aufgrund diverser Fehlerfälle wird die offene Transaktion damit aktiv abgebrochen, was den Coordinator veranlasst, Transaktionsmarker vom Typ abort in die betroffenen Topic-Partitionen für alle erfolgreich gespeicherten Nachrichten zu schreiben. Ebenso wird die vorliegende Transaktion im Transaction Log als abgebrochen vermerkt.

Consumer-Verhalten im Kontext von Transaktionen

Ob auf der konsumierenden Seite atomare Schreibvorgänge berücksichtigt werden, hängt wiederum von der Konfiguration der verwendeten Consumer ab. Die neu verfügbare Einstellung namens isolation.level wirkt sich wie folgt aus:

  • read_uncommitted: Unverändert zum bisherigen Verhalten des Kafka Consumer API, werden sämtliche Nachrichten gelesen, ohne dabei auf konsumierender Seite einen transaktionalen Kontext zu berücksichtigen. Partitions-Offsets, an denen sich Transaktionsmarker befinden, werden beim Lesen der Nachrichten ignoriert bzw. übersprungen.
  • read_committed: Es werden nur jene Nachrichten gelesen, für welche die Transaktion erfolgreich abgeschlossen werden konnte. Mit anderen Worten lesen Consumer Nachrichten so lange, bis sie auf die erste Nachricht innerhalb einer offenen Transaktion stoßen – dieser Punkt innerhalb der Partition wird als latest stable offset (LSO) bezeichnet. Sämtliche transaktionale Nachrichten einer abgebrochenen Transaktion werden gefiltert. Nachrichten, die ursprünglich nie Teil einer Transaktion waren, können zu jeder Zeit gelesen werden. Die Beschreibungen zu zwei einfachen konkreten Partitionszuständen sollen das Verhalten näher erläutern:

Bsp. 1: Nachdem sich in dieser Partition zwei erfolgreich abgeschlossene Transaktionen gefolgt von nicht-transaktionalen Nachrichten befinden, liest ein Consumer mit isolation.level=read_committed durchgehend bis einschließlich Offset 9. Dabei werden die commit-Marker 2c sowie 6c übersprungen.

Bsp. 2: Ein Consumer mit isolation.level=read_committed liest in dieser Partition nur bis exklusive Offset 8. Zunächst werden die beiden nicht-transaktionalen Nachrichten gefolgt von jenen der erfolgreich abgeschlossenen Transaktion gelesen. Danach wird der commit-Marker 4c übersprungen, die transaktionalen Nachrichten der abgebrochenen Transaktion gefiltert sowie der abort-Marker 7a ebenso übersprungen. Da die beiden Nachrichten am Ende (Offsets 8 + 9) Teil einer noch offenen Transaktion sind, werden diese auch nicht gelesen.

Bei näherer Betrachtung der gelesenen Offsets während des Konsumierens von Nachrichten wird klar, dass Lücken auftreten können. Einerseits müssen transaktionale Nachrichten aufgrund abgebrochener Transaktionen beim Lesen übersprungen werden. Auf der anderen Seite belegen die Transaktionsmarker (commit/abort) selbst ebenso Offsets in den entsprechenden Partitionen, die nicht an konsumierende Clients zurückgeliefert werden.

Unabhängig vom gewählten isolation.level werden Nachrichten weiterhin garantiert in der Reihenfolge ihrer jeweiligen Partitions-Offsets der dazugehörigen Topics gelesen. Das gilt auch für den Fall von überlappenden Transaktionen aufgrund mehrerer, parallel agierender Producer. Kafka erlaubt es konsumierenden Anwendungen nach wie vor, Nachrichten aus Topic-Partitionen durch explizites Zurücksetzen des Offsets mehrmals zu lesen. Macht eine Anwendung davon Gebrauch, gibt es keine Möglichkeit für Kafka, ein mehrfaches Konsumieren derselben Nachrichten zu verhindern.

Anwendungsfall: Datenstromverarbeitung

Werden die zuvor isoliert diskutierten Möglichkeiten hinsichtlich Idempotenz und transaktionalem Schreibverhalten gemeinsam betrachtet, wird klar, dass im Bereich der Datenstromverarbeitung enorme Vorteile entstehen. Auf Basis des Release 0.11 und Kafkas Streams API können erstmals client-seitige Java-Anwendungen für Stream Processing mit durchgängiger Exactly-Once-Semantik entwickelt werden. Das typische Muster dabei lautet: 1) Nachrichten aus Topic-Partitionen zu lesen, 2) diese mittels Transformationen, Filterungen, Anreicherungen etc. entsprechend zu verarbeiten und 3) die Resultate daraus erneut in Form von Nachrichten in Topic-Partitionen zu schreiben.

Dieser gesamte Ablauf (end-to-end) kann dank der Neuerungen atomar erfolgen und produziert daher selbst im Fall bestimmter auftretender Fehler valide Ergebnisse. Aus Entwicklersicht genügt es, für die Kafka-Streams-Anwendung eine einzige Konfigurationseinstellung (processing.mode = exactly_once) zu setzen und diese Garantien damit „gratis“ zu bekommen.

Um die Ergebnisdaten einer Java-Anwendung, die das Kafka Streams API verwendet, weiterarbeiten zu können, bieten sich im Grunde zwei Optionen an. Für simple Anwendungsfälle kann es ausreichend sein, aktuelle Ergebnisse mittels sogenannter interaktiver Abfragen direkt aus der laufenden Java-Anwendung zu beziehen. Die Daten dieser interaktiven Abfragen könnten beispielsweise von einer NodeJS-Applikation an ein Web-Frontend ausgeliefert werden. Eine Alternative wäre es, die Ergebnisdaten aus der Java-Anwendung wieder zurück nach Kafka zu schreiben und mittels Kafka Connect kontinuierlich und echtzeitnahe in beliebige Zieldatenbanken wie MongoDB oder Elasticsearch einzuspeisen. Auch eine Kombination beider Ansätze ist möglich, um komplexere Anwendungsfälle abzudecken.

Implikationen hinsichtlich Performance

Von Beginn an wurde Kafka für hohen Schreib- und Lesedurchsatz bei gleichzeitig geringer Latenzzeit entworfen. Aus diesem Grund ist es für viele potentielle Nutzer interessant, abzuschätzen und erste Anhaltspunkte zu bekommen, wie sich die neuen Features des Release 0.11 auf die Performance auswirken. Zumindest erste veröffentliche Hersteller-Benchmarks zeigen sehr vielversprechende Ergebnisse. Als Beispiel dafür weisen Transaktionen, welche aus 1 KB großen Producer-Nachrichten bestehen und in 100ms Abständen committet werden, einen durchschnittlichen Overhead von nur ca. 3-5% auf.

Im Allgemeinen gilt, dass länger andauernde Transaktionen bestehend aus größeren Nachrichten-Batches eine bessere Performance (i.S.v. geringerem Overhead) im direkten Vergleich zu nicht-transaktionalem Verhalten aufweisen. Zieht man einen Vergleich gegen die schwächste Übermittelungsgarantie (at most once), kann ein relativer Overhead von etwa 20% beobachtet werden. Die bloße Verwendung idempotenter Producer abseits von transaktionalem Verhalten zeigt im Grunde vernachlässigbar geringe Performanceeinbußen.

Hintergrund für die überraschend guten Ergebnisse sind wohl auch allgemeine Verbesserungen im neuen Release, allen voran ein optimiertes und damit effizienteres Message-Format, das unter bestimmten Voraussetzungen bis zu 20% höheren Producer-Durchsatz und bis zu 50% höheren Consumer-Durchsatz bei gleichzeitig bis zu 20% geringerer Plattenspeichernutzung erlaubt. Dieses neue Message-Format kompensiert das Hinzufügen neuer benötigter Metadaten auf Header-Ebene durch einen Delta-Encoding-Ansatz auf Nachrichten-Ebene. Erste geringe Größeneinsparungen hinsichtlich des gesamten Metadaten-Overhead können sich dadurch bereits ab einer Batch-Größe von nur zwei Nachrichten ergeben.

Signifikante, immer größere Einsparungen können erzielt werden, je kleiner einzelne Nachrichten sind und je mehr davon in einen Batch zusammengefasst werden können. Wie bei allen Benchmarks gilt es jedoch, keine voreiligen oder übereifrigen Schlüsse zu ziehen. Stattdessen sollten die eigenen Anwendungsfälle samt ihren Nachrichteneigenschaften sowie die vorherrschenden Interaktionsmuster mit Kafka unter den jeweiligen System- und Hardware-Umgebungen einem individuellen und detaillierten Benchmark für das neue Release 0.11 unterzogen werden.

Abschließend darf man bereits heute dem mit großer Spannung erwarteten Kafka Release 1.0 entgegenblicken, das voraussichtlich im Oktober dieses Jahres erscheinen wird.

Der Autor bedankt sich bei Michael G. Noll (Twitter @miguno) von Confluent Inc. für hilfreiche Anmerkungen und wertvolles Feedback zum vorliegenden Artikel.

Mehr zum Thema:

Kafka 101 – Massive Datenströme mit Apache Kafka

Verwandte Themen:

Geschrieben von
Hans-Peter Grahsl
Hans-Peter Grahsl
Hans-Peter Grahsl arbeitet im Java-Bereich als Technical Trainer und ist für das interne Education Department bei Netconomy Software & Consulting GmbH in Graz verantwortlich. Außerdem unterstützt er Kunden als selbstständiger Trainer und Berater bei der Konzeption und Umsetzung von on-premise- oder Cloud-basierten Datenarchitekturen im NoSQL-Umfeld. Nebenberuflich unterrichtet er an der FH CAMPUS 02.
Kommentare

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht.