Java CyclicBarrier

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue. More information could be found at CyclicBarrier in Oracle

The CyclicBarrier uses an all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads waiting at that barrier point will also leave abnormally via BrokenBarrierException (or InterruptedException if they too were interrupted at about the same time).

CyclicBarrier is a natural requirement for a concurrent program because it can be used to perform final part of the task once individual tasks  are completed. All threads which wait for each other to reach barrier are called parties, CyclicBarrier is initialized with a number of parties to wait and threads wait for each other by calling CyclicBarrier.await() method which is a blocking method in Java and  blocks until all Thread or parties call await(). In general calling await() is shout out that Thread is waiting on the barrier. await() is a blocking call but can be timed out or Interrupted by other thread.

The difference between CountDownLatch and CyclicBarrier: we have seen how CountDownLatch can be used to implement multiple threads waiting for each other. The CyclicBarrier also the does the same thing but  CountDownLatch cannot be not reused once the count reaches zero while CyclicBarrier can be reused by calling reset() method which resets Barrier to its initial State. Therefore the CountDownLatch is a good for one-time events like application start-up time but CyclicBarrier can be used in case of the recurrent event e.g. concurrently calculating a solution of the big problem etc.

CyclicBarrier Implementation

Create a DTO class that simply maps field values.

package com.roytuts.java.thread.cyclicbarrier;

public class Person {

    private String id;
    private String name;
    private String email;
    private String phone;
    private String city;
    private String state;
    private String country;

    //Getters and setters
}

Create a class which holds the list of Person object.

package com.roytuts.java.thread.cyclicbarrier;

import java.util.List;

public class Job {

    private List<Person> persons;

    public List<Person> getPersons() {
        return persons;
    }

    public void setPersons(List<Person> persons) {
        this.persons = persons;
    }

}

Create below three different service classes to fetch data from different sources for Person object.

Fetch data from file:

package com.roytuts.java.thread.cyclicbarrier;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;

public class PersonFileService implements Callable<Job> {
    private CyclicBarrier barrier;

    public PersonFileService(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public Job call() throws Exception {
        // Dummy set of persons
        // the actual data for Person should come from File
        List<Person> persons = PersonRepo.buildPersonData();

        Job job = new Job();
        job.setPersons(persons);

        System.out.println(this.getClass().getName() + " is waiting on the barrier");

        // await
        barrier.await();

        System.out.println(this.getClass().getName() + " has crossed the barrier");

        return job;
    }

}

Fetch data from database:

package com.roytuts.java.thread.cyclicbarrier;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;

public class PersonDBService implements Callable<Job> {
    private CyclicBarrier barrier;

    public PersonDBService(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public Job call() throws Exception {
        // Dummy set of persons
        // the actual data should come from Database
        List<Person> persons = PersonRepo.buildPersonData();

        Job job = new Job();
        job.setPersons(persons);

        System.out.println(this.getClass().getName() + " is waiting on the barrier");

        // await
        barrier.await();

        System.out.println(this.getClass().getName() + " has crossed the barrier");

        return job;
    }

}

Fetch data from REST service:

package com.roytuts.java.thread.cyclicbarrier;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;

public class PersonRestService implements Callable<Job> {
    private CyclicBarrier barrier;

    public PersonRestService(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public Job call() throws Exception {
        // Dummy set of persons
        // the actual data for person should come from REST webservice
        List<Person> persons = PersonRepo.buildPersonData();

        Job job = new Job();
        job.setPersons(persons);

        System.out.println(this.getClass().getName() + " is waiting on the barrier");

        // await
        barrier.await();

        System.out.println(this.getClass().getName() + " has crossed the barrier");
        return job;
    }

}

Hard coded sample data which were used in the above classes:

package com.roytuts.java.thread.cyclicbarrier;

import java.util.ArrayList;
import java.util.List;

public final class PersonRepo {

    private PersonRepo() {
    }

    public static List<Person> buildPersonData() {
        Person p1 = new Person();
        p1.setId("1000");
        p1.setName("Debabrata");
        p1.setEmail("debabrata@gmail.com");
        p1.setPhone("1234567890");

        // set other fields for p1
        Person p2 = new Person();
        p2.setId("1000");
        p2.setName("Debina");
        p2.setEmail("debina@gmail.com");
        p2.setPhone("1234567890");

        // set other fields for p2
        Person p3 = new Person();
        p3.setId("1000");
        p3.setName("Baishali");
        p3.setEmail("baishali@gmail.com");
        p3.setPhone("1234567890");

        // set other fields for p3
        Person p4 = new Person();
        p4.setId("1000");
        p4.setName("Liton");
        p4.setEmail("liton@gmail.com");
        p4.setPhone("1234567890");

        // set other fields for p4
        List<Person> persons = new ArrayList<>();

        persons.add(p1);
        persons.add(p2);
        persons.add(p3);
        persons.add(p4);

        return persons;
    }

}

Create below class to start different services on separate threads to fetch data and once all data are available then start the main thread.

package com.roytuts.java.thread.cyclicbarrier;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CyclicBarrierApp {

    public static void main(String[] args) {
        int noOfThreads = 3;

        CyclicBarrier barrier = new CyclicBarrier(noOfThreads);
        ExecutorService executorService = Executors.newFixedThreadPool(noOfThreads);

        PersonFileService fileService = new PersonFileService(barrier);
        PersonDBService personDBService = new PersonDBService(barrier);
        PersonRestService personRestService = new PersonRestService(barrier);

        List<Future<Job>> futures = new ArrayList<>();
        futures.add(executorService.submit(fileService));
        futures.add(executorService.submit(personDBService));
        futures.add(executorService.submit(personRestService));

        try {
            List<Person> aggregatedList = new ArrayList<>();

            for (Future<Job> future : futures) {
                List<Person> persons = future.get().getPersons();
                aggregatedList.addAll(persons);
            }

            System.out.println("Aggregated List size : " + aggregatedList.size());

            executorService.shutdown();
            // do something with aggregatedList
            // main tasks start here
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

}

Run the above class, you will get below output:

com.roytuts.java.thread.cyclicbarrier.PersonDBService is waiting on the barrier
com.roytuts.java.thread.cyclicbarrier.PersonFileService is waiting on the barrier
com.roytuts.java.thread.cyclicbarrier.PersonRestService is waiting on the barrier
com.roytuts.java.thread.cyclicbarrier.PersonFileService has crossed the barrier
com.roytuts.java.thread.cyclicbarrier.PersonDBService has crossed the barrier
com.roytuts.java.thread.cyclicbarrier.PersonRestService has crossed the barrier
Aggregated List size : 12

Source Code

Download

Leave a Reply

Your email address will not be published. Required fields are marked *