Java 8 Stream API

Introduction

In this tutorial I will explain about Java’s new API called stream, which was introduced in Java 8 and included in package java.util.stream.

The stream API supports functional-style operations on collection of elements or stream of elements in a collection.

For example, you only want to print even numbers from a list of integer numbers, then you can use the stream API in the following manner:

List<Integer> integers = Arrays.asList(1,2,3,5,8,11);
integers.stream().filter(i -> i % 2 == 0).forEach(i -> System.out.println(i));

The classes Stream, IntStream, LongStream, and DoubleStream are streams over objects and the primitive int, long and double types.

Difference Between Streams and Collections

  1. Stream has no storage because it is not a datastructure, that stores elements, like collection. Stream only conveys elements from a source such as a data structure, an array, a generator function, or an I/O channel, through a pipeline of computational operations.
  2. Stream is functional in nature becuase an operation on it produces a result rather than modifying its source. For example, in the above example filtering an even number (stream) from a collection of integers produce a new Stream without the filtered integers (elements), rather than removing integers (elements) from the source collection (collection name is integers).
  3. Many stream operations, such as filtering, mapping, or duplicate removal, can be implemented lazily, exposing opportunities for optimization.
  4. Stream is unbounded because it does not have finite size whereas collection has finite size.
  5. The elements of a stream are only visited once during the life of a stream. Like an Iterator, a new stream must be generated to revisit the same elements of the source.

Stream Operations And Pipelines

Stream operations are divided into intermediate and terminal operations, and are combined to form stream pipelines.

Intermediate operations such as filter(), map(), etc. are always lazy and they create a new stream. For example, filter() does not actually perform any filtering on the initial source stream but it creates a new stream that contains the elements of the initial stream that matches the given predicate.

Intermediate operations are further divided into stateless and stateful operations.

Stateless operations, such as filter and map, retain no state from previously seen element when processing a new element – each element can be processed independently of operations on other elements.

Stateful operations, such as distinct and sorted, may incorporate state from previously seen elements when processing new elements.

Terminal operations, such as Stream.forEach() or IntStream.sum(), may traverse the stream to produce a result or a side-effect.

Once the terminal operation is performed you cannot use the stream pipline with other terminal operation. If you need to use any terminal operation again then you must use the source data to get a new stream.

In most of the cases, terminal operations are eager, completing their traversal of the data source and processing of the pipeline before returning.

Only the terminal operations iterator() and spliterator() are not; these are provided as an “escape hatch” to enable arbitrary client-controlled pipeline traversals.

Nature of Stream API

Parallelism

All streams operations can execute either in serial or in parallel. The stream implementations in the JDK create serial streams unless parallelism is explicitly requested. To execute the prior “sum of even numbers from a collection” query in parallel, you would do as follows:

int sumOfEvenNumbers = integers.parallelStream().filter(i -> i % 2 == 0).mapToInt(i -> i).sum();

The only difference between the serial and parallel versions of this example is the creation of the initial stream, using parallelStream() instead of stream().

Non-Interference

For most data sources, non-interference means ensuring that the data source is not modified at all during the execution of the stream pipeline. The notable exception to this are streams whose sources are concurrent collections, which are specifically designed to handle concurrent modification. Concurrent stream sources are those whose Spliterator reports the CONCURRENT characteristic.

Accordingly, behavioral parameters in stream pipelines whose source might not be concurrent should never modify the stream’s data source.

A behavioral parameter is said to interfere with a non-concurrent data source if it modifies, or causes to be modified, the stream’s data source. Unless the stream source is concurrent, modifying a stream’s data source during execution of a stream pipeline can cause exceptions, incorrect answers, or nonconformant behavior.

For well-behaved stream sources, the source can be modified before the terminal operation commences and those modifications will be reflected in the covered elements.

For example, consider the following code:

List<Integer> l = new ArrayList(Arrays.asList(1, 2));
Stream<Integer> sl = l.stream();
l.add(3);
int s = sl.collect(Collectors.summingInt(n -> n));
System.out.println(s);

First a list is created consisting of two integers: “1”; and “2”. Then a stream is created from that list. Next the list is modified by adding a third integer : “3”. Finally the elements of the stream are collected and summed together. Since the list was modified before the terminal collect operation commenced the result will be a total sum of “1, 2, and 3” is equal to “6”.

Stateless Behaviors

Stream pipeline results may be nondeterministic or incorrect if the behavioral parameters to the stream operations are stateful.

A stateful lambda (or other object implementing the appropriate functional interface) is one whose result depends on any state which might change during the execution of the stream pipeline. An example of a stateful lambda is the parameter to map() in:

Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...

Here, if the mapping operation is performed in parallel, the results for the same input could vary from run to run, due to thread scheduling differences, whereas, with a stateless lambda expression the results would always be the same.

Note also that attempting to access mutable state from behavioral parameters presents you with a bad choice with respect to safety and performance; if you do not synchronize access to that state, you have a data race and therefore your code is broken, but if you do synchronize access to that state, you risk having contention undermine the parallelism you are seeking to benefit from.

The best approach is to avoid stateful behavioral parameters to stream operations entirely; there is usually a way to restructure the stream pipeline to avoid statefulness.

Side Effects

Side-effects in behavioral parameters to stream operations are, in general, discouraged, as they can often lead to unwitting violations of the statelessness requirement, as well as other thread-safety hazards.

ArrayList<Integer> results = new ArrayList<>();
        integers.stream().filter(i -> i%2 == 0)
               .forEach(i -> results.add(i));  // Unnecessary use of side-effects!

This code unnecessarily uses side-effects. If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism.

Furthermore, using side-effects here is completely unnecessary; the forEach() can simply be replaced with a reduction operation that is safer, more efficient, and more amenable to parallelization:

List<Integer> results =
                integers.stream().filter(i -> i%2==0)
                      .collect(Collectors.toList());  // No side-effects!

Ordering

Streams may or may not have a defined order and it depends on the source and the intermediate operations.

Certain stream sources such as List, arrays are intrinsically ordered, whereas others like HashSet are not.

Likewise, some intermediate operations such as sorted() may impose an ordered stream otherwise unordered.

Further, some terminal operations may ignore encounter order, such as forEach().

For sequential streams, the presence or absence of an encounter order does not affect performance, only determinism. If a stream is ordered, repeated execution of identical stream pipelines on an identical source will produce an identical result; if it is not ordered, repeated execution might produce different results.

For parallel streams, relaxing the ordering constraint can sometimes enable more efficient execution. Certain aggregate operations, such as filtering duplicates (distinct()) or grouped reductions (Collectors.groupingBy()) can be implemented more efficiently if ordering of elements is not relevant.

Similarly, operations that are intrinsically tied to encounter order, such as limit(), may require buffering to ensure proper ordering, undermining the benefit of parallelism.

In cases where the stream has an encounter order, but the user does not particularly care about that encounter order, explicitly de-ordering the stream with unordered() may improve parallel performance for some stateful or terminal operations.

Reduction Operations

A reduction operation (also called a fold) takes a sequence of input elements and combines them into a single summary result by repeated application of a combining operation, such as finding the sum or maximum of a set of numbers, or accumulating elements into a list.

The streams classes have multiple forms of general reduction operations, called reduce() and collect(), as well as multiple specialized reduction forms such as sum(), max(), or count().

Of course, such operations can be readily implemented as simple sequential loops, as in:

 int sum = 0;
    for (int x : numbers) {
       sum += x;
    }

However, there are good reasons to prefer a reduce operation over a mutative accumulation such as the above. Not only is a reduction “more abstract” — it operates on the stream as a whole rather than individual elements — but a properly constructed reduce operation is inherently parallelizable, so long as the function(s) used to process the elements are associative and stateless.

For example, given a stream of numbers for which we want to find the sum, we can write:

int sum = numbers.stream().reduce(0, (x,y) -> x+y);

or:

int sum = numbers.stream().reduce(0, Integer::sum);

These reduction operations can run safely in parallel with almost no modification:

int sum = numbers.parallelStream().reduce(0, Integer::sum);

Reduction parallelizes well because the implementation can operate on subsets of the data in parallel, and then combine the intermediate results to get the final correct answer.

Associativity

An operator or function op is associative if the following holds:

(a op b) op c == a op (b op c)

The importance of this to parallel evaluation can be seen if we expand this to four terms:

a op b op c op d == (a op b) op (c op d)

So we can evaluate (a op b) in parallel with (c op d), and then invoke op on the results.

Examples of associative operations include numeric addition, min, and max, and string concatenation.

That’s a brief on the Stream API in Java 8 or later.

Leave a Reply

Your email address will not be published. Required fields are marked *