Apache Spark: Mehr als ein Hadoop-Ersatz

Kai Spichale

©Shutterstock/Claudia Paulussen

Hadoop wird in verschiedenen Branchen erfolgreich zur Analyse großer Datenmengen eingesetzt. Sein Erfolg ist begründet durch sein einfaches Programmiermodell MapReduce, das fehlertolerant, skalierbar und kosteneffektiv für unterschiedlich große Datenmengen genutzt werden kann. Ein Nachteil von MapReduce ist allerdings dessen hohe Latenz, sodass man für die Durchführung von Abfragen vergleichsweise lange warten muss. Unter diesem Gesichtspunkt wurde Spark von der Apache Software Foundation zur Beschleunigung von Datenverarbeitungsprozessen entwickelt.

Um Missverständnissen vorzubeugen, sei zu Beginn dieses Artikels explizit darauf hingewiesen, dass Spark keine modifizierte Hadoop-Version ist, sondern eine unabhängige Entwicklung mit eigener Cluster-Verwaltung. Man kann Hadoop jedoch zur Implementierung von Spark nutzen, wenn man möchte. Aufgrund der eigenen Cluster-Verwaltung kann Spark als Standalone-Version deployt werden. Spark könnte dennoch das verteilte und ausfallsichere Dateisystem HDFS aus dem Hadoop-Stack nutzen, um darin gespeichert Dateien zu verarbeiten. Das klassische MapReduce-Beispiel Wörterzählen könnte mit Spark folgendermaßen aussehen:

val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

In diesem Scala-Beispiel wird das RDD-API genutzt. RDD steht für Resilient Distributed Dataset und ist das zentrale Konzept des Spark API. Ein RDD ist eine fehlertolerante Collection, deren Elemente nebenläufig verarbeitet werden können. Ein RDD kann entweder durch Parallelisierung einer im Spark-Programm existierenden Collection oder durch Referenzierung eines Datenbestandes in einem externen Speichersystem erzeugt werden. Letzteres wird im obigen Beispiel gemacht. Frameworks wie MapReduce und Dryad bieten wie Spark Fehlertoleranz und Verteilungstransparent, aber eine Abstraktion für verteilten Speicher bietet nur Spark. RDDs sind vorteilhaft für die Wiederverwendung von Zwischenergebnissen, die beispielsweise beim iterativen maschinellen Lernen und Graphenalgorithmen auftreten.

Im obigen Scala-Beispiel entsteht durch wenige Transformationen das RDD counts bestehend aus (Spring, Int) Tupeln. Der Scala-Ausdruck (_ + _) ist eine anonyme Funktion mit zwei Parametern. Der erste Parameter wird mit +-Operator aufgerufen. Der zweite Parameter wird beim Aufruf übergeben, sodass im Reduce-Schritt die Einsen pro Wort zusammengezählt werden. Das Ergebnis ist eine Liste aller Wörter und deren Häufigkeit im Text. Dieses Ergebnis wird am Ende in eine Datei im HDFS zurückgeschrieben. Mit dem Java-API wird das Beispiel übrigens nicht einfacher. Wer sich einen Eindruck verschaffen möchte, kann sich dieses Beispiel anschauen.

Spark-Komponenten

Eine Spark-Applikation wird mit mehreren unabhängigen Prozessen in einem Computer-Cluster ausgeführt. Für die Koordination ist ein Spark Context zuständig. Der Spark Context ist ein Objekt im Treiberprogramm, das sich mit einem Cluster Manager verbindet. Je nach Betriebsszenario stammt der Cluster Manager von Spark, Mesos oder YARN. Mithilfe des Cluster Managers kann das Treiberprogramm schließlich die Executor-Prozesse auf den Maschinen des Clusters anweisen, die Berechnungen durchzuführen und notwendige Daten zu speichern. Spark nutzt Datenlokalität aus, wann immer es möglich ist. Der Applikationscode in Form einer JAR-Datei oder in Form von Python-Dateien werden deswegen zu den Exekutor-Prozessen kopiert und lokal ausgeführt.

Spark im Hadoop-Cluster

Spark kann noch mehr nutzen als nur das Dateisystem HDFS. Falls bereits eine YARN-Installation vorhanden ist, könnte auf dieser Spark ohne weitere Installation ausgeführt werden. Apache YARN (Yet Another Resource Negotiator) ist die Clusterware von Hadoop und dient der Verwaltung von Ressourcen.

$ ./bin/spark-submit --class SomeSelfContainedProgram
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 2g \
    --executor-memory 1g \
    --total-executor-cores 2 \
    lib/spark-programs*.jar

In diesem Beispiel wird eine Self-Contained-Applikation gestartet. Mit den übergebenen Parametern wird der Deploy-Modus cluster ausgewählt. In diesem Fall läuft das Treiberprogramm im Application-Master-Prozess von YARN, sodass der Benutzer nach dem Start des Treibers nicht mehr benötigt wird. Alternativ könnte der Deploy-Modus client gewählt werden, um das Treiberprogramm lokal beim Benutzer auszuführen.

Als Master-URL wird im Beispiel yarn angegeben, sodass sich das Spark-Programm zur Ausführung mit einem YARN-Cluster verbindet. Ebenfalls könnte man auch spark://HOST:PORT oder mesos://HOST:PORT angeben, um das Programm auf einem Spark Standalone Cluster oder einem Mesos-Cluster auszuführen. Das Spark-Programm könnte ebenfalls mit 1, n oder möglichst vielen lokalen Spark-Threads mit der Master-URL local, local[n] oder local[*] ausgeführt werden. Es gibt demzufolge viele unterschiedliche Möglichkeiten, ein Spark-Programm lokal oder in einem Cluster mit unterschiedlicher Parallelisierung auszuführen.

Interaktive Spark Shell

Spark bietet mit der Spark Shell eine elegante Möglichkeit, das API kennenzulernen. In der Shell können Daten interaktiv ausgewertet werden. Man kann entweder Scala auf der Java VM oder Python in der Shell nutzen.

<em>./bin/spark-shell --master spark://IP:PORT</em>

In der Spark Shell könnte man beispielsweise Spark MLlib, eine Bibliothek für verteiltes maschinelles Lernen, ausprobieren. Spark MLlib bietet verschiedene Clustering-Algorithmen wie K-Means++, um aus einer gegebenen Menge ähnlicher Objekte eine gewünschte Anzahl von Gruppen (Cluster) zu bilden. Das folgende Beispiel kann direkt in der Spark-Scala-Shell ausgeführt werden, um zwei Cluster für die gegebenen Vektoren zu berechnen:

import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors

val v1 = Vectors.dense(0.0, 0.0, 0.0)
val v2 = Vectors.dense(0.1, 0.1, 0.1)
val v3 = Vectors.dense(9.0, 9.0, 9.0)
val v4 = Vectors.dense(9.1, 9.1, 9.1)
val distVectors = sc.parallelize(Array(v1, v2, v3, v4))

val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(distVectors, numClusters, numIterations)
clusters.clusterCenters.foreach { println }

Caching

Spark kann Cluster-weit Daten in den Hauptspeicher laden, um die Performance von Applikationen um Faktor 10 zu verbessern. Diese Maßnahme zahlt sich aus, wenn Daten wiederholt benutzt werden. Für iterative Algorithmen ist diese Maßnahme deswegen empfehlenswert. Wenn ein RDD gespeichert wird, übernimmt jeder Spark-Knoten einen Teil der Daten. Der Cache ist fehlertolerant: Falls ein Cache ausfällt, wird dieser auf einem anderen Spark-Knoten neuberechnet. Das Speichern kann mit den Methoden persist() oder cache() durchgeführt werden. Die Methode persist() unterstützt verschiedenen Storage Level von MEMORY_ONLY bis DISK_ONLY. Die Methode cache() entspricht persist(MEMORY_ONLY).

$ ./bin/spark-shell  --driver-memory 8g

val bigdata = sc.textFile("s3n://path_to_bucket/big.data")
bigdata.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
bigdata.count()
bigdata.count()

Spark-Prozesse sind prinzipiell lazy, denn beim Erzeugen der RDD bigdata passiert noch nichts. Erst beim Aufruf von count() wird die Datei gelesen, um die Zeilenanzahl zu bestimmen. Die Methoden cache() und persist() sind ebenfalls lazy, sodass erst beim Aufruf von count() die Datei geladen wird. Der wiederholte Aufruf von count() ist durch das Caching dann spürbar schneller.

Spark SQL

Das Modul Spark SQL ist ideal zur Verarbeitung von strukturierten Daten. Insbesondere im Vergleich zum zuvor vorgestellten Spark RDD API bietet Spark SQL mehr Informationen über die Struktur der Daten und die durchgeführten Operationen. Die Informationen über die Struktur der Daten werden intern dazu verwendet, die Ausführung der Operationen zu optimieren. Wahlweise kann man Datenabfragen mit SQL-Syntax oder HiveQL schreiben.

<em>val hiveContext = org.apache.spark.sql.hive.HiveContext(sc)</em>

<em>hiveContext.sql("SELECT * FROM users")</em>

Mit einem SQLContext können Spark-Applikationen DataFrames für RDDs und Data Sources erzeugen. Als Data Sourcen eignen sich Hive-Tabellen, um eventuell existierende Hive-Installationen wiederverwenden zu können. Genauso könnte man auch Tabellen einer relationalen Datenbank wie MySQL nutzen. Auch Dateien im Hadoop-Spalten-Speicherformat Apache Parquet oder einfachem JSON-Format können mit Spark verarbeitet werden:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Notwendig zur Konvertierung von RDD nach DataFrame
import sqlContext.implicits._
val df = sqlContext.read.json("users.json")
df.select(df("age"), df("name"), df("active")).show()

+----+-----+------+
| age| name|active|
+----+-----+------+
|null|Alice|  true|
|  30|  Bob|  true|
|   9|Kevin|  true|
|  28|Maria| false|
|  41|  Tim| false|
+----+-----+------+

Die DataFrames von Spark bieten ein Fluent API zur Formulierung von Abfragen. Filter, Gruppierungen und Aggregationen können miteinander kombiniert werden:

df.filter(df("age")&gt;=18).groupBy("active").count().show()

&nbsp;

+------+-----+

|active|count|

+------+-----+

| true|   1|

| false|   2|

+------+-----+

Beim Entwurf der Spark API wurde viel Wert darauf gelegt, dass die Interaktion mit verteilten Daten möglichst elegant und mit nur wenig Boilerplate-Code funktioniert. Sowohl mit RDDs als auch mit DataFrames können mehrere Transformationen miteinander verkettet werden, um die Daten wie gewünscht zu verarbeiten.

Apache Cassandra

Mit dem Spark-Cassandra-Connector bzw. Spark-Cassandra-Connector-Java können Cassandra-Tabellen als Spark RDDs genutzt werden. Im folgenden Beispiel wird eine CQL-Abfrage auf die in Cassandra gespeicherte Tabelle users gestartet.

val conf = new SparkConf(true).set("spark.cassandra.connection.host", "127.0.0.1");

import com.datastax.spark.connector._;

val sc = new SparkContext("local","My Cluster",conf);

val ts = sc.cassandraTable("testkeyspace","users");

ts.select("name").where("active = ?", true).toArray.foreach(println)

Cassandra unterstützt nur einfache Abfragen. Die Server-seitigen Filter (Where-Klauseln) sind jedoch nützlich, um die Datenmenge vor ihrer Übertragung zu reduzieren. Außerdem werden die Daten lokal im Cluster verarbeitet, falls das möglich ist. Aus diesem Grund werden die Spark Worker lokal auf den Maschinen der Cassandra-Knoten ausgeführt. Für alle in der Where-Klausel benutzten Spalten müssen in Cassandra Secondary Indexes angelegt worden sein.

Lambda-Architektur

Die Lambda-Architektur kombiniert Batch Processing und Stream Processing zur Verarbeitung großer Datenmengen. Spark ist eine der wenigen Technologien, die nahtlos beide Ansätze miteinander vereint. Das Modul Spark Streaming kann Datenströme beispielsweise von HDFS, Flume, Kafka oder ZeroMQ lesen. Die Ergebnisse werden dann typischerweise in einem Dashboard visualisiert oder zurück in HDFS und andere Datenbanken geschrieben. Ein Technologie-Stack bestehend aus Spark Streaming, Apache Kafka, Apache Cassandra, Akka und Scala kann zur Umsetzung einer Lambda-Architektur verwendet werden. Eine Lambda-Architektur besteht grob aus drei Ebenen:

  • In der Batch-Ebene liegen mehrfach redundant gesicherte Daten. Diese Daten sind das Ergebnis von langlaufenden Berechnungen. Je nach Anforderung und Datenvolumen können diese Berechnungen mehrere Stunden in Anspruch nehmen. Falls während einer Berechnung bereits neue Daten hinzugefügt werden, können diese Daten nicht unmittelbar berücksichtigt werden. Erst bei der folgenden Berechnung werden die Daten einbezogen.
  • Weil die Batch-Ebene eventuell viele Stunden zur Berechnung der Ergebnisse braucht, ist eine separate Serling-Ebene notwendig. Diese Ebene kann externe Benutzer bedienen. Wenn neue Ergebnisse in der Batch-Ebene erstellt wurden, können die Daten in der Serving-Ebene ersetzt werden.
  • Theoretisch könnte man schon mit der Batch- und Serving-Ebene eine sinnvolle Applikation bauen. Man muss jedoch beachten, dass die Daten in der Serving-Ebene niemals den aktuellen Stand enthalten. Diese Lücke kann durch eine dritte Ebene, die Speed-Ebene, geschlossen werden. Diese Ebene erhält ebenfalls die neuen Daten und aktualisiert kontinuierlich ihre Ergebnisse. Weil diese Ergebnisse möglicherweise nicht so genau oder umfassend sind wie die der Batch-Ebene, werden diese ersetzt sobald die Batch-Ebene einen neueren Zwischenstand berechnet hat.

In einer möglichen Umsetzung würde man die neuen Daten mit Kafka zu Cassandra streamen und dort ausfallsicher für die folgende Batch-Verarbeitung speichern. Dank der von Kafka unterstützen Topics können die Nachrichten von mehreren Empfängern konsumiert werden, sodass die eingehenden Daten ebenfalls in einer Spark-Streaming-Applikation konsumiert werden können.

Apache Zeppelin

Zur Analyse von Daten gehört typischerweise auch die grafische Darstellung der Berechnungsergebnisse. Um die Ergebnisse in einem PDF-Bericht oder auf einer Website anzeigen zu können braucht man ein Datenvisualisierungs-Tool wie Apache Zeppelin. Diese Web-basierte Notizbuch bietet eine eingebaute Spark-Integration. Beispielsweise kann man die SQL-Shell von Apache Zeppelin nutzen, um SQL-Abfragen zu schreiben und mit Spark ausführen zu lassen. Die Ergebnisse werden in Textform, als HTML oder mit verschiedenen Diagrammtypen (z.B. Kuchen- oder Säulendiagramme) dargestellt.

Fazit

Apache Spark macht interaktive Datenverarbeitung im Computercluster vergleichsweise einfach. Mit der Spark API können mit nur wenig Boilerplate-Code und Konfigurationsaufwand Berechnungen durchgeführt werden. Sehr variabel zeigt sich Spark in der Kombination mit anderen Technologien wie Hadoop, Cassandra oder Kafka. Ein großer Pluspunkt ist auch der Performance-Vorteil im Vergleich zu Hadoop MapReduce, der durch Hauptspeicherberechnungen begründet ist. Dieser rundum positive Eindruck wird ebenfalls durch die Google Trends bestätigt: Während das Interesse an Hadoop auf einem niedrigen Niveau stagniert, wächst die Anzahl der Suchanfragen für Spark kontinuierlich.

Aufmacherbild: A young elephant right next to an adult one.von Shutterstock / Urheberrecht: Claudia Paulussen

Verwandte Themen:

Geschrieben von
Kai Spichale
Kai Spichale
Kai Spichale (@kspichale) beschäftigt sich leidenschaftlich seit mehr als 10 Jahren mit Softwarearchitekturen und sauberen Code. Für innoQ Deutschland GmbH arbeitet er als IT-Berater mit modernen Architekturansätzen, API-Design und NoSQL. Er ist regelmäßiger Autor in verschiedenen Fachmagazinen und Sprecher auf Konferenzen.
Kommentare

Hinterlasse einen Kommentar

Hinterlasse den ersten Kommentar!

avatar
400
  Subscribe  
Benachrichtige mich zu: