
The problem of using CompletableFuture in multiple threads and its solution

CompletableFuture
that allows you to conveniently write asynchronous code. When using
CompletableFuture
from several threads, I came across its non-obvious behavior, namely, the fact that callbacks on it can be executed not at all in those threads, as expected. About this and how I managed to solve the problem - I will tell in this article. I developed an asynchronous, non-blocking, single-threaded client to a server that used thread-safe data structures. Tests passed without problems, but benchmarks sometimes c dropped
ConcurrentModificationException
on the internal structures of a single-threaded client. Asynchrony in the client was implemented using
CompletableFuture
all operations inside the client were performed in one thread (hereinafter referred to as code singleThreadExecutor
).A fragment of client code with a method
get
that is available to users://ожидающие завершения запросы
private final Set pendingFutures = Collections.newSetFromMap(new IdentityHashMap<>());
public CompletableFuture get(String key) {
CompletableFuture future = new CompletableFuture<>();
//передаём задачу на выполнение в поток клиента
singleThreadExecutor.execute(() -> {
//добавляем future в список ожидающих завершения
pendingFutures.add(future);
future.whenComplete((v, e) -> {
//когда future завершится удаляем его из списка ожидающих
pendingFutures.remove(future);
});
//тут был код передающий запрос на сервер и получающий ответ от сервера
//в конечном итоге код вызвал future.complete(data); в потоке этого singleThreadExecutor
});
return future;
}
It turned out that this should not be done.
Perhaps I would have learned about this earlier if I had carefully read the javadoc for
CompletableFuture
.View javadoc
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
When using this architecture, it is necessary that all callbacks are
CompletableFuture
called in the same thread that it does CompletableFuture.complete
. According to the code above, it seems to be happening. But benchmarks sometimes ended with
ConcurrentModificationException
in code that iterated over pendingFutures
in the same client thread ( singleThreadExecutor
). The fact is that the callback passed to
future.whenComplete
(which calls pendingFutures.remove
) is sometimes executed in a completely different thread. Or rather, in the thread of the application that my client uses:Client client = new Client("127.0.0.1", 8080);
CompletableFuture result = client.get(key);
result.thenAccept(data -> {
System.out.println(data);
});
A call
result.thenAccept
in this application sometimes leads to the call of the remaining callbacks on the future, which were added inside the client code itself.Let's look at the problem with simple examples.
Thread mainThread = Thread.currentThread();
CompletableFuture future = new CompletableFuture<>();
future.thenRun(() -> {
System.out.println(Thread.currentThread() == mainThread);
});
future.complete(null);
Such code always displays
true
, since callback is executed in the same thread as the complete method. But if there
CompletableFuture
is at least one call from another thread, then the behavior may change://основной поток
Thread mainThread = Thread.currentThread();
//создаём второй поток
Executor executor = Executors.newSingleThreadExecutor();
CompletableFuture future = new CompletableFuture<>();
future.thenRun(() -> {
System.out.println(Thread.currentThread() == mainThread)
});
//просто добавляем callback к тому же future в другом потоке
executor.execute(() -> {
future.thenRun(() -> {
//nop
});
});
//завершаем future
future.complete(null);
Such code can sometimes produce
false
. The fact is that a call
thenRun
from the same future, but in the second thread, can lead to a callback in the first thenRun
. In this case, the callback of the first thenRun
will be called in the second thread. This happens at the moment when it
future.complete(null)
started to execute, but had not yet managed to call callbacks, and in the second thread it was called thenRun
, which will execute all other callbacks on this future but already in its own thread. The problems are solved simply:
//основной поток
Thread mainThread = Thread.currentThread();
//создаём второй поток
Executor executor = Executors.newSingleThreadExecutor();
CompletableFuture future = new CompletableFuture<>();
CompletableFuture secondThreadFuture = future.thenRun(() -> {
System.out.println(Thread.currentThread() == mainThread);
});
//просто добавляем callback к тому же future в другом потоке
executor.execute(() -> {
secondThreadFuture.thenRun(() -> {
//nop
});
});
//завершаем future
future.complete(null);
We just added secondThreadFuture, which depends on the result of the original future. And a call on it
thenRun
in the second thread does not lead to the possible triggering of callbacks on the original future. To guarantee callbacks in user-defined threads,
CompletableFuture
there are async implementations of methods, for example thenRunAsync
, to which you need to pass Executor. But async versions of methods may run slower than regular versions. Therefore, I did not want to use them again.Conclusion
The conclusion I made for myself: not to use one object
CompletableFuture
in several threads, if you need to be sure that all callbacks on it are executed in the given thread. And if you need to use several streams with one CompletableFuture it is necessary to transfer to another stream not the original CompletableFuture
, but the new one, which will depend on the original one. For example, like this:CompletableFuture secondThreadFuture = firstThreadFuture.whenComplete((v, e) -> {
//nop
});