Parallelism Example in Java Stream API

I am going to show you how to work with parallel stream in Java 8 or higher version of Java. In parallel computing a task is divided into sub-tasks and these sub-tasks are computed parallelly in each separate threads and finally the result of the solution for each sub-tasks is combined. One of the features for computing parallelism is to use Java’s join/fork framework, where you need to mention how the particular task will be sub-divided or partitioned into sub-tasks.

The main difficulty for parallel computing in applications is the collections that are not thread-safe. And it means that multiple threads cannot manipulate the collections. Java provides synchronization wrappers (synchronizedCollection(), synchronizedSet(), synchronizedList(), synchronizedMap(), etc.) for making collections thread-safe but again these collections cannot access parallelly because when a thread is manipulating a particular collection, it acquires lock on the entire collection and no other would be able to access it until the previous thread releases its lock.

Therefore parallelism is not actually automatically faster than operations performed sequentially or serially, but it can be achieved if your system has multiple processor cores. So it is your responsibility to check whether your application is fit for parallelism.

When a stream is executed parallelly, the Java runtime partitions the stream into multiple sub-streams. Then aggregate operations iterate over and process these sub-streams in parallel and then combine the results.

Parallel Execution

Let’s consider the following example that calculates the average of the given numbers.

List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);

double avg = integers.parallelStream().mapToInt(i -> i).average().getAsDouble();
System.out.println("Average: " + avg);

The above code snippets will give you the output 6.0. The average was calculated parallelly. To calculate parallel I invoked the parallel stream using the method parallelStream(). If you use the stream() method then it will compute the result in sequential manner.

Reduction Operation

Reduction operation is used in parallel computing and the main target of this operation is to reduce the elements into a single result. Let’s say you want to group all employees by their designation. The following code snippets group employees by their designations and convert the list of employees to map.

List<Employee> employees = Arrays.asList(new Employee("John", "Developer"), new Employee("Michel", "Sr Developer"), new Employee("Harris", "Developer"),
 new Employee("Kamla", "Sr Developer"), new Employee("Jerome", "Manager"));

Map<String, List<Employee>> byDesignationConcurrent = employees.stream()
.collect(Collectors.groupingBy(Employee::getDesignation));

So the above code gives you the following output:

Sr Developer => [Employee [name=Michel, designation=Sr Developer], Employee [name=Kamla, designation=Sr Developer]]
Developer => [Employee [name=John, designation=Developer], Employee [name=Harris, designation=Developer]]
Manager => [Employee [name=Jerome, designation=Manager]]

The following code snippets are similar to the above and give you the same output as above but it uses parallelStream(). This example uses ConcurrentMap and groupingByConcurrent() for parallel execution.

ConcurrentMap<String, List<Employee>> byDesignationConcurrentMap = employees.parallelStream().collect(Collectors.groupingByConcurrent(Employee::getDesignation));

This is an example of concurrent reduction operation and it is performed by Java runtime if the following conditions are met:

Ordering

The order in which the elements are processed in streams depend on whether you use sequential or parallel stream. Consider the following example that prints he integer elements from ArrayList with the forEach() operation several times.

List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);

The following forEach() operation prints the elements in the order that they were inserted into the list.

integers.stream().forEach(i -> System.out.print(i + " "));

Output: 1 2 3 4 5 6 7 8 9 10 11

The following code snippets will print in random order:

integers.parallelStream().forEach(i -> System.out.print(i + " "));

Output: 7 8 6 10 11 9 3 5 4 2 1

If you want to get your data in the same order they were inserted into the list then you can use forEachOrdered():

integers.parallelStream().forEachOrdered(i -> System.out.print(i + " "));

Output: 1 2 3 4 5 6 7 8 9 10 11

Side Effects

A method or an expression has a side effect if it modifies the state of computation in pipeline. JDK handles certain side effects for parallel streams. Operations like forEach and peek are designed for side effects. System.out.println also has side effect.

All intermediate operations in streams are lazy because they do not start processing of the contents of the stream until the terminal operation starts.

Ideally streams operations should not interfere. Interference occurs when the source of a stream is modified while a pipeline processes the stream.

Let’s take a look at the following code. The following code tries to add another string to the list names. However it throws ConcurrentModificationException.

List<String> names = new ArrayList<>();
names.add("John");
names.add("Michel");

try {
	String addAnotherName = names.stream().peek(s -> names.add("Adrish")).reduce((s1, s2) -> s1 + " " + s2)
			.get();

	System.out.println("Additional Name: " + addAnotherName);
} catch (Exception e) {
	System.out.println("Exception: " + e);
}

The code above attempts to concatenate the strings in names into an Optional<String> value with terminal operation reduce. But the pipeline here invokes the immediate operation (lazy) peek that attempts to add a new string to the names. The argument of the peek operation attempts to modify the stream source during the execution of the pipeline, which causes the Java runtime to throw a ConcurrentModificationException.

Avoid using stateful lambda expressions as parameters in stream operations. A stateful lambda expression is one whose result depends on any state that might change during the execution of a pipeline.

Java stream API provides intermediate stateful operations, such as, distinct(), sorted(), limit(), skip() and they maintain state internally for performing the operations.

The results from stateful operations can vary every time the code is run.

That’s all about parallel stream examples in Java stream API.

Source Code

Download

Leave a Reply

Your email address will not be published. Required fields are marked *