10 tips for using ExecutorService

I offer readers of Habrahabr a translation of the publication ExecutorService - 10 tips and tricks .



The abstraction of ExecutorService was introduced back in Java 5. 2004 was in the yard ... For a second - now Java 5 and 6 are no longer supported and Java 7 is preparing to fill up the list. And many Java programmers still do not fully understand how the ExecutorService works. At your disposal are many sources, but now I would like to talk about the little-known subtleties and practices for working with it.

1. Name the thread pools


I can not help but mention this. When dumping or during debugging, you will notice that the standard thread naming scheme is as follows: pool-N-thread-M , where N denotes the sequential pool number (every time you create a new pool, the global counter N increments), and M is ordinal thread number in the pool. For example, pool-2-thread-3 means the third thread in the second pool of the JVM life cycle. See: Executors.defaultThreadFactory () . Not very informative, is it? The JDK makes it a little difficult to properly name the threads, as The naming strategy is hidden inside ThreadFactory . Fortunately, Google Guava has a built-in class for this:
import com.google.common.util.concurrent.ThreadFactoryBuilder;
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
        .setNameFormat("Заказы-%d")
        .setDaemon(true)
        .build();
final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);

By default, non-daemon thread pools are created , decide for yourself where which is more appropriate.

2. Change the names depending on the context


I learned about this trick from the article “Supercharged jstack: How to Debug Your Servers at 100mph” . Since we know about stream names, we can change them in runtime whenever we want! This makes sense, since the stream dump contains the names of classes and methods without parameters and local variables. By including some important information in the stream name, we can easily track which messages / records / requests, etc. slow down the system or cause a deadlock.
private void process(String messageId) {
    executorService.submit(() -> {
        final Thread currentThread = Thread.currentThread();
        final String oldName = currentThread.getName();
        currentThread.setName("Обработка-" + messageId);
        try {
            //основная логика...
        } finally {
            currentThread.setName(oldName);
        }
    });
}

Inside the try-finally block, the current thread is called Processing-ID-of-current-message , which can be useful when monitoring the flow of messages in the system.

3. Explicit and secure termination


Between the client threads and the thread pool lies the job queue. When the application shuts down, you need to worry about two things: what will happen to the tasks waiting in the queue, and how the ones that are already running will behave (more on that later). Surprisingly, many developers do not close the thread pool properly. There are two ways: either allow all tasks in the queue to work ( shutdown () ), or delete them ( shutdownNow () ), depending on the particular case. For example, if we queued a set of tasks and want to return control as soon as they are all completed, use shutdown () :
private void sendAllEmails(List emails) throws InterruptedException {
    emails.forEach(email ->
            executorService.submit(() ->
                    sendEmail(email)));
    executorService.shutdown();
    final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES);
    log.debug("Все ли письма были отправлены? {}", done);
}

In this example, we send a packet of letters, each as a separate task for the thread pool. After queuing these tasks, we close the pool so that it can no longer accept new tasks. Next, we wait a maximum of one minute until all tasks are completed. However, if some tasks have not yet been completed, awaitTermination () will simply return false . In addition, the remaining tasks will continue to be completed. I know hipsters are ready to go for:
emails.parallelStream().forEach(this::sendEmail);

Call me old fashioned, but I like to control the number of parallel threads. And an alternative to gradual shutdown () completion is shutdownNow () :
final List rejected = executorService.shutdownNow();
log.debug("Отклоненные задачи: {}", rejected.size());

This time all the tasks in the queue are discarded and returned. Already running tasks are allowed to continue.

4. Handle interruption with care.


A lesser known feature of the Future interface is the ability to cancel. The following is one of my previous articles: InterruptedException and interrupting threads explained .
Since the InterruptedException is clearly checked (checked), most likely no one even thought about how many errors it has suppressed over all these years. And since it needs to be handled, many do it wrong or thoughtlessly. Let's look at a simple example of a thread that periodically does some sort of cleaning and sleeps in between most of the time.
class Cleaner implements Runnable {
  Cleaner() {
    final Thread cleanerThread = new Thread(this, "Чистильщик");
    cleanerThread.start();
  }
  @Override
  public void run() {
    while(true) {
      cleanUp();
      try {
        TimeUnit.SECONDS.sleep(1);
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
    }
  }
  private void cleanUp() {
    //...
  }
}

This code is terrible anyway!
  1. Starting a thread from a constructor is often a bad idea. For example, some frameworks, such as Spring, like to create dynamic subclasses to support method hooks. Ultimately, we get two threads running from two instances.
  2. The InterruptedException is swallowed and not handled properly.
  3. This class starts a new thread in each instance. Instead, it should use a ScheduledThreadPoolExecutor , which will produce the same threads for many objects, which is more reliable and efficient.
  4. In addition, with the help of the ScheduledThreadPoolExecutor, we can avoid writing sleep / work cycles and get really scheduled.
  5. Last but not least. There is no way to get rid of this thread, even if no one else refers to the Cleaner instance.

All of these issues are important, but suppressing an InterruptedException is the biggest sin. Before we understand why, let's think about why this exception is needed and how we can use its advantages to gracefully interrupt threads. Many blocking operations in the JDK require you to handle an InterruptedException , for example:
  • Object.wait ()
  • Thread.sleep ()
  • Process.waitFor ()
  • A lot of blocking methods in java.util.concurrent. * , Such as ExecutorService.awaitTermination (), Future.get (), BlockingQueue.take (), Semaphore.acquire () Condition.await () and many, many others
  • SwingUtilities.invokeAndWait ()

Note that blocking I / O does not throw an InterruptedException (which is regrettable). If all of these classes declare an InterruptedException , you may be surprised when these exceptions are thrown:
  • When a thread is blocked on some method that declares InterruptedException , and you call Thread.interrupt () on this thread, most likely the blocking method will immediately throw an InterruptedException .
  • If you queued a task ( ExecutorService.submit () ) and called Future.cancel (true) while it is still running, the thread pool will try to interrupt the thread that is performing this task, effectively completing it.

Knowing what InterruptedException actually is , we can handle it correctly. If someone tries to interrupt our thread, and we find this by handling InterruptedException , it would be wise to allow it to terminate immediately, for example:
class Cleaner implements Runnable, AutoCloseable {
  private final Thread cleanerThread;
  Cleaner() {
    cleanerThread = new Thread(this, "Cleaner");
    cleanerThread.start();
  }
  @Override
  public void run() {
    try {
      while (true) {
        cleanUp();
        TimeUnit.SECONDS.sleep(1);
      }
    } catch (InterruptedException ignored) {
      log.debug("Interrupted, closing");
    }
  }
  //...   
  @Override
  public void close() {
    cleanerThread.interrupt();
  }
}

Note that the try-catch block in this example surrounds the while loop . Thus, if the sleep () throws InterruptedException , we interrupt this cycle. You may argue that we should log an InterruptedException exception stack . It depends on situation. In this case, the interruption of the flow is the expected behavior, and not a fall. In general, at your discretion. In most cases, the thread will be interrupted during sleep () and we will quickly complete the run () method at the same time. If you are very careful, you will probably ask - what will happen if the stream is interrupted during the cleanUp () cleanup? Often you will come across the decision to manually set a flag, like this:
private volatile boolean stop = false;
@Override
public void run() {
  while (!stop) {
    cleanUp();
    TimeUnit.SECONDS.sleep(1);
  }
}
@Override
public void close() {
  stop = true;
}

Remember that the stop flag (it must be volatile!) Will not interrupt blocking operations, we must wait for the sleep () method to work. On the other hand, this explicit flag gives us better control, as we can monitor it at any time. Turns out thread interruption works just the same. If someone interrupted a thread while it was performing non-blocking calculations (for example, cleanUp () ), such calculations would not be interrupted immediately. However, the thread is already marked as interrupted, so any next blocking operation, such as sleep (), will be interrupted immediately and throw an InterruptedException , so we will not lose this signal.

We can also take advantage of this fact if we implement a non-blocking thread that still wants to take advantage of the thread interrupt mechanism. Instead of relying on InterruptedException , we should just periodically check Thread.isInterrupted ():
public void run() {
  while (Thread.currentThread().isInterrupted()) {
    someHeavyComputations();
  }
}

As you can see, if someone interrupts our flow, we will cancel the calculations as soon as the previous iteration of someHeavyComputations () allows . If it runs for a very long time or endlessly, we will never reach the interrupt flag. It is noteworthy that this flag is not one-time. We can call Thread.interrupted () instead of isInterrupted (), which will clear the flag and we can continue. Sometimes you may want to ignore the interrupt flag and continue execution. In this case, interrupted () may come in handy.

If you are an old-school programmer, you probably remember the Thread.stop () methodwhich is out of date 10 years ago. In Java 8, there were plans for its “de-implementation”, but in 1.8u5 it is still with us. However, do not use it and refactor any code in which it occurs using Thread.interrupt () .

You may sometimes want to completely ignore InterruptedException. In this case, pay attention to the Uninterruptibles class from Guava. It contains many methods such as sleepUninterruptibly () or awaitUninterruptibly (CountDownLatch). Just be careful with them. They do not throw InterruptedException , but also completely eliminate the thread from interruption, which is rather unusual.

So, now you have an understanding of why some methods throwInterruptedException :
  • Thrown InterruptedException should be adequately handled in most cases.
  • Suppressing InterruptedException is often a bad idea.
  • If the thread was interrupted during non-blocking calculations. Use isInterrupted ().


5. Watch the length of the queue and determine the border


Wrong thread pools can lead to performance degradation, instability, and memory leaks. If you specify too few threads, the queue will grow, consuming a lot of memory. On the other hand, too many threads will slow down the whole system due to frequent context switches, which will lead to the same symptoms. It is important to maintain the depth of the queue and determine its boundaries. And an overloaded pool may simply temporarily abandon new tasks.
final BlockingQueue queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue);

The above code is equivalent to Executors.newFixedThreadPool (n) , however, instead of using the default unlimited LinkedBlockingQueue , we use an ArrayBlockingQueue with a fixed capacity of 100. This means that if 100 tasks are already completed, the next task will be rejected with a RejectedExecutionException exception . In addition, since the queue is now accessible from the outside, we can periodically inquire about its size in order to log it, send it to JMX, etc.

6. Remember to handle exceptions.


What is the result of executing the following code?
executorService.submit(() -> {
    System.out.println(1 / 0);
});

I was puzzled at how many times he printed nothing. No sign of java.lang.ArithmeticException: / by zero , nothing. The thread pool simply swallowed the exception, as if it had never been thrown. If it were a thread created from scratch, without a pool wrapper, UncaughtExceptionHandler could have fired . But with a thread pool, you have to be more careful. If you sent Runnable for execution (without any result, as above), you must put the whole body of the method inside try-catch . If you queue Callable, make sure you always get its result with a blocking get () to throw an exception again:
final Future division = executorService.submit(() -> 1 / 0);
//ниже будет выброшено ExecutionException, вызванное ArithmeticException
division.get();

It is noteworthy that even in the Spring framework they made this mistake in @Async , see: SPR-8995 and SPR-12090 .

7. Watch the waiting time in line


Monitoring the depth of the work queue is one-way. When solving problems with a single transaction / task, it makes sense to see how much time has passed between setting the task and the beginning of its execution. Ideally, this time should tend to zero (when there is an idle thread in the pool), however, it will increase as tasks are queued. In addition, if the pool does not have a fixed number of threads, starting a new task may require the birth of a new thread, which will also take some time. To clearly measure this indicator, wrap the original ExecutorService in something similar:
public class WaitTimeMonitoringExecutorService implements ExecutorService {
    private final ExecutorService target;
    public WaitTimeMonitoringExecutorService(ExecutorService target) {
        this.target = target;
    }
    @Override
    public  Future submit(Callable task) {
        final long startTime = System.currentTimeMillis();
        return target.submit(() -> {
                    final long queueDuration = System.currentTimeMillis() - startTime;
                    log.debug("Задание {} провело в очереди {} мс", task, queueDuration);
                    return task.call();
                }
        );
    }
    @Override
    public  Future submit(Runnable task, T result) {
        return submit(() -> {
            task.run();
            return result;
        });
    }
    @Override
    public Future submit(Runnable task) {
        return submit(new Callable() {
            @Override
            public Void call() throws Exception {
                task.run();
                return null;
            }
        });
    }
    //...
}

This is not a complete implementation, but the point is clear. At the moment when we put the task in the thread pool, we immediately noticed the time. Then the stopwatch was stopped as soon as the task was removed and sent for execution. Don't be fooled by the proximity of startTime and queueDuration in the source code. In fact, these two lines are executed in different threads, in milliseconds or even in seconds from each other.

8. Save the client stack trace


Increased attention is paid to reactive programming these days: Reactive manifesto , reactive streams , RxJava (already 1.0!), Clojure agents , scala.rx ... All this looks great, but stackrace is no longer your friend, it is by and large useless. Consider, for example, the following exception that occurs during the execution of a job in a thread pool:
java.lang.NullPointerException: null
    at com.nurkiewicz.MyTask.call (Main.java:76) ~ [classes /: na]
    at com.nurkiewicz.MyTask.call (Main.java:72) ~ [classes /: na]
    at java.util.concurrent.FutureTask.run (FutureTask.java:266) ~ [na: 1.8.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) ~ [na: 1.8.0]
    at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) ~ [na: 1.8.0]
    at java.lang.Thread.run (Thread.java:744) ~ [na: 1.8.0]

We can easily notice that MyTask threw NPE on line 76. But we have no idea who approved this task, since the stack only refers to Thread and ThreadPoolExecutor . Technically, we can simply navigate through the code in the hope of finding only one section where MyTask is queued. But without separate flows (not to mention event-oriented, reactive, etc. programming), we always see the whole picture at once. What if we could save the stack trace of the client code (that initiates the task) and show it, for example, if an error occurs? The idea is not new, for example, Hazelcastpropagates exceptions from the owner node to client code. The following is a straightforward example of how to do this:
public class ExecutorServiceWithClientTrace implements ExecutorService {
    protected final ExecutorService target;
    public ExecutorServiceWithClientTrace(ExecutorService target) {
        this.target = target;
    }
    @Override
    public  Future submit(Callable task) {
        return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
    }
    private  Callable wrap(final Callable task, final Exception clientStack, String clientThreadName) {
        return () -> {
            try {
                return task.call();
            } catch (Exception e) {
                log.error("Исключение {} в задании из потока {}:", e, clientThreadName, clientStack);
                throw e;
            }
        };
    }
    private Exception clientTrace() {
        return new Exception("Клиентский стектрейс");
    }
    @Override
    public  List> invokeAll(Collection> tasks) throws InterruptedException {
        return tasks.stream().map(this::submit).collect(toList());
    }
    //...
}

This time, in case of failure, we retrieve the full stack trace and the name of the stream where the job was queued. Much more valuable information compared to the standard exception discussed earlier: The
java.lang.NullPointerException exception in the job from the main thread:
java.lang.Exception: Client stack trace
    at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace (ExecutorServiceWithClientTrace.java:43) ~ [classes /: na]
    at com.nurkiewicz.ExecutorServiceWithClientTrace.submit (ExecutorServiceWithClientTrace.java:28) ~ [classes /: na]
    at com.nurkiewicz.Main.main (Main.java:31) ~ [classes /: na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) ~ [na: 1.8.0]
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) ~ [na: 1.8.0]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) ~ [na: 1.8.0]
    at java.lang.reflect.Method.invoke (Method.java:483) ~ [na: 1.8.0]
    at com.intellij.rt.execution.application.AppMain.main (AppMain.java:134) ~ [idea_rt.jar: na]


9. Prefer CompletableFuture


Java 8 introduced the more powerful CompletableFuture class . Please use it where possible. The ExecutorService has not been extended to support this abstraction, so you have to take care of it yourself. Instead:
final Future future = executorService.submit(this::calculate);


Use:
final CompletableFuture future = CompletableFuture.supplyAsync(this::calculate, executorService);


CompletableFuture extends Future , so that everything works as before. But more advanced users of your API will truly appreciate the enhanced functionality provided by CompletableFuture.

10. Synchronous Queues


SynchronousQueue is an interesting variation of BlockingQueue , which is actually not quite a queue. It is not even a data structure as such. It can best be defined as a queue with zero capacity.
Here is what JavaDoc says :
Each operation to be added must wait for a corresponding delete operation in another thread, and vice versa. A synchronous queue has no internal capacity, not even a single one. You cannot look into the synchronous queue, because the element is presented only when you try to delete it; you cannot insert an element (using any method) until another thread deletes it: you cannot bypass the queue because there is nothing to bypass.
Synchronous queues are similar to the "rendezvous channels" used in CSP and Ada.

How does all this relate to thread pools? Let's try using SynchronousQueue together with ThreadPoolExecutor :
BlockingQueue queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, queue);

We created a thread pool with two threads and a SynchronousQueue before this. Essentially, SynchronousQueue are queues with a capacity of 0, so such ExecutorService will only accept new tasks if an idle thread is available. If all threads are busy, the new task will be immediately rejected and will never wait in line. This mode can be useful for immediate processing in the background, if possible.

That's all, I hope you have discovered at least one interesting feature!

Also popular now: