Maschinenkommunikation mit MQTT und Vaadin

Internet of Things mit Java 8 und TinkerForge, Teil 9

Sven Ruppert
© S&S Media

Im letzten Beitrag haben wir uns angesehen, wie wir unsere IoT-Einheiten twittern lassen. Nützlich und einfach, aber es gibt da noch das Protokoll MQTT. Was kann man damit anstellen und wie nutze ich es selbst? Was wird alles dafür benötigt?

 

Sensor an RaspberryPi: Die Daten werden geliefert.

Raspberry Pi an Sensor: Daten erhalten. Beginne mit der Verarbeitung. Ende. 

So oder so ähnlich kann man sich die Kommunikation zwischen zwei Geräten vorstellen. Wir werden heute an dieser Machine-to-Machine-, kurz: M2M-Kommunikation, teilnehmen. Die Sprache, die fast alle Geräte verstehen, ist MQTT (Message Queue Telemetry Transport).

IoT und MQTT

MQTT ist das Protokoll mit der größten Verbreitung im Bereich M2M. Das Protokoll selbst ist mit über zehn Jahren schon recht alt und wurde ehemals dafür entwickelt, Telemetrie-Daten über satellitengestützte Kommunikationskanäle zu übertragen. Es handelt sich um ein Publish-Subscribe-Protokoll. Was das genau bedeutet, werden wir uns noch ansehen. Wichtig ist zu wissen, dass es drei unterschiedliche Dienstgüte-Niveaus (Quality of Service Levels, QoS) gibt. Hier wird geregelt, ob Nachrichten ankommen dürfen, sollen oder müssen. Um MQTT zu verwenden, werden ein paar Dinge benötigt. Es gibt den Sender, der wiederum seine Nachrichten an einen Broker schickt. Der Broker ist die zentrale Anlaufstelle für alle Konsumenten, die sich für die jeweiligen Nachrichten interessieren. Seine Nachrichtenkanäle sind in einer baumartigen Struktur organisiert. Die Ebenen in diesem Baum werden Topics genannt. Topics können sowohl Knoten als auch Blätter darstellen. An jedem beliebigen Knoten kann man sich einklinken und alle Informationspakete ab dieser Ebene inklusive aller Unterebenen empfangen. Zur Adressierung selbst wird innerhalb des Baumes ein Muster verwendet.

Der Broker

Kernelement ist also der Broker, der sowohl für den Sender als auch den Empfänger die direkte Anlaufstelle ist. Hier gibt es verschiedene Implementierungen. Ich habe mich hier für die Implementierung „Mosquitto“ entschieden. Die Installation auf einem Raspberry Pi ist sehr einfach und in ein paar Minuten erledigt. Zu Beginn muss der GPG-Key der zu dem Mosquitto-Repository gehört, installiert werden. Sobald das erfolgt ist, kann man in dem Verzeichnis /etc/apt/sources.list.d die Referenzen ablegen. Ein nachfolgendes apt-get update und apt-get install erledigt den Rest. Damit ist nun der jeweils aktuelle Mosquitto-Broker auf dem System aktiv. Voraussetzung ist natürlich immer mal wieder ein Update über den üblichen Linux-Mechanismus per apt-get. Listing 1 zeigt die von mir verwendete Befehlsfolge.  

wget http://repo.mosquitto.org/debian/mosquitto-repo.gpg.key
sudo apt-key add mosquitto-repo.gpg.key
cd /etc/apt/sources.list.d/
sudo wget http://repo.mosquitto.org/debian/mosquitto-repo.list
sudo apt-get update
sudo apt-get install mosquitto

Nun ist der Broker installiert und kann verwendet werden. Für unsere ersten Tests muss nichts weiter konfiguriert werden.

Der Server

Der Broker ist nun soweit, und wir können beginnen, Daten zu pushen. Dafür verwende ich die Implementierung aus dem Eclipse-Projekt Paho. In diesem Projekt gibt es verschiedene Sprachanbindungen. Für uns ist die Java-Anbindung von Interesse. Um diese nutzen zu können, wird in der pom.xml lediglich der Eintrag wie in Listing 2 benötigt.  

    <!--MQTT-->
    <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>mqtt-client</artifactId>
      <type>jar</type>
      <version>0.4.0</version>
    </dependency>

Das Vorgehen ist recht einfach. Es wird eine Verbindung zum Broker aufgebaut, und nachfolgend kann unter einem beliebigen Topic eine Message gepusht werden. Für den Verbindungsaufbau wird die IP des Brokers benötigt und in unserem Fall der Standard-Port 1883 verwendet. Damit jeder Client von dem Broker unterschieden werden kann, wird jeder Verbindung eine UID zugeordnet. Diese kann man beliebig selber wählen, solange es keine Dopplungen gibt. In unserem Fall verwende ich die Klasse UUID und generiere eine 22 Einheiten lange Zufallsfolge, bestehend aus alphanumerischen Zeichen. Um die Initialisierung ein wenig komfortabler zu gestalten, wurde sie in einen Builder ausgelagert und ist in Listing 3 zu sehen. 

public class MqttClientBuilder {

  private String uri;
  private String clientUID;
  private boolean memoryPersistence = true;
  private boolean filePersistence = false;

  public  MqttClient build(){
    MqttClient client;
    try {
      if(memoryPersistence){
        client = new MqttClient( uri,clientUID, new MemoryPersistence() );
      } else{
        client = new MqttClient( uri,clientUID, 
                    new MqttDefaultFilePersistence());
      }
    } catch (MqttException e) {
      e.printStackTrace();
      client = null;
    }
    return client;
  }

  public MqttClientBuilder uri(String s) {
    this.uri = s;
    return this;
  }

  public MqttClientBuilder clientUID(String s) {
    this.clientUID = s;
    return this;
  }
  public MqttClientBuilder clientUIDGenerated() {
    String substring = UUID.randomUUID()
                            .toString().replace("-", "").substring(0, 22);
    this.clientUID = substring;
    System.out.println("clientUID = " + clientUID);
    return this;
  }

  public MqttClientBuilder memoryPersistence(boolean b) {
    this.memoryPersistence = b;
    this.filePersistence = !this.memoryPersistence;
    return this;
  }

  public MqttClientBuilder filePersistence(boolean b) {
    this.filePersistence = b;
    this.memoryPersistence = !this.filePersistence;
    return this;
  }
}

Die Verwendung, um eine Verbindung aufzubauen ist dann recht kurz (Listing 4).

public static final String TOPIC = "TinkerForge/Wetterstation/";
public static final String BROKER = "192.168.0.106";  //broker

    private static MqttClientBuilder builder = new MqttClientBuilder();

    MqttClient sender = builder
        .uri("tcp://" + BROKER+":1883")
        .clientUIDGenerated()
        .build();
    try {
      sender.connect();

      MqttBuffer buffer = new MqttBuffer() //Implementierung in Listing 6
          .client(sender).topic(TOPIC).qos(1).retained(true);

Die Verbindung steht, das Topic wurde gewählt. Nun können wir beginnen, die Daten zu senden.

Aus den vorangegangenen Teilen dieser Serie wissen wir, dass es recht einfach ist, in regelmäßigen Abständen einen Messwert aus dem Sensor zu erhalten (Listing 5). Wir werden uns in der Zeile mit dem System.out.println einklinken und an der Stelle die Daten senden. Fertig ist der Sender… fast!

public static final String HOST = "192.168.0.200";  //wetterstation
  public static final int PORT = 4223;
  private static int callbackPeriod = 5000;

final Temperature temperature = new Temperature("dXj",
                                            callbackPeriod, PORT, HOST);
    temperature.bricklet().addTemperatureListener(sensorvalue -> {
      final double tempNorm = sensorvalue / 100.0;
      final String text = LocalDateTime.now() + " - Temp  : " 
                            + tempNorm + " °C";
      System.out.println("text = " + text);
      buffer.sendAsync(text);
    });
temperature.run();

Wir haben hier eine Besonderheit zu bedenken: Das Senden der Daten benötigt eine Zeitspanne x. Wenn diese teilweise größer ist als die Zeit, bis der nächste Sensorwert bereitsteht, müssen wir einen Puffer dazwischen setzen. Dank Java 8 ist das kein Problem. Wir verwenden eine Instanz der Klasse CompletableFuture, um asynchrone Methodenaufrufe verwenden zu können. Damit wird die Implementierung einer puffernden Struktur recht einfach. Ein Beispiel ist in Listing 6 zu sehen.

public class MqttBuffer {

  static final ExecutorService fixedThreadPool 
                                  = Executors.newFixedThreadPool(4);

  private String topic;
  private MqttClient client;
  private int qos = 1;
  private boolean retained = true;

  public MqttBuffer topic(String s) {
    this.topic = s;
    return this;
  }

  public MqttBuffer client(MqttClient c) {
    this.client = c;
    return this;
  }

  public MqttBuffer qos(int q) {
    this.qos = q;
    return this;
  }
  public MqttBuffer retained(boolean b) {
    this.retained = b;
    return this;
  }

  public void sendAsync(String msg) {
    Supplier<String> task = () -> {
      try {
        client.publish(topic, (msg).getBytes("UTF-8"), 
                          qos, retained);
      } catch (MqttException
          | UnsupportedEncodingException e) {
        e.printStackTrace();
      }
      return "Done - " + msg;
    };
    CompletableFuture.supplyAsync(task, fixedThreadPool)
        .thenAccept(System.out::println);
  }
}

Der Client – ein Vaadin Chart

Daten zu senden, ohne sie dann weiter zu verarbeiten, ist doch recht unnötig. Daher werden wir die Daten mit dem Webframework Vaadin visualisieren. Vaadin hat einige sehr ansprechende GUI-Komponenten, von denen ich in diesem Fall den Typ Chart/Gauge verwende.

Da sich die Daten in regelmäßigen Abständen ändern, soll sich natürlich auch das GUI aktualisieren. Die Sendefrequenz muss dabei allerdings nicht der Aktualisierungsfrequenz entsprechen. Also auch hier ist es notwendig, das abzufedern. Prinzipiell ist das Vorgehen recht geradlinig. Um das Beispiel überschaubar zu halten, besteht die Anwendung lediglich aus einer Komponente. In der Methode init(..) findet die Initialisierung statt. Hier wird zum einen der MQTT-Client gestartet, zum anderen der Thread, der für die Aktualisierung notwendig ist (Listing 7). Der MQTT-Client schreibt den Wert, den er empfängt, immer in die Variable lastMessage. Der MQTT_Thread holt sich dort in regelmäßigen Abständen den geänderten Wert und schreibt diesen in die GUI-Komponente. Das ist an der Stelle nicht sauber implementiert, soll aber den prinzipiellen Weg beschreiben. Vaadin selber gibt einem die Möglichkeit, das GUI per „push“ zu aktualisieren. Notwendig ist dafür nur die Annotation @Push an der Klasse, die vom UI ableitet. Fertig ist die Anwendung. Für recht niederfrequente Abläufe ist diese Implementierung durchaus robust genug. 

//… in der init-Methode
try {
  empf = new MqttClient(
     "tcp://192.168.0.106:1883", "MyfirstMQTTEmpf",
     new MemoryPersistence());
  empf.setCallback(new MqttCallback() {
    @Override
    public void connectionLost(Throwable throwable) { }
    @Override
    public void messageArrived(String str, MqttMessage mqttMessage) 
                                                    throws Exception {
      byte[] payload = mqttMessage.getPayload();
      lastMessage = new String(payload);
      System.out.println("s = " + str + " msg " 
                              + lastMessage);
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { }
  });

  Button button = new Button("refresh");
  button.addClickListener(event -> access(
    () -> chart.getConfiguration()
    .getSeries()
    .forEach(s -> ((ListSeries) s).updatePoint(
                0, Double.valueOf(lastMessage.split(":")[3])))));
  layout.addComponent(button);
  empf.connect();
  empf.subscribe(TOPIC, 1);

  mqqtThread.start();
} catch (MqttException e) {e.printStackTrace(); }


// Klasse MQTT_Thread
  class MQQT_Thread extends Thread {
    String lastPushed = "";
    @Override
    public void run() {
      try {
        while (true) {
          // Update the data for a while
          Thread.sleep(10000);
          if (!lastPushed.equals(lastMessage)) {
            access(() -> chart.getConfiguration()
                .getSeries()
                .forEach(s -> {
                  Double newValue = Double.valueOf(lastMessage
                                     .split(":")[3]);
                  ((ListSeries) s).updatePoint(0, newValue);
                }));
            System.out.println("pushed -> lastMessage = " 
                                + lastMessage);
            lastPushed = lastMessage;
          }
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

Fazit

MQTT ist das Mittel der Wahl, wenn man in der Welt des IoT ein Protokoll zur Datenübertragung benötigt. Der Broker ist schnell aufgesetzt, und wahlweise kann man auch eine der freien Instanzen im Internet verwenden. Die Anbindung mit Java ist sehr einfach. Vaadin bietet mit den Charts sehr ansprechende GUI-Elemente, um schnell und unkompliziert die Daten zu visualisieren. Wir werden uns in den nächsten Teilen der Serie genauer mit den hier vorgestellten Technologien beschäftigen.

Diesmal gibt es zwei Repositories zu diesem Artikel. Zum einen das bekannte rapidpm-modules [1], zum anderen das Vaadin-Beispiel unter [2].

Bei Ideen, Anregungen oder Fragen: bitte einfach melden: Twitter: @SvenRuppert

Stay tuned. Happy coding!

Geschrieben von
Sven Ruppert
Sven Ruppert
Sven Ruppert arbeitet seit 1996 mit Java und ist Developer Advocate bei Vaadin. In seiner Freizeit spricht er auf internationalen und nationalen Konferenzen, schreibt für IT-Magazine und für Tech-Portale. Twitter: @SvenRuppert
Kommentare

Hinterlasse einen Kommentar

Hinterlasse den ersten Kommentar!

avatar
400
  Subscribe  
Benachrichtige mich zu: