Praxis-Tutorial JXTA, Teil 2

Peer-to-Peer mit JXTA

Markus Oliver Junginger

Peer-to-Peer-Netzwerke erlauben die dynamische Bildung zusammenarbeitender Gruppen, die nicht über einen Server koordiniert werden müssen. Dieser Artikel stellt ein objektorientiertes Konzept vor, das die Peer-to-Peer-Entwicklung vereinfacht und auch bei bestehenden Anwendungen nachträglich angewandt werden kann. Als Beispielanwendung dient eine Arbeitsfläche, die gemeinsam von mehreren Peers benutzt werden kann.

Abb. 1: Collaborative Workbench: Screenshot

Manche Arbeiten lassen sich einfacher und mit mehr Spaß in der Gruppe lösen. In unserem Fall arbeiten mehrere Peers an einer gemeinsamen Aufgabe und können somit voneinander profitieren. Die hier vorgestellte Beispielsanwendung basiert auf einem Konzept von Vivien Dollinger [2]. Graphen, die aus Knoten und Verknüpfungen bestehen, pictureen eine strukturierte Repräsentation beliebiger Ressourcen und setzen diese untereinander in Relation. Ein einfaches Anwendungsbeispiel ist das so genannte Mindmapping, eine Brainstorming-Technik, bei der spontane Assoziationen mit Hilfe von Knoten und Verbindungen ausgedrückt und strukturiert werden. Die hier vorgestellte Anwendung überträgt diese Funktionalität in den Peer-to-Peer-Bereich in der Hoffnung, Synergieeffekte unter den Peers zu erzielen.

Jeder Peer kann Knoten benennen, anlegen, anordnen und Verknüpfungen zu anderen Knoten herstellen. Unsere Beispielanwendung beschränkt sich zu diesem Zeitpunkt auf die grundlegende Funktionalität, kann aber ohne Weiteres zum Beispiel zu komplexen Wissens- und Ressourcen-Netzen [3] ausgebaut werden. Die Funktionalität für die Graphen sowie deren Darstellung werden durch die NodeDiagram-Komponente [4] zur Verfügung gestellt, die hier nur am Rande erwähnt wird. Unser Fokus liegt vielmehr darin, diese Komponente mit dem Peer-to-Peer-Netzwerk zu verbinden. Beliebige Gruppen von Peers sollen in der Lage sein, gemeinsam an einem Graphen zu arbeiten.

System.setProperty("net.jxta.tls.principal","-- hier der Login-Name --");
System.setProperty("net.jxta.tls.password","-- hier das Passwort --");

Verteilte Systeme bestehen meist aus voneinander getrennten Einzelsystemen, die folglich keinen gemeinsamen Adressraum besitzen. Von der Java-Seite betrachtet bedeutet dies, dass Objekte einer Virtuellen Maschine zunächst nicht von anderen VMs zugänglich sind. Erst durch Einsatz von objektorientierter Middleware wie CORBA, RMI oder auch EJB entsteht die Illusion, dass auf verteilte Objekte von beliebigen Orten aus zugreifbar ist. Die Illusion entsteht dadurch, dass die Middleware Stellvertreterobjekte (Proxy/Stub) zur Verfügung stellt, die genau wie die eigentlichen Objekte angesprochen werden. Der Unterschied ist, dass die Stellvertreterobjekte die Aufrufe an das eigentliche Objekt delegieren und die dafür benötigte Netzwerkkommunikation mit dem System, auf dem sich das Objekt tatsächlich befindet, übernehmen. Dies bringt mit sich, dass das Objekt einen definierten Ort besitzt, auf den zugegriffen werden kann. Bei Peer-to-Peer-Netzwerken wird dies anders gehandhabt, wenn es um gemeinsame Objekte geht. Hier ist die Zuordnung nicht mehr eindeutig, da Objekte eher einer Gruppe als einzelnen Peers angehören. Ein erster naheliegender Ansatz könnte darin bestehen, einen Peer zum Objektmanager zu ernennen, der Objekte in seiner Umgebung hält und anderen Peers zugänglich macht. Dies bringt uns allerdings wieder Client/Server-Systemen und deren Problematik näher, da das System somit auf einzelne Netzwerkknoten angewiesen ist. Besonders in einem Netzwerk aus unzuverlässigen Peers ist das sicherlich keine Musterlösung. Peer-to-Peer-Netzwerke sind anders und erfordern eine angepasste Denkweise, damit man sich deren Vorteile zunutze machen kann.

Ein Lösungsansatz für objektorientierte Entwicklung in massiv verteilten Systemen basiert auf abstrakten Objekten, die logisch einer Gruppe angehören. Ein solches Objekt ist in dem Sinne abstrakt, dass dessen Existenz nicht durch einen bestimmten Speicherbereich definiert ist. Vielmehr wird es durch lokale Instanzen in verschiedenen Adressräumen repräsentiert, die an sich zunächst unabhängig voneinander sind. Die Zuordnung lokaler Instanzen zueinander erfolgt durch eine zeitlich und räumlich eindeutige ID. Dadurch wird es möglich, die lokalen Instanzen untereinander konsistent zu halten. Die Konsistenzwahrung erfolgt über den Austausch von Nachrichten innerhalb der Gruppe. Ändert sich die lokale Instanz eines Peers, so wird eine Nachricht mit den nötigen Informationen über die Änderung an die Gruppe gesendet. Jedes Gruppenmitglied wertet die Nachrichten aus, identifiziert die eigene lokale Instanz anhand der ID und führt die Änderungen lokal durch. Somit scheinen die lokalen Instanzen Spiegel des imaginären globalen Objekts zu sein. Ein wichtiger Vorteil des Konzeptes ist, dass existierende Anwendungen für den Einsatz in der verteilten Umgebung erweitert werden können, ohne zwingend in das System eingreifen zu müssen. Die NodeDiagram-Komponente, die nichts von einem Netzwerk weiß, ist ein gutes Beispiel dafür.

Dieses Vorgehen schließt Konflikte nicht aus. Beispielsweise können mehrere Peers unabhängig voneinander ein globales Objekt gleichzeitig ändern. Das Objekt hat in diesem Fall zumindest temporär keinen eindeutigen Zustand; es hat eventuell sogar abhängig vom Adressraum gegensätzliche Zustände. Derartige Konflikte könnte man zum Beispiel über verteilte Semaphore lösen. Ein Peer könnte sich das globale und ausschließliche Zugriffsrecht für ein Objekt sichern, das Objekt ändern und danach das Zugriffsrecht wieder zurückgeben. Dies führt jedoch zu neuen Problemen. Zunächst ist ein Semaphorenservice, der die Ressourcen global verwaltet, kaum durch einzelne Peers in einem dynamischen und dezentralen Netzwerk zuverlässig realisierbar. Des Weiteren sollte man das Funktionieren der Gesamtheit nicht von einzelnen Peers abhängig machen, da sie das Netz spontan verlassen könnten. Zudem könnten bösartige Peers die Abhängigkeit sogar für Attacken ausnutzen, indem diese beispielsweise die Semaphore nicht freigeben und somit die Ressource für andere Peers unzugänglich machen. Außerdem sind verteilte Semaphore mit einem beträchtlichen Performance-Overhead verbunden.

Aus diesen Gründen ist eine andere Vorgehensweise in Peer-to-Peer-Netzwerken sinnvoll. So erlauben wir allen Peers, jederzeit Objekte zu verändern und vertrauen auf Selbstregulierung. Konflikte werden bewusst nicht automatisch aufgelöst. Vielmehr können die Peers (Anwender) Konflikte selbst beheben, eventuell nach vorheriger Absprache im Chat. Das System entscheidet lediglich anhand des Zeitpunkts der Änderung darüber, welche Änderung übernommen wird. Im Falle von konkurrierenden und einander ausschließenden Änderungen wird der zeitlich näherliegenden Änderung der Vorzug gegeben.

Implementierung

Um dieses Konzept mit Hilfe von JXTA umzusetzen, bedienen wir uns der PropagatePipe, mit der wir Zustandsänderungen von Objekten in der Gruppe publik machen. Ändert sich der Zustand eines Objektes, muss eine Nachricht mit entsprechenden Informationen erstellt und mit der PropagatePipe versendet werden. Die anderen Gruppenmitglieder erhalten diese Nachrichten über einen Pipe-Listener und können die Änderungen übernehmen. In unserer Collaborative Workbench-Anwendung wird der Vorgang durch drei Klassen realisiert, die nachfolgend detailliert besprochen werden.

Das Versenden und Empfangen von Nachrichten wird jeweils durch eine eigene Klasse realisiert. Beide Klassen teilen eine gewisse Funktionalität und haben deshalb eine gemeinsame Superklasse. In Listing 1 sind die wesentlichen Teile dieser Superklasse dargestellt. Das Groups.Listener Interface der im ersten Teil besprochenen Groups-Komponente wird durch diese Klasse implementiert. Sobald einer PeerGroup beigetreten wird oder eine neue PeerGroup erstellt, wird die Methode joinedGroup bzw. createdGroup aufgerufen. In diesem Fall dienen beide Methoden dazu, den Einsatz der PropagatePipe vorzubreiten und ein entsprechendes Advertisement zu erstellen. Um mehrere unterschiedliche Advertisements innerhalb einer PeerGroup auszuschließen, erstellen wir die PipeID mit der Methode getDiagramUpdatePipeID selbst. Dieses Vorgehen wurde bereits im ersten Teil des Tutorials bei dem ChatPipe-Advertisement angewandt. Nach der Erstellung des Advertisement wird es veröffentlicht und zuletzt an die abstrakte connectPipe Methode übergeben, die von den erweiternden Klassen implementiert wird. Die Hilfs-Methoden attachObject und retrieveObject werden von denselben Klassen benutzt, um die Einbettung von Objekten in Nachrichten zu vereinfachen und zu vereinheitlichen.

Listing 1

public abstract class DiagramUpdateBase implements Groups.Listener
{
public void joinedGroup(PeerGroup group)
{
this.group=group;
pipes=group.getPipeService();

PipeID pid=getDiagramUpdatePipeID(group.getPeerGroupID());
PipeAdvertisement adv=(PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
adv.setPipeID(pid);
adv.setType(PipeService.PropagateType);
adv.setName("DiagramUpdatePipe");
DiscoveryService discovery=group.getDiscoveryService();
try
{
discovery.publish(adv, DiscoveryService.ADV);
discovery.remotePublish(adv, DiscoveryService.ADV);
connectPipe(adv);
} catch (Exception ex)
{
ex.printStackTrace();
}
}

public void createdGroup(PeerGroup group) {joinedGroup(group);}

protected void attachObject(Message msg, String elementName, Object object) throws IOException
{
ByteArrayOutputStream byteOut=new ByteArrayOutputStream();
ObjectOutputStream out=new ObjectOutputStream(byteOut);
out.writeObject(object);
out.close();
msg.addElement(msg.newMessageElement(elementName,null,byteOut.toByteArray()));
}

protected Object retrieveObject(Message msg, String elementName) throws IOException, ClassNotFoundException
{
MessageElement element=msg.getElement(elementName);
byte[] bytes=element.getBytesOffset();
ByteArrayInputStream byteIn=new ByteArrayInputStream(bytes);
ObjectInputStream in=new ObjectInputStream(byteIn);
Object obj=in.readObject();
in.close();
return obj;
}
}

Die Klasse DiagramUpdateSender erweitert die DiagramUpdateBase Klasse und dient zum Versenden von Änderungs-Nachrichten an die Gruppe (Listing 2). Die Methode connectPipe wird von joinedGroup der Superklasse aufgerufen. Eine eventuell bestehende OutputPipe wird hier zunächst geschlossen und eine neue gemäß dem vorbereiteten Advertisements erzeugt. Die beiden Methoden resourceAddedEvent und resourceModifiedEvent stammen aus dem Interface NodeDiagramModificationListener, welches direkt von der Klasse implementiert wird. Dieses Interface gehört zu der NodeDiagram-Komponente. Im Moment reicht es zu wissen, dass die Methoden aufgerufen werden, sobald bestimmte Ressourcen geändert worden sind. Diese Ereignisse sind der Auslöser dafür, dass die Veränderungen der restlichen Gruppe mitgeteilt werden. Mit der Methode sendMessage wird ein Message-Objekt inklusive zwei MessageElement-Objekten erzeugt. Das erste Element ist ein StructuredDocument, das den Event-Typ und die Sender-ID enthält. Das zweite Element ist das Ressourcen-Objekt, welches mit der attachObject Methode an das Message-Objekt angehängt wird. Ein komplettes Ressourcen-Objekt zu versenden vereinfacht in diesem Fall die Handhabung. Für andere Anwendungen können andere Lösungen durchaus sinnvoll sein, wie zum Beispiel sich auf die geänderten Daten zu beschränken. Das eigentliche Versenden der Nachricht über die OutputPipe ist mit einer Verzögerung verbunden. Da wir diese Verzögerung Anwendern nicht zumuten möchten, erfolgt der Nachrichtenversand asynchron. Dies bedeutet, dass die Nachricht zunächst lediglich in eine Message-Queue (in unserem Fall ist diese Bezeichnung etwas übertrieben) eingereiht wird. Somit kann der weitere Programmablauf praktisch ohne Verzögerung erfolgen. Das eigentliche Versenden erfolgt in einem eigenen Thread, der durch die innere Klasse Sender repräsentiert wird. Die run-Methode besteht aus einer Endlosschleife, in der geprüft wird, ob die Message-Queue Objekte enthält. Ist dies der Fall, werden die Nachrichten nach FIFO-Schema aus der Message-Queue entfernt und mit dem send Befehl der OutputPipe verschickt.

Listing 2

public class DiagramUpdateSender extends DiagramUpdateBase implements NodeDiagramModificationListener
{
public void connectPipe(PipeAdvertisement adv) throws java.io.IOException
{
if (outPipe != null) outPipe.close();
outPipe=null;
outPipe=pipes.createOutputPipe(adv, -1);
}

public void resourceAddedEvent(NodeDiagram diagram, Object resource)
{sendMessage(RESOURCE_ADDED_EVENT_TYPE, resource);}

public void resourceModifiedEvent(NodeDiagram diagram, Object resource)
{sendMessage(RESOURCE_MODIFIED_EVENT_TYPE, resource);}

protected void sendMessage(String eventType, Object resource)
{
StructuredDocument doc=StructuredDocumentFactory.newStructuredDocument (mimeType, "biz.junginger.jxta:Resource");
PeerID pid=group.getPeerID();
doc.appendChild(doc.createElement("SenderPeerID",pid.toString()));
doc.appendChild(doc.createElement(EVENT_TYPE,RESOURCE_ADDED_EVENT_TYPE));

Message msg=pipes.createMessage();
try
{
msg.addElement(msg.newMessageElement(EVENT_MESSAGE_ELEMENT, mimeType, doc.getStream()));
attachObject(msg, "Resource", resource);

synchronized(sendQueue)
{
sendQueue.add(msg);
sendQueue.notifyAll();
}
}
catch (IOException ex)
{ex.printStackTrace();}
}

private class Sender extends Thread
{
public void run()
{
while(true)
{
Message msg=null;
synchronized(sendQueue)
{
while(true)
{
if(!sendQueue.isEmpty()) msg=(Message) sendQueue.removeFirst();
if(msg!=null) break;
try{sendQueue.wait();}
catch(Exception ex) {ex.printStackTrace();}
}
}
try
{
outPipe.send(msg);
}
catch (Exception ex)
{ex.printStackTrace();}
}
}
}
}

Die versendeten Nachrichten werden durch die Klasse DiagramUpdateReceiver empfangen und ausgewertet. Die für uns wichtigen Teile des Quellcodes finden Sie in Listing 3. Die Bedeutung der Methode connectPipe wurde bereits vorher besprochen und dient in dieser Klasse zur Verwaltung der InputPipe. Bei der Erzeugung der InputPipe wird das this-Objekt als PipeMsgListener übergeben. Dadurch erreichen wir, dass die Methode pipeMsgEvent aufgerufen wird, sobald eine neue Nachricht eintrifft. Als Argument hat diese Methode ein PipeMsgEvent-Objekt, das im Wesentlichen ein Message-Objekt enthält. Zunächst wird das MessageElement ausgewertet, welches die Sender-ID und den Event-Typ enthält. Ist die Sender-ID die eigene, bedeutet dies, dass der empfangende Peer selbst der Sender war. Somit muss die eigene Nachricht nicht auswertet werden, weil das eigene lokale Objekt aktuell ist. Beschreibt der Event-Typ eine Änderung oder Erstellung einer Ressource, so wird das Objekt mit der Methode retrieveObject aus der Message extrahiert und die Objekt-Aktualisierung an die synchronize-Methode delegiert. Hier wird zunächst anhand der ID überprüft, ob eine lokale Instanz des imaginären globalen Objektes bereits existiert. Ist dies nicht der Fall, wird es neu angelegt, beziehungsweise das empfangene Objekt wird der lokalen Datenstruktur hinzugefügt.

Existiert unter der ID jedoch ein lokales Objekt, muss es synchronisiert werden. An dieser Stelle wenden wir uns kurz diesen lokalen Objekten zu. Da der zeitliche Aspekt bei der Synchronisierung eine wichtige Rolle spielt, wird der Zeitpunkt einer Änderung für jede Objekt-Eigenschaft (Property) festgehalten. Dies wurde mit einer Map realisiert, die den Property-Namen als Schlüssel und die Zeit als long-Wert enthält. Es muss dafür gesorgt werden, dass der Zeitwert unabhängig von abweichenden Uhren und Zeitzonen ist, aber diese zusätzliche Komplexität wollen wir an dieser Stelle nicht weiter betrachten. Wichtig für die Properties ist, dass sie gemäß dem JavaBeans-Standard über get und set-Methoden angesprochen werden. Dies erlaubt eine generische Synchronisation mit Methoden der Reflection APIs. Damit sind wir wieder bei der Methode synchronize angelangt, die mit getMethods alle vorhandenen Methoden des zu synchronisierenden Objektes beziehungsweise dessen Klasse holt. Jede Methode, deren Name mit get beginnt, wird als Property erkannt, welches daraufhin näher überprüft wird. Dazu werden mit getModificationTime die Zeitpunkte der Änderungen abgefragt. Sollte das Property des lokalen Objektes älter als das Property des empfangenen Objektes sein, muss das lokale Property aktualisiert werden. Die Aktualisierung greift wieder auf den Reflection Mechanismus zurück. Zunächst wird mit invoke die zum Property gehörende get-Methode aufgerufen, um den neuen Wert zu erhalten. Danach kann dieser Wert mit einem Aufruf der set-Methode an das lokale Objekt übergeben werden. Dafür wird erneut die invoke-Methode benutzt, welche hier den Wert als Methodenparameter in ein Objekt-Array erwartet. Jetzt fehlt nur die Aktualisierung der Änderungszeit, um das lokale Objekt vollständig zu synchronisieren. Schließlich wird commitDiagramChanges aufgerufen, um der NodeDiagram-Komponente die Aktualisierung mitzuteilen.

Listing 3

public class DiagramUpdateReceiver extends DiagramUpdateBase implements PipeMsgListener
{
public void connectPipe(PipeAdvertisement adv) throws java.io.IOException
{
if (inPipe != null) inPipe.close();
inPipe=null;
inPipe=pipes.createInputPipe(adv,this);
}

public void pipeMsgEvent(PipeMsgEvent event)
{
try
{
Message msg=event.getMessage();
MessageElement element=msg.getElement(EVENT_MESSAGE_ELEMENT);
StructuredDocument doc=StructuredDocumentFactory.newStructuredDocument(mimeType, element.getStream());

String pid=null;
String eventType=null;
Enumeration enum=doc.getChildren();
while (enum.hasMoreElements())
{
Element el=(Element) enum.nextElement();
if (el.getKey().equals("SenderPeerID")) pid=(String) el.getValue();
else if (el.getKey().equals(EVENT_TYPE)) eventType=(String) el.getValue();
}

if(group.getPeerID().toString().equalsIgnoreCase(pid)) return;

if(eventType.equals(RESOURCE_ADDED_EVENT_TYPE) || eventType.equals(RESOURCE_MODIFIED_EVENT_TYPE))
{
Resource resource=(Resource) retrieveObject(msg, "Resource");
synchronize(resource);
}
}
catch (Exception ex)
{
ex.printStackTrace();
return;
}
}

protected void synchronize(Resource resource)
{
String uid=resource.getUID();
Resource existing=diagram.getResource(uid);
if(existing==null)
{
...
diagram.add(resource);
return;
}

Method [] methods=resource.getClass().getMethods();
for (int i=0; i timeExisting)
{
try
{
Object result=methods[i].invoke(resource, null);
Class [] pt={methods[i].getReturnType()};
Method setter=existing.getClass().getMethod("set"+property, pt);
Object[] parameters={result};
setter.invoke(existing, parameters);
existing.setModificationTime(property, timeNew);
}
catch (Exception ex)
{ex.printStackTrace();}
}
}
}
diagram.commitDiagramChanges();
}
}

Wie das vorgestellte Konzept sowie die dazugehören Klassen in die Applikation einfließt, lässt sich gut anhand der Klasse CollaborationTutorial skizzieren. Der Konstruktor der Klasse ist in Listing 4 zu finden. Nach der Initialisierung von JXTA werden hier zunächst die aus dem ersten Teil des Tutorials bekannten Groups- und ChatBoard-Komponenten erzeugt. Darauf folgen die NodeDiagram-Komponente und die soeben vorgestellten neuen Klassen. Der DiagramUpdateReceiver erhält dabei das NodeDiagram-Objekt, welches benötigt wird, um Zugriff auf die lokalen Daten zu erhalten und diese zu aktualisieren, sobald entsprechende Nachrichten eintreffen. Auf die Klasse DiagramInitializer wird noch später eingegangen. Danach wird der DiagramUpdateSender mit der NodeDiagram-Komponente anhand der Methode addModificationListener verknüpft. Somit kann der DiagramUpdateSender auf Veränderungen von Daten reagieren und entsprechende Nachrichten versenden. Die Groups Komponente ist für mehrere Klassen relevant und wird deshalb mit der Methode addListener mit ihnen verknüpft. Um mit der Collaborative Workbench-Anwendung abzuschließen, werfen wir noch einen kurzen Blick auf die Klasse DiagramInitializer, welche die Initialisierung der NodeDiagram-Objekte übernimmt, sobald einer PeerGroup beigetreten wird. Dazu wird das eigene, zuvor gespeicherte Diagramm geladen und mit den aktuellen Daten der Gruppe synchronisiert.

Listing 4

public CollaborationTutorial()
{
PeerGroup netGroup=initJxta();

Groups groups=new Groups(netGroup,groupPrefix);
ChatBoard board=new ChatBoard();
groups.addListener(board);

NodeWorkbench workbench=NodeWorkbench.createInstance();
NodeDiagram diagram=workbench.getDiagram();
DiagramUpdateReceiver receiver=new DiagramUpdateReceiver(diagram);
DiagramUpdateSender sender=new DiagramUpdateSender();
DiagramInitializer initializer=new DiagramInitializer(diagram);
diagram.addModificationListener(sender);
groups.addListener(sender);
groups.addListener(receiver);
groups.addListener(initializer);

joinDefaultGroup(netGroup,groups);
initGUI(groups,board,workbench);
}

Damit sind wir auch schon fast am Ende des Tutorials. Als kleine Ergänzung werden abschließend noch zwei weitere JXTA-Konzepte erwähnt, die für die Entwicklung eigener Peer-to-Peer-Projekte interessant sein könnten: Sicherheit und der Content Management Service (CMS). Sicherheitsmechanismen haben gerade im Peer-to-Peer-Bereich einen großen Stellenwert. Die Sicherheitsinfrastruktur von JXTA basiert auf den Klassen der entsprechenden Java Card APIs, damit die Funktionalität auch bei kleinen Geräten garantiert werden kann. Zur Verfügung stehen unter anderem RSA und Secret Keys, RC4 Verschlüsselung, SH-1 bzw. MD5 Message Digests und Signaturen. Außerdem existieren Pipes für eine verschlüsselte Datenübertragung, die ohne weiteren Aufwand benutzt werden können. Einem anderen Zweck dient CMS, der die Funktionalität für Austausch von Dateien zur Verfügung stellt. Dieser Service vereinfacht es, Dateien im Netzwerk zugänglich zu machen, sie zu finden und herunterzuladen.
JXTA hat jetzt hoffentlich Ihr Interesse geweckt und vielleicht haben Sie schon eine Idee für eine eigene Anwendung, die von dem Peer-to-Peer-Netzwerk profitieren kann.

Geschrieben von
Markus Oliver Junginger
Kommentare

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht.