JDK concurrent package

Original author: Alexey Kutuzov
  • Transfer
The memory model that currently exists in Java guarantees the expected execution order of multi-threaded code, in the absence of thread racing in this code. And in order to protect your code from racing, various ways of synchronizing and exchanging data between them have been invented.

The package java.util.concurrentincluded with HotSpot JDK provides the following tools for writing multi-threaded code:
  • Atomic
  • Locks
  • Collections
  • Synchronization points
  • Executors
  • Accumulators _jdk 1.8_


Atomic

The child package java.util.concurrent.atomiccontains a set of classes for atomic work with primitive types. The contract of these classes guarantees the execution of the operation compare-and-setfor "1 unit of processor time." When setting a new value for this variable, you also pass its old value (optimistic locking approach). If, from the moment the method is called, the variable value differs from the expected one, the result of execution will be false.

For example, take two arrays of longvariables [1,2,3,4,5]and [-1,-2,-3,-4,-5]. Each of the threads will be sequentially iterated over the array and summarize the elements into a single variable. The code ( groovy ) with a pessimistic lock looks like this:

class Sum {
    static monitor = new Object()
    static volatile long sum = 0
}
class Summer implements Callable {
    long[] data
    Object call() throws Exception {
        data.each {
            synchronized (Sum.monitor) {
                println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                Sum.sum += it
            }
        }
    }
}
Executors.newFixedThreadPool(2).invokeAll([
        new Summer(data: [1,2,3,4,5]),
        new Summer(data: [-1,-2,-3,-4,-5])
])
print("Sum: ${Sum.sum}")


The result of the execution will be expected:

pool-1-thread-1: add 1 to 0
pool-1-thread-2: add -1 to 1
pool-1-thread-1: add 2 to 0
pool-1-thread-2: add -2 to 2
pool-1-thread-1: add 3 to 0
pool-1-thread-2: add -3 to 3
pool-1-thread-1: add 4 to 0
pool-1-thread-1: add 5 to 4
pool-1-thread-2: add -4 to 9
pool-1-thread-2: add -5 to 5
Sum: 0


However, this approach has significant performance disadvantages. In this case, more useless work takes us more resources than useful:
  • attempt to lock the monitor
  • flow blocking
  • monitor unlock
  • thread unlock


Consider the use AtomicLongof optimistic blocking for the implementation of the calculation of the same amount:

class Sum {
    static volatile AtomicLong sum = new AtomicLong(0)
}
class Summer implements Callable {
    long[] data
    Object call() throws Exception {
        data.each {
                while(true) {
                    long localSum = Sum.sum.get()
                    if (Sum.sum.compareAndSet(localSum, localSum + it)) {
                        println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                        break;
                    } else {
                        println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                    }
                }
        }
    }
}
Executors.newFixedThreadPool(2).invokeAll([
        new Summer(data: [1,2,3,4,5]),
        new Summer(data: [-1,-2,-3,-4,-5])
])
print("Sum: ${Sum.sum}")


As you can see from the results of the “erroneous” attempts, there were not so many:

[MISS!] pool-1-thread-1: add 1 to -1
pool-1-thread-2: add -1 to -1
pool-1-thread-2: add -2 to -3
[MISS!] pool-1-thread-1: add 1 to -3
pool-1-thread-2: add -3 to -6
pool-1-thread-1: add 1 to -5
[MISS!] pool-1-thread-2: add -4 to -5
pool-1-thread-1: add 2 to -7
pool-1-thread-2: add -4 to -7
pool-1-thread-1: add 3 to -9
pool-1-thread-2: add -5 to -9
pool-1-thread-1: add 4 to -5
pool-1-thread-1: add 5 to 0
Sum: 0


When deciding to use optimistic locking, it is important that the action with the variable being modified does not take much time. The longer this action is, the more erroneous will happen compare-and-set, and the more often you will have to perform this action again.

Based on compare-and-setthis, a non-blocking read lock may also be implemented. In this case, the atomic variable will store the version of the object being processed. Having received the version value before calculations, we can verify it after calculation. Regular read-writelocks take effect only if the version check fails.

class Transaction {
    long debit
}
class Account {
    AtomicLong version = new AtomicLong()
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock()
    List transactions = new ArrayList()
}
long  balance(Account account) {
    ReentrantReadWriteLock.ReadLock locked
    while(true) {
        long balance = 0
        long version = account.version.get()
        account.transactions.each {balance += it.debit}
        //volatile write for JMM
        if (account.version.compareAndSet(version, version)) {
            if (locked) {locked.unlock()}
            return balance
        } else {
            locked = account.readWriteLock.readLock()
        }
    }
}
void modifyTransaction(Account account, int position, long newDebit) {
    def writeLock = account.readWriteLock.writeLock()
    account.version.incrementAndGet()
    account.transactions[position].debit = newDebit
    writeLock.unlock()
}


Locks

Reentrantlock

Unlike syncronized locks, ReentrantLock it allows you to more flexibly select the moments of removal and receipt of locks since Uses regular Java calls. It also ReentrantLockallows you to get information about the current state of the lock, allows you to "wait" for a lock for a certain time. Supports the correct recursive retrieval and release of locks for a single thread. If you need honest locks (observing the sequence when capturing the monitor) - it is ReentrantLockalso equipped with this mechanism.

Despite the fact that syncronizedand ReentrantLocklock are very similar - implementation on the JVM level differs quite strongly.
Without going into JMM details: useReentrantLockinstead of the syncronized lock provided by the JVM, it is only necessary if you very often have a battle of threads for the monitor. In the case when only one thread gets into the syncronized method, the performance is ReentrantLockinferior to the JVM locking mechanism.

ReentrantReadWriteLock

Complementing the properties with the ReentrantLockability to capture many read locks and write locks. A write lock can be “lowered” before a read lock, if necessary.

StampedLock _jdk 1.8_

It implements optimistic and pessimistic read-write locks with the possibility of their further increase or decrease. Optimistic locking is implemented through the “stamp” of the lock ( javadoc ):

double distanceFromOriginV1() { // A read-only method
 long stamp;
 if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic
   double currentX = x;
   double currentY = y;
   if (sl.validate(stamp))
     return Math.sqrt(currentX * currentX + currentY * currentY);
 }
 stamp = sl.readLock(); // fall back to read lock
 try {
   double currentX = x;
   double currentY = y;
     return Math.sqrt(currentX * currentX + currentY * currentY);
 } finally {
   sl.unlockRead(stamp);
 }
}


Collections

ArrayBlockingQueue

An honest queue for sending a message from one thread to another. Supports blocking ( put()take()) and non-blocking ( offer()pool()) methods. Forbids null values. Queue capacity must be specified at creation.

Concurrenthashmap

Key-value structure based on hashfunction. There are no read locks. When recording, only part of the card (segment) is blocked. The number of segments is limited to the closest to concurrencyLeveldegree 2.

ConcurrentSkipListMap

Balanced multithreaded key-value structure (O (log n)). The search is based on a skipped list. The card should be able to compare keys.

ConcurrentSkipListSet

ConcurrentSkipListMap without values.

CopyOnWriteArrayList

A write-block, non-read-block list. Any modification creates a new instance of the array in memory.

CopyOnWriteArraySet

CopyOnWriteArrayList without values.

Delayquueue

PriorityBlockingQueueallowing to receive an element only after a certain delay (the delay is declared through the Delayedinterface of the object). DelayQueuecan be used to implement the scheduler. Queue capacity is not fixed.

LinkedBlockingDeque

Bidirectional BlockingQueue, based on coherence (cache-miss & cache coherence overhead). Queue capacity is not fixed.

LinkedBlockingQueue

Unidirectional BlockingQueue, based on coherence (cache-miss & cache coherence overhead). Queue capacity is not fixed.

LinkedTransferQueue

Unidirectional `BlockingQueue` based on connectivity (cache-miss & cache coherence overhead). Queue capacity is not fixed. This queue allows you to wait for the handler to pick up the element.

PriorityBlockingQueue

Unidirectional `BlockingQueue`, allowing to prioritize messages (through comparison of elements). Forbids null values.

SynchronousQueue

Unidirectional `BlockingQueue` that implements transfer()logic for put()methods.

Synchronization points

CountDownLatch

A barrier ( await()) waiting for a specific (or more) number of calls countDown(). The state of the barrier cannot be reset.

CyclicBarrier

A barrier ( await()) waiting for a specific number of calls by await()other threads. When the number of threads reaches the specified, an optional callback will be called and the lock will be released. The barrier resets its state to the initial state upon release of pending flows and can be reused.

Exchanger

Barrier (`exchange ()`) to synchronize two threads. At the time of synchronization, volatile transfer of objects between threads is possible.

Phaser

Extension `CyclicBarrier`, which allows to register and remove participants for each cycle of the barrier.

Semaphore

A barrier allowing only the specified number of threads to capture the monitor. Essentially expands the functionality of `Lock` the ability to be in the block to multiple threads.

Executors


ExecutorServicecame to replace new Thread(runnable)to simplify work with streams. ExecutorServiceIt helps to reuse freed threads, organize queues from tasks for a thread pool, and subscribe to the result of a task. Instead of an interface, the Runnablepool uses an interface Callable(it can return a result and throw errors).

ExecutorService pool = Executors.newFixedThreadPool(4)
Future future = pool.submit(new Callable() {
    Object call() throws Exception {
        println("In thread")
        return "From thread"
    }
})
println("From main")
println(future.get())
try {
    pool.submit(new Callable() {
        Object call() throws Exception {
            throw new IllegalStateException()
        }
    }).get()
} catch (ExecutionException e) {println("Got it: ${e.cause}")}
pool.shutdown()


The method invokeAllgives control to the calling thread only upon completion of all tasks. The method invokeAnyreturns the result of the first successfully completed task, canceling all subsequent ones.

ThreadPoolExecutor

A pool of threads with the ability to specify the working and maximum number of threads in the pool, a queue for tasks.

ScheduledThreadPoolExecutor

Extends functionality with the ThreadPoolExecutorability to perform tasks deferred or regularly.

ThreadPoolExecutor

Lighter thread pool for self-replicating tasks. The pool expects calls to the `fork ()` and `join ()` methods of the child tasks in the parent.

class LNode {
    List childs = []
    def object
}
class Finder extends RecursiveTask {
    LNode  node
    Object expect
    protected LNode compute() {
        if (node?.object?.equals(expect)) {
            return node
        }
        node?.childs?.collect {
            new Finder(node: it, expect: expect).fork()
        }?.collect {
            it.join()
        }?.find {
            it != null
        }
    }
}
ForkJoinPool es = new ForkJoinPool()
def invoke = es.invoke(new Finder(
        node: new LNode(
                childs: [
                        new LNode(object: "ivalid"),
                        new LNode(
                                object: "ivalid",
                                childs: [new LNode(object: "test")]
                        )
                ]
        ),
        expect: "test"
))
print("${invoke?.object}")


Accumulators _jdk 1.8_

Batteries allow you to perform primitive operations (sum / search for the maximum value) on numeric elements in a multi-threaded environment without using CAS.

Also popular now: