Asynchronous example programming: reconstructing java.util.concurrent.CompletableFuture methods

  • Tutorial

What is the need for reconstruction if the source code of this class is open?


If only because there is a highly optimized, hard-to-read code under the hood, the study of which does little in pedagogical terms.


Therefore, we will recreate the semantics of operations according to their specifications, and write functionally equivalent, understandable and readable code, although perhaps not the most economical in terms of memory and processor time.


Let's start with a relatively simple method:


publicstatic <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                   Executor executor)
Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.
Type Parameters:
U - the function's return type
Parameters:
supplier - a function returning the value to be used to complete the returned CompletableFuture
executor - the executor to use for asynchronous execution
Returns:
the new CompletableFuture

Read carefully the specification:


Returns a new CompletableFuture

That is, an object of type CompletableFutureor its subclass is created and returned as a result.


that is asynchronously completed by a task running in the given executor`

In addition, a task is created that is executed on Executor'e.
As we know, Executoraccepts only objects of type Runnable.
Runnable is an interface, and the first object can easily implement it - so we combine two functions in one object.


 completed ... with the value obtained by calling the given Supplier.

This Runnableshould cause this Supplierand the resulting value to complete the created one CompletableFuture.


Supplier - this is a function without parameters, so encoding it all is very simple:


classCompletableFutureForSupplyAsync<U> extendsCompletableFuture<U> implementsRunnable{
        Supplier<U> supplier;
        publicCompletableFutureForSupplyAsync(Supplier<U> supplier){
            this.supplier = supplier;
        }
        publicvoidrun(){
            try {
                U result = supplier.get();
                super.complete(result);
            } catch (Throwable e) {
                super.completeExceptionally(e);
            }
        }
    }
    publicstatic <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor){
        CompletableFutureForSupplyAsync<U> task = new CompletableFutureForSupplyAsync<>(supplier);
        executor.execute(task);
        return task;
    }

The following example is somewhat more complicated:


public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
                                               Executor executor)
Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor,
  with this stage's result as the argument to the supplied function. 
  See the CompletionStage documentation for rules covering exceptional completion.
Specified by:
thenApplyAsync in interface CompletionStage<T>
Type Parameters:
U - the function's return type
Parameters:
fn - the function to use to compute the value of the returned CompletionStage
executor - the executor to use for asynchronous execution
Returns:
the new CompletionStage

Returns a new CompletionStage that... is executed using the supplied Executor


Here we are directly offered to arrange the created object to issue in the form Runnable.


... with this stage's result as the argument to the supplied function.


but this is more interesting. The function passed to us has a parameter, and the value of this parameter is the value that completes the current one CompletionStage. At the time of the call, thenApplyAsyncthis value may not be known, so Executorwe cannot immediately start the task on . Instead, we must agree with the current CompletionStageone
so that at the time of its completion it transfers its value to the task. Among the many methods CompletionStagethere are one that is exactly suitable for this purpose whenComplete:


public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes.

That is, in the newly created object-object, it is enough to implement another interface BiConsumerfor receiving the argument:


classCompletableFutureForApplyAsync<T, U> extendsCompletableFuture<U>
            implementsRunnable, BiConsumer<T,Throwable>
    {
        Function<? super T,? extends U> fn;
        Executor executor;
        T arg;
        Throwable throwable;
        publicCompletableFutureForApplyAsync(Function<? super T,? extends U> fn, Executor executor){
            this.fn = fn;
            this.executor = executor;
        }
        @Override// implementation of BiConsumer interfacepublicvoidaccept(T argument, Throwable throwable){
            if (throwable != null) {
                this.throwable = throwable;
            } else {
                this.arg = argument;
            }
            executor.execute(this);
        }
        @Overridepublicvoidrun(){
            if (throwable == null) {
                try {
                    U result = fn.apply(arg);
                    super.complete(result);
                } catch (Throwable e) {
                    super.completeExceptionally(e);
                }
            } else {
                super.completeExceptionally(throwable);
            }
        }
    }
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
                                                   Executor executor
    ){
        CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor);
        this.whenComplete(task);
        return task;
    }
}

This example is very important for understanding the nature of asynchronous programming, so once again we list its main steps:


1) an asynchronous procedure is created:


     CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor);

2) it is not yet ready for execution, so we ask the supplier of the missing argument to pass this argument to us in the future, calling the method we submitted:


this.whenComplete(task);

3) in this method we not only save the received argument, but also run the task for execution (see method accept()).


4) the execution of the task is reduced to the execution of the function submitted to us and the saving of the result.
This result can also be requested by other procedures using the whenComplete() method already applied to our newly constructed object, so that we can construct a chain of asynchronous procedures of arbitrary length. But this chain will be executed in a strictly sequential manner, without any parallelism.


But how to depict a more complex calculation diagram containing parallel branches?
For this is the method thenCombineAsync.


If in the previous example we ran an asynchronous procedure with one argument, then in this one - with two.


In this case, the calculation of both arguments can occur in parallel.


ublic <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,
                                                   BiFunction<? super T,? super U,? extends V> fn,
                                                   Executor executor)
Description copied from interface: CompletionStage
Returns a new CompletionStage that, when this and the other given stage complete normally,
is executed using the supplied executor, with the two results as arguments to the supplied function. 

Everything here is the same as in the previous example thenApplyAsync, but the function parameter is already from two arguments, and a parameter CompletionStage<? extends U> otheris added that is the asynchronous provider of the second argument.


How do we handle the second argument?


Well, first, instead of one variable, T argdescribe two:, T arg1; U arg2;a instead of one public method, void accept(T argument, Throwable throwable)describe two - accept1and accept2,
each of which works with its own argument.


At the same time, our object under construction no longer implements the interface BiConsumer<T,Throwable>and we can no longer write a key sentence for connecting nodes of the asynchronous computing graph.


this.whenComplete(task);

Fortunately, an object of a functional interface can be represented by a reference to a method, without enclosing it in a separate class:


this.whenComplete(task::accept1);
        other.whenComplete(task::accept2);

That is, the current object thisdelivers the first argument, and the object the othersecond.


Here, method codes will have to be changed so that they do not start the task immediately upon the arrival of their argument, but also check the arrival of the second one:


publicsynchronizedvoidaccept1(T argument, Throwable throwable){
            if (throwable != null) {
                this.throwable = throwable;
                executor.execute(this);
            } else {
                this.arg1 = argument;
                if (arg2 != null) {
                    executor.execute(this);
                }
            }
        }

The accept2 method is described similarly.


Note that:


  • methods become synchronized (working with common data)
  • in the case of an error, it is not necessary to wait for the second argument.
  • checking the arrival of an argument by comparing to nullis not the best way; maybe you need to have a boolean for each argument.

In this way, you can make asynchronous procedures from a larger number of arguments than two, but the thought comes right away - can you still make a separate class for the parameters, so as not to write your method to accept each parameter, but get by dynamically creating the parameters?


    Parameter<Integer> arg1 = new Parameter<>();
    Parameter<Float> arg2 = new Parameter<>();
    ...
    future1.whenComplete(arg1);
    future2.whenComplete(arg2);

Yes, such a class can be created, but more on that next time.


A brief summary of the above:


  • An asynchronous program is a network of interconnected asynchronous procedures,
    just as a multi-threaded program is a network of interconnected execution threads (threads).

But the means of communication flows and asynchronous procedures are fundamentally different.


Threads are connected using semaphores, blocking queues and other similar objects
that block the receiver's stream if information has not yet arrived, but the stream is already trying to extract it using a pull-based operation.


Asynchronous procedures - recipients simply do not enter execution until all the information they need is ready.
They passively wait until the information providers themselves pass it through push-based operations.
Because of this, they do not spend memory on the stack while waiting, and, therefore, take up much less memory than execution threads.


  • building a network of asynchronous procedures is reduced to creating objects and linking them together, more precisely, to linking their subobjects - parameters: the output parameter of the information provider is passed the address of the input parameter of the recipient.

A set of methods CompletableFuturedoes exactly that, and in principle, these methods can be dispensed with by creating objects explicitly, as shown in the examples above.
But for this you need to have classes similar to those described in these examples.
For some reason, the creators java.util.concurrentchose not to give users access to these classes and hid them in the back of the code CompletableFuture.


Those who want to have a visual representation of the asynchronous network being created can reconstruct these classes by continuing the examples given. The source code of the examples is available on Github .


Also popular now: