„Parallel“ einmal anders dank JDK8

Streams: Wie alles beginnen kann

Sven Ruppert
©Shutterstock.com/ilolab

„Classes to support functional-style operations on streams of elements, such as map-reduce transformations on collections.“ So lautet der erste Satz in der JDK8 API-Doc [1]. In der nachfolgenden Reihe „Streams“ werde ich auf die neuen Klassen aus dem Package java.util.stream eingehen.

Seit JDK8 gibt es das Package java.util.stream, indem die neuen Streams zu finden sind. Die Anzahl der Klassen und Interfaces ist relativ überschaubar.

Der Überblick

Beginnen wir mit dem allgemeinen Überblick über die zur Verfügung stehenden Klassen, Interfaces und Enums (Abb 1).

Abb. 1: Package java.util.stream Überblick

Der Aufbau beginnt mit der Annotation @FunctionalInterface (Listing 1). Diese Annotation ist seit JDK 8 neu dabei und wird verwendet um Interfaces zu definieren, die aus genau einer abstrakten Methode bestehen. Genau genommen ist es also keine Annotation sondern ein Interface-Typ. Instanzen eines @FunctionalInterface können mittels Lambda-Expressions, Methoden-Referenzen oder Konstruktor-Referenzen erzeugt werden. Ein solches @FunctionalInterface ist das Interface AutoCloseable, das es seit JDK 7 gibt. Hierbei handelt es sich um ein Interface aus dem Kontext der Ressourcenverwaltung. Die Methode void close() throws Exception; ist der Einstiegspunkt um die verwendete Ressource und alle damit verbundenen Ressourcen zu schließen wenn diese nicht mehr benötigt werden.

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface FunctionalInterface {}


Der nun wesentliche Teil beginnt mit dem Interface BaseStream, in dem der Basis-Funktionsumfang der Streams definiert ist. Auffällig sind hierbei die Methoden sequential() und parallel(). Die Streams gibt es also unterteilt in zwei Hauptgruppen. Die eine ist für die sequentielle, die andere für die parallele Verarbeitung vorgesehen. Der Grundgedanke ist, dass eine programmatische Entscheidung getroffen werden kann, die Verwendung sich implizit nur in dem Vorrat der zu verwendenden Methodenimplementierungen unterscheidet. Das ist für den Entwickler transparent. Wie sich das einsetzen lässt, werden wir uns nun ansehen.

Beginnen wir mit einem einfachen Interface: Der Worker (Listing 2), in dem zwei Methoden definiert werden, die zur synthetischen Lasterzeugung verwendet werden. Ziel hier ist es, zuerst eine Matrix mit Stützstellen aufzubauen (generateDemoValueMatrix), die danach mittels Splines interpoliert wird (generateInterpolatedValues).

public interface Worker {
public static final int ANZAHL_KURVEN = 200;
   public static final int ANZAHL_MESSWERTE = 10;
   public static final int MAX_GENERATED_INT = 100;

   public abstract List<List<Integer>> generateDemoValueMatrix();

   public abstract List<List<Double>>
        generateInterpolatedValues(List<List<Integer>> baseValues);
}


Für die Beispiele wurden zwei Teile ausgelagert. Der erste Teil ist die Erzeugung einer Wertereihe (DemoValueGenerator) zur Darstellung der Stützstellen (Listing 3). Die Implementierung wurde als Interface mit default-Methode gewählt, was einer JDK8 Notation entspricht. Die Implementierung selbst erfolgte an der Stelle noch ohne Streams.

public interface DemoValueGenerator {

   public default List<Integer> generateDemoValuesForY() {
   	final Random random = new Random();
        	final List<Integer> result = new ArrayList<> ();
        	for (int i = 0; i < Worker.ANZAHL_MESSWERTE; i++) {
            	final int nextInt = 
                      random.nextInt(Worker.MAX_GENERATED_INT);
            	result.add(nextInt);
        	}
        	return result;
    }
}


Der zweite Teil ist die Berechnung der interpolierten Werte (WorkLoadGenerator), was hier zum einen der Lasterzeugung dient und zum anderen als Beispiel zum Einbinden von ThirdParty-Code verwendet wird (Listing 4).

public class WorkLoadGenerator {

   public static final int STEP_SIZE = 100;

   private UnivariateFunction createInterpolateFunction(
                              final List<Integer> values) {
        	final double[] valueArrayX = new double[values.size()];
        	for (int i = 0; i < valueArrayX.length; i++) {
           	   valueArrayX[i] = (double) i * STEP_SIZE;
        	}

        	final double[] valueArrayY = new double[values.size()];
        	int i = 0;
        	for (final Integer value : values) {
         		valueArrayY[i] = (double) value.intValue();
            	        i = i + 1;
        	}

        	final UnivariateInterpolator interpolator 
                      = new SplineInterpolator();
        	final UnivariateFunction function = 
                      interpolator.interpolate(valueArrayX, valueArrayY);
        	return function;
    }

    public List<Double> generate(final List<Integer> v) {
        	final UnivariateFunction interpolateFunction = 
                      createInterpolateFunction(v);
        	//baue Kurve auf
        	final int anzahlValuesInterpolated 
                      = (v.size() - 1) * STEP_SIZE;
        	final List<Double> result = new ArrayList<> ();
        	for (int i = 0; i < anzahlValuesInterpolated - 1; i++) {
         		final double valueForY = interpolateFunction.value(i);
            	        result.add(valueForY);
        	}
        	return result;
    }
}


Sequenzielle Version

Die sequenzielle Version (Listing 5) ist sehr einfach gehalten, und dürfte einer ersten Implementierung meist sehr ähnlich sein.

public class WorkerSerial implements Worker {

    @Override
    public List<List<Double>>
                generateInterpolatedValues(List<List<Integer>> baseValues) {
   	        final WorkLoadGenerator generator = new WorkLoadGenerator();
        	final List<List<Double>> result = new ArrayList<> ();
        	for (final List<Integer> valueList : baseValues) {
         		final List<Double> doubleList = 
                           generator.generate(valueList);
            	result.add(doubleList);
        	}
        	return result;
    }

    private DemoValueGenerator valueGenerator 
          = new DemoValueGenerator(){};

public List<List<Integer>> generateDemoValueMatrix() {
   	        final List<List<Integer>> result = new ArrayList<> ();
        	for (int i = 0; i < ANZAHL_KURVEN; i++) {
         		final List<Integer> demoValuesForY = 
                          valueGenerator.generateDemoValuesForY();
            	result.add(demoValuesForY);
        	}
        	return result;
 	}
}


Der Vorgang ist recht einfach, da pro Messwertereihe eine Kurve erzeugt wird. Das wird nacheinander erledigt. Es liegt auf der Hand, dass dieses natürlich pro Kurve parallel erledigt werden kann.

Aufmacherbild: Streams of light abstract Cool waves background von Shutterstock / Urheberrecht: ilolab

[ header = Parallele Version von Threads, ExecutorService und Streams ]

Parallele Version mit Threads

Die erste parallele Version (Listing 6) wird mit den guten alten Threads realisiert. Hier wird pro Kurve ein Thread mit der Aufgabe betreut. Zum Schluss werden die Ergebnisse eingesammelt. Es ist sofort ersichtlich, dass diese Implementierung wesentlich auswendiger ist.

public class WorkerParallelThreads implements Worker {

   @Override
   public List<List<Integer>> generateDemoValueMatrix() {
   	final List<List<Integer>> result = new ArrayList<> ();
        final List<Task> taskList = new ArrayList<> ();
        for(int i = 0; i< ANZAHL_KURVEN; i++){
        	taskList.add(new Task());
        }
        for (final Task task : taskList) {
        	task.run();
        }
        for (final Task task : taskList) {
        	try {
                	task.join();
                	result.add(task.result);
            	} catch (InterruptedException e) {
                	e.printStackTrace();
            	}
        }
        return result;
    }

    @Override
    public List<List<Double>>
       generateInterpolatedValues(List<List<Integer>> baseValues) {
   	
       final List<List<Double>> result = new ArrayList<> ();
       final List<TaskInterpolate> taskList = new ArrayList<> ();
       for (final List<Integer> baseValue : baseValues) {
         	final TaskInterpolate taskInterpolate 
                        = new TaskInterpolate();
            	taskInterpolate.values.addAll(baseValue);
            	taskList.add(taskInterpolate);
        }
        for (final TaskInterpolate task : taskList) {
        	task.run();
        }
        for (final TaskInterpolate task : taskList) {
        	try {
                	task.join();
                	result.add(task.result);
            	} catch (InterruptedException e) {
                	e.printStackTrace();
            	}
        }
        return result;
    }


    public static class Task extends Thread {
    	public List<Integer> result = new ArrayList<> ();
        private DemoValueGenerator valueGenerator 
                = new DemoValueGenerator(){};

        @Override
        public void run() {
         	result.addAll(valueGenerator.generateDemoValuesForY());
        }
    }
    public static class TaskInterpolate extends Thread {
        	public final List<Integer> values = new ArrayList< > ();
        	public final List<Double> result = new ArrayList< > ();

        	private final WorkLoadGenerator generator 
                       = new WorkLoadGenerator();

        	@Override
        	public void run() {
         		result.addAll(generator.generate(values));
        	}
    }
}


Parallele Version mit ExecutorService

Seit JDK5 gibt es den ExecutorService, mittels dem die zu verwendenden Threads wiederverwendet werden können. Hier werden Callables instanziiert, was im Wesentlichen nichts anderes ist als die Definition der Threads im vorherigen Beispiel. Damit ist der Aufwand für den Entwickler an der Stelle fast identisch. Die Verwendung des ExecutorService ist ein klein wenig einfacher als die Handhabung der Threads selbst (Listing 7).

public class WorkerParallelExecutorService implements Worker{

   private final ExecutorService executorService;

   public WorkerParallelExecutorService(
      ExecutorService executorService) {
      this.executorService = executorService;
  }

   @Override
   public List<List<Integer>> generateDemoValueMatrix() {
   	final List<Task> taskList = new ArrayList<> ();
       	for(int i = 0; i< ANZAHL_KURVEN; i++){
       		taskList.add(new Task());
        	}	

       	final List<List<Integer>> result = new ArrayList<> ();
       	try {
            	final List<Future<List<Integer>>> futureList 
                     = executorService.invokeAll(taskList);
            	for (final Future<List<Integer>> future : futureList) {
                	final List<Integer> valueList = future.get();
                	result.add(valueList);
            	}

        	} catch (InterruptedException | ExecutionException e) {
          	e.printStackTrace();
        	}
        	return result;
    }

    @Override
    public List<List<Double>> generateInterpolatedValues(
           List<List<Integer>> baseValues) {
   	final List<TaskInterpolate> taskList = new ArrayList<> ();
        for (final List<Integer> baseValue : baseValues) {
         		final TaskInterpolate taskInterpolate 
                              = new TaskInterpolate();
            	taskInterpolate.values.addAll(baseValue);
            	taskList.add(taskInterpolate);
        }

        final List<List<Double>> result = new ArrayList<> ();
        try {
        	final List<Future<List<Double>>> futureList 
                              = executorService.invokeAll(taskList);
            	for (final Future<List<Double>> future : futureList) {
                	final List<Double> valueList = future.get();
                	result.add(valueList);
            	}

        } catch (InterruptedException | ExecutionException e) {
        		e.printStackTrace();
        }
        return result;


    }


   public static class Task implements Callable<List<Integer>> {
   	private DemoValueGenerator valueGenerator 
             = new DemoValueGenerator(){};

   	@Override
        public List<Integer> call() {
        	final List<Integer> result = new ArrayList<> ();
            	result.addAll(valueGenerator.generateDemoValuesForY());
            	return result;
        	}
 	}


   public static class TaskInterpolate 
        implements Callable<List<Double>>  {
   	
        public final List<Integer> values = new ArrayList<> ();
        public final List<Double> result = new ArrayList<> ();

        private final WorkLoadGenerator generator 
                = new WorkLoadGenerator();

        	@Override
        	public List<Double> call() {
         		result.addAll(generator.generate(values));
            	return result;
        	}
   }
}


Parallele Version mit Streams

Nun zu der Implementierung mittels Streams (Listing 8). Sofort ersichtlich ist die starke Reduktion der Codezeilen. Der gesamte Anteil, der normalerweise für die Parallelität notwendig ist, ist hier auf die Verwendung der Methode parallelStream() reduziert.

public class WorkerParallelStreams implements Worker{

    @Override
    public List<List<Integer>> generateDemoValueMatrix(){
   	return Stream
       		.generate(this::generateDemoValuesForY)
                .limit(ANZAHL_KURVEN)
                .collect(Collectors.toList());
 	}

    @Override
    public List<List<Double>> generateInterpolatedValues(
                            List<List<Integer>> baseValues) {
   	final List<List<Double>> baseValueMatrix 
               = generateDemoValueMatrix()
                	.parallelStream()
                	.map(v -> {
                    		final WorkLoadGenerator generator 
               = new WorkLoadGenerator();
                    	return generator.generate(v);
                	})
                	.collect(Collectors.toList());
        	return baseValueMatrix;
    }

   public List<Integer> generateDemoValuesForY(){
    	final Random random = new Random();
        	return Stream
         	.generate(() -> random.nextInt(MAX_GENERATED_INT))
                .limit(ANZAHL_MESSWERTE)
                .collect(Collectors.toList());
  	}
}


Und zu guter Letzt kann alles zusammen im Interface selbst definiert werden. Dazu muss man lediglich die Methoden der Klasse WorkerParallelStreams mittel default in dem Interface selbst definieren.

Fazit

Dieses kleine Beispiel sollte Ihr Interesse an der neuen Streams-API wecken. In den nächsten Artikeln der Serie werden wir uns im Detail mit Streams beschäftigen und die Möglichkeiten demonstrieren, die sich daraus ergeben.

Wenn Sie das Programm Part01 starten, werden Sie eine Grafik erhalten, in der die Ausführungszeiten der jeweiligen Implementierungen gegenübergestellt werden. Schön zu sehen ist, dass die Implementierung mittels Streams nicht nur sehr einfach ist, sondern auch noch die Implementierung mit der höchsten Performance ist.

Die Quelltexte zu diesem Text sind unter [1] zu finden. Wer umfangreichere Beispiele zu diesem Thema sehen möchte, dem empfehle einen Blick auf [2] (Modul cdi-commons).

Geschrieben von
Sven Ruppert
Sven Ruppert
Sven Ruppert arbeitet seit 1996 mit Java und ist Developer Advocate bei Vaadin. In seiner Freizeit spricht er auf internationalen und nationalen Konferenzen, schreibt für IT-Magazine und für Tech-Portale. Twitter: @SvenRuppert
Kommentare

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht.