How Fork Join works in Java

Introduction

Here I am going to tell you how fork join works in Java and how to use fork join in Java. Fork join was introduced in java version 1.7. JDK 7 and Java version 7 introduced many good features, such as, string in switch case, multi-catch block, automatic resource management, etc. and fork join was one of them.

Fork join was introduced to enhance the performance of the Java programming in which problems are solved by splitting them (recursively) into subtasks that are solved in parallel, waiting for them to complete, and then composing results. The main implementation techniques surround efficient construction and management of tasks queues and worker threads.

Fork/Join parallelism is among the simplest and most effective design techniques for obtaining good parallel performance. Fork/join algorithms are parallel versions of familiar divide−and−conquer algorithms.

The fork operation starts a new parallel fork/join subtask. The join operation causes the current task not to proceed until the forked subtask has completed.

Design

Fork/join programs can be run using any framework that supports construction of subtasks that are executed in parallel, along with a mechanism for waiting out their completion. However, the java.lang.Thread class is not optimal solution for supporting fork/join programs.

Fork/join tasks have simple and regular synchronization and management requirements. The computation graphs produced by fork/join tasks admit much more efficient scheduling tactics than needed for general−purpose threads. For example, fork/join tasks never need to block except to wait out subtasks. Thus, the overhead and bookkeeping necessary for tracking blocked general−purpose threads are wasted.

Given reasonable base task granularities, the cost of constructing and managing a thread can be greater than the computation time of the task itself. While granularities can and should be subject to tuning when running programs on particular platforms, the extremely coarse granularities necessary to outweigh thread overhead limits opportunities for exploiting parallelism.

The main advantage of creating such a Java lightweight execution framework is to enable fork/join programs to be written in a more portable fashion and to run on the wide range of systems supporting JVMs.

Optimization for fork/join design is established in the following way:

A pool of worker threads is established. Each worker thread is a standard (“heavy”) thread that processes tasks held in queues. Normally, there are as many worker threads as there are CPUs on a system. These are mapped to kernel threads or lightweight processes, and in turn to CPUs.

All fork/join tasks are instances of a lightweight executable class, not instances of threads. In Java, independently executable tasks must implement interface Runnable and define a run method.

A special purpose queuing and scheduling discipline is used to manage tasks and execute them via the worker threads.

Work Stealing:

The heart of a fork/join framework lies in its lightweight scheduling mechanics:

  • Each worker thread maintains runnable tasks in its own scheduling queue.
  • Queues are maintained as double−ended queues (i.e., deques), supporting both LIFO (Last In First out) (push and pop operations), as well as a FIFO (First In First Out) take operation.
  • Subtasks generated in tasks run by a given worker thread are pushed onto that workers own deque.
  • Worker threads process their own deques in LIFO order, by popping tasks.
  • When a worker thread has no local tasks to run, it attempts to take (“steal”) a task from another randomly chosen worker, using a FIFO rule.
  • When a worker thread encounters a join operation, it processes other tasks, if available, until the target task is noticed to have completed. All tasks otherwise run to completion without blocking.
  • When a worker thread has no work and fails to steal any from others, it backs off (via yields, sleeps, and/or priority adjustment) and tries again later unless all workers are known to be similarly idle, in which case they all block until another task is invoked from top−level

It reduces contention by having stealers operate on the opposite side of the deque as owners.

The more details on fork join design and performance can be found on a paper written by professor Doug Lea.

How fork join came into existence?

Multicore processors are now becoming widespread across server, desktop, and laptop hardware. They are also making their way into smaller devices, such as smartphones and tablets. They open new possibilities for concurrent programming because the threads of a process can be executed on several cores in parallel. One important technique for achieving maximal performance in applications is the ability to split intensive tasks into chunks that can be performed in parallel to maximize the use of computational power.

Dealing with concurrent or parallel programming has traditionally been difficult, because you have to deal with thread synchronization and the pitfalls of shared data.

The Executor framework in Java has made the management of concurrent task very easy and it works on divide and conquer algorithm and create sub-tasks and communicate with each other to complete.

The problem with the Executor framework is that a Callable is free to submit a new sub-task to its executor and wait for its result in a synchronous or asynchronous fashion. Here comes the issue of parallelism: when a Callable waits for the result of another Callable, it is put under a waiting state, and thus wasting an opportunity to handle another Callable queued for execution.

Therefore to give a solution Java 7 introduced parallelism and fork join framework has been added to solve this kind of issue.

Now let’s move on to example…

Example

Let’s have a look at a simple use of Executor and Callable. We have a list of employees with their half yearly performance ratings in a year. We need to calculate and print average or final rating for each employee.

Let’s assume rating scale is 1 to 5, where 1 is the lowest and 5 is the highest value.

Without using threads, we can simply iterate over the list of employees and calculate their average rating one after another. But java already provided the concurrency frameworks, so we will use concurrency framework to solve our problem.

Below program creates a separate thread for each employee and each thread calculates and returns average yearly appraisal’s final rating for each employee.

package com.roytuts.java.fork.join;

import java.util.concurrent.Callable;

public class EmployeeAppraisal implements Callable<String> {

	private String name;
	private float halfYear1;
	private float halfYear2;

	public EmployeeAppraisal(String name, float halfYear1, float halfYear2) {
		this.name = name;
		this.halfYear1 = halfYear1;
		this.halfYear2 = halfYear2;
	}

	@Override
	public String call() throws Exception {
		return "Yearly appraisal's final rating for " + name + ": " + ((halfYear1 + halfYear2) / 2);
	}

}
package com.roytuts.java.fork.join;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class EmployeeAppraisalApp {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		List<EmployeeAppraisal> appraisals = new ArrayList<EmployeeAppraisal>();

		appraisals.add(new EmployeeAppraisal("Rajiv", 3.0f, 3.5f));
		appraisals.add(new EmployeeAppraisal("Alinaksha", 2.0f, 2.5f));
		appraisals.add(new EmployeeAppraisal("Rajat", 2.5f, 3.0f));
		appraisals.add(new EmployeeAppraisal("Sushil", 3.0f, 3.1f));
		appraisals.add(new EmployeeAppraisal("Shelshi", 3.2f, 3.5f));
		appraisals.add(new EmployeeAppraisal("Pavni", 2.2f, 2.5f));
		appraisals.add(new EmployeeAppraisal("Suprabhat", 1.5f, 2.5f));
		appraisals.add(new EmployeeAppraisal("Anindya", 1.0f, 1.5f));
		appraisals.add(new EmployeeAppraisal("Saugata", 2.2f, 2.5f));

		ExecutorService service = Executors.newFixedThreadPool(3);

		List<Future<String>> results = service.invokeAll(appraisals);

		for (Future<String> result : results) {
			System.out.println(result.get());
		}

		service.shutdown();
	}

}

The above program will give you below output:

Yearly appraisal's final rating for Rajiv: 3.25
Yearly appraisal's final rating for Alinaksha: 2.25
Yearly appraisal's final rating for Rajat: 2.75
Yearly appraisal's final rating for Sushil: 3.05
Yearly appraisal's final rating for Shelshi: 3.35
Yearly appraisal's final rating for Pavni: 2.35
Yearly appraisal's final rating for Suprabhat: 2.0
Yearly appraisal's final rating for Anindya: 1.25
Yearly appraisal's final rating for Saugata: 2.35

How it is easy to write multi-threaded program using Java’s concurrency framework. We have just passed a list of Callable employees to ExecutorService‘s invokeAll() method. The ExecutorService distributes the tasks among threads residing in a pool of size 3.

The invokeAll() method returns Future objects. Each future object can be tested individually to check whether their corresponding thread operation has been cancelled (using isCancelled() method) before its normal completion, whether it is completed (using isDone() method), even you can attempt to cancel the operation using cancel() method. Cancellation of operation in plain thread is a complicated task.

The invokeAll() method is non-blocking and you can iterate through the collection of Future object any time and start working on a completed task.

As said earlier the Executors are a big step forward than the plain threads because they ease the management of concurrent tasks. Executors work on ‘Divide-and-Conquer’. According to this algorithm, the bigger tasks are divided into smaller chunks of subtasks, later the subtasks are executed concurrently and finally the results of the subtasks are combined to get the final result. Identifying parallel chunks of subtasks and dividing the task is called as fork and combining the results of subtasks to form a final result is called as join.

Let’s say we want to modify our problem to calculate the average of first half-yearly ratings for all employees. The logic would be, to iterate through the list of employees and calculate total of first half-year ratings for all employees and then divide it by number of employees.

But calculating the average of the entire organization is a big task. Let’s think of dividing it into multiple individual tasks. While iterating through the list of employees, we will form chunks of few (let’s say 5) employees each. That means for every 5 employees we will create a separate Callable and assign it with the ratings of 5 employees.

Let’s say we have 25 employees in an organization. So we will have 5 threads calculating the averages of their own chunk of employees. We can simply iterate over the resulted Collection of Future, add the averages and divide the total by number of chunks (5 chunks in our case). The divide and conquer algorithm will surely boost the performance than that in case of a single thread model.

Here is the example,

Remember these examples are only demonstration purposes and in real application you may have to handle complex situation.

package com.roytuts.java.fork.join;

public class Employee {

	private String name;
	private float halfYear;

	public Employee(String name, float halfYear) {
		this.name = name;
		this.halfYear = halfYear;
	}

	public String getName() {
		return name;
	}

	public float getHalfYear() {
		return halfYear;
	}

}
package com.roytuts.java.fork.join;

import java.util.List;
import java.util.concurrent.Callable;

public class EmployeeHalfYearlyAppraisal implements Callable<Float> {

	private List<Employee> employees;

	public EmployeeHalfYearlyAppraisal(List<Employee> employees) {
		this.employees = employees;
	}

	@Override
	public Float call() throws Exception {
		float sum = 0;
		for (Employee employee : employees) {
			sum += employee.getHalfYear();
		}
		return (sum / employees.size());
	}

}
package com.roytuts.java.fork.join;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class EmployeeHalfYearlyAppraisalApp {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		List<Employee> employees1 = new ArrayList<>();
		employees1.add(new Employee("Rajiv", 3.0f));
		employees1.add(new Employee("Alinaksha", 2.0f));
		employees1.add(new Employee("Rajat", 2.5f));
		employees1.add(new Employee("Sushil", 3.0f));
		employees1.add(new Employee("Shelshi", 3.2f));

		List<Employee> employees2 = new ArrayList<>();
		employees2.add(new Employee("Pavni", 3.0f));
		employees2.add(new Employee("Suprabhat", 2.0f));
		employees2.add(new Employee("Anindya", 2.5f));
		employees2.add(new Employee("Saugata", 3.0f));
		employees2.add(new Employee("Rajesh", 3.2f));

		List<Employee> employees3 = new ArrayList<>();
		employees3.add(new Employee("Shirisha", 3.0f));
		employees3.add(new Employee("Satish", 2.0f));
		employees3.add(new Employee("Luke", 2.5f));
		employees3.add(new Employee("Seham", 3.0f));
		employees3.add(new Employee("Vandana", 3.2f));

		List<Employee> employees4 = new ArrayList<>();
		employees4.add(new Employee("Abhisekh", 3.0f));
		employees4.add(new Employee("Rania", 2.0f));
		employees4.add(new Employee("Prakhar", 2.5f));
		employees4.add(new Employee("Princy", 3.0f));
		employees4.add(new Employee("Bharti", 3.2f));

		List<Employee> employees5 = new ArrayList<>();
		employees5.add(new Employee("Ahmed", 3.0f));
		employees5.add(new Employee("Mohamed", 2.0f));
		employees5.add(new Employee("Aditi", 2.5f));
		employees5.add(new Employee("Zainab", 3.0f));
		employees5.add(new Employee("Rashmi", 3.2f));

		EmployeeHalfYearlyAppraisal halfYearlyAppraisal1 = new EmployeeHalfYearlyAppraisal(employees1);
		EmployeeHalfYearlyAppraisal halfYearlyAppraisal2 = new EmployeeHalfYearlyAppraisal(employees2);
		EmployeeHalfYearlyAppraisal halfYearlyAppraisal3 = new EmployeeHalfYearlyAppraisal(employees3);
		EmployeeHalfYearlyAppraisal halfYearlyAppraisal4 = new EmployeeHalfYearlyAppraisal(employees4);
		EmployeeHalfYearlyAppraisal halfYearlyAppraisal5 = new EmployeeHalfYearlyAppraisal(employees5);

		List<EmployeeHalfYearlyAppraisal> appraisals = new ArrayList<EmployeeHalfYearlyAppraisal>();

		appraisals.add(halfYearlyAppraisal1);
		appraisals.add(halfYearlyAppraisal2);
		appraisals.add(halfYearlyAppraisal3);
		appraisals.add(halfYearlyAppraisal4);
		appraisals.add(halfYearlyAppraisal5);

		ExecutorService service = Executors.newFixedThreadPool(5);

		List<Future<Float>> results = service.invokeAll(appraisals);

		float sum = 0;
		for (Future<Float> result : results) {
			sum += result.get();
		}

		System.out.println("Average half-yearly rating for all employees: " + (sum / results.size()));

		service.shutdown();
	}

}

The output you will get is:

Average half-yearly rating for all employees: 2.74

Let’s implement the same example using fork join.

The ForkJoinPool executor is dedicated to running instances implementing ForkJoinTask. ForkJoinTask objects support the creation of subtasks plus waiting for the subtasks to complete.

ForkJoinTask objects feature two specific methods:

  • The fork() method allows a ForkJoinTask to be planned for asynchronous execution. This allows a new ForkJoinTask to be launched from an existing one.
  • In turn, the join() method allows a ForkJoinTask to wait for the completion of another one.

There are two types of ForkJoinTask specializations:

  • Instances of RecursiveAction represent executions that do not yield a return value.
  • In contrast, instances of RecursiveTask yield return values.

In general, RecursiveTask is preferred because most divide-and-conquer algorithms return a value from a computation over a data set.

If you instantiate ForkJoinPool using no argument constructor, it sets parallelism to number of processors and uses default thread factory. ForkJoinPool provides other constructors which allow you to specify parallelism, thread factory and exception handler.

Here we will get the available cores from the system and use that as a number of threads for parallelism. You can also find the number of cores by pressing Ctrl+Shift+Esc key on Windows system and check on the Performance tab to see the number of cores for your system.

Here we will create RecursiveTask and submit it to ForkJoinPool. RecursiveTask returns result meaning compute method of RecursiveTask returns result.

package com.roytuts.java.fork.join;

import java.util.List;
import java.util.concurrent.RecursiveTask;

public class EmployeeHalfYearlyAverageRatingTask extends RecursiveTask<Float> {

	private static final long serialVersionUID = 1L;

	private List<Employee> employees;

	public EmployeeHalfYearlyAverageRatingTask(List<Employee> employees) {
		this.employees = employees;
	}

	@Override
	protected Float compute() {
		if (employees == null || employees.size() < 1) {
			return 0f;
		} else if (employees.size() == 1) {
			return getRating(employees.get(0));
		}

		List<Employee> empList1 = employees.subList(0, employees.size() / 2);
		List<Employee> empList2 = employees.subList(employees.size() / 2, employees.size());

		EmployeeHalfYearlyAverageRatingTask cTaskOne = new EmployeeHalfYearlyAverageRatingTask(empList1);
		cTaskOne.fork();

		EmployeeHalfYearlyAverageRatingTask cTaskTwo = new EmployeeHalfYearlyAverageRatingTask(empList2);

		return cTaskTwo.compute() + cTaskOne.join();
	}

	private Float getRating(Employee employee) {
		return employee.getHalfYear();
	}

}
package com.roytuts.java.fork.join;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

public class EmployeeHalfYearlyAverageRatingTaskApp {

	public static void main(String[] args) {
		int threads = Runtime.getRuntime().availableProcessors();

		ForkJoinPool forkJoinPool = new ForkJoinPool(threads);

		List<Employee> employees = new ArrayList<>();
		employees.add(new Employee("Rajiv", 3.0f));
		employees.add(new Employee("Alinaksha", 2.0f));
		employees.add(new Employee("Rajat", 2.5f));
		employees.add(new Employee("Sushil", 3.0f));
		employees.add(new Employee("Shelshi", 3.2f));

		employees.add(new Employee("Pavni", 3.0f));
		employees.add(new Employee("Suprabhat", 2.0f));
		employees.add(new Employee("Anindya", 2.5f));
		employees.add(new Employee("Saugata", 3.0f));
		employees.add(new Employee("Rajesh", 3.2f));

		employees.add(new Employee("Shirisha", 3.0f));
		employees.add(new Employee("Satish", 2.0f));
		employees.add(new Employee("Luke", 2.5f));
		employees.add(new Employee("Seham", 3.0f));
		employees.add(new Employee("Vandana", 3.2f));

		employees.add(new Employee("Abhisekh", 3.0f));
		employees.add(new Employee("Rania", 2.0f));
		employees.add(new Employee("Prakhar", 2.5f));
		employees.add(new Employee("Princy", 3.0f));
		employees.add(new Employee("Bharti", 3.2f));

		employees.add(new Employee("Ahmed", 3.0f));
		employees.add(new Employee("Mohamed", 2.0f));
		employees.add(new Employee("Aditi", 2.5f));
		employees.add(new Employee("Zainab", 3.0f));
		employees.add(new Employee("Rashmi", 3.2f));

		EmployeeHalfYearlyAverageRatingTask etask = new EmployeeHalfYearlyAverageRatingTask(employees);

		float avgRating = forkJoinPool.invoke(etask) / employees.size();

		System.out.println("Average rating: " + avgRating);
	}

}

The above class will give you output as Average rating: 2.74.

Alternatively you can also use the parallel stream API to compute average of the employees. You need to submit list of employees to the ForkJoinPool as given below:

package com.roytuts.java.fork.join;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;

public class EmployeeHalfYearlyAverageRating {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		int threads = Runtime.getRuntime().availableProcessors();

		ForkJoinPool forkJoinPool = new ForkJoinPool(threads);

		List<Employee> employees = new ArrayList<>();
		employees.add(new Employee("Rajiv", 3.0f));
		employees.add(new Employee("Alinaksha", 2.0f));
		employees.add(new Employee("Rajat", 2.5f));
		employees.add(new Employee("Sushil", 3.0f));
		employees.add(new Employee("Shelshi", 3.2f));

		employees.add(new Employee("Pavni", 3.0f));
		employees.add(new Employee("Suprabhat", 2.0f));
		employees.add(new Employee("Anindya", 2.5f));
		employees.add(new Employee("Saugata", 3.0f));
		employees.add(new Employee("Rajesh", 3.2f));

		employees.add(new Employee("Shirisha", 3.0f));
		employees.add(new Employee("Satish", 2.0f));
		employees.add(new Employee("Luke", 2.5f));
		employees.add(new Employee("Seham", 3.0f));
		employees.add(new Employee("Vandana", 3.2f));

		employees.add(new Employee("Abhisekh", 3.0f));
		employees.add(new Employee("Rania", 2.0f));
		employees.add(new Employee("Prakhar", 2.5f));
		employees.add(new Employee("Princy", 3.0f));
		employees.add(new Employee("Bharti", 3.2f));

		employees.add(new Employee("Ahmed", 3.0f));
		employees.add(new Employee("Mohamed", 2.0f));
		employees.add(new Employee("Aditi", 2.5f));
		employees.add(new Employee("Zainab", 3.0f));
		employees.add(new Employee("Rashmi", 3.2f));

		Double totalRating = forkJoinPool.submit(() -> {
			return employees.stream().parallel().mapToDouble(e -> e.getHalfYear()).sum();
		}).get();

		System.out.println("Average rating: " + (totalRating.floatValue() / employees.size()));
	}

}

The above class will give you output as Average rating: 2.74.

Source Code

Download

Thanks for reading.

Leave a Reply

Your email address will not be published. Required fields are marked *