JDK concurrent package
- Transfer
The memory model that currently exists in Java guarantees the expected execution order of multi-threaded code, in the absence of thread racing in this code. And in order to protect your code from racing, various ways of synchronizing and exchanging data between them have been invented.
The package
The child package
For example, take two arrays of
The result of the execution will be expected:
However, this approach has significant performance disadvantages. In this case, more useless work takes us more resources than useful:
Consider the use
As you can see from the results of the “erroneous” attempts, there were not so many:
When deciding to use optimistic locking, it is important that the action with the variable being modified does not take much time. The longer this action is, the more erroneous will happen
Based on
Unlike syncronized locks,
Despite the fact that
Without going into JMM details: use
Complementing the properties with the
It implements optimistic and pessimistic read-write locks with the possibility of their further increase or decrease. Optimistic locking is implemented through the “stamp” of the lock ( javadoc ):
An honest queue for sending a message from one thread to another. Supports blocking (
Key-value structure based on
Balanced multithreaded key-value structure (O (log n)). The search is based on a skipped list. The card should be able to compare keys.
A write-block, non-read-block list. Any modification creates a new instance of the array in memory.
Bidirectional
Unidirectional
Unidirectional `BlockingQueue` based on connectivity (cache-miss & cache coherence overhead). Queue capacity is not fixed. This queue allows you to wait for the handler to pick up the element.
Unidirectional `BlockingQueue`, allowing to prioritize messages (through comparison of elements). Forbids null values.
Unidirectional `BlockingQueue` that implements
A barrier (
A barrier (
Barrier (`exchange ()`) to synchronize two threads. At the time of synchronization, volatile transfer of objects between threads is possible.
Extension `CyclicBarrier`, which allows to register and remove participants for each cycle of the barrier.
A barrier allowing only the specified number of threads to capture the monitor. Essentially expands the functionality of `Lock` the ability to be in the block to multiple threads.
The method
A pool of threads with the ability to specify the working and maximum number of threads in the pool, a queue for tasks.
Extends functionality with the
Lighter thread pool for self-replicating tasks. The pool expects calls to the `fork ()` and `join ()` methods of the child tasks in the parent.
Batteries allow you to perform primitive operations (sum / search for the maximum value) on numeric elements in a multi-threaded environment without using CAS.
The package
java.util.concurrentincluded with HotSpot JDK provides the following tools for writing multi-threaded code:- Atomic
- Locks
- Collections
- Synchronization points
- Executors
- Accumulators _jdk 1.8_
Atomic
The child package
java.util.concurrent.atomiccontains a set of classes for atomic work with primitive types. The contract of these classes guarantees the execution of the operation compare-and-setfor "1 unit of processor time." When setting a new value for this variable, you also pass its old value (optimistic locking approach). If, from the moment the method is called, the variable value differs from the expected one, the result of execution will be false. For example, take two arrays of
longvariables [1,2,3,4,5]and [-1,-2,-3,-4,-5]. Each of the threads will be sequentially iterated over the array and summarize the elements into a single variable. The code ( groovy ) with a pessimistic lock looks like this:class Sum {
static monitor = new Object()
static volatile long sum = 0
}
class Summer implements Callable {
long[] data
Object call() throws Exception {
data.each {
synchronized (Sum.monitor) {
println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
Sum.sum += it
}
}
}
}
Executors.newFixedThreadPool(2).invokeAll([
new Summer(data: [1,2,3,4,5]),
new Summer(data: [-1,-2,-3,-4,-5])
])
print("Sum: ${Sum.sum}")
The result of the execution will be expected:
pool-1-thread-1: add 1 to 0
pool-1-thread-2: add -1 to 1
pool-1-thread-1: add 2 to 0
pool-1-thread-2: add -2 to 2
pool-1-thread-1: add 3 to 0
pool-1-thread-2: add -3 to 3
pool-1-thread-1: add 4 to 0
pool-1-thread-1: add 5 to 4
pool-1-thread-2: add -4 to 9
pool-1-thread-2: add -5 to 5
Sum: 0
However, this approach has significant performance disadvantages. In this case, more useless work takes us more resources than useful:
- attempt to lock the monitor
- flow blocking
- monitor unlock
- thread unlock
Consider the use
AtomicLongof optimistic blocking for the implementation of the calculation of the same amount:class Sum {
static volatile AtomicLong sum = new AtomicLong(0)
}
class Summer implements Callable {
long[] data
Object call() throws Exception {
data.each {
while(true) {
long localSum = Sum.sum.get()
if (Sum.sum.compareAndSet(localSum, localSum + it)) {
println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
break;
} else {
println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
}
}
}
}
}
Executors.newFixedThreadPool(2).invokeAll([
new Summer(data: [1,2,3,4,5]),
new Summer(data: [-1,-2,-3,-4,-5])
])
print("Sum: ${Sum.sum}")
As you can see from the results of the “erroneous” attempts, there were not so many:
[MISS!] pool-1-thread-1: add 1 to -1
pool-1-thread-2: add -1 to -1
pool-1-thread-2: add -2 to -3
[MISS!] pool-1-thread-1: add 1 to -3
pool-1-thread-2: add -3 to -6
pool-1-thread-1: add 1 to -5
[MISS!] pool-1-thread-2: add -4 to -5
pool-1-thread-1: add 2 to -7
pool-1-thread-2: add -4 to -7
pool-1-thread-1: add 3 to -9
pool-1-thread-2: add -5 to -9
pool-1-thread-1: add 4 to -5
pool-1-thread-1: add 5 to 0
Sum: 0
When deciding to use optimistic locking, it is important that the action with the variable being modified does not take much time. The longer this action is, the more erroneous will happen
compare-and-set, and the more often you will have to perform this action again. Based on
compare-and-setthis, a non-blocking read lock may also be implemented. In this case, the atomic variable will store the version of the object being processed. Having received the version value before calculations, we can verify it after calculation. Regular read-writelocks take effect only if the version check fails.class Transaction {
long debit
}
class Account {
AtomicLong version = new AtomicLong()
ReadWriteLock readWriteLock = new ReentrantReadWriteLock()
List transactions = new ArrayList()
}
long balance(Account account) {
ReentrantReadWriteLock.ReadLock locked
while(true) {
long balance = 0
long version = account.version.get()
account.transactions.each {balance += it.debit}
//volatile write for JMM
if (account.version.compareAndSet(version, version)) {
if (locked) {locked.unlock()}
return balance
} else {
locked = account.readWriteLock.readLock()
}
}
}
void modifyTransaction(Account account, int position, long newDebit) {
def writeLock = account.readWriteLock.writeLock()
account.version.incrementAndGet()
account.transactions[position].debit = newDebit
writeLock.unlock()
}
Locks
Reentrantlock
Unlike syncronized locks,
ReentrantLock it allows you to more flexibly select the moments of removal and receipt of locks since Uses regular Java calls. It also ReentrantLockallows you to get information about the current state of the lock, allows you to "wait" for a lock for a certain time. Supports the correct recursive retrieval and release of locks for a single thread. If you need honest locks (observing the sequence when capturing the monitor) - it is ReentrantLockalso equipped with this mechanism. Despite the fact that
syncronizedand ReentrantLocklock are very similar - implementation on the JVM level differs quite strongly. Without going into JMM details: use
ReentrantLockinstead of the syncronized lock provided by the JVM, it is only necessary if you very often have a battle of threads for the monitor. In the case when only one thread gets into the syncronized method, the performance is ReentrantLockinferior to the JVM locking mechanism.ReentrantReadWriteLock
Complementing the properties with the
ReentrantLockability to capture many read locks and write locks. A write lock can be “lowered” before a read lock, if necessary.StampedLock _jdk 1.8_
It implements optimistic and pessimistic read-write locks with the possibility of their further increase or decrease. Optimistic locking is implemented through the “stamp” of the lock ( javadoc ):
double distanceFromOriginV1() { // A read-only method
long stamp;
if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic
double currentX = x;
double currentY = y;
if (sl.validate(stamp))
return Math.sqrt(currentX * currentX + currentY * currentY);
}
stamp = sl.readLock(); // fall back to read lock
try {
double currentX = x;
double currentY = y;
return Math.sqrt(currentX * currentX + currentY * currentY);
} finally {
sl.unlockRead(stamp);
}
}
Collections
ArrayBlockingQueue
An honest queue for sending a message from one thread to another. Supports blocking (
put()take()) and non-blocking ( offer()pool()) methods. Forbids null values. Queue capacity must be specified at creation.Concurrenthashmap
Key-value structure based on
hashfunction. There are no read locks. When recording, only part of the card (segment) is blocked. The number of segments is limited to the closest to concurrencyLeveldegree 2.ConcurrentSkipListMap
Balanced multithreaded key-value structure (O (log n)). The search is based on a skipped list. The card should be able to compare keys.
ConcurrentSkipListSet
ConcurrentSkipListMap without values.CopyOnWriteArrayList
A write-block, non-read-block list. Any modification creates a new instance of the array in memory.
CopyOnWriteArraySet
CopyOnWriteArrayList without values.Delayquueue
PriorityBlockingQueueallowing to receive an element only after a certain delay (the delay is declared through the Delayedinterface of the object). DelayQueuecan be used to implement the scheduler. Queue capacity is not fixed.LinkedBlockingDeque
Bidirectional
BlockingQueue, based on coherence (cache-miss & cache coherence overhead). Queue capacity is not fixed.LinkedBlockingQueue
Unidirectional
BlockingQueue, based on coherence (cache-miss & cache coherence overhead). Queue capacity is not fixed.LinkedTransferQueue
Unidirectional `BlockingQueue` based on connectivity (cache-miss & cache coherence overhead). Queue capacity is not fixed. This queue allows you to wait for the handler to pick up the element.
PriorityBlockingQueue
Unidirectional `BlockingQueue`, allowing to prioritize messages (through comparison of elements). Forbids null values.
SynchronousQueue
Unidirectional `BlockingQueue` that implements
transfer()logic for put()methods.Synchronization points
CountDownLatch
A barrier (
await()) waiting for a specific (or more) number of calls countDown(). The state of the barrier cannot be reset.CyclicBarrier
A barrier (
await()) waiting for a specific number of calls by await()other threads. When the number of threads reaches the specified, an optional callback will be called and the lock will be released. The barrier resets its state to the initial state upon release of pending flows and can be reused.Exchanger
Barrier (`exchange ()`) to synchronize two threads. At the time of synchronization, volatile transfer of objects between threads is possible.
Phaser
Extension `CyclicBarrier`, which allows to register and remove participants for each cycle of the barrier.
Semaphore
A barrier allowing only the specified number of threads to capture the monitor. Essentially expands the functionality of `Lock` the ability to be in the block to multiple threads.
Executors
ExecutorServicecame to replace new Thread(runnable)to simplify work with streams. ExecutorServiceIt helps to reuse freed threads, organize queues from tasks for a thread pool, and subscribe to the result of a task. Instead of an interface, the Runnablepool uses an interface Callable(it can return a result and throw errors).ExecutorService pool = Executors.newFixedThreadPool(4)
Future future = pool.submit(new Callable() {
Object call() throws Exception {
println("In thread")
return "From thread"
}
})
println("From main")
println(future.get())
try {
pool.submit(new Callable() {
Object call() throws Exception {
throw new IllegalStateException()
}
}).get()
} catch (ExecutionException e) {println("Got it: ${e.cause}")}
pool.shutdown()
The method
invokeAllgives control to the calling thread only upon completion of all tasks. The method invokeAnyreturns the result of the first successfully completed task, canceling all subsequent ones.ThreadPoolExecutor
A pool of threads with the ability to specify the working and maximum number of threads in the pool, a queue for tasks.
ScheduledThreadPoolExecutor
Extends functionality with the
ThreadPoolExecutorability to perform tasks deferred or regularly.ThreadPoolExecutor
Lighter thread pool for self-replicating tasks. The pool expects calls to the `fork ()` and `join ()` methods of the child tasks in the parent.
class LNode {
List childs = []
def object
}
class Finder extends RecursiveTask {
LNode node
Object expect
protected LNode compute() {
if (node?.object?.equals(expect)) {
return node
}
node?.childs?.collect {
new Finder(node: it, expect: expect).fork()
}?.collect {
it.join()
}?.find {
it != null
}
}
}
ForkJoinPool es = new ForkJoinPool()
def invoke = es.invoke(new Finder(
node: new LNode(
childs: [
new LNode(object: "ivalid"),
new LNode(
object: "ivalid",
childs: [new LNode(object: "test")]
)
]
),
expect: "test"
))
print("${invoke?.object}")
Accumulators _jdk 1.8_
Batteries allow you to perform primitive operations (sum / search for the maximum value) on numeric elements in a multi-threaded environment without using CAS.