Introduction
Here I will discuss about Java 8 or later version’s new feature CompletableFuture
in Java programming language. Using Java 8 or later version’s CompletableFuture
API you can complete the tasks in an ad hoc manner.
A Future
represents the pending result of an asynchronous computation. It offers a method — get()
— that returns the result of the computation when it’s done.
The problem is that a call to get is blocking until the computation is done. This is quite restrictive and can quickly make the asynchronous computation pointless.
CompletableFuture<T>
extends Future<T>
and makes it completable in an ad hoc manner. This is a big deal, considering that Future objects were limited before Java 8.
Benefits of CompletableFuture
This new and improved CompletableFuture
has mainly two benefits:
- It can be explicitly completed by calling the
complete()
method without any synchronous wait. It allows values of any type to be available in the future with default return values, even if the computation did not complete, using default/intermediate results. - It also allows you to build a pipeline data process in a series of actions. You can find a number of patterns for
CompletableFuture
s such as creating aCompletableFuture
from a task, or building aCompletableFuture
chain.
Interface Implementation
CompletableFuture implements interface CompletionStage
with the following policies:
- Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current
CompletableFuture
, or by any other caller of a completion method. - All async methods without an explicit Executor argument are performed using the
ForkJoinPool.commonPool()
(unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task). To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interfaceCompletableFuture.AsynchronousCompletionTask
. - All
CompletionStage
methods are implemented independently of other public methods, so the behavior of one method is not impacted by overrides of others in subclasses.
CompletableFuture
also implements Future with the following policies:
- Since (unlike
FutureTask
) this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion. Method cancel has the same effect ascompleteExceptionally(new CancellationException())
. MethodisCompletedExceptionally()
can be used to determine if aCompletableFuture
completed in any exceptional fashion. - In case of exceptional completion with a
CompletionException
, methodsget()
andget(long, TimeUnit)
throw anExecutionException
with the same cause as held in the correspondingCompletionException
. To simplify usage in most contexts, this class also defines methods join() andgetNow(T)
that instead throw theCompletionException
directly in these cases.
CompletableFuture Examples
A CompletionStage
is a promise that ensures the computation will eventually be done and the great thing about the CompletionStage
is, it offers a vast selection of methods that let you attach callbacks that will be executed on completion. This way you can build systems in a non-blocking fashion.
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(this::createRandomNumber);
supplyAsync()
takes a Supplier
containing the code you want to execute asynchronously — in your case the createRandomNumber()
or createId()
method.
If you had worked with Future
then you may surprise where the Executor
gone. If you want, you can still pass Executor
as a second argument to the supplyAsync()
method. However, if you do not pass any Executor
as a second argument then it will be submitted to the default ForkJoinPool.commonPool()
.
CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::createId).thenApply(this::convert)
.thenAccept(this::store);
thenAccept()
is one of many ways to add a callback. It takes a Consumer
— in our case a method called store — which handles the result of the preceding computation when it is done but it does not return any value.
If you want to continue passing values from one callback to another and also if you want to return value and pass that to next callback then you can use thenApply()
callback. In our example, thenApply()
takes an argument a method called convert.
In the example above, everything will be executed on the same thread. This results in the last message waiting for the first message to complete.
Now look at the below example each message is submitted as a separate task to the ForkJoinPool.commonPool()
. This results in both the sendURL()
callbacks being executed when the preceding calculation(findURL)
is done.
CompletableFuture<String> cf = CompletableFuture.supplyAsync(this::findURL);
//execute the below task on separate thread on completion of previous task cf
CompletableFuture<String> resp1 = cf.thenApplyAsync(this::sendURL);
//execute the below task on separate thread on completion of previous task cf
CompletableFuture<String> resp2 = cf.thenApplyAsync(this::sendURL);
cf.thenAccept(System.out::println);
resp1.thenAccept(System.out::println);
resp2.thenAccept(System.out::println);
The key is — the asynchronous version can be convenient when you have several callbacks dependent on the same computation.
As you know, sometimes you may face things go wrong as you have already faced working with Future
but CompletableFuture
has a feature to handle such situation in a nice way, using execptionally()
.
exceptionally()
gives us a chance to recover by taking an alternative function that will be executed if preceding calculation fails with an exception. This way succeeding callbacks can continue with the alternative result as input.
CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::failureMsg).exceptionally(ex -> notify(ex))
.thenAccept(this::notify);
cf.get();
Sometimes you need to create a callback that is dependent on the result of two computations. This is where thenCombine()
comes into the picture.
CompletableFuture<Double> cfNum = CompletableFuture.supplyAsync(this::createRandomNumber);
CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
CompletableFuture<String> resp = cfNum.thenCombine(cfUrl, this::notify);
First, I have started two asynchronous jobs — creating random number and finding a URL. Then I use thenCombine()
to do with the result of these two computations by defining our method notify.
I have covered the scenario where you were dependent on two computations for further process. But, what about when you just need the result of one of them?
CompletableFuture<Double> cfNum = CompletableFuture.supplyAsync(this::createRandomNumber);
CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
CompletableFuture<String> resp = cfNum.thenCombine(cfUrl, this::notify);
The complete example of CompletableFuture
is given below:
package com.roytuts.java.completablefuture;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureApi {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFutureApi example = new CompletableFutureApi();
// chain example
example.exampleOp();
// explicitly complete
example.completeOp();
// return known value
example.knownValueOp();
// separate task using the async suffix
example.separateTasksOp();
// exceptionally
// example.exceptionallyOp();
// thenCombine
example.combineOp();
// acceptEither
example.acceptEitherOp();
}
public void exampleOp() throws InterruptedException, ExecutionException {
// block and wait for the result
// CompletableFuture allows you to build pipeline executed
// asynchronously within the ForkJoinPool
CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::createId).thenApply(this::convert)
.thenAccept(this::store);
System.out.println("null : " + cf.get());
}
public void completeOp() throws InterruptedException, ExecutionException {
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(this::createRandomNumber);
// if you want to control concurrency using ExecutorService, i.e., if
// you do not want to submit the task to default
// ForkJoinPool.commonPool(), then pass es as second argument to the
// supplyAsync method
/*
* ExecutorService es = Executors.newFixedThreadPool(2);
* CompletableFuture<Double> cf =
* CompletableFuture.supplyAsync(this::createRandomNumber, es);
*/
// explicitly complete and return default value if you do not want to
// wait for the task to complete
cf.complete(434345.8765);
System.out.println("Random Number : " + cf.get());
}
public void knownValueOp() throws InterruptedException, ExecutionException {
// create a completed CompletableFuture in advance that returns a known
// value
// this might come in handy in testing environment, in case you will
// want to combine that known value with one that needs to be computed
CompletableFuture<String> cf = CompletableFuture.completedFuture("I'm done");
cf.isDone(); // return true
cf.join(); // return "I'm done"
System.out.println("Known value : " + cf.get());
}
public void separateTasksOp() throws InterruptedException, ExecutionException {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(this::findURL);
// execute the below task on separate thread on completion of previous
// task cf
CompletableFuture<String> resp1 = cf.thenApplyAsync(this::sendURL);
// execute the below task on separate thread on completion of previous
// task cf
CompletableFuture<String> resp2 = cf.thenApplyAsync(this::sendURL);
cf.thenAccept(System.out::println);
// cf.thenAcceptAsync(System.out::println);
resp1.thenAccept(System.out::println);
// resp1.thenAcceptAsync(System.out::println);
resp2.thenAccept(System.out::println);
// resp2.thenAcceptAsync(System.out::println);
}
public void exceptionallyOp() throws InterruptedException, ExecutionException {
CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::failureMsg).exceptionally(ex -> notify(ex))
.thenAccept(this::notify);
cf.get();
}
public void combineOp() throws InterruptedException, ExecutionException {
// what we want to do with the result of these two computations
CompletableFuture<Double> cfNum = CompletableFuture.supplyAsync(this::createRandomNumber);
CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
CompletableFuture<String> resp = cfNum.thenCombine(cfUrl, this::notify);
System.out.println("Combine : " + resp.get());
}
public void acceptEitherOp() throws InterruptedException, ExecutionException {
// what we want to do with the result of any of the two computations
CompletableFuture<String> cfName = CompletableFuture.supplyAsync(this::findName);
CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
cfName.acceptEither(cfUrl, this::sendMsg);
}
private Double createRandomNumber() {
return Math.random();
}
private UUID createId() {
return UUID.randomUUID();
}
private String convert(UUID input) {
return input.toString();
}
private String findURL() {
return "roytuts.com";
}
private String findName() {
return "Roy Tutorials";
}
private String sendURL(String url) {
return "Sending " + url + " to destination";
}
private String failureMsg() {
throw new RuntimeException("Failured due to Exception");
}
private String notify(Throwable t) {
throw new RuntimeException(t.getMessage());
}
private void notify(String msg) {
System.out.println("The message : " + msg);
}
private void sendMsg(String msg) {
System.out.println("The message : " + msg);
}
private String notify(Double num, String msg) {
return msg + "," + num;
}
private void store(String message) {
System.out.println("message : " + message);
}
}
The output of the above code is given below:
message : 2ac0786b-8768-4368-b9cb-a2c3e407036b
null : null
Random Number : 434345.8765
Known value : I'm done
roytuts.com
Sending roytuts.com to destination
Sending roytuts.com to destination
Combine : roytuts.com,0.8346270693364803
The message : Roy Tutorials