Suche
MQTT: Schnelleinstieg in das schlanke IoT-Protokoll mit Java

IoT-Allrounder

Dominik Obermaier

© Shutterstock.com/123dartist

Das schlanke und leichtgewichtige Kommunikationsprotokoll MQTT hat sich als wichtigstes Internet-of-Things-(IoT-)Standardprotokoll durchgesetzt. Die Anwendungsfälle für MQTT reichen von industriellen Anwendungen über Connected Cars bis zu Home Automation und Do-it-yourself-(DIY-)Projekten. Grund genug, das IoT-Allrounder-Protokoll aus der Java-Perspektive zu betrachten und zu erforschen, welche Möglichkeiten sich mit dem Einsatz von MQTT in Java-Applikationen ergeben.

Das Protokoll MQTT wurde 1999 als ein M2M-Kommunikationsprotokoll für SCADA-Systeme mit minimalem Protokoll-Overhead entwickelt. Es bietet vernetzten Geräten eine Möglichkeit, bandbreiten- und batterieschonend miteinander zu kommunizieren. Das Protokoll MQTT zeichnet sich außerdem durch seine außerordentliche Leichtgewichtigkeit aus und ist clientseitig sehr einfach zu implementieren.

Seit 2010 ist die Protokollspezifikation in Version 3.1 lizenzfrei verfügbar und seit 2014 auch die aktuellste Version 3.1.1, die erstmals bei dem Standardisierungsgremium OASIS spezifiziert wurde. Das Protokoll schlägt mit einem ereignisgesteuerten Push-Ansatz einen anderen Weg ein als beispielsweise HTTP, das auf Request/Response basiert.

Lesen Sie auch: 8 Gründe, warum sich jeder Software-Entwickler mit MQTT beschäftigen sollte

Warum MQTT?

MQTT ist besonders geeignet für eine zuverlässige Nachrichtenübertragung in unzuverlässigen und instabilen Netzwerken, wie etwa bei Mobilfunknetzwerken. Folgende Aspekte machen MQTT zu einem optimalen Protokoll für das Internet der Dinge und die mobile Kommunikation:

  • MQTT ist komplett datenagnostisch. Es ist daher geeignet, Daten jeder Art zu übertragen, egal ob es sich um Text oder binärkodierte Inhalte handelt.
  • MQTT ist einfach. Die Konzepte sind leicht zu erlernen und eigene Clientimplementierungen problemlos zu realisieren.
  • MQTT erfindet das Rad nicht neu. Es baut auf TCP auf, und die Übertragung kann jederzeit mittels SSL/TLS verschlüsselt werden.
  • MQTT ermöglicht echte Push-Kommunikation: Anders als bei anderen Protokollen gibt es bei MQTT kein Polling. Nachrichten werden sofort verteilt, wenn ein Ereignis auftritt. Das schont Bandbreite und CPU, da MQTT-Clientanwendungen reagieren können, sobald eine Nachricht ankommt, anstatt beim Server nach neuen Nachrichten zu fragen.

Publish/Subscribe

MQTT implementiert das Publish/Subscribe-Pattern (Abb. 1). Das bedeutet, dass jede Kommunikation über einen zentralen Verteiler, den so genannten MQTT Message Broker, stattfindet. Jede Nachricht, die ein Client sendet, enthält ein so genanntes Topic und die tatsächlichen Nutzdaten.

Abb. 1: Das Publish/Subscribe-Pattern

Abb. 1: Das Publish/Subscribe-Pattern

Ein Topic ist ein Text, der Trennzeichen enthalten kann, und er kann, ähnlich dem POSIX-Dateisystempfad, eine Hierarchie abbilden. Die Struktur heimautomatisierung/peters_haus/wohnzimmer/gluehbirne/1 wäre beispielsweise ein gutes Topic für Nachrichten von Glühbirne 1 im Wohnzimmer von Peters Haus. Jeder MQTT-Client, der Nachrichten für dieses Topic empfangen möchte, abonniert es beim Message Broker. Da die interessierten Clients beim Eintreffen neuer Nachrichten durch den Broker benachrichtigt werden, anstatt selbst beim Server nach Änderungen zu fragen, wird eine hocheffiziente Kommunikation zwischen den Teilnehmern gewährleistet. Somit ist eine echte Push-Kommunikation möglich. Entscheidend ist, dass die Teilnehmer der Kommunikation nichts voneinander wissen, da jeder Client nur den Message Broker kennt, nicht jedoch die anderen Teilnehmer.

Lesen Sie auch: Eclipse IoT für Dummies

Protokollfeatures

MQTT besitzt neben dem reinen Publish/Subscribe eine Reihe von bemerkenswerten Protokollfeatures, die im Kontext des IoT und von Mobile optimal sind.

  • Quality of Service: Um sicherzustellen, dass eine gesendete Nachricht beim Empfänger ankommt, definiert MQTT drei verschiedene Quality of Service Levels (QoS), mit denen eine Nachricht gesendet werden kann: at most once (0), at least once (1) und exactly once (2).
  • Retained Messages: Die letzte gesendete Nachricht eines Topics kann am Broker hinterlegt werden. Jeder neue Subscriber auf diesem Topic erhält automatisch die zuletzt gesendete Nachricht.
  • Last Will and Testament (LWT): Ein Client kann eine MQTT-Nachricht als „letzten Willen“ beim Verbindungsaufbau am Broker hinterlegen. Wenn der Broker feststellt, dass dieser Client nicht mehr verbunden ist und der Client sich vorher nicht abgemeldet hat, wird dieser letzte Wille an alle Subscriber versendet. Der Broker ist also verantwortlich dafür, den letzten Willen des Clients auszuführen.
  • Persistent Sessions: In Anwendungsfällen, bei denen mit häufigen Verbindungsabbrüchen von Clients zu rechnen ist, kann der Broker eine persistente Session vorhalten. Wenn ein Client sich erneut verbindet, schickt der Broker alle verpassten Nachrichten für seine Subscriptions an den Client. Natürlich muss der Client sich nicht neu auf vorher abonnierte Topics subscriben, da der Broker die vorherige Session einfach fortführt.

Eclipse Paho

Eclipse Paho ist ein Projekt unter dem Schirm der Eclipse Foundation, das MQTT-Clientimplementierungen in verschiedenen Programmiersprachen zur Verfügung stellt. Das sind neben C die folgenden: C++, C#, Python, JavaScript, Go und natürlich auch Java. Das Paho-Projekt existiert seit 2012, die Java- und C-Implementierungen werden jedoch seit vielen Jahren bei IBM, die auch die initiale Implementierungen bereitgestellt haben, eingesetzt. Sie sind für den produktiven Einsatz freigegeben. Neben einem synchronen API bietet Paho auch ein asynchrones API, das mehr Performance verspricht, dafür etwas komplizierter in der Benutzung ist. In den folgenden Beispielen wird das synchrone Paho API verwendet.

Lesen Sie auch: Messaging im Internet der Dinge – Eclipse Paho und MQTT

Download von Paho

Eclipse Paho ist als .jar-Datei zum Download verfügbar und kann in ein Java-Projekt eingebunden werden. Wer Apache Maven für sein Dependency Management bevorzugt, kann den Eintrag aus Listing 1 in der pom.xml zu seinem Projekt hinzufügen.

Paho ist aktuell nicht im Maven Central Repository verfügbar. Es existiert jedoch ein eigenes Repository, das man, wie in Listing 1 gezeigt, in seiner pom.xml hinzufügen kann.

...
<dependencies>
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.0.2</version>
    </dependency>
</dependencies>

<repositories>
    <repository>
        <id>Eclipse Paho Repo</id>
        <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
    </repository>
</repositories>
...

Publish

Der nachfolgende Code baut eine Verbindung zu einem MQTT-Broker auf, schickt eine Nachricht an das Topic javamagazin/mqttarticle mit dem Text „Hallo Welt“ und trennt die Verbindung zum MQTT wieder.

 MqttClient client = new MqttClient(
"tcp://broker.mqttdashboard.com", generateClientId());

client.connect();

MqttMessage message = new MqttMessage("Hallo Welt".getBytes());
client.publish("javamagazin/mqttarticle", message);

client.disconnect();

Wie man sieht, sind für das Senden von Nachrichten  nur wenige Zeilen Code notwendig. Auffallend in dem Codebeispiel ist, dass der String Hallo Welt als Bytearray übergeben wird. Das hat damit zu tun, dass die Daten bei MQTT immer binär sind und es dem Protokoll egal ist, ob ein Text, ein Bild oder ein sonstiges Bytearray übertragen wird.

Folgender Code schickt dieselbe MQTT-Nachricht wie das vorherige Codebeispiel, diesmal jedoch als Retained Message und mit Quality of Service 2. Damit stellt das Protokoll sicher, dass die Nachricht exakt ein Mal beim Empfänger ankommt.

 client.publish("javamagazin/mqttarticle", "Hallo Welt".getBytes(), 2, true);

Lesen Sie auch: Kapua – die ultimative IoT-Plattform

Subscribe

Natürlich wäre es witzlos, wenn zwar Nachrichten gesendet werden, aber niemand diese Nachrichten empfangen würde. Einen Subscriber-Client mit Paho zu implementieren, ist dabei eine einfache Sache. Der in Listing 2 dargestellte Beispielcode zeigt die Implementierung eines Clients, der sämtliche Nachrichten für den Topic javamagazin/mqttarticle empfängt und die Nachrichten auf der Konsole ausgibt.

...
 MqttClient client = new MqttClient(
"tcp://broker.mqttdashboard.com", generateClientId());

client.setCallback(new MqttCallback() {
       @Override
       public void connectionLost(Throwable throwable) { }

       @Override
       public void messageArrived(String t, MqttMessage m) throws Exception {
         System.out.println(new String(mqttMessage.getPayload()));
       }

       @Override
       public void deliveryComplete(IMqttDeliveryToken t) { }
       });

client.connect();

client.subscribe("javamagazin/mqttarticle");
...

Um auf ankommende Nachrichten reagieren zu können, kann man dem MqttClient mittels der setCallback()-Methode einen MqttCallback anhängen. Dieser bietet mittels der messageArrived()-Methode die Möglichkeit, auf eingehende Nachrichten zu reagieren. Der Callback wird in einem Thread im Hintergrund aufgerufen, daher wird der Hauptthread der Applikation nicht geblockt, wenn eine neue Nachricht eingeht. Der Code ist reaktiv.

Im MqttCallback können noch weitere Methoden implementiert werden: connectionLost() wird aufgerufen, sobald die TCP-Verbindung getrennt wurde. Diese Methode kann benutzt werden, um beispielsweise einen Reconnect zu implementieren. Die Methode deliveryComplete() ruft Paho auf, sobald eine Nachricht erfolgreich abgeschickt wurde.

Optionen beim Verbindungsaufbau

Paho bietet eine volle Implementierung aller MQTT-Features, u. a. Last-Will-and-Testament-(LWT-) und Persistent Sessions. Listing 3 zeigt, wie eine LWT-Nachricht beim Verbindungsaufbau mitgegeben werden kann und wie man eine persistente Session für einen Client aktiviert. Zusätzlich werden ein Benutzername und ein Passwort für die Authentifizierung am Broker übergeben.

...
MqttClient client = new MqttClient(
"tcp://broker.mqttdashboard.com", generateClientId());

MqttConnectOptions options = new MqttConnectOptions();
//Setzen einer Persistent Session
options.setCleanSession(false);
options.setUserName("username");
options.setPassword("password".toCharArray());

options.setWill(
        "will/topic",               //Topic
        "Disconnected!".getBytes(), //Nachrichteninhalt
        2,      //QoS
        false); //Retained message?

client.connect(options);
...

Das MqttConnectOptions-Objekt bietet noch mehr Eingriffsmöglichkeiten in die Standardkonfiguration von Paho. SSL kann beispielsweise aktiviert und es können Timeouts gesetzt werden.

Wie gezeigt, ist es sehr leicht, einen eigenen MQTT-Client mit Eclipse Paho zu entwickeln. Dabei eignet sich Paho genauso zur Einbindung in Java-Enterprise-Anwendungen, wie auch in Android-Applikationen oder für Embedded Devices. Neben dem synchronen API gibt es noch ein asynchrones API, das optimal geeignet ist, wenn sehr viele Nachrichten gleichzeitig geschickt werden sollen oder falls es nötig ist, Paho tiefer in die eigene Applikation zu integrieren. Paho hat eine aktive Community und wird aktiv weiterentwickelt.

MQTT-Broker

Der MQTT-Broker ist das Herzstück der Publish/Subscribe-Kommunikation, da alle MQTT-Clients über den Broker via Topics kommunizieren. Unter den verschiedenen verfügbaren MQTT-Brokern haben wir uns in unserem Beispiel für den Java-basierten Broker HiveMQ entschieden. HiveMQ ist ein MQTT-Broker für den professionellen Einsatz und bietet neben seiner hohen Performance und der Cluster-Funktionalität ein Java-basiertes Open-Source-Plug-in-System.

In den nachfolgenden Beispielen soll das Java-basierte Plug-in-System genutzt werden, um den MQTT-Broker um folgende Funktionalitäten zu erweitern:

  • Authentifizierung: Ein MQTT-Client muss eine valide Username/Passwort-Kombination beim Verbindungsaufbau benutzen; andernfalls wird der Verbindungsaufbau vom MQTT-Broker abgelehnt.
  • Jede MQTT-Nachricht wird vom MQTT-Broker persistiert, damit andere Applikationen die Daten später auswerten können.

Das HiveMQ-Plug-in-System

Mit dem HiveMQ-Plug-in-System ist es möglich, die oben genannten Anforderungen, wie das Abspeichern von MQTT-Messages und die Authentifizierung von MQTT-Clients, mit wenigen Zeilen Code umzusetzen. Dazu werden von HiveMQ verschiedene Callbacks angeboten, die in einem Plug-in implementiert werden können. Neben Callbacks gibt es auch noch so genannte Services, die benutzt werden können, um aus dem Plug-in-System heraus mit dem Broker zu interagieren. Es ist beispielsweise möglich, manuell neue Subscriptions für bestimmte Clients anzulegen oder eine Nachricht direkt aus dem Plug-in-System heraus an bestimmte Topics zu schicken. Das Plug-in-System ist unter der Apache-Lizenz frei verfügbar, und der Quellcode kann auf GitHub eingesehen werden. Um mit der Plug-in-Entwicklung zu starten, gibt es einen Maven Archetype, eine ausführliche Dokumentation und viele Beispiel-Plug-ins auf GitHub.

Lesen Sie auch: Ein neuer Cluster für HiveMQ 3.1

Authentifizierung

In Produktionsumgebungen ist es üblich, dass ein MQTT-Client sich authentifizieren muss, bevor er Nachrichten senden oder empfangen kann. In kleineren Umgebungen werden dazu oft Access Control Lists (ACLs ) benutzt, also eine Datei mit erlaubten Benutzernamen und dem entsprechendem Passwort. Für HiveMQ gibt es dazu bereits ein fertiges Plug-in, das ohne programmatischen Aufwand benutzt werden kann. Natürlich reichen ACLs oft nicht aus, denn die Accounts müssen oft aus dem firmeneigenen LDAP in Datenbanken oder über (Micro-)Services ausgelesen werden. Da jeder Anwendungsfall hier unterschiedlich ist, bietet es sich an, ein eigenes Plug-in für das genaue Einsatzszenario zu entwickeln. Das HiveMQ-Plug-in-System bietet dazu den OnAuthenticationCallback an. Eine beispielhafte Implementierung ist in Listing 4 zu sehen.

public class AuthenticationExample implements OnAuthenticationCallback {

    @Override
    public Boolean checkCredentials(final ClientCredentialsData clientData)
            throws AuthenticationException {

        if (clientData.getClientId().startsWith("mqtt-client")) {
            return true;
        }
        return false;
    }

    @Override
    public int priority() {
        return CallbackPriority.HIGH;
    }
}

Die Methode checkCredentials wird von HiveMQ aufgerufen, sobald ein neuer Client sich verbinden möchte. In Listing 4 wird überprüft, ob der Client einen Client Identifier besitzt, der mit dem String mqtt-client beginnt. Natürlich würden in einem realen Anwendungsfall eher der Benutzername und das Passwort überprüft und ein Lookup in einer Datenbank, einem LDAP oder einem firmeninternen Service durchgeführt werden. Das Beispiel veranschaulicht jedoch, wie einfach eine solche Integration zu realisieren ist: Es kann jede beliebige Java-Bibliothek eingebunden werden, weshalb es also ohne Weiteres möglich ist, z. B. ein ORM-Framework wie Hibernate im Plug-in-System zu benutzen.

Speichern von Nachrichten

Ein weiterer typischer Anwendungsfall für ein MQTT-Broker-Plug-in ist es, Nachrichten direkt vom Broker in eine Datenbank zu speichern oder an ein nachgelagertes System zu übergeben. HiveMQ bietet dazu den Callback OnPublishReceived an, der implementiert werden kann, um die MQTT-Publish-Message zu verarbeiten. Das Codebeispiel in Listing 5 zeigt, wie eine solche Nachricht in einer Datenbank gespeichert werden kann, falls der Absender der Nachricht einen Client Identifier besitzt, der mit important-client beginnt.

public class PublishExample implements OnPublishReceivedCallback {

    @Override
    public void onPublishReceived(PUBLISH publish, ClientData clientData) throws OnPublishReceivedException {

        if (clientData.getClientId().startsWith("important-client")) {
            saveToDB(publish.getTopic(), publish.getPayload());
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.HIGH;
    }

    public void saveToDB(String topic, byte[] content) {
        //Persist to database! 
    }
}

Es ist also ziemlich einfach, mit diesem Callback Nachrichten vorzufiltern, bevor sie in der Datenbank gespeichert werden. Dazu können wahlweise Sessiondaten des sendenden Clients benutzt werden oder aber auch eine Eigenschaft der gesendeten Nachricht. Dieser Callback kann natürlich auch mehrmals und jeweils mit einer anderen Businesslogik implementiert werden. So wäre es möglich, später ein Plug-in zu entwickeln, das eine SMS bei jeder Nachricht mit dem Topic alert an das Operations-Team schickt. Die Callbacks werden dann nach Priorität ausgeführt, wie dies in der priority()-Methode geschieht. Ein gängiger Anwendungsfall ist auch, im OnPublishReceivedCallback die Daten der Nachricht zu überprüfen, bevor sie weiterverarbeitet werden. Im Fehlerfall kann dann eine OnPublishReceivedException geworfen werden, die eine Weiterverarbeitung und ein Weitersenden der Nachricht verhindert.

Lesen Sie auch: Node-RED: Ein Baukasten für das Internet der Dinge

Fazit

Der Einstieg in MQTT ist für Java-Entwickler dank MQTT-Clients wie Eclipse Paho und Java-basierten Brokern wie HiveMQ so einfach wie nie zuvor. Es ist mit wenigen Zeilen Code möglich, aus bestehenden Java-Applikationen vollwertige MQTT-Clients zu machen, die sowohl Nachrichten senden als auch empfangen können. Zusätzliche Geschäftslogik, die über das Verteilen von MQTT-Nachrichten hinausgeht, lässt sich sehr leicht mit erweiterbaren MQTT-Brokern wie HiveMQ umsetzen. Dabei kann jede beliebige Java-Bibliothek in Plug-ins verwendet werden, was die Produktivität bei der Entwicklung stark erhöht.

Aufmacherbild: Abstract network connection background – chrome on white von Shutterstock / Urheberrecht: 123dartist

Geschrieben von
Dominik Obermaier
Dominik Obermaier
Dominik Obermaier ist Geschäftsführer bei der dc-square GmbH, die hochskalierbare IoT-Lösungen, wie den MQTT-Broker HiveMQ, entwickelt. Dominik berät Unternehmen im Bereich MQTT, Messaging und IoT. Er entwickelte MQTT 3.1.1 bei OASIS mit und ist regelmäßiger Autor und Speaker auf Konferenzen.
Kommentare
  1. Rainer Schleevoigt2016-06-07 10:39:25

    Hallo,
    freut mich, dass es eine Java-Implentierung gibt. Nun lese ich auf der RepoSeite https://github.com/eclipse/paho.mqtt.java dass wichtige Feature wie AutoReconnect immer noch nicht zur Verfügungs stehen. Ich hatte mich genau wegen dieser Robustheit für MQtt entschieden. Wann ist damit zu rechnen?

  2. Christian Treber2017-01-27 19:09:03

    ECHO... Echo... echo...

    Antworten gibt es eher nicht?

  3. 2017-02-06 07:14:02

    Nur per MQTT.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht.