Java Concurrency (Multithreading) – CompletableFuture Explained

Starting from Java 8 we finally have such an elegant and intuitive way to deal with concurrency with help of CompletableFuture. But the plenty of different CompletableFuture methods a bit confusing. So let’s bring some order to that.

First, let’s figure out what is the difference between runAsync and suplyAsync:

runAsync implements Runnable interface, creates new thread and not allows to return a value.

suplyAsync implements Supplier interface, creates new thread in the thread pool and returns a value of a parameterized type.

This is the list of CompletableFuture methods with their characteristics:

Method Async method Arguments Returns
thenRun() thenRunAsync()             –             –
thenAccept() thenAcceptAsync() Result of previous stage             –
thenApply() thenApplyAsync() Result of previous stage Result of current stage
thenCompose() thenComposeAsync() Result of previous stage Future result of current stage
thenCombine() thenCombineAsync() Result of two previous stages Result of current stage
whenComplete() whenCompleteAsync() Result or exception from previous stage             –

 

Let’s see some code samples. I believe learning by practical example is the best way to learn something new.
I created several different methods in order to demonstrate CompletableFuture abilities:

    public String method1() {
        System.out.println("Running method1... thread id: " + Thread.currentThread().getId());
        String returnValue = "Hello from method1!";
        System.out.println("Return value: " + returnValue);
        return returnValue;
    }

    public void method2(String arg) {
        System.out.println("Running method2... thread id: " + Thread.currentThread().getId());
        System.out.println("Incoming argument: " + arg);
    }

    public String method3(String arg) {
        System.out.println("Running method3... thread id: " + Thread.currentThread().getId());
        System.out.println("Incoming argument: " + arg);
        String returnValue = "Hello from method3!";
        System.out.println("Return value: " + returnValue);
        return returnValue;
    }


    public CompletableFuture<String> method4(String arg) {
        return CompletableFuture.supplyAsync(()->{
            System.out.println("Running method4... thread id: " + Thread.currentThread().getId());
            return arg + "  - Well, hello to you too!";
        });
    }

    public CompletableFuture<String> method5(String arg) {
        return CompletableFuture.supplyAsync(()->{
            System.out.println("Running method5... thread id: " + Thread.currentThread().getId());
            return arg + arg + arg;
        });
    }

 

suplyAsync and thenAccept

Now let’s run the following code:

System.out.println("Main thread running... thread id: " + Thread.currentThread().getId());

CompletableFuture.supplyAsync(c::method1).thenAccept(c::method2);

System.out.println("Main thread finished");
    

Once we run the code this is what we’ll get:

Main thread running... thread id: 1
Running method1... thread id: 12
Return value: Hello from method1!
Running method2... thread id: 1
Incoming argument: Hello from method1!
Main thread finished

As you can see method1 running in a separate thread, after its completion the result has been passed to method2.

thenApply

thenApply used for passing values from one callback to another. Thus we can chain multiple methods. Let’s change our previous line of code by following:

CompletableFuture.supplyAsync(c::method1).thenApply(c::method3).thenAccept(c::method2);

The Program Output:
Main thread running... thread id: 1
Running method1... thread id: 12
Return value: Hello from method1!
Running method3... thread id: 1
Incoming argument: Hello from method1!
Return value: Hello from method3!
Running method2... thread id: 1
Incoming argument: Hello from method3!
Main thread finished

 

thenCompose

thenCompose used for scenarios where there is a need to call different methods that return CompletableFuture and combine the result.

        System.out.println("Main thread running... thread id: " + Thread.currentThread().getId());

        CompletableFuture<String> finalResult = CompletableFuture.supplyAsync(this::method1).thenCompose(this::method4);

        System.out.println(finalResult.get());
        
        System.out.println("Main thread finished");

Result:

Main thread running... thread id: 1
Running method1... thread id: 14
Return value: Hello from method1!
Running method4... thread id: 14
Hello from method1!  - Well, hello to you too!
Main thread finished

So here method4 waits for a result from method1 and takes it as an input parameter.

Now let’s replace thenCompose(this::method4) with thenComposeAsync(this::method4):

Main thread running... thread id: 1
Running method1... thread id: 14
Return value: Hello from method1!
Running method4... thread id: 15
Hello from method1!  - Well, hello to you too!
Main thread finished

Notice that method4 runs in a new thread.

 

allOf

Sometimes you need to wait for all CompletableFuture calls to complete and only then to run some other task. This is how you do it.

        System.out.println("Main thread running... thread id: " + Thread.currentThread().getId());

        CompletableFuture<Void> all = CompletableFuture.allOf(method4("Hello!"), method5("Hi!"));
        CompletableFuture cf = all.thenRun(() ->
                System.out.println("Running this after all ... thread id: " + Thread.currentThread().getId()));

        cf.get();

        System.out.println("Main thread finished");

The program output:

Main thread running... thread id: 1
Running method4... thread id: 14
Running method5... thread id: 15
Running this after all ... thread id: 14
Main thread finished

 

exceptionally

exceptionally is used for error handling. In this crazy world of multi-threading exceptions definitely will happen. This is how you deal with them:

         CompletableFuture.supplyAsync(() -> {
            try {
                int i = 1 / 0;  //  Do not try it at home :)))
            } catch (Exception e) {
                throw new RuntimeException("Division by zero!!!", e);
            }
            return "Hello World!";
        })
                .thenAcceptAsync(s -> {
                    System.out.println("Result: " + s);
                })
                .exceptionally(e -> {
                    System.err.println("Error! " + e.getMessage());
                    return null;
                });

The program output:

Error! java.lang.RuntimeException: Division by zero!!!

 

join / get

These methods are used to get the result from CompletableFuture stage. Usually it’s bad practice to call any of these methods because it will actually force the main thread to wait for the result. The difference between join and get is that join returns the result when a task completed or throws unchecked exception if a task completed exceptionally while get waits if necessary for the future to complete and if future is not completed with specified time (can be passed as parameter to the method) throws an exception.

A practical example of using join:

    public CompletableFuture<String> printMsg(String arg) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println(arg);
            return arg;
        });
    }

    String joinedFuture = Stream.of(printMsg("Hello"), printMsg("Java"), printMsg("Concurrency!"))
                .map(CompletableFuture::join)
                .collect(Collectors.joining(" "));

    System.out.println(joinedFuture);

The program output:

Hello
Java
Concurrency!
Hello Java Concurrency!

2 COMMENTS

Leave a Reply to Abdo Cancel reply

Please enter your comment!
Please enter your name here

This site uses Akismet to reduce spam. Learn how your comment data is processed.