Tuesday 8 November 2016

Cyclic Barrier

Cyclic Barrier

Cyclic Barrier is one of the synchronizing mechanism made available in Java. Lets imagine it like a barrier in the actual sense which require a fixed number of parties to arrive to cross it.
Below line creates a barrier that requires three threads to cross it.
CyclicBarrier barrier = new CyclicBarrier(3);
We, will create a simple program that would perform addition of two matrices (2*2). To achieve parallelism, each row of the matrix is worked on by a seperate thread.
class Matrix {
    CyclicBarrier barrier;
    ExecutorService executorService;
    int[][] result;
    Matrix() {
        this.barrier = new CyclicBarrier(3);
        executorService = Executors.newFixedThreadPool(2);
    }
}
Here,
- barrier waiting on three threads to arrive.
- ExecutorService using a fixedThreadPool to schedule the threads
- int [][]result stores the exectuion result of a matrix operation.
int[][] add(int[][] matrix_a, int[][] matrix_b) {
        class AddWorker implements Worker {
            int row_number;
            AddWorker(int row_number) {
                this.row_number = row_number;
            }
            @Override
            public void run() {
                for (int i = 0; i < matrix_a[row_number].length; i++) {
                    result[row_number][i] = matrix_a[row_number][i] + matrix_b[row_number][i];
                }
                try {
                    System.out.println("Completed for Row  " + row_number);
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
        result = new int[matrix_a.length][matrix_a[0].length];
        executorService.submit(new AddWorker(0));
        executorService.submit(new AddWorker(1));
        try {
            System.out.println("Waiting for Matrix Addition");
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        return result;
    }
The add() method creates two AddWorker instances. Each instance works on performing addition on one row of the matrix determined by the row_number passed in the constructor.
The add() method calls barrier.await() which will cause it to block as it is the only thread currently waiting at the barrier.
The barrier is overcome only when both the AddWorker instances also call barrier.await().
public class CyclicBarrierDemo {
    public static void main(String[] args) {

        int[][] matrix_a = { { 1, 1 }, { 1, 1 } };
        int[][] matrix_b = { { 2, 2 }, { 3, 2 } };

        Matrix _matrix = new Matrix();
        int[][] result = _matrix.add(matrix_a, matrix_b);
        printMatrix(result);

        int[][] matrix_c = { { 10, 10 }, { 12, 11 } };
        int[][] matrix_d = { { 22, 22 }, { 13, 12 } };

        result = _matrix.add(matrix_c, matrix_d);
        printMatrix(result);
    }

    public static void printMatrix(int[][] x) {
        for (int[] res : x) {
            for (int val : res) {
                System.out.print(val + ", ");
            }
            System.out.println(" ");
        }
    }
}
Demo Application performs addition of two matrices using the add() method.
Since barriers can be reused. We can call the add() method mutliple times without having to reset the synchronizer.

Output of the Matrix Operation.


Waiting for Matrix Addition
Completed for Row  0
Completed for Row  1
3, 3,  
4, 3,  
Waiting for Matrix Addition
Completed for Row  0
Completed for Row  1
32, 32,  
25, 23,  

No comments:

Post a Comment