Eine Einführung in Apache Kafka

Kafka 101 – Massive Datenströme mit Apache Kafka

Lars Pfannenschmidt, Frank Wisniewski

© Shutterstock/ramcreations

Apache Kafka ist ein verteilter, partitionierender und replizierender Service für Datenströme. Er stellt seine Funktionen wie ein klassischer Messaging Broker zur Verfügung, allerdings unter der Verwendung von Konzepten, die sich von den bisher gängigen deutlich unterscheiden. Im Rahmen dieses Artikels werden wir die wichtigsten dieser Konzepte und den Grund für deren effektive Performance beleuchten und mithilfe von Anwendungsbeispielen veranschaulichen.

Kafka wird in der Regel in einem Cluster betrieben, bei dem jeder einzelne Knoten Broker genannt wird. Datenströme innerhalb des Clusters können in so genannte Topics kategorisiert werden. Ferner unterscheidet man zwischen Prozessen, die Nachrichten erzeugen (Producer), und Prozessen, die Nachrichten konsumieren (Consumer) (Abb. 1). Die Kommunikation zwischen Producer, Broker und Consumer findet mithilfe eines sprachenunabhängigen TCP-Protokolls statt. Durch diese Abstraktionsschicht werden Clientbibliotheken für unterschiedliche Programmiersprachen ermöglicht.

Abb. 1: Topologische Übersicht: Producer, Broker, Consumer nach der offiziellen Kafka-Dokumentation

Abb. 1: Topologische Übersicht: Producer, Broker, Consumer nach der offiziellen Kafka-Dokumentation

Topics und Partitionen

Nachrichten werden auf Topics veröffentlicht, wobei eine Nachricht aus einem optionalen Schlüssel und dessen Wert besteht. Kafka erstellt für jedes Topic konfigurierbar viele Partitionen, welche jeweils ein sortiertes und unveränderliches Protokoll darstellen, an das kontinuierlich Nachrichten angehängt werden, das commit log (Abb. 2). Jeder Nachricht in einer Partition wird ein sequenzieller Schlüssel zugewiesen, das Offset. Dieses Offset ist die einzige Metainformation, die zusätzlich zur eigentlichen Nachricht gespeichert wird.

Abb. 2: Die Anatomie eines Topics nach Jay Kreps

Abb. 2: Die Anatomie eines Topics nach der offiziellen Kafka-Dokumentation

Das Aufteilen von Topics auf n Partitionen ermöglicht es, dass ein Server nicht den Platz für ein gesamtes Topic bereitstellen muss, sondern nur für dessen n-ten Teil. Zusätzlich kann die Last von Producern und Consumern im Broker Cluster verteilt und jede Partition für den Fehlerfall repliziert werden.

Ein Broker behält jede veröffentlichte Nachricht für einen konfigurierbaren Zeitraum vor, standardmäßig sieben Tage, oder bis eine definierbare Maximalgröße erreicht ist, unabhängig davon, ob diese bereits konsumiert wurde oder nicht. Nach Ablauf dieses Zeitfensters wird die Nachricht gelöscht. Da Kafkas Performance unabhängig von der Größe des commit logs konstant bleibt, kann eine große Nachrichtenmasse problemlos gespeichert werden.

Offset-Management

Das Offset, das definiert, welche Nachricht gelesen werden soll, wird anders als beim klassischen Messaging vom Consumer selbst gesteuert. Da außerdem das commit log kontinuierlich erweitert wird, können so Lese- und Schreibzugriffe auf der Festplatte größtenteils sequenziell erledigt werden. Diese Art der Persistierung von Daten ist in einem solchen verteilten System deutlich leistungsfähiger als der klassische Ansatz.

In der Regel wird beim Empfang von Nachrichten das Offset kontinuierlich erhöht, um sequenziell zu lesen. Es ist aber ebenso möglich, Nachrichten in einer willkürlichen Reihenfolge oder bereits konsumierte Nachrichten zu lesen. Dieses Vorgehen kann beispielsweise angewandt werden, wenn eine erneute Datenverarbeitung (beispielsweise im Fehlerfall) nötig sein sollte oder ein zweiter Consumer dieselben Nachrichten zeitgleich lesen möchte. Das Entwerfen einer passenden Strategie zur Nachrichtenabarbeitung wird Offset-Management genannt. Im Zuge der Betrachtung eines Anwendungsbeispiels werden wir im Abschnitt Consumer näher auf konkrete Vorgehensweisen eingehen.

Log Compaction

Wie bereits beschrieben, speichert Kafka im normalen Betrieb seine Nachrichten für einen vorab definierten Zeitraum oder bis eine maximale Größe des commit logs erreicht ist. Log Compaction stellt sicher, dass zu einem Nachrichtenschlüssel wenigstens eine Nachricht erhalten bleibt. Dies ist in Anwendungfällen sinnvoll, in denen nur die letzte Nachricht für einen bestimmten Schlüssel aufgehoben werden soll. Zum Löschen einer Nachricht muss beispielsweise lediglich eine Nachricht mit identischem Schlüssel, aber ohne Wert, übertragen werden.

Zur Veranschaulichung hier ein kurzes Beispiel: Für das Topic fussball.ticker hat jede Nachricht einen Primärschlüssel. Jede dieser Nachrichten soll ein Ereignis aus einem Fußballspiel darstellen:

...
123 -> "Ronaldo trifft zum 1:0"
...
123 -> "Ronaldo im Abseits, fast das 1:0"
...

Nach der Compaction wird nur die letzte Nachricht im commit log aufbewahrt. Dies vereinfacht es dem Konsumenten, den aktuellen Status abzuleiten, ohne dabei alle Nachrichten einer Historie durchforsten zu müssen (Abb. 3).

Abb. 3: Log Compaction veranschaulicht nach der offiziellen Kafka-Dokumentation

Abb. 3: Log Compaction veranschaulicht nach der offiziellen Kafka-Dokumentation

Der Compaction-Prozess verläuft kontinuierlich im Hintergrund. Dieser Aufräumvorgang behindert die Schreibzugriffe nicht und kann, um weitere Auswirkungen auf Consumer- und Producer-Prozesse zu vermeiden, zusätzlich gedrosselt werden.

Story behind Kafka
Kafka ist als internes Projekt bei LinkedIn entstanden, um eine einheitliche Plattform für die Abwicklung und Verarbeitung von großen Echtzeitdatenströmen zu schaffen. Die enormen Datenmassen verursachten vor sieben Jahren Skalierungsprobleme in den einzelnen Systemen. Diese konnten durch Umstellung auf verteilte Architekturen bis auf ein Problem gelöst werden: Die Daten zwischen den einzelnen Systemen des internen Data Warehouses wurden direkt ausgetauscht, sodass nicht jedes System mit frischen und akuraten Daten versorgt werden konnte (Abb. 4).

Abb. 4: Beispiel für einen schlechten Datenfluss über verteilte Systeme nach Jay Kreps

Abb. 4: Beispiel für einen schlechten Datenfluss über verteilte Systeme nach Jay Kreps

Um dieses Problem zu lösen, sollte eine einheitliche Messaging-Infrastruktur geschaffen werden, wie sie in Abbildung 5 skizziert ist. Die Architektur sollte folgenden Kernanforderungen genügen:

  • Kurze Latenz für Echtzeitsysteme
  • Gute Skalierbarkeit für eine große Anzahl an Logs und Events
  • Hohe Fehlertoleranz für sichere Verfügbarkeit kritischer Daten

Klassische Systeme wie beispielsweise RabbitMQ oder ActiveMQ erwiesen sich unter diesen Gesichtspunkten als nicht zweckmäßig, da u. a. mit starken Performanceeinbußen aufgrund von nicht sequenziellen IO-Zugriffen zu rechnen ist, sobald Nachrichten persistiert werden müssen. Insgesamt fasst Jay Kreps bei der Beurteilung von existierenden Systemen den Stand der Technik wie folgt zusammen: „None were built like a modern distributed system that you could safely dump all the data of a growing company into and scale along with your needs.“

Abb. 5: Topologie für ein entkoppeltes Nachrichtensystem nach Jay Kreps

Abb. 5: Topologie für ein entkoppeltes Nachrichtensystem nach Jay Kreps

Aus diesem Grund wurde Kafka als internes Projekt bei LinkedIn vorangetrieben und entwickelte sich zum „zentralen Nervensystem“ der Firma, durch das 300 Milliarden Nachrichten am Tag fließen. Da das Team um Jay Kreps von der Idee und den Konzepten überzeugt war, entschloss man sich, das Projekt als Open Source freizugeben. Wie sich später herausstellte mit großem Erfolg.

Abschließend ist darauf hinzuweisen, dass sich das Hauptentwicklerteam von Kafka mit dem Unternehmen Confluent Inc. selbstständig gemacht hat, um kommerzielle Produkte und Dienstleistungen rund um Kafka anzubieten. Kafka selbst bleibt dabei ein Open-Source-Projekt.

Eine ausführlichere Darstellung der Entstehung und des Werdegangs von Kafka geben ein Paper und ein Blogeintrag von LinkedIn.

Anwendungsbeispiel

Um die unterschiedlichen Funktionen und Konzepte besser zu veranschaulichen, werden wir in einer Beispielanwendung das Senden und Empfangen von Nachrichten demonstrieren. Die Domänenschicht ist trivial und besteht deshalb nur aus der selbsterklärenden Klasse News. Da zur (De-)Serialisierung JSON verwendet wird, sind einige Jackson-Annotationen in der Klasse zu finden (Listing 1).

  public final UUID id;
  public final String author;
  public final String title;
  public final String body;

  @JsonCreator
  public News(@JsonProperty("id") UUID id,
    @JsonProperty("author") String author,
    @JsonProperty("title") String title,
    @JsonProperty("body") String body) {
      this.id = id;
      this.author = author;
      this.title = title;
      this.body = body;
  }
}

Producer

In Kafkas Version 0.8.2 wurde ein neues und deutlich vereinfachtes Producer-API veröffentlicht. Der Konstruktor unserer NewsProducer-Klasse erzeugt mithilfe der createProducer-Methode eine KafkaProducer-Instanz (Listing 2). In den Kafka-APIs ist es üblich, Konfigurationsparameter über Properties anzugeben. Eine detaillierte Beschreibung aller Parameter ist in org.apache.kafka.clients.producer.ProducerConfig zu finden. Der BOOTSTRAP_SERVERS_CONFIG-Parameter ist eine Liste von Kafka-Brokern, die dazu genutzt wird, eine initiale Verbindung aufzubauen und die restlichen Knoten im Cluster zu ermitteln. Somit müssen nicht alle Knoten des Clusters in dieser Liste stehen. Letztere wird wie folgt als String angegeben: host1:port1,host2:port2,….

public class NewsProducer {
  private final String topic;
  private final KafkaProducer<String, News> producer;

  public NewsProducer(String topic, String broker) {
    this.topic = topic;
    this.producer = createProducer(broker);
  }

  private KafkaProducer<String, News> createProducer(String broker) {
    Properties config = new Properties();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      StringSerializer.class.getName());
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      NewsSerializer.class.getName());
    return new KafkaProducer<>(config);
  }

  public RecordMetadata send(News news)
  throws ExecutionException, InterruptedException {
    ProducerRecord<String, News> record = new ProducerRecord<>(topic,
      news.id.toString(), news);
    return this.producer.send(record).get();
  }

  public Future sendAsync(News news) {
    ProducerRecord<String, News> record = new ProducerRecord<>(topic,
      news.id.toString(), news);
      return this.producer.send(record);
  }

  public void close() {
    producer.close();
  }
}

Die KafkaProducer-Klasse hat zwei Typenparameter, den ersten für den Schlüssel K, den zweiten für den Wert V. So gibt KEY_SERIALIZER_CLASS_CONFIG an, welche Klasse für das Serialisieren der Schlüssel verantwortlich ist. Da wir in unserem Beispiel UUIDs verwenden, kann ein von Kafka mitgelieferter Serializer genutzt werden, der StringSerializer. Für den Wert und dessen Konfiguration VALUE_SERIALIZER_CLASS_CONFIG gebrauchen wir eine eigene Implementierung, da wir das Domänenobjekt News in unserem Beispiel nach JSON übertragen wollen. Um eine eigene Implementierung für das Serialisieren registrieren zu können, muss eine Klasse das Serializer-Interface wie beispielsweise in Listing 3 aus dem Kafka-API implementieren. Die Methode configure kann genutzt werden, um spezielle Konfigurationen für den Serializer vorzunehmen. Mithilfe des booleschen Parameters isKey kann eine Fallunterscheidung für Schlüssel- oder Wert-Serializer vorgenommen werden. Mittels close können am Ende Ressourcen freigegeben werden. Die Serializer-Methode selbst muss schlussendlich nur die Transformation eines Objekts in ein Byte-Array vornehmen, welches in unserem Fall durch die JSON-Bibliothek Jackson leicht zu bewerkstelligen ist. Falls bei der Transformation ein Fehler auftreten sollte, wird dieser in eine SerializationException umgewandelt.

public class NewsSerializer implements Serializer {

  ObjectMapper mapper = new ObjectMapper();

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    // nop
  }

  @Override
  public byte[] serialize(String topic, News data) {
    try {
      return mapper.writeValueAsBytes(data);
    } catch (JsonProcessingException ex) {
      throw new SerializationException(
        "Could not transform Object to JSON: " + ex.getMessage(), ex);
    }
  }

  @Override
  public void close() {
    // nop
  }
}

Mithilfe der nun vorhandenen KafkaProducer-Instanz können Nachrichten auf unterschiedliche Weise an den Kafka-Cluster übertragen werden. In der Methode sendAsync der Klasse NewsProducer (Listing 2) erzeugen wir eine neue ProducerRecord-Instanz. Diese Klasse hat verschiedene Konstruktoren, die für die richtige Verteilung der Nachrichten entscheidend sind. In unserem Beispiel gibt der erste Parameter das Ziel-Topic, der zweite den Schlüssel der Nachricht und der letzte die Nachricht selbst an. Der Producer gibt bei einem Aufruf ein Future zurück, anstatt zu blockieren. Soll allerdings auf eine Antwort gewartet und somit blockiert werden, kann auf dem Future get aufgerufen werden, wie beispielsweise in der send-Methode in Listing 2. Um beim Beenden einer Applikation sicherzustellen, dass alle Threads ordnungsgemäß geschlossen und somit alle Nachrichten korrekt verarbeitet werden, muss man am Producer close aufrufen.

ProducerRecord

Wie zuvor erwähnt, kann ein Topic in mehrere Partitionen aufgeteilt werden. Da die Reihenfolge der Nachrichten nur innerhalb einer Partition garantiert wird, gibt es bei der Nachrichtenverteilung einige Kniffe und Heuristiken zu beachten. Die Klasse ProducerRecord ist bestimmend, da hier der Anwender entscheidet, wohin eine Nachricht tatsächlich versendet wird. Die Klasse kann vier Konstruktorparameter entgegennehmen:

  1. Topic: Name des Ziel-Topics
  2. Partition: Zielpartition des Topics
  3. Key: der Nachrichtenschlüssel
  4. Value: die Nachricht selbst

Die zahlreichen Möglichkeiten zur Konfiguration des Topics werden im Folgenden stichpunktartig zusammengefasst:

  • Wenn Key- und Zielpartition nicht angegeben werden, wird die Nachricht an eine zufällige Partition geschickt.
  • Wird ein Key übergeben, wird darüber ein Hash gebildet, damit Nachrichten mit diesem Schlüssel immer in der gleichen Partition abgelegt werden. Somit kann beispielsweise bei einem Stream von Benutzerevents sichergestellt werden, dass Nachrichten eines Benutzers immer in derselben Partition abgespeichert werden und eine richtige Reihenfolge garantiert ist (etwa unter Verwendung des Benutzerschlüssels).
  • Wird eine Partition angegeben, findet die Übertragung der Nachricht immer an diese statt. Allerdings kann der Key dann für LogCompaction nützlich werden.

Zu beachten ist bezüglich LogCompaction, dass hierbei nur die letzte Nachricht zu einem bestimmten Schlüssel existieren kann.

Consumer

Wie bereits erwähnt, unterscheiden sich Kafka und klassische Message Broker hauptsächlich dadurch, dass der Empfänger selbstständig nachhalten muss, ob er eine Nachricht schon verarbeitet hat oder nicht. Es existiert also keine Methode zur Empfangsbetätigung einer Nachricht, da diese Information vom Broker weder verarbeitet werden kann, noch anderweitig Beachtung findet.

Derzeit bietet Kafka für das Empfangen von Nachrichten inklusive Offset-Management zwei unterschiedliche APIs an. Das so genannte High Level Consumer API lagert das Verwalten des Offsets nach ZooKeeper oder Kafka aus (Kasten: „Exkurs ZooKeeper“), während der SimpleConsumer hier mehr Kontrolle zulässt. Beide Varianten haben ihre Vor- und Nachteile. In Ersterer werden Details abstrahiert, die zum „einfachen“ Konsumieren von Nachrichten irrelevant sind. Im Gegensatz dazu wird in Letzterer auf eben diese Details geachtet. Hiermit ist es u. a. möglich, bestimmte Nachrichten mehrmals zu lesen oder nur spezifische Nachrichten zur Verarbeitung auszuwählen.

Zudem wird in der Version 0.9 ein neues API zum Konsumieren von Nachrichten eingeführt, die erhebliche Änderungen enthält. Wir haben uns im Hinblick auf das Anwendungsbeispiel für das High Level Consumer API entschieden, da dieses für den Einstieg besser geeignet ist.

Konfiguration

Analog zum Producer wird ein Consumer über ein Properties-Objekt konfiguriert. In Listing 4 ist die ConsumerConfig für das Anwendungsbeispiel vorgegeben. Die einzelnen Parameter werden mit den dahinterstehenden Konzepten im Folgenden einzeln erläutert.

private ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
  Properties props = new Properties();
  props.put("zookeeper.connect", zookeeper);
  props.put("group.id", groupId);
  props.put("offsets.storage", "kafka");
  props.put("dual.commit.enabled", "false");
  return new ConsumerConfig(props);
}

Consumer Groups

Mit Kafka ist es möglich, auf Consumer-Ebene zu entscheiden, nach welchem Konzept die Nachrichten verarbeitet werden sollen. Ob jede Nachricht einzeln vom jeweiligen Empfänger bezogen wird (Stichwort Queue) oder als Broadcast von allen (publish-subscribe), lässt sich implizit über die so genannten Consumer Groups regeln, die jedem Consumer zugewiesen werden muss (group.id).

Ein Consumer-Client registriert sich für ein Topic; für jede Nachricht, die an dieses Topic gesendet wurde, wird vom Cluster eine Instanz einer Konsumentengruppe zum Empfang und zur Verarbeitung ebendieser Nachricht ausgewählt.

Abb. 6: Zusammenfassen von Consumern in Konsumentengruppen

Abb. 6: Zusammenfassen von Consumern in Konsumentengruppen

In Abbildung 6 werden die vorgestellten Konstellationen skizziert. Für Topic A gibt es eine Consumer-Gruppe, die zwei Instanzen zur Nachrichtenverarbeitung bereithält. Das heißt, jede Nachricht wird entweder von A0 oder A1 empfangen. Für Topic B existieren hingegen mehrere Gruppen, in denen wiederum eine unterschiedliche Anzahl an Instanzen zur Verfügung steht. Somit wird jede Nachricht sowohl von B0 als auch von einem Consumer aus Gruppe C empfangen.

Abschließend sei angemerkt, dass nicht alle Partitionen für ein Topic auf dem gleichen Broker liegen müssen, sondern im Cluster verteilt sein können, wohingegen eine Partition immer vollständig auf einem Knoten des Clusters anzutreffen ist.

Verwaltung des Offsets

Die Verwaltung des Offsets ist eines der zentralen Themen beim Entwurf eines Kafka-Consumers, und die Referenzimplementierung befindet sich derzeit im Wandel. Mit der aktuellen Version 0.8.2 wurde ein neuer Offset-Manager eingeführt, welcher die Verwaltung von ZooKeeper nach Kafka verlagert (Kasten: „Exkurs ZooKeeper“). Dazu bietet die ConsumerConfig Konfigurationsmöglichkeiten, sowohl für einen Migrationspfad durch zweifaches Commiten (dual.commit.enabled) nach Kafka und ZooKeeper als auch zur Wahl des Backends (offsets.storage) an.

Das Offset wird von dem hier vorgestellten API je Topic für jede Gruppe und Partition automatisch gespeichert. Falls der Anwender dies explizit über ConsumerConnector::commitOffsets erledigen möchte, muss der Schalter auto.commit.enable umgelegt werden. Klärend sei noch einmal darauf hingewiesen, dass die komplette Verwaltung des Offsets auch ohne diese Infrastruktur in Eigenregie erledigt werden kann.

Exkurs ZooKeeper
Beim ersten Installieren und Starten eines Kafka-Brokers kommt man zwangsweise mit dem Apache ZooKeeper in Berührung. Die naheliegende Frage ist, warum muss man eine zweite „Serverfarm“ aufsetzen, um ein Kafka-Cluster zu betreiben? Da dieses zusätzliche Stück Infrastruktur nicht unerheblich zum Verständnis von der Arbeitsweise Kafkas ist, wollen wir hier einen kurzen Exkurs geben.
ZooKeeper
Apache ZooKeeper [10] stellt Koordinierungsdienste zur Synchronisation von Prozessen auf verteilten Systemen bereit. Ursprünglich wurde es bei Yahoo! entworfen, um verteilte Applikationen nicht manuell auf dem jeweiligen Rechner konfigurieren zu müssen und eine veraltete oder nicht einheitliche Konfiguration zu vermeiden [11]. Aus diesem Kontext haben sich primär bestimmte Grundeigenschaften ergeben, die hier nur kurz vorgestellt werden sollen. Für eine tiefergehende Recherche lohnt es sich, die offizielle Dokumentation zu studieren.
Abb. 7: Schematische Darstellung einer ZooKeeper-Clusters

Abb. 7: Schematische Darstellung einer ZooKeeper-Clusters

Das API stellt analog zu einem Dateibaum Methoden bereit, um so genannte ZNodes hierarchisch abzuspeichern. Die Daten werden allerdings im Speicher vorgehalten, was den Service deutlich performanter macht als ein „echtes“ Dateisystem. Ferner repliziert sich ZooKeeper selbstständig über mehrere Server und gewährt Ausfallsicherheit, solange eine Überzahl an Instanzen verfügbar ist. Über die einzelnen Knoten herrscht eine globale Ordnung, d. h., jeder Transaktion wird eine Versionsnummer zugewiesen. Hierdurch ergibt sich, dass für den Lesezugriff eine Anfrage an einen einzelnen Knoten des Ensembles (so wird das ZooKeeper-Cluster genannt) ausreicht, während zum Schreiben der Leader die jeweilige Information an die Slave-Knoten weitergeben muss. Insgesamt wird ein Verhältnis zwischen Lese- und Schreibzugriffen von 10:1 als sinnvoll angegeben. Ein Beispiel für ein derartiges ZooKeeper-Ensemble mit mehreren Clients ist in Abbildung 7 skizziert.

Zusammenspiel mit Kafka

Die Broker im Kafka-Cluster benutzen ZooKeeper zum Erkennen von Serverausfällen, Datenpartionierung, synchroner Datenreplikation und zur Verwaltung von Consumern. Ursprünglich bestanden auch Abhängigkeiten bei den Client-APIs zum Senden und Empfangen von Nachrichten. Der Producer hat beispielsweise die Broker-Adressen abgefragt, während das High Level Consumer API immer noch das Offset-Management über ZooKeeper betreibt. Seit Version 0.8.2 sind die Abhängigkeiten aus dem Producer-Client verschwunden. Zur Performanceverbesserung existiert ein neues API zum nativen Verwalten des Offsets in Kafka über ein eigenes Topic mittels Log Compaction. Mit Version 0.9 wird dann auch ein Consumer-Client eingeführt, der vollständig von ZooKeeper abstrahiert ist. Eine Übersicht über die Datenstrukturen in ZooKeeper gibt das Wiki.

Nachrichten lesen aus dem KafkaStream

Zum Konsumieren von Nachrichtenströmen aus Kafka existiert die Klasse KafkaStream<K, V>, wobei K der Schlüsseltyp und V der Payloadtyp der Nachricht ist. Also in unserem Fall String und News.

KafkaStream ist von Iterable abgeleitet und kann einfach mit hasNext() und isNext() ausgelesen werden. Die Implementierungen sind blockierend, weshalb sie jeweils in einem eigenen Thread gestartet werden sollten. In Listing 5 wird eine Runnable-Implementierung skizziert, die ein News-Objekt vom Kafka-Nachrichtenstrom auf eine Instanz des Interface NewsConsumer weitergibt. Diesem haben wir lediglich die Methode void consume(News news) spendiert.

public class KafkaConsumerThread implements Runnable {

  private final NewsConsumer consumer;
  ...
  public void run() {
    Thread.currentThread().setName(String.format(%s-%s", name, id));
    logger.info("Started consumer thread {}", name);
    ConsumerIterator<String, News> it = messageStream.iterator();
    while (it.hasNext()) {
      relayMessage(it.next());
    }
    logger.info("Shutting down consumer thread {}", name);
  }

  private void relayMessage(MessageAndMetadata<String, News> kafkaMessage) {
    logger.trace("Received message with key '{}' and offset '{}' on partition '{}' for topic '{}'",
    kafkaMessage.key(), kafkaMessage.offset(),
    kafkaMessage.partition(), kafkaMessage.topic());
    consumer.consume(kafkaMessage.message());
  }

}

Da Kafka keine Kenntnisse über unsere Domäne hat, wird als Defaultwert ein Byte-Array als Typenparameter sowohl für Schlüssel als auch Nachrichteninhalt verwendet. Zum Übertragen in unsere Domäne kann das Interface Decoder<T> implementiert werden. Für die News-Klasse zeigt dies Listing 6, für unseren Schlüssel vom Typ String wird ein StringDecoder mitgeliefert.

public class NewsDecoder implements Decoder {

  private final ObjectMapper mapper = new ObjectMapper();

  @Override
  public News fromBytes(byte[] bytes) {
    try {
      return mapper.readValue(bytes, News.class);
    } catch (IOException ex) {
      throw new DecoderException("Cannot read message.", ex);
    }
  }
}

ConsumerConnector

Als Schnittstelle zum Kafka-Cluster sieht das API den ConsumerConnector vor, der sich über die statische Methode createJavaConsumerConnector gemäß der oben vorgestellten ConsumerConfig instantiieren lässt:

…
ConsumerConfig config = createConsumerConfig(zookeeper, groupId);
ConsumerConnector consumerConnector =
   ConsumerConnector.createJavaConsumerConnector(config);
…

Wenn der Connector eingerichtet ist, kann man durch verschiedene Implementierungen der createMessageStream-Methode Zugriff auf die Nachrichten über KafkaStream<K, V> bekommen. Als Parameter sind für jeden Anwendungsfall unterschiedliche Kombinationen aus Topic-Filtern, Decodern (in unserem Falle StringDecoder und NewsDecoder) und/oder eine Map zur Konfiguration der Anzahl von Threads für ein Topic zulässig. Bevor wir darauf näher eingehen, ein kurzer Quelltextauszug zum Beziehen von KafkaStreams aus unserer Beispielanwendung:

…
Map<String, List<KafkaStream<String, News>>> consumerMap =
  consumerConnector.createMessageStreams(
    topicCountMap, new StringDecoder(null), new NewsDecoder());
List<KafkaStream<String, News>> streams = consumerMap.get(topic);
…

Neben den Decodern übergeben wir hier eine Map topicCountMap. Diese hält einen Integer-Wert je Topic bereit, der die Anzahl der zu erzeugenden KafkaStream-Instanzen definiert. Im Weiteren wird gezeigt, wie für jede Instanz ein Thread zum Auslesen des Streams angelegt wird.

Threading

Die korrekte Konfiguration zwischen der Anzahl von Threads und Partitionen eines Topics veranschaulicht Abbildung 8. Sollten einem Topic mehr Threads als Partitionen zugeordnet sein, erhalten die überschüssigen Threads keine zu konsumierenden Nachrichten. In unserem Beispiel entspricht dies dem Thread T0. Im Umkehrschluss bekommen Threads Nachrichten von mehreren Partitionen, wenn die Konstellation invers ist. In der Konsequenz geht die globale Ordnung über die Nachrichten verloren. Das heißt, für jede Partition werden die Nachrichten sequenziell an den Consumer-Thread übergeben, aber für Nachrichten aus unterschiedlichen Partitionen ist dies nicht zwingend der Fall.

Abb. 8: Consumer-Threading im Hinblick auf Anzahl der Topic-Partitionen

Abb. 8: Consumer-Threading im Hinblick auf Anzahl der Topic-Partitionen

Im Überblick wurde bisher beschrieben, wie man sich mit dem Kafka-Cluster verbindet, wie Nachrichten über das Kafka-API deserialisiert werden können und wie ein Nachrichtenstrom ausgelesen wird. Zudem haben wir uns mit der Anzahl von Auslesethreads und deren Auswirkungen beschäftigt. Im Folgenden wird die Verkopplung dieser Teilaspekte demonstriert.

// create consumer threads to handle the messages
int threadNumber = 0;
for (final KafkaStream stream : streams) {
  String name = String.format("%s[%s]", consumer.getTopic(), threadNumber++);
  Runnable runnable = new KafkaConsumerThread(stream, name, consumer);
  pool.submit(runnable);
} 

Fazit und Ausblick

Zusammenfassend haben wir in komprimierten Auszügen gezeigt, wie Nachrichten mit dem Kafka-API verschickt und empfangen werden können. Der vollständige Beispielcode ist in einem GitHub Repository zugänglich. Hinzukommend finden sich dort auch Integrationstests, die ohne manuell aufgesetzte Kafka- und ZooKeeper-Cluster verwendet werden können.

Für Probeläufe auf dem heimischen System kann Kafka gemäß der Schnellstartanleitung problemlos installiert werden. Wird eine rekonstruierbare Konfiguration über ein Chef-Rezept bevorzugt, existiert bereits ein Lösungsansatz.

Da rund um das Kafka-Ökosystem noch viel in Bewegung und das Thema an sich sehr umfassend ist, konnten wir hier nur eine kurze Einführung geben und viele Bereiche lediglich skizzieren. Wir hoffen, wir konnten Appetit auf mehr machen und Sie für Kafka und die dahinterliegenden Konzepte interessieren.

Aufmacherbild: internet background with binary code von Shutterstock / Urheberrecht: ramcreations

Geschrieben von
Lars Pfannenschmidt
Lars Pfannenschmidt
Lars Pfannenschmidt beschäftigt sich vorrangig mit Themen rund um Internet of Things, Mobilen Applikationen, Machine Learning, Big Data und Agilen Vorgehensmodellen wie Scrum und Kanban. Lars arbeitet als Staff Software Engineer bei der Intuit Inc. und ist Mitgründer der mobile.cologne User Group in Kölle.
Frank Wisniewski

Frank Wisniewski arbeitet als Senior Software Engineer im Bereich Echtzeitapplikationen bei Intuit Data Engineering & Analytics. Sein Erfahrungshorizont umfasst außedem 3-D-Visualisierung und -Rendering (C++/OpenGL, 3D-PDFs), moderne Webentwicklung und Big Data in Echtzeit sowie agilen Vorgehensmodellen wie Scrum oder Kanban.

Kommentare

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht.