Atmosphere

WebSocket-Portabilität auf der JVM

Jeanfrançois Arcand
©Shutterstock/artiomp

Mit dem Atmosphere-Framework lassen sich portierbare Anwendungen in Groovy, Scala und Java schreiben. Neben einer JavaScript-Komponente enthält Atmosphere mehrere Serverkomponenten, die alle wichtigen Java-basierten Webserver unterstützen. Ziel ist es, die Entwicklung von Anwendungen dadurch zu erleichtern, dass das Framework selbstständig den besten Kommunikationskanal zwischen Client und Server findet – und das codeunabhängig.

Mit Atmosphere im Einsatz ist es zum Beispiel möglich, dass eine Anwendung das WebSocket-Protokoll nutzt, solange sie mit einem Browser oder Server mit WebSocket-Unterstützung verwendet wird. Andernfalls kehrt das Framework codeunabhängig zu HTTP zurück. Mit dem Internet Explorer 6, 7, 8 und 9 läuft eine Atmosphere-Anwendung problemlos mit HTTP; im Zusammenspiel mit dem Internet Explorer 10 wird auf WebSocket umgestellt. Um die Mächtigkeit dieses Frameworks noch besser zu demonstrieren, wollen wir eine einfache Chat-Anwendung aufsetzen. Wir gehen davon aus, dass diese nur einen Chatroom unterstützt, um die Logik einfach zu halten. Zunächst schreiben wir die serverseitige Komponente. Atmosphere unterstützt folgende Komponenten:

  • die Atmosphere-Runtime: das Kernmodul von Atmosphere, auf dessen Grundlage alle anderen Module gebaut werden. Es stellt zwei einfache APIs für die Anwendungsentwicklung zur Verfügung: AtmosphereHandler und Meteor. Der AtmosphereHandler ist ein einfaches Interface für Implementierungen. Bei Meteor handelt es sich um eine Klasse, die aufgerufen und in Servlet-basierte Anwendungen injiziert werden kann.
  • Atmosphere Jersey: eine Erweiterung für das Jersey-REST-Framework, die einige zusätzliche Annotationen bereitstellt
  • Atmosphere GWT: eine Erweiterung für das GWT-Framework

Die Serverseite

Im Folgenden werde ich die Atmosphere-Runtime verwenden, um zu demonstrieren, wie einfach es ist, eine einfache asynchrone Anwendung zu schreiben. Beginnen wir mit der Serverkomponente, die einen AtmosphereHandler verwendet. Letzterer ist wie in Listing 1 definiert.

public interface AtmosphereHandler {

    void onRequest(AtmosphereResource resource) throws IOException;

    void onStateChange(AtmosphereResourceEvent event) throws IOException;

    void destroy();
}

Die onRequest-Methode wird jedes Mal aufgerufen, wenn ein Request auf dem Pfad abgebildet wird, der mit dem AtmosphereHandler in Verbindung steht. Der Pfad wird dadurch definiert, dass eine Implementierung des AtmosphereHandlers annotiert wird:

@AtmosphereHandlerService(path = "/<path>")

In Atmosphere steht eine AtmosphereResource für eine physische Verbindung. Mithilfe einer AtmosphereResource lassen sich Informationen über einen Request abrufen. Auf die Antwort hin kann eine Aktion ausgeführt werden und – was wichtiger ist – die Verbindung während der onRequest-Ausführung aufrechterhalten werden. Ein Webserver benötigt Informationen darüber, wann eine Verbindung für weitere Aktionen offen bleiben soll (z. B. für WebSockets) und wann ein Upgrade zwecks Protokollunterstützung erforderlich ist, damit die HTTP-Verbindung (Streaming, Long-Polling, JSONP oder serverseitige Events) für zukünftige Aktionen offen bleibt.

Abb. 1: Die onStateChange-Methode

Die onStateChange-Methode (Abb. 1) wird in Atmosphere in den folgenden Fällen aufgerufen:

  • bei einer Broadcast-Operation, die von einem Broadcaster initiiert wird und eine Aktion erfordert. Der Broadcaster ist eine Art Kommunikationskanal. Eine Anwendung kann viele solcher Kanäle erzeugen und per BroadcasterFactory-Klasse auf sie zugreifen. Eine AtmosphereResource steht grundsätzlich mit einer oder mehreren Broadcastern in Verbindung. Man kann Broadcaster auch als Event-Queue sehen, in der man auf neue Broadcast-Events lauschen und sich im Falle eines Events benachrichtigen lassen kann. Broadcast ist durch onRequest, onStateChange oder überall auf der Serverseite möglich.
  • Wenn die Verbindung geschlossen wurde oder ein Time-out aufgetreten ist (d. h. keine Aktivität zu verzeichnen ist).

Das klingt kompliziert? Nun, zum Glück wird das Framework mit AtmosphereHandlers ausgeliefert, die in fast allen Szenarien verwendet werden können. So kann sich der Entwickler auf die Anwendungslogik konzentrieren, während sich Atmosphere um den Lebenszyklus der Verbindung kümmert. In Listing 2 wird der OnMessage<T> AtmosphereHandler verwendet, um eine Anwendung zu schreiben.

  @AtmosphereHandlerService(
          path="/chat",
          interceptors = {AtmosphereResourceLifecycleInterceptor.class,
                                   BroadcastOnPostAtmosphereInterceptor.class})
  public class ChatRoom extends OnMessage<String> {
  
      private final ObjectMapper mapper = new ObjectMapper();
     
       @Override
     public void onMessage(AtmosphereResponse response, String message) throws IOException {
          response.getWriter()
                     .write(mapper.writeValueAsString(mapper.readValue(message, Data.class)));
    }   
  }   

Die Grundidee ist dabei, so viel wie möglich aus dem Lebenszyklus der Verbindung an die einsatzbereite Komponente von Atmosphere zu delegieren. Zunächst versehen wir die ChatRoom-Klasse mit der @AtmosphereHandlerService-Annotation und definieren den Pfad und die Interceptors. Ein AtmosphereInterceptor ist eine Art Filter, der grundsätzlich vor und nach dem AtmosphereHandler#onRequest aufgerufen wird. Der AtmosphereInterceptor ist für die Bearbeitung von Request und Response nützlich, aber auch für den Lebenszyklus, beispielsweise beim Suspend und beim Broadcast (Abb. 2).

Abb. 2: Suspend und Broadcast

Wie oben beschrieben, können zwei Interceptors dazu verwendet werden, zuerst den Request zu verlängern (AtmosphereResourceLifeCycleInterceptor) und dann die Daten, die bei jedem POST empfangen werden, per Broadcast zu verteilen. Wir können uns also ausschließlich auf die Anwendungslogik konzentrieren.

Statt also unseren kompletten AtmosphereHandler zu schreiben, können wir den OnMessage<T>-Handler verwenden, der die Broadcast-Operation an die onMessage-Methode delegiert (Zeile 10). Für unsere Chat-Anwendung bedeutet das einfach nur, dass wir schreiben, was wir empfangen haben. Wenn wir 50 verbundene User haben, bedeutet das, dass die onMessage-Methode 50 mal aufgerufen wird, damit die 50 User die Nachricht erhalten. Für die Kommunikation zwischen Client und Server verwenden wir JSON. Der Client sendet Folgendes:

{"message":"Hello World","author":"John Doe"}

Und der Server schickt über die Browser folgendes zurück:

{"message":"Hello World","author":"John Doe","time":1348578675087}

Wir verwenden die Jackson-Library, um die Nachricht zu lesen und zusammen mit der Zeit, zu der die Nachricht empfangen wurde, zurückzusenden. Die Data-Klasse ist ein einfaches POJO (Listing 3).

    public final static class Data {

        private String message;
        private String author;
        private long time;

        public Data() {
            this("","");
        }

        public Data(String author, String message) {
            this.author = author;
            this.message = message;
            this.time = new Date().getTime();
        }

        public String getMessage() {
            return message;
        }

        public String getAuthor() {
            return author;
        }

        public void setAuthor(String author) {
            this.author = author;
        }

        public void setMessage(String message) {
            this.message = message;
        }

        public long getTime() {
            return time;
        }

        public void setTime(long time) {
            this.time = time;
        }

    }

Die Clientseite – Atmosphere.js

Um die Clientseite zu schreiben, verwenden wir Atmosphere.js. Werfen wir zunächst einen Blick auf den Code (Listing 4).

  $(function () {
      "use strict";
  
      var header = $('#header');
      var content = $('#content');
      var input = $('#input');
      var status = $('#status');
      var myName = false;
      var author = null;
     var logged = false;
     var socket = $.atmosphere;
     var subSocket;
     var transport = 'websocket';
 
     // We are now ready to cut the request
     var request = { url: document.location.toString() + 'chat',
         contentType : "application/json",
         trackMessageSize: true,
         shared : true,
         transport : transport ,
         fallbackTransport: 'long-polling'};
 
     request.onOpen = function(response) {
         content.html($('<p>', { text: 'Atmosphere connected using ' + response.transport }));
         input.removeAttr('disabled').focus();
         status.text('Choose name:');
         transport = response.transport;
         
         if (response.transport == "local") {
             subSocket.pushLocal("Name?");
         }   
     };  
     
     request.onTransportFailure = function(errorMsg, request) {
         jQuery.atmosphere.info(errorMsg);
         if (window.EventSource) {
             request.fallbackTransport = "sse";
             transport = "see";
         }   
         header.html($('<h3>', { text: 'Atmosphere Chat. Default transport is WebSocket, fallback is ' + request.fallbackTransport }));
     };  
     
     request.onMessage = function (response) {
     
         // We need to be logged first.
         if (!myName) return;
         
         var message = response.responseBody;
         try {
            var json = jQuery.parseJSON(message);
         } catch (e) {
             console.log('This doesn't look like a valid JSON: ', message.data);
             return;
         }   
        
         if (!logged) {
             logged = true;
            status.text(myName + ': ').css('color', 'blue');
             input.removeAttr('disabled').focus();
            subSocket.pushLocal(myName);
         } else {
             input.removeAttr('disabled');
             
             var me = json.author == author;
             var date = typeof(json.time) == 'string' ? parseInt(json.time) : json.time;
             addMessage(json.author, json.message, me ? 'blue' : 'black', new Date(date));
         }   
     };  
     
     request.onClose = function(response) {
        logged = false;
     }   
     
     subSocket = socket.subscribe(request);
     
     input.keydown(function(e) {
         if (e.keyCode === 13) {
             var msg = $(this).val();
            if (author == null) {
                 author = msg;
             }
                 
             subSocket.push(jQuery.stringifyJSON({ author: author, message: msg }));
             $(this).val('');
             
             input.attr('disabled', 'disabled');
             if (myName === false) {
                 myName = msg;
             }
         }       
     });     
          
     function addMessage(author, message, color, datetime) {
         content.append('<p><span style="color:' + color + '">' + author + '</span> @ ' +
             + (datetime.getHours() < 10 ? '0' + datetime.getHours() : datetime.getHours()) + ':'
             + (datetime.getMinutes() < 10 ? '0' + datetime.getMinutes() : datetime.getMinutes())
             + ': ' + message + '</p>');
     }       
  });  

Listing 4 enthält eine Menge zusätzlichen Code, wir wollen uns aber auf die wichtigsten Teile von Atmosphere.js beschränken. Als Erstes starten wir eine Verbindung (die im Code socket heißt):

var socket = $.atmosphere;

Im nächsten Schritt definieren wir einige Callback-Funktionen. Wir beschränken uns auf eine kleine Gruppe. Als Erstes definieren wir eine onOpen-Funktion, die aufgerufen wird, sobald der zugrunde liegende transport mit dem Server verbunden wird. Dort zeigen wir nur den transport an, der für die Serververbindung benötigt wird. Er wird auf dem Request-Objekt spezifiziert, das wie folgt definiert wird:

var request = { url: document.location.toString() + 'chat',
          contentType : "application/json",
          transport : transport ,
          fallbackTransport: 'long-polling'};

Hier verwenden wir standardmäßig den WebSocket-transport und wechseln zu Long-Polling, falls WebSocket weder vom Browser noch vom Server unterstützt wird. In unserer onOpen-Funktion wird angezeigt, welcher transport verwendet wird. Übrigens lässt sich der transport auch bei einem Ausfall von WebSocket ändern. Dazu wird eine onTransportFailure-Funktion hinzugefügt:

   request.onTransportFailure = function(errorMsg, request) {
        if (window.EventSource) {
            request.fallbackTransport = "sse";
            transport = "see";
        }  

Um dies zu demonstrieren, suchen wir nach einem EventSource-Objekt (HTML5-Server-side-Events). Falls dieses verfügbar ist, stellen wir den transport entsprechend um. Das Elegante an dieser Vorgehensweise: Man braucht kein spezielles API. Bei allen transports kann man unter Verwendung von Atmosphere.js analog vorgehen.

Als Nächstes definieren wir die onMessage-Funktion, die immer dann aufgerufen wird, wenn wir Daten vom Server empfangen:

   request.onMessage = function (response) {
      .....
   }

Hier zeigen wir nur die empfangene Nachricht an. Um eine Verbindung aufzubauen und Daten an den Server zu schicken, müssen wir nur folgenden Aufruf starten:

subSocket = socket.subscribe(request);

Einmal subscribed, können wir Daten empfangen und senden. Um Daten zu senden, verwenden wir das subSocket-Objekt, das uns die Subscribe-Operation zurückliefert. Wenn der WebSocket-transport in Verwendung ist, referenziert das subSocket-Objekt die WebSocket-Verbindung, da das Protokoll ja bidirektional ist. Bei allen anderen Übertragungsarten wird für jede Push-Operation eine neue Verbindung geöffnet:

subSocket.push(jQuery.stringifyJSON({ author: author, message: msg }));

Als Nächstes fügen wir Unterstützung für ein besonders nützliches Atmosphere-Feature hinzu: die Möglichkeit, eine Verbindung unter geöffneten Fenstern bzw. Tabs zu teilen. Man braucht bei einem Request nur die geteilte Variable auf true zu setzen:

var request = { url: document.location.toString() + 'chat',
          contentType : "application/json",
          transport : transport ,
          shared : true,
          fallbackTransport: 'long-polling'};

Nun wird die Verbindung jedes Mal, wenn ein neues Fenster oder ein neuer Tab geöffnet sowie dieselbe Seite aufgerufen wird, geteilt. Um eine Benachrichtigung über die „Master“-Tabs bzw. -Fenster zu erhalten, implementiert man folgende Funktion:

    request.onLocalMessage = function(message) {
      ....
    }

Tabs bzw. Fenster können über die folgende Funktion auch direkt kommunizieren:

subSocket.pushLocal(...)

Aufmacherbild: Earth atmosphere greenhouse effect von Shutterstock / Urheberrecht: artiomp

[ header = WebSocket-Portabilität auf der JVM – Teil 2 ]

Voll einsatzbereit? Noch nicht ganz…

Wir haben jetzt eine voll funktionsfähige Chat-Anwendung, die allerdings in ihrem jetzigen Zustand noch mit zwei Problemen behaftet ist: Das erste ist Proxy- bzw. Firewall-bedingt: Gelegentlich lassen es Letztere nicht zu, dass eine Verbindung für längere Zeit inaktiv bleibt, weswegen die Verbindung automatisch vom Proxy geschlossen wird. Wenn die Verbindung aufrechterhalten werden soll, muss der Client jedes Mal eine neue Verbindung aufbauen. Lösen lässt sich dieses Problem durch Übertragung einiger Bytes zwischen Client und Server, um einen Verbindungsabbruch zu vermeiden. In unserem Fall muss lediglich der HeartbeatInterceptor hinzugefügt werden. Durch ihn bleibt die Verbindung aktiv (Listing 5), und zwar wiederum codeabhängig.

  @AtmosphereHandlerService(
          path = "/chat",
          interceptors = {AtmosphereResourceLifecycleInterceptor.class,
                                  BroadcastOnPostAtmosphereInterceptor.class,
                                   HeartbeatInterceptor.class})
  public class ChatRoom extends OnMessage<String> {

Nun schreibt der HeartbeatInterceptor periodisch einige Bytes (Whitespace), um die Verbindung aufrecht zu erhalten. Ausschließen lässt sich allerdings nicht, dass einige Proxies die Verbindung trotz Aktivität unterbrechen oder dass ein Netzwerkproblem auftritt und der Browser sich erneut verbinden muss.

Während eines Reconnects kann es immer sein, dass gerade eine Broadcast-Operation am Laufen ist. Der Browser wird den Broadcast in diesem Fall nie empfangen, weil die Verbindung gerade erst aufgebaut wird. So kann es vorkommen, dass der Browser eine Nachricht verpasst (oder verliert). Bei einigen Anwendungen könnte dies ein größeres Problem darstellen.

Zum Glück unterstützt Atmosphere das Prinzip des BroadcasterCache. Installiert man einen solchen, verliert oder verpasst der Browser keine einzige Nachricht mehr. Wenn der Browser sich neu verbindet, durchsucht Atmosphere den Cache und stellt sicher, dass alle Nachrichten, die während des erneuten Verbindungsversuchs versendet wurden, zum Browser gesendet werden. Das BroadcasterCache-API lässt sich einbinden, und Atmosphere wird mit einer einsatzbereiten Implementierung ausgeliefert. Für unsere Chat-Anwendung brauchen wir also nur Folgendes zu tun:

@AtmosphereHandlerService(
          path = "/chat",
          broadcasterCache = HeaderBroadcasterCache.class,
          interceptors = {AtmosphereResourceLifecycleInterceptor.class,
                                   BroadcastOnPostAtmosphereInterceptor.class,
                                   HeartbeatInterceptor.class})
  public class ChatAtmosphereHandler extends OnMessage<String> {

Unsere Anwendung verliert oder verpasst nun garantiert keine Nachricht mehr.

Das zweite Problem: Je nach Webserver können sich empfangene Nachrichten mischen und der Browser beispielsweise zwei oder anderthalb Nachrichten auf einmal empfangen. Nehmen wir an, dass wir JSON zum Schreiben unserer Nachricht verwenden. In dem Fall wird der Browser Nachrichten wie die folgenden nicht lesen können:

{"message":"Hello World","author":"John Doe","time":1348578675087}{"message":"Cool Man","author":"Foo Bar","time":1348578675087}

oder

{"message":"Hello World","author":"John Doe

oder

{"message":"Hello World","author":"John Doe","time":1348578675087}{"message":"Cool Man","author"

Wenn der Browser solche Nachrichten empfängt, wird er sie nicht lesen können:

var json = jQuery.parseJSON(message);

Um Abhilfe zu schaffen, müssen wir den TrackMessageSizeInterceptor installieren, der der Nachricht einige Hinweise hinzufügt, damit die atmosphere.js-onMessage-Funktion immer mit einer gültigen Nachricht aufgerufen wird (Listing 6).

@AtmosphereHandlerService(
          path = "/chat",
         broadcasterCache = HeaderBroadcasterCache.class,
          interceptors = {AtmosphereResourceLifecycleInterceptor.class,
                                   BroadcastOnPostAtmosphereInterceptor.class,
                                   TrackMessageSizeInterceptor.class,
                                   HeartbeatInterceptor.class})
  public class ChatRoom extends OnMessage<String> {

Auf der Clientseite brauchen wir nur noch die trackMessageLength im Request-Objekt zu setzen:

var request = { url: document.location.toString() + 'chat',
        contentType : "application/json",
        logLevel : 'debug',
        shared : true,
        transport : transport ,
        trackMessageLength : true,
        fallbackTransport: 'long-polling'};

Ab in die Cloud!

Wir können jetzt unsere erste Anwendung in der Cloud deployen. Nun, beinahe. Zuerst müssen wir durch ein noch hinzuzufügendes Feature angeben, wie Nachrichten auf die Server verteilt werden, wenn sie in der Cloud deployt werden. Das hierbei entstehende Problem ist in Abbildung 3 ersichtlich.

Abb. 3: Server in der Cloud

Wenn sich in diesem Szenario eine Broadcast-Aktion auf dem Tomcat-Server 1 ereignet, wird der Tomcat-Server 2 diese Nachrichten niemals erhalten. Das ist nicht nur mit Blick auf den Chat problematisch, sondern auch im Fall jeder anderen Anwendung, die in der Cloud deployt werden soll. Es kommt uns daher sehr entgegen, dass Atmosphere Cloud- oder Cluster-fähige Broadcaster unterstützt, mit denen man Nachrichten zwischen Serverinstanzen verbreiten kann. Atmosphere bietet derzeit nativen Support für bewährte Technologien wie Redis PubSub, Hazelcast, JGroups, JMS oder XMPP (z. B. für Gmail-Server). Im Folgenden verwenden wir Redis PubSub (Abb. 4).

Abb. 4: Redis PubSub

Durch Redis PubSub ist es möglich, sich mit einer Redis-Instanz zu verbinden und sich für bestimmte Topics zu subscriben. Für unsere Anwendung müssen wir lediglich ein „Chat“-Topic erzeugen und für alle unserer Server eine Subscription starten. Als Nächstes bringen wir unsere Anwendung dazu, den RedisBroadcaster statt des normalen Broadcasters zu verwenden. Das ist tatsächlich so einfach wie in Listing 7 dargestellt.

@AtmosphereHandlerService(
         path = "/chat",
         broadcasterCache = HeaderBroadcasterCache.class,
         broadcaster = RedisBroadcaster.class,
         interceptors = {AtmosphereResourceLifecycleInterceptor.class,
                                  BroadcastOnPostAtmosphereInterceptor.class,
                                  TrackMessageSizeInterceptor.class,
                                  HeartbeatInterceptor.class})
 public class ChatRoom extends OnMessage<String> {

Indem wir den RedisBroadcaster hinzufügen, ermöglichen wir Message Sharing zwischen den Servern, sodass unsere Chat-Anwendung durch eine einzige Zeile „Cloud-freundlich“ wird. Clientseitig müssen wir nichts verändern. Wir haben nun eine voll funktionsfähige Anwendung, die:

  • codeunabhängig alle existierenden Webserver unterstützt
  • codeunabhängig alle existierenden Browser unterstützt
  • Cloud-/Cluster-fähig ist

Unsere Anwendung wird zunächst den besten Übertragungsweg zwischen Client und Server ausfindig machen. Wenn wir also beispielsweise Jetty 8 verwenden, werden die folgenden Übertragungsarten verwendet:

  • Chrome 21: WebSockets
  • Internet Explorer 9: Long-Polling
  • FireFox 15: Server-side Events
  • Safari/iOS 6: WebSockets
  • Internet Explorer 10: WebSockets
  • Android 2.3: Long-Polling
  • FireFox 3.5 : Long-Polling

All das geschieht codeunabhängig, sodass der Entwickler sich auf die Anwendung konzentrieren kann, statt sich mit Übertragungs- und Portierungsproblemen herumschlagen zu müssen.

Fazit und Ausblick

WebSockets und Server-side Events sind unaufhaltsam auf dem Vormarsch. Ihre Verbreitung innerhalb von Unternehmen nimmt stark zu. Beim Schreiben eines Frameworks sollten daher folgende Kriterien unbedingt berücksichtigt werden:

  • Ist das API portierbar, d. h. wird es auf allen bekannten Webservern funktionieren?
  • Stellt das Framework bereits einen Fallback-Mechanismus bereit? Internet Explorer 7, 8 und 9 unterstützen beispielsweise weder WebSockets noch Server-side Events; zu unserem Nachteil sind diese Browser noch vielerorts im Einsatz.
  • Ist das Framework Cloud-fähig und – viel wichtiger – skalierbar?
  • Kann man damit problemlos Anwendungen schreiben? Hat sich das Framework gut etabliert?

In Bezug auf Atmosphere lassen sich alle vier Fragen positiv beantworten. Noch immer nicht überzeugt? Dann gehen Sie zum Wall Street Journal [1], öffnen Sie die Seite und sehen Sie nach dem Wordnik-Logo: mehr als 60 Millionen Requests pro Tag – powered by Atmosphere! Besuchen Sie auch unsere Website [2] und legen Sie los!

Von der Redaktion des Eclipse Magazins aus dem Englischen übersetzt.

Geschrieben von
Jeanfrançois Arcand
Jeanfrançois ist seit achtzehn Jahren in der Softwareindustrie tätig. Er studierte vorher reine Mathematik und arbeitete für eine kanadische Forschungseinrichtung. Dort entwarf er mathematische Modelle mit C++, bis ihm jemand Java vorstellte. Jeanfrançois arbeitete fast zehn Jahre für Sun Microsystems, schrieb Grizzly, eines der ersten NIO-Frameworks, und Grizzly Comet, eines der frühen Frameworks zur Implementierung asynchroner Webanwendungen. Danach begann er mit der Arbeit am Atmosphere-Framework.
Kommentare

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht.