CompletableFuture in Java 8 or later

Introduction

Here I will discuss about Java 8 or later version’s new feature CompletableFuture in Java programming language. Using Java 8 or later version’s CompletableFuture API you can complete the tasks in an ad hoc manner.

A Future represents the pending result of an asynchronous computation. It offers a method — get() — that returns the result of the computation when it’s done.

The problem is that a call to get is blocking until the computation is done. This is quite restrictive and can quickly make the asynchronous computation pointless.

CompletableFuture<T> extends Future<T> and makes it completable in an ad hoc manner. This is a big deal, considering that Future objects were limited before Java 8.

Benefits of CompletableFuture

This new and improved CompletableFuture has mainly two benefits:

  • It can be explicitly completed by calling the complete() method without any synchronous wait. It allows values of any type to be available in the future with default return values, even if the computation did not complete, using default/intermediate results.
  • It also allows you to build a pipeline data process in a series of actions. You can find a number of patterns for CompletableFutures such as creating a CompletableFuture from a task, or building a CompletableFuture chain.

Interface Implementation

CompletableFuture implements interface CompletionStage with the following policies:

  • Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
  • All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task). To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface CompletableFuture.AsynchronousCompletionTask.
  • All CompletionStage methods are implemented independently of other public methods, so the behavior of one method is not impacted by overrides of others in subclasses.

CompletableFuture also implements Future with the following policies:

  • Since (unlike FutureTask) this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion. Method cancel has the same effect as completeExceptionally(new CancellationException()). Method isCompletedExceptionally() can be used to determine if a CompletableFuture completed in any exceptional fashion.
  • In case of exceptional completion with a CompletionException, methods get() and get(long, TimeUnit) throw an ExecutionException with the same cause as held in the corresponding CompletionException. To simplify usage in most contexts, this class also defines methods join() and getNow(T) that instead throw the CompletionException directly in these cases.

CompletableFuture Examples

A CompletionStage is a promise that ensures the computation will eventually be done and the great thing about the CompletionStage is, it offers a vast selection of methods that let you attach callbacks that will be executed on completion. This way you can build systems in a non-blocking fashion.

CompletableFuture<Double> cf = CompletableFuture.supplyAsync(this::createRandomNumber);

supplyAsync() takes a Supplier containing the code you want to execute asynchronously — in your case the createRandomNumber() or createId() method.

If you had worked with Future then you may surprise where the Executor gone. If you want, you can still pass Executor as a second argument to the supplyAsync() method. However, if you do not pass any Executor as a second argument then it will be submitted to the default ForkJoinPool.commonPool().

CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::createId).thenApply(this::convert)
                .thenAccept(this::store);

thenAccept() is one of many ways to add a callback. It takes a Consumer — in our case a method called store — which handles the result of the preceding computation when it is done but it does not return any value.

If you want to continue passing values from one callback to another and also if you want to return value and pass that to next callback then you can use thenApply() callback. In our example, thenApply() takes an argument a method called convert.

In the example above, everything will be executed on the same thread. This results in the last message waiting for the first message to complete.

Now look at the below example each message is submitted as a separate task to the ForkJoinPool.commonPool(). This results in both the sendURL() callbacks being executed when the preceding calculation(findURL) is done.

CompletableFuture<String> cf = CompletableFuture.supplyAsync(this::findURL);
//execute the below task on separate thread on completion of previous task cf
CompletableFuture<String> resp1 = cf.thenApplyAsync(this::sendURL);
//execute the below task on separate thread on completion of previous task cf
CompletableFuture<String> resp2 = cf.thenApplyAsync(this::sendURL);
cf.thenAccept(System.out::println);
resp1.thenAccept(System.out::println);
resp2.thenAccept(System.out::println);

The key is — the asynchronous version can be convenient when you have several callbacks dependent on the same computation.

As you know, sometimes you may face things go wrong as you have already faced working with Future but CompletableFuture has a feature to handle such situation in a nice way, using execptionally().

exceptionally() gives us a chance to recover by taking an alternative function that will be executed if preceding calculation fails with an exception. This way succeeding callbacks can continue with the alternative result as input.

CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::failureMsg).exceptionally(ex -> notify(ex))
                .thenAccept(this::notify);
cf.get();

Sometimes you need to create a callback that is dependent on the result of two computations. This is where thenCombine() comes into the picture.

CompletableFuture<Double> cfNum = CompletableFuture.supplyAsync(this::createRandomNumber);
CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
CompletableFuture<String> resp = cfNum.thenCombine(cfUrl, this::notify);

First, I have started two asynchronous jobs — creating random number and finding a URL. Then I use thenCombine() to do with the result of these two computations by defining our method notify.

I have covered the scenario where you were dependent on two computations for further process. But, what about when you just need the result of one of them?

CompletableFuture<Double> cfNum = CompletableFuture.supplyAsync(this::createRandomNumber);
CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
CompletableFuture<String> resp = cfNum.thenCombine(cfUrl, this::notify);

The complete example of CompletableFuture is given below:

package com.roytuts.java.completablefuture;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureApi {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		CompletableFutureApi example = new CompletableFutureApi();

		// chain example
		example.exampleOp();

		// explicitly complete
		example.completeOp();

		// return known value
		example.knownValueOp();

		// separate task using the async suffix
		example.separateTasksOp();

		// exceptionally
		// example.exceptionallyOp();
		// thenCombine
		example.combineOp();

		// acceptEither
		example.acceptEitherOp();
	}

	public void exampleOp() throws InterruptedException, ExecutionException {
		// block and wait for the result
		// CompletableFuture allows you to build pipeline executed
		// asynchronously within the ForkJoinPool
		CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::createId).thenApply(this::convert)
				.thenAccept(this::store);
		System.out.println("null : " + cf.get());
	}

	public void completeOp() throws InterruptedException, ExecutionException {
		CompletableFuture<Double> cf = CompletableFuture.supplyAsync(this::createRandomNumber);
		// if you want to control concurrency using ExecutorService, i.e., if
		// you do not want to submit the task to default
		// ForkJoinPool.commonPool(), then pass es as second argument to the
		// supplyAsync method
		/*
		 * ExecutorService es = Executors.newFixedThreadPool(2);
		 * CompletableFuture<Double> cf =
		 * CompletableFuture.supplyAsync(this::createRandomNumber, es);
		 */
		// explicitly complete and return default value if you do not want to
		// wait for the task to complete
		cf.complete(434345.8765);
		System.out.println("Random Number : " + cf.get());
	}

	public void knownValueOp() throws InterruptedException, ExecutionException {
		// create a completed CompletableFuture in advance that returns a known
		// value
		// this might come in handy in testing environment, in case you will
		// want to combine that known value with one that needs to be computed
		CompletableFuture<String> cf = CompletableFuture.completedFuture("I'm done");
		cf.isDone(); // return true
		cf.join(); // return "I'm done"
		System.out.println("Known value : " + cf.get());
	}

	public void separateTasksOp() throws InterruptedException, ExecutionException {
		CompletableFuture<String> cf = CompletableFuture.supplyAsync(this::findURL);
		// execute the below task on separate thread on completion of previous
		// task cf
		CompletableFuture<String> resp1 = cf.thenApplyAsync(this::sendURL);
		// execute the below task on separate thread on completion of previous
		// task cf
		CompletableFuture<String> resp2 = cf.thenApplyAsync(this::sendURL);
		cf.thenAccept(System.out::println);
		// cf.thenAcceptAsync(System.out::println);
		resp1.thenAccept(System.out::println);
		// resp1.thenAcceptAsync(System.out::println);
		resp2.thenAccept(System.out::println);
		// resp2.thenAcceptAsync(System.out::println);
	}

	public void exceptionallyOp() throws InterruptedException, ExecutionException {
		CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::failureMsg).exceptionally(ex -> notify(ex))
				.thenAccept(this::notify);
		cf.get();
	}

	public void combineOp() throws InterruptedException, ExecutionException {
		// what we want to do with the result of these two computations
		CompletableFuture<Double> cfNum = CompletableFuture.supplyAsync(this::createRandomNumber);
		CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
		CompletableFuture<String> resp = cfNum.thenCombine(cfUrl, this::notify);
		System.out.println("Combine : " + resp.get());
	}

	public void acceptEitherOp() throws InterruptedException, ExecutionException {
		// what we want to do with the result of any of the two computations
		CompletableFuture<String> cfName = CompletableFuture.supplyAsync(this::findName);
		CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
		cfName.acceptEither(cfUrl, this::sendMsg);
	}

	private Double createRandomNumber() {
		return Math.random();
	}

	private UUID createId() {
		return UUID.randomUUID();
	}

	private String convert(UUID input) {
		return input.toString();
	}

	private String findURL() {
		return "roytuts.com";
	}

	private String findName() {
		return "Roy Tutorials";
	}

	private String sendURL(String url) {
		return "Sending " + url + " to destination";
	}

	private String failureMsg() {
		throw new RuntimeException("Failured due to Exception");
	}

	private String notify(Throwable t) {
		throw new RuntimeException(t.getMessage());
	}

	private void notify(String msg) {
		System.out.println("The message : " + msg);
	}

	private void sendMsg(String msg) {
		System.out.println("The message : " + msg);
	}

	private String notify(Double num, String msg) {
		return msg + "," + num;
	}

	private void store(String message) {
		System.out.println("message : " + message);
	}

}

The output of the above code is given below:

message : 2ac0786b-8768-4368-b9cb-a2c3e407036b
null : null
Random Number : 434345.8765
Known value : I'm done
roytuts.com
Sending roytuts.com to destination
Sending roytuts.com to destination
Combine : roytuts.com,0.8346270693364803
The message : Roy Tutorials

Source Code

Download

Leave a Reply

Your email address will not be published. Required fields are marked *