Overview of java.util.concurrent. *

  • Tutorial
In everyday work, it is not so often that you have to deal with the java.util.concurrent multithreading package. Sometimes there are design restrictions on the use of java 1.4.2, where there is no this package, but most often there is enough normal synchronization and nothing supernatural is required. Fortunately, tasks periodically arise that make you think a little bit of brains and either write a bicycle or rummage through javadocs and find something more suitable. There are no problems with the bike - just take and write, since there is nothing super complicated in multithreading. On the other hand, less code - less bugs. Moreover, no one in their right mind writes unit tests for multithreading, because these are already full integration tests with all the ensuing consequences.

What to choose for a specific case? In the conditions of a zaparki and deadline'ov it is rather difficult to cover all java.util.concurrent. Selects something similar and go! So, gradually, ArrayBlockingQueue, ConcurrentHashMap, AtomicInteger, Collections.synchronizedList (new LinkedList ()) and other interesting things appear in the code. Sometimes right, sometimes not. At some point in time, you begin to realize that more than 95% of the standard classes in java are not used at all in product development. Collections, primitives, shifting bytes from one place to another, hibernate, spring or EJB, some other library and, voila, the application is ready.

In order to at least somehow streamline knowledge and facilitate entry into the topic, below is an overview of classes for working with multithreading. I write primarily as a cheat sheet for myself. And if it suits anyone else, it's wonderful.

For seed


I’ll give you a couple of interesting links right away. The first is for those who swim a bit in multithreading. The second for "advanced" programmers - maybe there is something useful here.


A bit about the author of the java.util.concurrent package


If anyone ever opened the source code for the java.util.concurrent classes, they could not help but notice in the authors Doug Lea (Doug Lee), Professor Oswego (Osuigo), University of New York. The list of his most famous developments included java collections and util.concurrent , which in one form or another were reflected in existing JDKs. He also wrote dlmalloc implementation for dynamic memory allocation. Among the literature there was a book on multithreading Concurrent Programming in Java: Design Principles and Pattern, 2nd Edition . More details can be found on his home page .


Doug Lea's performance at the JVM Language Summit in 2010.

To the top


Probably many had a sense of some chaos with a quick look at java.util.concurrent. In one package, different classes with completely different functionality are mixed, which makes it somewhat difficult to understand what is relevant and how it works. Therefore, you can schematically divide classes and interfaces according to a functional attribute, and then go over the implementation of specific parts.



Concurrent Collections - a collection of collections that work more efficiently in a multi-threaded environment than the standard universal collections from the java.util package. Instead of the base wrapper Collections.synchronizedList with blocking access to the entire collection, locks on data segments are used or work is optimized for parallel reading of data using wait-free algorithms.

Queues- Non-blocking and blocking queues with support for multithreading. Non-blocking queues are sharpened for speed and operation without blocking threads. Blocking queues are used when you need to “slow down” the Producer or Consumer threads, if any conditions are not met, for example, the queue is empty or re-branded, or there is no free Consumer a.

Synchronizers - helper utilities for thread synchronization. They are a powerful weapon in "parallel" computing.

Executors - contains excellent frameworks for creating thread pools, scheduling asynchronous tasks with obtaining results.

Locks- represents alternative and more flexible mechanisms of synchronization of flows in comparison with basic synchronized, wait, notify, notifyAll.

Atomics - classes with support for atomic operations on primitives and links.



1. Concurrent Collections



CopyOnWrite Collection




The name speaks for itself. All operations to modify the collection (add, set, remove) lead to the creation of a new copy of the internal array. This ensures that when iterating through the collection, ConcurrentModificationException will not be thrown. It should be remembered that when copying an array, only references (links) to objects (shallow copy) are copied, incl. access to element fields is not thread-safe. CopyOnWrite collections are convenient to use when write operations are quite rare, for example, when implementing the listeners subscription mechanism and passing through them.

CopyOnWriteArrayList - Stream safe analog of ArrayList implemented with CopyOnWrite algorithm.
Additional methods and constructor
CopyOnWriteArrayList (E [] toCopyIn)A constructor that takes an array as input.
int indexOf (E e, int index)Returns the index of the first element found, starting the search from the given index.
int lastIndexOf (E e, int index)Returns the index of the first element found in the reverse search, starting at the given index.
boolean addIfAbsent (E e)Add an item if it is not in the collection. The equals method is used to compare elements.
int addAllAbsent (Collection c)Add items if they are not in the collection. Returns the number of items added.

CopyOnWriteArraySet- Implementation of the Set interface, using CopyOnWriteArrayList as the basis. Unlike CopyOnWriteArrayList, there are no additional methods.

Scalable maps




Improved implementations of HashMap, TreeMap with better support for multithreading and scalability.

Concurrentmap - An interface that extends Map with several additional atomic operations.
Additional methods
V putIfAbsent (K key, V value)Adds a new key-value pair only if the key is not in the collection. Returns the previous value for the given key.
boolean remove (Object key, Object value)Deletes a key-value pair only if the specified key matches the specified value in Map. Returns true if the item was successfully deleted.
boolean replace (K key, V oldValue, V newValue)Replaces the old value with a new one by key only if the old value matches the specified value in Map. Returns true if the value has been replaced with a new one.
V replace (K key, V value)Replaces the old value with a new one by key only if the key is associated with any value. Returns the previous value for the given key.

Concurrenthashmap- Unlike Hashtable and synhronized blocks on HashMap, the data is presented in the form of segments, divided by hash'ami keys. As a result, access to data is locked by segments, and not by one object. In addition, iterators present data for a specific time slice and do not throw a ConcurrentModificationException. ConcurrentHashMap is described in more detail in habratopika here .
Additional constructor
ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel)The 3rd parameter of the constructor is the expected number of simultaneously writing threads. The default value is 16. Affects the size of the collection in memory and performance.

ConcurrentNavigableMap- Extends the NavigableMap interface and forces the use of ConcurrentNavigableMap objects as return values. All iterators are declared safe to use and do not throw a ConcurrentModificationException.

ConcurrentSkipListMap- It is an analogue of TreeMap with multithreading support. Data is also sorted by key and guaranteed average log (N) performance for containsKey, get, put, remove and other similar operations. The SkipList operation algorithm is described on the Wiki and Habr .

ConcurrentSkipListSet - Implementation of the Set interface, based on ConcurrentSkipListMap.



2. Queues


Non-blocking queues




Thread-safe and non-blocking Queue implementations on linked nodes.

ConcurrentLinkedQueue- The implementation uses a wait-free algorithm from Michael & Scott, adapted for working with garbage collector. This algorithm is quite efficient and, most importantly, very fast, because built on CAS . The size () method can work for a long time, incl. it’s better not to pull it all the time. A detailed description of the algorithm can be found here .

Concurrentlinkeddeque- Deque stands for Double ended queue and reads as "Deck". This means that data can be added and pulled out on both sides. Accordingly, the class supports both operating modes: FIFO (First In First Out) and LIFO (Last In First Out). In practice, ConcurrentLinkedDeque should only be used if you need LIFO, because Due to the bi-directionality of the nodes, this class loses 40% in performance compared to ConcurrentLinkedQueue.

Blocking queues




Blockingquueue- When processing large data streams through queues, using ConcurrentLinkedQueue is clearly not enough. If the threads raking the queue cease to cope with the influx of data, then you can quickly get out of memory or overload IO / Net so much that the performance will drop at times until the system crashes by timeouts or due to the lack of free descriptors in the system. For such cases, you need a queue with the ability to set the size of the queue or with locks on the conditions. Here the BlockingQueue interface appears, paving the way for a whole set of useful classes. In addition to the ability to set the size of the queue, new methods have been added that respond differently to the non-filling or overflow of the queue. So, for example, when adding an element to an overflowed queue, one method will throw an IllegalStateException, the other will return false, the third will block the stream until a place appears, the fourth will block the stream with a timeout and return false if the place still does not appear. It is also worth noting that blocking queues do not support null values, as this value is used in the poll method as a timeout indicator.

ArrayBlockingQueue- A blocking queue class built on a classic ring buffer. In addition to the size of the queue, the ability to manage the “honesty” of locks is available. If fair = false (default), then the order of the threads is not guaranteed. More details about “honesty” can be found in the description of ReentrantLock.

Delayquueue- A rather specific class that allows you to pull items out of the queue only after a certain delay defined in each item through the getDelay method of the Delayed interface.

LinkedBlockingQueue- A blocking queue on linked nodes, implemented on the “two lock queue” algorithm: one lock to add, another lock to pull out the element. Due to two locks, in comparison with ArrayBlockingQueue, this class shows higher performance, but its memory consumption is higher. The queue size is set through the constructor and defaults to Integer.MAX_VALUE.

PriorityBlockingQueue- It is a multi-threaded wrapper over PriorityQueue. When an element is inserted into the queue, its order is determined in accordance with the logic of Comparator or the implementation of the Comparable interface for elements. The smallest element comes out of the queue first.

SynchronousQueue- This line works on the principle of one entered, one left. Each insert operation blocks the “Producer” stream until the “Consumer” stream pulls the item out of the queue and vice versa, the “Consumer” waits until the “Producer” inserts the item.

Blockingdeque- An interface that describes additional methods for a bidirectional blocking queue. Data can be inserted and pulled out on both sides of the queue.

LinkedBlockingDeque- Bidirectional blocking queue on linked nodes, implemented as a simple bidirectional list with one lock. The queue size is set through the constructor and defaults to Integer.MAX_VALUE.

TransferQueue- This interface may be interesting in that when adding an item to the queue, it is possible to block the thread inserting the "Producer" until another thread "Consumer" pulls the item out of the queue. The lock can be either with a timeout, or even be replaced by checking for pending "Consumer" s. Thus, it becomes possible to implement a message transfer mechanism with support for both synchronous and asynchronous messages.

LinkedTransferQueue- Implementation of TransferQueue based on the Dual Queues with Slack algorithm. Actively uses CAS and thread parking when they are in standby mode.



3. Synchronizers




This section provides classes for actively managing threads.

Semaphore - Semaphores are most often used to limit the number of threads when working with hardware resources or the file system. Access to a shared resource is controlled by a counter. If it is greater than zero, then access is allowed, and the counter value decreases. If the counter is zero, then the current thread is blocked until another thread releases the resource. The number of permissions and the “honesty” of thread release is set through the constructor. The bottleneck when using semaphores is setting the number of permissions, because often this number has to be selected depending on the power of the hardware.

CountDownLatch- Allows one or more threads to wait until a certain number of operations have been completed on other threads. The classic driver example pretty well describes the logic of the class: Threads that call the driver will hang in the await method (with or without a timeout) until the thread with the driver initializes and then calls the countDown method. This method decreases the count down counter by one. As soon as the counter becomes zero, all waiting threads in await will continue to work, and all subsequent await calls will pass without waiting. The count down counter is one-time and cannot be reset to its original state.

CyclicBarrier- It can be used to synchronize a given number of threads at one point. A barrier is achieved when N-threads call the await (...) method and lock. After that, the counter is reset to its original value, and waiting threads are freed. Additionally, if necessary, it is possible to run a special code before unlocking threads and resetting the counter. To do this, an object with an implementation of the Runnable interface is passed through the constructor.

Exchanger- As the name implies, the main purpose of this class is the exchange of objects between two streams. At the same time, null values ​​are also supported, which allows you to use this class to transfer only one object, or simply as a synchronizer of two streams. The first thread that calls the exchange (...) method will block until the same method calls the second thread. As soon as this happens, the threads will exchange values ​​and continue their work.

Phaser- An improved implementation of the barrier for thread synchronization, which combines the functionality of CyclicBarrier and CountDownLatch, incorporating the best of them. So, the number of threads is not rigidly set and can dynamically change. The class can be reused and report the readiness of the stream without blocking it. More details can be found in habratopika here .



4. Executors


So we got to the largest part of the package. Here we will describe the interfaces for running asynchronous tasks with the possibility of obtaining results through Future and Callable interfaces, as well as services and factories for creating thread pools: ThreadPoolExecutor, ScheduledPoolExecutor, ForkJoinPool. For a better understanding, we will make a small decomposition of interfaces and classes.

Future and callable




Future- A wonderful interface for obtaining the results of an asynchronous operation. The key method here is the get method, which blocks the current thread (with or without a timeout) until the asynchronous operation in the other thread completes. Also, there are additional methods for canceling the operation and checking the current status. As an implementation, the FutureTask class is often used.

Runnablefuture- If Future is an interface for the Client API, then the RunnableFuture interface is already used to run the asynchronous part. Successful completion of the run () method completes the asynchronous operation and allows pulling results through the get method.

Callable- An extended analogue of the Runnable interface for asynchronous operations. Allows you to return a typed value and throw a checked exception. Although the run () method is missing from this interface, many java.util.concurrent classes support it along with Runnable.

Futuretask- Implementation of the Future / RunnableFuture interface. An asynchronous operation is received at the input of one of the constructors in the form of Runnable or Callable objects. The FutureTask class itself is designed to run in a worker thread, for example, through new Thread (task) .start (), or through ThreadPoolExecutor. The results of the asynchronous operation are pulled through the get (...) method.

Delayed - Used for asynchronous tasks that should begin in the future, as well as in DelayQueue. Allows you to set the time before the start of the asynchronous operation.

Scheduled Future- Token interface combining Future and Delayed interfaces.

RunnableScheduledFuture- An interface that combines RunnableFuture and ScheduledFuture. Additionally, you can indicate whether the task is a one-time task or whether it should be started at a specified frequency.

Executor services




Executor - Provides a basic interface for classes that implement Runnable tasks. This ensures the decoupling between adding a task and the way it starts.

ExecutorService- An interface that describes a service to run Runnable or Callable tasks. Input submit methods accept a task in the form of Callable or Runnable, and Future is used as the return value, through which you can get the result. InvokeAll methods work with task lists with thread blocking until all tasks in the transferred list are complete or before the specified timeout expires. The invokeAny methods block the calling thread until any of the transferred tasks completes. In addition to everything, the interface contains methods for graceful shutdown. After calling the shutdown method, this service will no longer accept tasks, throwing a RejectedExecutionException when trying to throw a task into the service.

ScheduledExecutorService- In addition to the ExecutorService methods, this interface adds the ability to run pending tasks.

AbstractExecutorService - Abstract class for building ExecutorService. The implementation contains the basic implementation of the submit, invokeAll, invokeAny methods. ThreadPoolExecutor, ScheduledThreadPoolExecutor, and ForkJoinPool inherit from this class.

ThreadPoolExecutor & Factory




Executors - Factory class for creating ThreadPoolExecutor, ScheduledThreadPoolExecutor. If you need to create one of these pools, this factory is exactly what you need. Also, there are different adapters Runnable-Callable, PrivilegedAction-Callable, PrivilegedExceptionAction-Callable and others.

ThreadPoolExecutor- A very powerful and important class. Used to run asynchronous tasks in a thread pool. Thus, there is almost no overhead for raising and stopping flows. And due to the fixed maximum of threads in the pool, predicted application performance is provided. As previously mentioned, it is preferable to create this pool through one of the Executors factory methods. If standard configurations are not enough, then through the designers or setters you can set all the basic parameters of the pool. More details can be found in this topic .

ScheduledThreadPoolExecutor - In addition to the ThreadPoolExecutor methods, it allows you to start tasks after a certain delay, as well as with some periodicity, which allows you to implement Timer Service based on this class.

ThreadFactory - By default, ThreadPoolExecutor uses the standard thread factory obtained through Executors.defaultThreadFactory (). If you need something more, for example, setting a priority or a thread name, you can create a class with the implementation of this interface and pass it to ThreadPoolExecutor.

RejectedExecutionHandler- Allows you to define a handler for tasks that for some reason cannot be performed through ThreadPoolExecutor. Such a case may occur when there are no free threads or the service is shut down or shut down. Several standard implementations are in the ThreadPoolExecutor class: CallerRunsPolicy - starts the task in the calling thread; AbortPolicy - throws acceptance; DiscardPolicy - ignores the task; DiscardOldestPolicy - removes the oldest non-running task from the queue, then tries to add a new task again.

Fork join




In java 1.7, a new Fork Join framework has appeared for solving recursive tasks that work on divide and conquer algorithms or Map Reduce . To make it clearer, you can give a visual example of the quicksort sorting algorithm: So, by splitting into parts, you can achieve their parallel processing in different threads. To solve this problem, you can use the usual ThreadPoolExecutor, but due to frequent switching of the context and monitoring of execution control, all this does not work very effectively. Then the Fork Join framework comes to the rescue, which is based on the work-stealing algorithm. It reveals itself best in systems with a large number of processors. More information can be found on the blog here.

or Doug Lea publications . You can read about performance and scalability here .

ForkJoinPool - Represents the entry point for starting the root (main) ForkJoinTask tasks. Subtasks are launched through the methods of the task from which you need to shoot (fork). By default, a thread pool is created with the number of threads equal to the number of cores available to the JVM.

ForkJoinTask- Base class for all Fork Join tasks. The key methods include: fork () - adds a task to the queue of the current ForkJoinWorkerThread thread for asynchronous execution; invoke () - launches a task in the current thread; join () - waits for the completion of the subtask with the return of the result; invokeAll (...) - combines all three previous previous operations, performing two or more tasks in one go; adapt (...) - creates a new ForkJoinTask task from Runnable or Callable objects.

RecursiveTask - An abstract class from ForkJoinTask, with a declaration of the compute method, in which an asynchronous operation in the heir should be performed.

RecursiveAction - Differs from RecursiveTask in that it does not return a result.

ForkJoinWorkerThread- Used as the default implementation in ForkJoinPoll. If desired, you can inherit and overload the methods of initialization and completion of the worker thread.

Completion service




CompletionService - The service interface with the decoupling of the asynchronous task launch and obtaining the results. So, to add tasks, submit methods are used, and to pull out the results of completed tasks, the blocking take method and non-blocking poll are used.

ExecutorCompletionService - Essentially it is a wrapper over any class that implements the Executor interface, such as ThreadPoolExecutor or ForkJoinPool. It is used mainly when you want to abstract from the way you start tasks and control their execution. If there are completed tasks - pull them out, if not - wait in take until something finishes. The service is based on the default LinkedBlockingQueue, but any other implementation of BlockingQueue can be passed.



5. Locks




Condition - An interface that describes alternative methods with the standard wait / notify / notifyAll. An object with a condition is most often obtained from locks via the lock.newCondition () method. Thus, you can get several sets of wait / notify for one object.

Lock - The basic interface from the lock framework, which provides a more flexible approach to restricting access to resources / blocks than when using synchronized. So, when using several locks, the order of their release can be arbitrary. Plus, it is possible to follow an alternative scenario if the lock is already captured by someone.

Reentrantlock- Lock on entry. Only one thread can enter a protected block. The class supports fair and non-fair thread unlocking. With fair unlocking, the order in which threads that call lock () are released is respected. With “unfair” unlocking, the order of thread release is not guaranteed, but, as a bonus, this unlocking works faster. By default, “unfair” unlocking is used.

ReadWriteLock - An additional interface for creating read / write locks. Such locks are extremely useful when the system has many read operations and few write operations.

ReentrantReadWriteLock- It is very often used in multithreaded services and caches, showing a very good performance increase compared to synchronized blocks. In fact, the class works in 2 mutually exclusive modes: many readers read data in parallel and when only 1 writer writes data.

ReentrantReadWriteLock.ReadLock - Read lock for readers, obtained through readWriteLock.readLock ().

ReentrantReadWriteLock.WriteLock - Write lock for writers received via readWriteLock.writeLock ().

LockSupport - Designed for building classes with locks. Contains methods for parking threads instead of the obsolete Thread.suspend () and Thread.resume () methods.



AbstractOwnableSynchronizer- The base class for building synchronization mechanisms. It contains only one getter / setter pair for storing and reading an exclusive stream that can work with data.

AbstractQueuedSynchronizer - Used as the base class for the synchronization mechanism in FutureTask, CountDownLatch, Semaphore, ReentrantLock, ReentrantReadWriteLock. It can be used to create new synchronization mechanisms that rely on a single and atomic int value.

AbstractQueuedLongSynchronizer - A variation of AbstractQueuedSynchronizer that supports the atomic value of long.



6. Atomics




AtomicBoolean , AtomicInteger , AtomicLong , AtomicIntegerArray , AtomicLongArray - What if the class needs to synchronize access to one simple variable of type int? You can use constructions with synchronized, and when using atomic operations set / get, volatile is also suitable. But you can do even better by using the new Atomic * classes. By using CAS , operations with these classes are faster than if synchronized through synchronized / volatile. Plus, there are methods for atomic addition by a given value, as well as increment / decrement.

AtomicReference - A class for atomic operations with a reference to an object.

AtomicMarkableReference - Class for atomic operations with the following pair of fields: object reference and bit flag (true / false).

AtomicStampedReference - Class for atomic operations with the following pair of fields: object reference and int value.

AtomicReferenceArray - An array of object references that can be atomically updated.

AtomicIntegerFieldUpdater , AtomicLongFieldUpdater , AtomicReferenceFieldUpdater - Classes for atomically updating fields by their names through reflection. The field offset for CAS is determined in the constructor and cached, incl. there is no strong drop in performance due to reflection.

Instead of a conclusion


Thank you for reading to the end, or at least scrolling through the article at the end. Just want to emphasize that here is only a brief description of the classes without any examples. This is done specifically so as not to clutter up the article with excessive code insertions. The main idea: to give a quick overview of the classes in order to know in which direction to move and what can be used. Examples of using classes can be easily found on the Internet or in the source code of the classes themselves. I hope this post will help to solve interesting tasks with multithreading faster and more efficiently.
image

Also popular now: