The problem of using CompletableFuture in multiple threads and its solution

    imageJava 8 introduces a new class CompletableFuturethat allows you to conveniently write asynchronous code.
    When using CompletableFuturefrom several threads, I came across its non-obvious behavior, namely, the fact that callbacks on it can be executed not at all in those threads, as expected. About this and how I managed to solve the problem - I will tell in this article.

    I developed an asynchronous, non-blocking, single-threaded client to a server that used thread-safe data structures. Tests passed without problems, but benchmarks sometimes c dropped ConcurrentModificationExceptionon the internal structures of a single-threaded client.

    Asynchrony in the client was implemented using CompletableFutureall operations inside the client were performed in one thread (hereinafter referred to as code singleThreadExecutor).

    A fragment of client code with a method getthat is available to users:

    //ожидающие завершения запросы
    private final Set pendingFutures = Collections.newSetFromMap(new IdentityHashMap<>());
    public CompletableFuture get(String key) {
         CompletableFuture future = new CompletableFuture<>();
         //передаём задачу на выполнение в поток клиента
         singleThreadExecutor.execute(() -> {
                 //добавляем future в список ожидающих завершения
                 pendingFutures.add(future);
                 future.whenComplete((v, e) -> {
                         //когда future завершится удаляем его из списка ожидающих
                         pendingFutures.remove(future);
                 });
                 //тут был код передающий запрос на сервер и получающий ответ от сервера
                 //в конечном итоге код вызвал future.complete(data); в потоке этого singleThreadExecutor
         });
         return future;
    }
    

    It turned out that this should not be done.

    Perhaps I would have learned about this earlier if I had carefully read the javadoc for CompletableFuture.

    View javadoc
    Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

    When using this architecture, it is necessary that all callbacks are CompletableFuturecalled in the same thread that it does CompletableFuture.complete.

    According to the code above, it seems to be happening. But benchmarks sometimes ended with ConcurrentModificationExceptionin code that iterated over pendingFuturesin the same client thread ( singleThreadExecutor).

    The fact is that the callback passed to future.whenComplete(which calls pendingFutures.remove) is sometimes executed in a completely different thread. Or rather, in the thread of the application that my client uses:

    Client client = new Client("127.0.0.1", 8080);
    CompletableFuture result = client.get(key);
    result.thenAccept(data -> {
        System.out.println(data);
    });
    

    A call result.thenAcceptin this application sometimes leads to the call of the remaining callbacks on the future, which were added inside the client code itself.

    Let's look at the problem with simple examples.


    Thread mainThread = Thread.currentThread();
    CompletableFuture future = new CompletableFuture<>();
    future.thenRun(() -> {
        System.out.println(Thread.currentThread() == mainThread);
    });
    future.complete(null);

    Such code always displays true, since callback is executed in the same thread as the complete method.

    But if there CompletableFutureis at least one call from another thread, then the behavior may change:

    //основной поток
    Thread mainThread = Thread.currentThread();
    //создаём второй поток
    Executor executor = Executors.newSingleThreadExecutor();
    CompletableFuture future = new CompletableFuture<>();
    future.thenRun(() -> {
        System.out.println(Thread.currentThread() == mainThread)
    });
    //просто добавляем callback к тому же future в другом потоке
    executor.execute(() -> {
        future.thenRun(() -> {
            //nop
        });
    });
    //завершаем future
    future.complete(null);
    

    Such code can sometimes produce false.

    The fact is that a call thenRunfrom the same future, but in the second thread, can lead to a callback in the first thenRun. In this case, the callback of the first thenRunwill be called in the second thread.

    This happens at the moment when it future.complete(null)started to execute, but had not yet managed to call callbacks, and in the second thread it was called thenRun, which will execute all other callbacks on this future but already in its own thread.

    The problems are solved simply:

    //основной поток
    Thread mainThread = Thread.currentThread();
    //создаём второй поток
    Executor executor = Executors.newSingleThreadExecutor();
    CompletableFuture future = new CompletableFuture<>();
    CompletableFuture secondThreadFuture = future.thenRun(() -> {
        System.out.println(Thread.currentThread() == mainThread);
    });
    //просто добавляем callback к тому же future в другом потоке
    executor.execute(() -> {
        secondThreadFuture.thenRun(() -> {
            //nop
        });
    });
    //завершаем future
    future.complete(null);
    

    We just added secondThreadFuture, which depends on the result of the original future. And a call on it thenRunin the second thread does not lead to the possible triggering of callbacks on the original future.

    To guarantee callbacks in user-defined threads, CompletableFuturethere are async implementations of methods, for example thenRunAsync, to which you need to pass Executor. But async versions of methods may run slower than regular versions. Therefore, I did not want to use them again.

    Conclusion


    The conclusion I made for myself: not to use one object CompletableFuturein several threads, if you need to be sure that all callbacks on it are executed in the given thread. And if you need to use several streams with one CompletableFuture it is necessary to transfer to another stream not the original CompletableFuture, but the new one, which will depend on the original one. For example, like this:

    CompletableFuture secondThreadFuture = firstThreadFuture.whenComplete((v, e) -> {
        //nop
    });
    

    Also popular now: