Effizient, robust und sicher

Flexible Vernetzung mit dem HiveMQ MQTT Client

Silvio Giebl

© Shutterstock / Diyana Dimitrova

Eine Vielzahl von Anwendungen setzt mittlerweile auf das Kommunikationsprotokoll MQTT. Das Protokoll findet beispielsweise Einsatz in IoT-Anwendungen wie Industrie 4.0, Connected Cars und Logistik sowie generell bei der Kommunikation zwischen Microservices.

Mit dem Begriff „Internet der Dinge“ verbindet man oft nur eingebettete Geräte. Tatsächlich umfassen typische Anwendungen eine Vielfalt von Systemen, z. B. Steuergeräte, Gateways, mobile Geräte, Backend-Systeme und eine Vielzahl weiterer Microservices. Aufgrund der Diversität dieser Systeme erfordert ihre Vernetzung über das Internet eine robuste und plattformübergreifende Kommunikation. Hier kommt MQTT ins Spiel.

Bei MQTT handelt es sich um ein Publish/Subscribe-Protokoll, das sich sehr flexibel für die zuvor genannten Anwendungsfälle einsetzen lässt. Der HiveMQ MQTT Client ermöglicht die zuverlässige Vernetzung diverser Applikationen und Geräte über MQTT. Die Bibliothek lässt sich leicht in alle Java-Projekte integrieren und setzt ein besonderes Augenmerk auf Effizienz, sodass sie sich für den Einsatz im gesamten Spektrum von Embedded-Geräten bis hin zu Backend-Systemen eignet.

Sie ist über Maven Central verfügbar und kann somit als Dependency über Maven, Gradle oder ähnliche Build-Tools sehr leicht in Projekte eingebunden werden. Entwickelt wurde die unter Apache 2 lizenzierte Open-Source-Bibliothek von HiveMQ – den Entwicklern des gleichnamigen Enterprise MQTT Broker – sowie von BMW Car IT. Die Weiterentwicklung ist sehr agil, jeder Interessierte kann sich über GitHub beteiligen und Feedback abgeben.

MQTT behandelt alle Applikationen und Geräte als MQTT-Clients, die über einen MQTT Broker bidirektional miteinander kommunizieren können. Clients können sowohl Nachrichten senden als auch empfangen. Das Senden wird als Publishen bezeichnet.

Nachrichten werden mit Topics gepublisht – Clients empfangen dann Nachrichten mit bestimmten Topics, indem sie sich zuvor mit Topic-Filtern beim Broker subscriben. MQTT bietet drei Quality-of-Service-(QoS-)Level, die unterschiedliche Nachrichtenzustellungsgarantien auch bei Verbindungsunterbrechungen gewährleisten: at most once, at least once und exactly once.

Für jede Nachricht kann je nach Anwendungsfall das passende QoS-Level gewählt werden.

Die HiveMQ-MQTT-Client-Bibliothek implementiert alle Features der neuesten MQTT-Version: MQTT 5. Die MQTT-5-Spezifikation wurde Ende 2017 erstmals veröffentlicht und schließlich Anfang 2019 als OASIS- und ISO-Standard bestätigt. Die Version 5 komplettiert die Vorgängerversion 3.1.1 und ist für zeitgemäße IoT-Anwendungsfälle optimiert. Natürlich wird auch die noch weit verbreitete Vorgängerversion durch den HiveMQ MQTT Client unterstützt.

Damit ist der HiveMQ MQTT Client mit allen aktuellen MQTT Brokers kompatibel. Für neue Projekte empfiehlt sich die Verwendung von MQTT 5. Die wichtigsten Neuerungen sind:

  • Shared Subscriptions für clientseitiges Load Balancing, wobei Nachrichten auf bestimmten Topics auf eine Gruppe mehrerer MQTT-Clients aufgeteilt werden.
  • Flow Control, die Backpressure Handling ermöglicht, also Schutz von Client und Broker vor einer Überlast zu vieler gleichzeitiger Nachrichten.
  • Session and Message Expiry Intervals, die eine feingranulare Steuerung der maximalen Gültigkeit von Sessions und Nachrichten ermöglichen.
  • Diverse Metadaten und nutzerdefinierbare Metadaten, sogenannte User Properties.
  • Negative Acknowledgements für eine bessere Behandlung einzelner Fehler, wie beispielsweise abgelehnte Nachrichten.
  • Topic Aliase, die die benötigte Netzwerkbandbreite für lange Topic-Namen verringern.
  • Request/Response über das Publish/Subscribe-Protokoll: Im Gegensatz zum klassischen Request/Response ermöglicht MQTT die Entkoppelung von Requester und Responder.

Auf die Umsetzung dieser Features mit der HiveMQ-MQTT-Client-Bibliothek wird später noch genauer eingegangen. Generell legt die Bibliothek neben kompletter Unterstützung der MQTT-Spezifikation großen Wert auf Leichtgewichtigkeit und das API-Design. Letzteres folgt folgendem Prinzip:

„APIs should be easy to use and hard to misuse. It should be easy to do simple things; possible to do complex things; and impossible, or at least difficult, to do wrong things.“

Ganz diesem Zitat gemäß bietet der HiveMQ MQTT Client sehr viele und komplexe Konfigurationsmöglichkeiten, die jedoch nur bei Bedarf verwendet werden müssen; größtenteils werden automatisch sinnvolle Standardwerte verwendet. Damit lässt sich Anwendungscode sowohl einfach halten als auch für bestimmte Teile detailliert konfigurieren. Ermöglicht wird diese Flexibilität durch den konsistenten Einsatz eines Entwurfsmusters: kontextsensitives, fluides Builder-Pattern. Das Fluent API unterstützt hier bei den im jeweiligen Kontext möglichen Operationen. Außerdem sorgt das Builder-Pattern für eine gute Lesbarkeit, Verständlichkeit und Robustheit des Codes, da alle Parameter einen sprechenden Namen besitzen. Der Verwechslung von Parametern desselben Typs wird dadurch vorgebeugt. Das folgende Beispiel zeigt die einfache Konfiguration eines MQTT-Clients:

Mqtt5Client client = Mqtt5Client.builder()
  .serverHost("broker.hivemq.com")
  .build();

Diese Konfiguration kann beliebig detailliert sein, wie in Listing 1 zu sehen ist.

Mqtt5Client client = Mqtt5Client.builder()
    .identifier("test-client")
  .serverHost("broker.hivemq.com")
  .serverPort(1883)
  .sslWithDefaultConfig()
  .automaticReconnect()
    .initialDelay(1, TimeUnit.SECONDS)
    .maxDelay(30, TimeUnit.SECONDS)
    .applyAutomaticReconnect()
  .simpleAuth()
    .username("test-username")
    .password("test-password".getBytes())
    .applySimpleAuth()
  .build();

Des Weiteren bietet die HiveMQ-MQTT-Client-Bibliothek drei klar definierte Arten von APIs an – blockierend, asynchron und reaktiv:

  • Das blockierende API eignet sich zum Testen und für Proof of Concepts.
  • Die asynchronen und reaktiven APIs eignen sich für den Produktiveinsatz.
  • Die reaktive Schnittstelle setzt auf die Reactive-Streams-Spezifikation und als Implementierung auf RxJava, und bietet so die besten Möglichkeiten für anwendungsspezifisches Backpressure Handling.
  • Die Verwendung eines API-Stils ist jedoch nicht exklusiv und starr – mehrere Stile können simultan verwendet werden. Das Stichwort ist auch hier Flexibilität. Für jede Operation kann flexibel entschieden werden, welcher Stil verwendet wird. Das Beispiel in Listing 2 verbindet einen MQTT-Client mit dem blockierenden API und sendet dann eine Nachricht mit dem asynchronen API.

    client.toBlocking().connect();
    // der Client ist hier bereits verbunden
     
    client.toAsync().publishWith()
      .topic("test/topic")
      .qos(MqttQos.AT_LEAST_ONCE)
      .payload("message".getBytes())
      .send()
      .whenComplete((publishResult, throwable) -> {
        if (throwable == null) {
    // Nachricht erfolgreich gepublisht
        } else {
    // Fehler beim Publishen
        }
    });
    

    Ein komplett asynchroner Ablauf mit Verbinden, Senden einer Nachricht und Trennen der Verbindung sieht beispielsweise aus, wie in Listing 3 gezeigt.

    Mqtt5AsyncClient client = Mqtt5Client.builder()
      .serverHost("broker.hivemq.com")
      .buildAsync();
     
    client.connect()
      .thenCompose(connAck -> client.publishWith()
        .topic("test/topic")
        .qos(MqttQos.AT_LEAST_ONCE)
        .payload("message".getBytes())
        .send())
      .thenCompose(publishResult -> client.disconnect());
     
    // alle obigen Operationen sind asynchron
    // hier kann der Applikationscode weitergeführt werden
    

    Die vorangegangenen Beispiele zeigten das Senden einer Nachricht. Ein MQTT-Client kann Nachrichten empfangen, indem er sich auf bestimmte Topics subscribt. Das asynchrone API bietet die Möglichkeit, einen Callback zu registrieren, der für jede empfangene Nachricht aufgerufen wird:

    client.toAsync().subscribeWith()
      .topicFilter("test/#")
      .callback(publish -> performComputation(publish))
      .send();
    

    Werden Nachrichten gestreamt und asynchron weiterverarbeitet, ist das reaktive API die bessere Wahl. Es verwendet RxJava – eine Bibliothek, die die Reactive-Streams-Spezifikation implementiert – und basiert auf asynchronen Streams, die deutlich mehr Möglichkeiten bieten als ein simpler Callback.

    RxJava stellt eine Fülle nützlicher Streaming-Operationen zur Verfügung, die weit mehr als die üblichen Funktionen wie map, flatMap, filter und reduce umfassen. Beispielsweise lässt sich die parallele Verarbeitung von Nachrichten mit dem reaktiven API sehr leicht umsetzen, wie Listing 4 zeigt.

    client.toRx().subscribeStreamWith()
      .topicFilter("test/#")
      .applySubscribe()
      .parallel()
      .runOn(Schedulers.computation())
      .doOnNext(publish -> performParallelComputation(publish))
      .sequential()
      .doOnNext(publish -> performSequentialComputation(publish))
      .subscribe();
    

    Um die Reihenfolge der Nachrichten pro Topic aufrechtzuerhalten, kann die Parallelisierung auf Gruppen von Topics wie in Listing 5 beschränkt werden.

    client.toRx().subscribeStreamWith()
      .topicFilter("test/#")
      .applySubscribe()
      .groupBy(Mqtt5Publish::getTopic)
      .parallel()
      .runOn(Schedulers.computation())
      .flatMap(groupedStream -> groupedStream)
      .doOnNext(publish -> performParallelComputation(publish))
      .sequential()
      .subscribe();
    

    Der große Vorteil des reaktiven API zeigt sich bei den Kombinationsmöglichkeiten der asynchronen Streams. Sollen beispielsweise Nachrichten auf bestimmte Topics transformiert und dann wiederum auf andere Topics gepublisht werden, lässt sich das komplett asynchron und mit implizitem Backpressure Handling wie in Listing 6 umsetzen.

    Flowable<Mqtt5Publish> stream = client.toRx().subscribeStreamWith()
      .topicFilter("test/#")
      .applySubscribe();
     
    client.toRx()
      .publish(stream.map(publish -> performTransformation(publish)))
      .subscribe();
    

    Noch ein weiteres Beispiel des reaktiven API: RxJava bietet auch auf Zeit basierende Operatoren, wie etwa Windowing oder Sampling (Listing 7).

    client.toRx().subscribeStreamWith()
      .topicFilter("test/#")
      .applySubscribe()
      .sample(2, TimeUnit.SECONDS)
      .onBackpressureLatest()
      .subscribe(publish -> performComputation(publish));
    

    Die subscribe-Methodenaufrufe in den obigen Beispielen stammen aus dem Vokabular der Reactive-Streams-Spezifikation. Subscriben bedeutet hierbei das Konsumieren asynchroner Streams und ist nicht zu verwechseln mit dem Subscriben bei MQTT.

    Zurück zum API-Design der HiveMQ-MQTT-Client-Bibliothek: weitere Grundprinzipien sind Immutability und Thead Safety.

    Alle Objekte sind unveränderlich. Eine Publish-Nachricht kann so nicht unabsichtlich verändert werden, während eine asynchrone Operation sie noch verwendet. Um aber trotzdem flexibel zu sein, bieten die unveränderlichen Objekte eine Erweiterungsmöglichkeit:

    Mqtt5Publish publish = Mqtt5Publish.builder()
      .topic("test/topic")
      .qos(MqttQos.AT_LEAST_ONCE)
      .build();
     
    Mqtt5Publish retained = publish.extend().retain(true).build();
    

    Neben dem Schnittstellendesign ist – wie bereits erwähnt – auch Leichtgewichtigkeit ein Kernprinzip der HiveMQ-MQTT-Client-Bibliothek. Dazu zählen effiziente Ressourcenverwendung und geringer Ressourcenbedarf. Das ermöglicht sowohl den Einsatz auf eingeschränkter Hardware als auch sehr hohen Datendurchsatz auf Backend-Systemen.

    Geringer Ressourcenbedarf bedeutet konkret, dass der Speicherverbrauch einer Clientinstanz möglichst gering ist und dass ein Client keine Ressourcen – wie Threads – blockiert. Das Ressourcen-Pooling muss also nicht von der Anwendung bewerkstelligt werden. Es ist so möglich, sehr viele MQTT-Clients gleichzeitig zu verwenden, und es müssen nicht einzelne MQTT-Clients künstlich zwischen mehreren Komponenten eines Systems geteilt werden, was eine unnötige Kopplung verursacht.

    Trotz Leichtgewichtigkeit bietet der HiveMQ MQTT Client auch Funktionen eines vollständigen Frameworks. Ein Client kümmert sich selbst um seine Connection- und Session-Verwaltung und das Puffern von Nachrichten sowie das Threadmanagement.

    Das automatische Reconnect Handling kann aber je nach Anwendungsfall flexibel angepasst werden. Auch das Threading-Modell ist konfigurierbar. Der Codeausschnitt in Listing 8 zeigt, dass das Reconnect Handling individuell angepasst werden kann, obwohl die automatische Konfiguration verwendet wird. Im Beispiel muss zuerst ein OAuth-Token angefragt werden, bevor eine neue Verbindung aufgebaut werden kann:

    CompletableFuture<byte[]> getOAuthToken() { ... }
     
    Mqtt5Client client = Mqtt5Client.builder()
      .serverHost("broker.hivemq.com")
      .automaticReconnectWithDefaultConfig()
      .addDisconnectedListener(context -> {
      context.getReconnector().reconnectWhen(
        getOAuthToken(), 
        (token, throwable) -> {
          ((Mqtt5ClientDisconnectedContext) context)
            .getReconnector()
            .connectWith()
            .simpleAuth().password(token).applySimpleAuth()
           ..applyConnect();
        });
      })
      .build();

    Neben Authentifizierung und Autorisierung über Tokens ist auch die Transportverschlüsselung ein essenzieller Teil, wenn über öffentliche Netzwerke wie das Internet kommuniziert wird.

    Der HiveMQ MQTT Client unterstützt verschlüsselte Kommunikation sowie Zertifikatsauthentifizierung mit Transport Layer Security (TLS). Vor allem TLS 1.3 eignet sich aufgrund der hohen Sicherheit und überschaubaren Anzahl von Cipher Suites für zeitgemäße Anwendungen.

    Mqtt5Client.builder()
      .sslConfig()
        .protocols(Arrays.asList("TLSv1.3"))
        .cipherSuites(Arrays.asList("TLS_AES_128_GCM_SHA256"))
        .trustManagerFactory(myTrustManager)
        .keyManagerFactory(myKeyManager)
    

    MQTT 5 hat außerdem eine weitere Möglichkeit für Authentifizierung und Autorisierung hinzugefügt: Enhanced Auth. Darüber lassen sich diverse Authentifikations- und Autorisationsmechanismen als Challenge-Response-Verfahren implementieren. Im Folgenden wird gezeigt, wie die neuen MQTT-5-Features mit der HiveMQ-MQTT-Client-Bibliothek umgesetzt werden können. Eine Shared Subscription ist eine Subscription, die von einer Gruppe mehrerer MQTT-Clients geteilt wird. Die Gruppe wird durch einen String repräsentiert.

    Es ist möglich, einen Shared Topic Filter komplett in einem String anzugeben, wie es in der MQTT-Spezifikation definiert ist:

    client.toAsync().subscribeWith()
      .topicFilter("$share/group/test/#")
      .callback(System.out::println)
      .send();
    

    Für einfachere Konfigurierbarkeit können Topic Filter und Shared Name auch separat angegeben werden:

    client.toAsync().subscribeWith()
      .topicFilter(MqttSharedTopicFilter.of("group", "test/#"))
      .callback(System.out::println)
      .send();
    

    Session Expiry kann ganz einfach beim Verbinden eines MQTT-Clients angegeben werden:

    client.toAsync().connectWith()
      .cleanStart(false)
      .sessionExpiryInterval(120)
      ...
    

    Während beim Publishen einer Nachricht oft nur Topic, QoS-Level und Payload angegeben werden, können auch diverse Metadaten und User Properties gesetzt werden. Hier kommt wieder das fluide Builder-Pattern ins Spiel; eigentlich muss nur das Topic angegeben werden, alle anderen Parameter einer Publish-Nachricht haben Standardwerte oder sind optional.

    Je nach Bedarf können Metadaten wie Message Expiry, Content Type und User Properties wie in Listing 9 gesetzt werden.

    client.toBlocking().publishWith()
      .topic("test/topic")
      .qos(MqttQos.AT_LEAST_ONCE)
      .payload("payload".getBytes())
      .retain(true)
      .contentType("text/plain")
      .messageExpiryInterval(120)
      .userProperties()
        .add("sender", "test-sender")
        .add("receiver", "you")
        .applyUserProperties()
      .send();
    

    Um das Zuweisen von Topic-Aliassen, die die benötigte Netzwerkbandbreite für Topic-Namen verringern, muss sich der Nutzer nicht selbst kümmern. Es passiert sozusagen unter der Haube, indem die meist verwendeten Topics möglichst gut automatisch Aliassen zugewiesen werden. Das vermeidet auch Protokollfehler durch aus Versehen vom Nutzer falsch gesetzte Topic-Aliasse.

    Für Request/Response bietet der HiveMQ MQTT Client (aktuell) keine dedizierten Operationen. Tatsächlich lässt sich das Verfahren aber sehr leicht umsetzen. Der antwortende Client muss sich auf das Request Topic subscriben und publisht dann auf das Response Topic, wenn er eine Nachricht erhält. Durch das reaktive API lässt sich das so umsetzen, dass er mehrere Requests behandeln kann, wie in Listing 10 gezeigt.

    responder.toRx()
      .publish(responder.toRx()
        .subscribeStreamWith()
        .topicFilter("request/topic")
        .applySubscribe()
        .map(publish -> Mqtt5Publish.builder()
          .topic(publish.getResponseTopic().get())
          .qos(publish.getQos())
          .payload("response".getBytes())
          .correlationData(publish.getCorrelationData().orElse(null))
          .build()))
      .subscribe();
    

    Der anfragende Client muss sich zuvor auf das Response Topic subscriben und dann die Request-Nachricht publishen (Listing 11).

    requester.toAsync()
      .subscribeWith()
      .topicFilter("response/topic")
      .callback(publish -> System.out.println("received response"))
      .send()
      .thenCompose(subAck -> requester.toAsync()
        .publishWith()
        .topic("request/topic")
        .responseTopic("response/topic")
        .correlationData("1234".getBytes())
        .qos(MqttQos.EXACTLY_ONCE)
        .payload("request".getBytes())
        .send());
    

    Über die Correlation Data kann eine Response eindeutig einem Request zugeordnet werden, wenn der anfragende Client mehrere Requests gleichzeitig startet. In diesem Artikel wurden soweit die wichtigsten Designkriterien der HiveMQ-MQTT-Client-Bibliothek beleuchtet: MQTT-Konformität, API-Design und Leichtgewichtigkeit.

    Zuletzt noch ein spezielleres Thema: die Verwendung auf Android. Die Bibliothek macht von Java-8-Sprachfunktionen wie Lambdas und APIs wie Optional, CompletableFuture und den funktionalen Interfaces Gebrauch. Die Java 8 APIs werden leider erst ab Android 7.0 beziehungsweise API-Level 24 vollständig unterstützt. Viele Projekte benötigen aber noch ein niedrigeres API-Level, um alle Nutzergeräte zu unterstützen. Tatsächlich kann die HiveMQ-MQTT-Client-Bibliothek bereits ab API-Level 19 respektive Android 4.4 verwendet werden, was mehr als 96 Prozent der weltweit verwendeten Android-Geräte abdeckt. Um dies zu ermöglichen, benötigt man nur das Android RetroFix Gradle Plugin, das die benötigten APIs automatisch auf älteren Android-Versionen zurück portiert. Der Vorteil des Gradle-Plug-ins ist, dass es keine Änderung am Applikationscode benötigt und in Zukunft einfach wieder entfernt werden kann, wenn das minimal unterstützte API-Level auf 24 angehoben wird.

    Fazit

    Die HiveMQ-MQTT-Client-Bibliothek ermöglicht effiziente, robuste und sichere Kommunikation im Internet. Sie lässt sich als MQTT-Standardbibliothek in allen Java-Projekten inklusive Android einsetzen. Als Open-Source-Projekt finden die Pflege und Weiterentwicklung mit Feedback und Teilnahme der Community auf GitHub statt.

Geschrieben von
Silvio Giebl

Silvio Giebl ist Software-Entwickler bei HiveMQ und Entwickler und Maintainer der HiveMQ-MQTT-Client-Bibliothek. Er beschäftigt sich mit API-Design, Softwarearchitektur und interessiert sich für hochperformante JVM-Applikationen sowie reaktive Programmierung.

Kommentare

Hinterlasse einen Kommentar

avatar
4000
  Subscribe  
Benachrichtige mich zu: