Suche
Eins, zwei oder drei

Extreme Java Concurrency: CountDownLatch vs CyclicBarrier vs Phaser

Dr. Heinz Kabutz

© shutterstock.com | Protasov

AN

 

Java 7 brachte uns eine neue Klasse namens Phaser, mit der wir Aktivitäten zwischen Threads koordinieren können. Sie ersetzt sowohl CountDownLatch als auch CyclicBarrier, die zwar einfacher zu verstehen, aber schwieriger zu bedienen sind.

 

Treffen Sie Dr. Heinz Kabutz auf dem Extreme Java Camp!
Das Extreme Java Camp besteht aus zwei Intensivseminaren, die umfassendes und aktuellstes Know-how zu fortgeschrittenen Java-Themen und zu Java Concurrency Performance vermitteln. Es ist ein einzigartiges Hands-on-Training, in dem auch die erfahrensten Java-Profis intensiv angeregt und gefordert werden.Infos unter www.extreme-java-camp.de

 

Java Concurrency: CountDownLatch vs CyclicBarrier vs Phaser

Ich spreche seit vielen Jahren in meinem Extreme-Java-Concurrency-Performance-Kurs über Phaser und habe noch keinen Teilnehmer gefunden, der mir sagt: „Oh ja, das ist eine tolle Klasse, wir benutzen sie ständig!“ Die Teilnehmer haben in der Regel schon von CountDownLatch und vielleicht CyclicBarrier gehört, aber selten von Phaser. Wie kann das sein, wenn Phaser seit Java 7 existiert und die Synchronisierung zwischen Threads so viel einfacher macht als andere ähnliche Konstrukte?

CountDownLatch ist einfach zu verstehen, aber schwierig zu handhaben. Phaser hingegen ist schwer zu verstehen, aber einfach anzuwenden. Kürzlich unterrichtete ich eine Gruppe cleverer Programmierer in Athen. Eine der vielen klugen Fragen lautete: „Wie können wir eine Reihe von Aufgaben koordinieren, die alle unterschiedlich viel Zeit in Anspruch nehmen?“ Meine erste, spontane Antwort lautete, CompletionStage zu benutzen. Aber je intensiver wir uns mit dem Problem auseinandersetzen, desto besser schien Phaser zu passen. Im Kurs habe ich zuerst mittels Phaser codiert. Jemand fragte dann, ob dasselbe mit CountDownLatch möglich wäre. Also haben wir auch das kodiert. Hier werde ich es umgekehrt machen. Wir beginnen mit CountDownLatch und refaktorieren dann, um stattdessen Phaser zu verwenden.

Extreme Java – die Serie von Dr. Heinz Kabutz:

Teil 1: Concurrency in Java – das Kreuz mit der Parallelität

Teil 2: Serialisierung in Java – der Klotz am Bein

Teil 3: String-Komprimierung in Java 9 – in der Kürze liegt die Würze

Teil 4: Concurrency – CountDownLatch vs CyclicBarrier vs Phaser

 

Erst mal aufteilen

Wir werden fünf Batches ausführen. Jedes Batch besteht aus drei Aufgaben, die zwischen 500 Millisekunden und 3 Sekunden dauern. Aufgaben innerhalb eines Batches sollten alle gleichzeitig beginnen. Um den Code leichter lesbar zu machen, definieren wir eine gemeinsame Oberklasse LockStepExample:

import java.util.concurrent.*;

public abstract class LockStepExample {
  protected final static int TASKS_PER_BATCH = 3;
  protected final static int BATCHES = 5;

  protected final void doTask(int batch) {
    System.out.printf("Task start %d%n", batch);
    int ms = ThreadLocalRandom.current().nextInt(500, 3000);
    try {
      Thread.sleep(ms);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
    System.out.printf("Task in batch %d took %dms%n", batch, ms);
  }
}

Als Nächstes erweitern wir den Code mit unserem LockStepCountDownLatch. Da das CountDownLatch nicht zurückgesetzt werden kann, müssen wir für jeden Batch ein neues latch erstellen. latch verfügt über ein ziemlich veraltetes Interrupt Handling. Es gibt keine Möglichkeit, den Interrupt so lange zu speichern, bis wir fertig sind, wie wir es mit Semaphore.acquireUninterruptibly() oder Lock.lock() tun können. Der Code in task() ist wichtig. Wir rufen zuerst latch.countDown() auf, um zu signalisieren, dass unser Thread bereit zur Ausführung ist. Danach rufen wir latch.await() auf, müssen aber die InterruptedException selbst verwalten. Wir tun dies, indem wir den unterbrochenen Zustand mit Thread.interrupted() löschen. Wir rufen dann weiter latch.await() auf, bis wir kontrolliert beenden können.

Wenn wir während weiterer latch.await()-Ausführungen erneut unterbrochen werden, merken wir uns den Zustand, warten aber weiter. Sobald wir die while(true)-Schleife mit dem break beenden, unterbrechen wir uns letztendlich selbst, wenn wir an einem Punkt in unserem wartenden Code unterbrochen wurden. interrupt() stoppt den Thread nicht, es ändert lediglich den Zustand zu unterbrochen. Danach rufen wir rufen die Methode doTask() auf und übergeben ihr die Batch-Nummer.

import java.util.concurrent.*;

import static java.util.concurrent.Executors.newFixedThreadPool;

public class LockStepCountDownLatch extends LockStepExample {
  public static void main(String... args) {
    LockStepCountDownLatch lse = new LockStepCountDownLatch();
    ExecutorService pool = newFixedThreadPool(TASKS_PER_BATCH);
    for (int batch = 0; batch < BATCHES; batch++) {
      // We need a new CountDownLatch per batch, since they
      // cannot be reset to their initial value
      CountDownLatch latch = new CountDownLatch(TASKS_PER_BATCH);
      for (int i = 0; i < TASKS_PER_BATCH; i++) { int batchNumber = batch + 1; pool.submit(() -> lse.task(latch, batchNumber));
      }
    }
    pool.shutdown();
  }

  public void task(CountDownLatch latch, int batch) {
    latch.countDown();
    boolean interrupted = Thread.interrupted();
    while (true) {
      try {
        latch.await();
        break;
      } catch (InterruptedException e) {
        interrupted = true;
      }
    }
    if (interrupted) Thread.currentThread().interrupt();
    doTask(batch);
  }
}

Unsere erste Herausforderung bestand darin, für jedes Batch einen neuen CountDownLatch zu erstellen. Dies können wir durch den Einsatz einer CyclicBarrier vermeiden. Das erlaubt es uns, die barrier wiederzuverwenden, aber das Interrupt Handling ist immer noch aus dem letzten Jahrtausend.

import java.util.concurrent.*;

import static java.util.concurrent.Executors.*;

public class LockStepCyclicBarrier extends LockStepExample {
  public static void main(String... args) {
    LockStepCyclicBarrier lse = new LockStepCyclicBarrier();
    ExecutorService pool = newFixedThreadPool(TASKS_PER_BATCH);
    CyclicBarrier barrier = new CyclicBarrier(TASKS_PER_BATCH);
    for (int batch = 0; batch < BATCHES; batch++) {
      for (int i = 0; i < TASKS_PER_BATCH; i++) { int batchNumber = batch + 1; pool.submit(() -> lse.task(barrier, batchNumber));
      }
    }
    pool.shutdown();
  }

  public void task(CyclicBarrier barrier, int batch) {
    boolean interrupted = Thread.interrupted();
    while (true) {
      try {
        barrier.await();
        break;
      } catch (InterruptedException e) {
        interrupted = true;
      } catch (BrokenBarrierException e) {
        throw new AssertionError(e);
      }
    }
    if (interrupted) Thread.currentThread().interrupt();
    doTask(batch);
  }
}

Nun wenden wir uns dem LockStepPhaser zu. Wir können den Phaser für die Batches wiederverwenden, wie die CyclicBarrier. Der Phaser weiß auch, in welchem Batch er sich befindet, sodass wir die Batch-Nummer nicht weitergeben müssen. Und die task()-Methode? Der gesamte komplizierte Interrupt-Handling-Code wird auf ein einzeiliges phaser.arriveAndAwaitAdvance() reduziert. Einfach großartig!

import java.util.concurrent.*;

import static java.util.concurrent.Executors.newFixedThreadPool;

public class LockStepPhaser extends LockStepExample {
  public static void main(String... args) {
    LockStepPhaser lse = new LockStepPhaser();
    ExecutorService pool = newFixedThreadPool(TASKS_PER_BATCH);
    Phaser phaser = new Phaser(TASKS_PER_BATCH);
    for (int batch = 0; batch < BATCHES; batch++) {
      for (int i = 0; i < TASKS_PER_BATCH; i++) { pool.submit(() -> lse.task(phaser));
      }
    }
    pool.shutdown();
  }

  public void task(Phaser phaser) {
    phaser.arriveAndAwaitAdvance();
    doTask(phaser.getPhase());
  }
}

Wer das Rennen macht

Weitere Gründe, warum Phaser die bevorzugte Lösung gegenüber CountDownLatch und CyclicBarrier ist: Er wird mit einem ManagedBlocker realisiert; das bedeutet, dass, wenn unser Phaser einen Thread im gemeinsamen Fork-Join-Pool blockiert, ein weiterer erstellt wird, um die Parallelität auf dem gewünschten Niveau zu halten. Außerdem können wir Phaser in einem Baum einrichten, um weniger Konflikte zu erhalten. Das ist ein bisschen kompliziert, zugegeben, aber es ist möglich. Das können wir mit den anderen Synchronisatoren wie CountDownLatch und CyclicBarrier nicht machen.

API Summit 2018
Christian Schwendtner

GraphQL – A query language for your API

mit Christian Schwendtner (PROGRAMMIERFABRIK)

Verwandte Themen:

Geschrieben von
Dr. Heinz Kabutz
Dr. Heinz Kabutz
Dr. Heinz Kabutz ist regelmäßiger Referent auf allen wichtigen Java-Konferenzen mit den Schwerpunkten Java Concurrency und Performance. Kabutz schreibt den beliebten „The Java Specialists’ Newsletter“, der von Zehntausenden von begeisterten Fans in über 140 Ländern gelesen wird.
Kommentare

Hinterlasse einen Kommentar

Hinterlasse den ersten Kommentar!

avatar
400
  Subscribe  
Benachrichtige mich zu: