Tuesday, 8 November 2016

Countdown Latch

Countdown Latch

Countdown Latch is one of the other synchronizing mechanism that’s made available in Java. Although it may appear to be very similar to Cyclic Barrier.
Countdown Latch works very similar to a multi-lever latch lock. Access through the lock is possible only when all the levers are exercised (count down)
Java Countdown latch also works on the same principle, one or more threads can be made to wait on the latch using the await() method. This latch is made open only when all the count downs are performed on the latch. The number of count downs requires to open the latch is specified during the initialization of the Countdown Latch.
Few notable differences with Cyclic Barrier.
  • A single thread can perform all the count downs if required as it performs various operations.
  • In case of Cyclic barrier, distinct threads have to arrive and wait at the barrier for it be crossed.
  • Threads performing the countdown do not wait at the latch after the count down.
Create a CountDownLatch with the number of countdowns
CountDownLatch latch = new CountDownLatch(2);
We, will create a simple program that would perform addition of two matrices (2*2). To achieve parallelism, each row of the matrix is worked on by a seperate thread.
class Matrix {
    CountDownLatch latch;
    ExecutorService executorService;
    int[][] result;
    Matrix() {
        this.latch = new CountDownLatch(2);
        executorService = Executors.newFixedThreadPool(2);
    }
}
Here,
latch has been initialized with value two
executor service created with fixed Thread Pool for two threads
int[][] add(int[][] matrix_a, int[][] matrix_b) {
        int[][] result = new int[matrix_a.length][matrix_a[0].length];

        class AddWorker implements Runnable {
            int row_number;

            AddWorker(int row_number) {
                this.row_number = row_number;
            }

            @Override
            public void run() {
                for (int i = 0; i < matrix_a[row_number].length; i++) {
                    result[row_number][i] = matrix_a[row_number][i]
                            + matrix_b[row_number][i];
                }
                latch.countDown();
            }
        }

        executorService.submit(new AddWorker(0));
        executorService.submit(new AddWorker(1));

        try {
            System.out.println("Waiting for Matrix Addition");
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executorService.shutdown();
        return result;
    }
}
The add() method creates two AddWorker instances. Each instance works on performing addition on one row of the matrix determined by the row_number passed in the constructor.
The add() method calls latch.await() which will cause it to block as it is the only thread currently waiting at the latch.
The latch is open when the two worker threads does a countdown on the latch. Two countdowns are required on the latch.
public class CountDownLatchDemo {

    public static void main(String[] args) {
        int[][] matrix_a = { { 1, 1 }, { 1, 1 } };
        int[][] matrix_b = { { 2, 2 }, { 3, 2 } };

        MatrixCountDown _matrix = new MatrixCountDown();
        int[][] result = _matrix.add(matrix_a, matrix_b);
        printMatrix(result);

    }

    public static void printMatrix(int[][] x) {
        for (int[] res : x) {
            for (int val : res) {
                System.out.print(val + ", ");
            }
            System.out.println(" ");
        }
    }

}
The key difference from a similar application written using cyclic barrier would be
  • Worker threads are not waiting after calling the countdown operations. Only threads that make call to await() operation on the latch wait
  • latch is the initialized to the number of countdown operations and not on the number of threads required to wait on it. In case of Cyclic barrier, barrier is initialized with the number of threads that needs to wait at the barrier.
Output from the Demo application
Waiting for Matrix Addition
3, 3,  
4, 3,  

Cyclic Barrier

Cyclic Barrier

Cyclic Barrier is one of the synchronizing mechanism made available in Java. Lets imagine it like a barrier in the actual sense which require a fixed number of parties to arrive to cross it.
Below line creates a barrier that requires three threads to cross it.
CyclicBarrier barrier = new CyclicBarrier(3);
We, will create a simple program that would perform addition of two matrices (2*2). To achieve parallelism, each row of the matrix is worked on by a seperate thread.
class Matrix {
    CyclicBarrier barrier;
    ExecutorService executorService;
    int[][] result;
    Matrix() {
        this.barrier = new CyclicBarrier(3);
        executorService = Executors.newFixedThreadPool(2);
    }
}
Here,
- barrier waiting on three threads to arrive.
- ExecutorService using a fixedThreadPool to schedule the threads
- int [][]result stores the exectuion result of a matrix operation.
int[][] add(int[][] matrix_a, int[][] matrix_b) {
        class AddWorker implements Worker {
            int row_number;
            AddWorker(int row_number) {
                this.row_number = row_number;
            }
            @Override
            public void run() {
                for (int i = 0; i < matrix_a[row_number].length; i++) {
                    result[row_number][i] = matrix_a[row_number][i] + matrix_b[row_number][i];
                }
                try {
                    System.out.println("Completed for Row  " + row_number);
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
        result = new int[matrix_a.length][matrix_a[0].length];
        executorService.submit(new AddWorker(0));
        executorService.submit(new AddWorker(1));
        try {
            System.out.println("Waiting for Matrix Addition");
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        return result;
    }
The add() method creates two AddWorker instances. Each instance works on performing addition on one row of the matrix determined by the row_number passed in the constructor.
The add() method calls barrier.await() which will cause it to block as it is the only thread currently waiting at the barrier.
The barrier is overcome only when both the AddWorker instances also call barrier.await().
public class CyclicBarrierDemo {
    public static void main(String[] args) {

        int[][] matrix_a = { { 1, 1 }, { 1, 1 } };
        int[][] matrix_b = { { 2, 2 }, { 3, 2 } };

        Matrix _matrix = new Matrix();
        int[][] result = _matrix.add(matrix_a, matrix_b);
        printMatrix(result);

        int[][] matrix_c = { { 10, 10 }, { 12, 11 } };
        int[][] matrix_d = { { 22, 22 }, { 13, 12 } };

        result = _matrix.add(matrix_c, matrix_d);
        printMatrix(result);
    }

    public static void printMatrix(int[][] x) {
        for (int[] res : x) {
            for (int val : res) {
                System.out.print(val + ", ");
            }
            System.out.println(" ");
        }
    }
}
Demo Application performs addition of two matrices using the add() method.
Since barriers can be reused. We can call the add() method mutliple times without having to reset the synchronizer.

Output of the Matrix Operation.


Waiting for Matrix Addition
Completed for Row  0
Completed for Row  1
3, 3,  
4, 3,  
Waiting for Matrix Addition
Completed for Row  0
Completed for Row  1
32, 32,  
25, 23,