Analysis of the main concepts of parallelism

Original author: Igor Sorokin, Alex Miller
  • Transfer
Coffee for everyone!

Tomorrow, we’ll smoothly launch the almost anniversary stream “Java Developer” course - the sixth in a row since last April. And this means that we again selected, translated the most interesting material that we share with you.

Go!

This guide will help Java developers working with multi-threaded programs to understand the basic concepts of concurrency and how to use them. You will learn about key aspects of the Java language with links to the standard library.

SECTION 1

Introduction

Since its inception, Java has supported key concurrency concepts such as threads and locks. This guide will help Java developers working with multi-threaded programs to understand the basic concepts of concurrency and how to use them.

SECTION 2

Concepts

ConceptDescription
Atomicity (atomicity)An atomic operation is an operation that is performed completely or not at all, partial execution is impossible.
VisibilityConditions under which one thread sees changes made by another thread

Table 1: Concurrency Concepts



Race condition

A race condition occurs when the same resource is used by multiple threads at the same time, and depending on the order of actions of each thread, there may be several possible outcomes. The code below is not thread safe, and a variable value can be initialized more than once, since check-then-act(checking for nulland then initializing), which lazily initializes the field, is not atomic :

class Lazy  {
 private volatile T value;
 T get() {
   if (value == null)
     value = initialize();
   return value;
 }
}

Data race

A data race occurs when two or more threads try to access the same non-final variable without synchronization. Lack of synchronization can lead to changes that will not be visible to other threads, because of this it is possible to read stale data, which in turn leads to endless loops, damaged data structures or inaccurate calculations. This code can lead to an endless loop, because the read stream may not notice the changes made by rewriting threads:

class Waiter implements Runnable {
 private boolean shouldFinish;
 void finish() { shouldFinish = true; }
 public void run() {
   long iteration = 0;
   while (!shouldFinish) {
     iteration++;
   }
   System.out.println("Finished after: " + iteration);
 }
}
class DataRace {
 public static void main(String[] args) throws InterruptedException {
   Waiter waiter = new Waiter();
   Thread waiterThread = new Thread(waiter);
   waiterThread.start();
   waiter.finish();
   waiterThread.join();
 }
}

SECTION 3

Java memory model: happens-before relationship

The Java memory model is defined in terms of actions such as reading / writing fields and synchronizing in the monitor. The actions are ordered using the happens-before relationship (which is executed before), which can be used to explain when a thread sees the result of the actions of another thread and what a correctly synchronized program is.

HAPPENS-BEFORE RELATIONS HAVE THE FOLLOWING PROPERTIES:

  • A call to Thread # start occurs before any action in this thread.
  • The return of the monitor occurs before any subsequent capture of the same monitor.
  • Writing to a volatile variable occurs before any subsequent reading of the volatile variable.
  • Writing to the final variable occurs before the publication of the object link.
  • All actions in the thread are executed until they return from Thread # join in this thread.

In Image 1, Action Xoccurs before Action Y, therefore, in Thread 2all operations to the right of Action Ywill see all operations to the left of Action X c Thread 1.


Image 1: Example happens-before


SECTION 4

Standard synchronization functions

Keywordsynchronized

Keyword is synchronized used to prevent different threads from simultaneously executing the same code block. It ensures that if you receive a lock (by entering the synchronized block), the data that this lock is applied to is processed in exclusive mode, so the operation can be considered atomic. In addition, it ensures that other threads will see the result of the operation after they receive the same lock.

class AtomicOperation {
 private int counter0;
 private int counter1;
 void increment() {
   synchronized (this) {
     counter0++;
     counter1++;
   }
 }
}

The synchronized keyword can also be expanded at the method level.

TYPE OF METHODLINK USED AS A MONITOR
staticClass object reference
non-staticthis link

Table 2: Monitors that are used when the entire method is synchronized.

A reentrant is blocked, so if the thread already contains the lock, it can successfully retrieve it again.

class Reentrantcy {
 synchronized void doAll() {
   doFirst();
   doSecond();
 }
 synchronized void doFirst() {
   System.out.println("First operation is successful.");
 }
 synchronized void doSecond() {
   System.out.println("Second operation is successful.");
 }
}

The level of competition affects the way the monitor is captured:

conditionDescription
initJust created, until no one was captured.
biasedThere is no fight, and code protected by locking is executed by only one thread. Cheapest to capture.
thinThe monitor is captured by several threads without a fight. For blocking, a relatively cheap CAS is used.
fatThere is a struggle. The JVM requests OS mutexes and allows the OS scheduler to handle thread parking and wakeups.

Table 3: Monitor states Methods are declared in the class . used to force a thread to go into state or (if a timeout value is passed). To wake up a stream, you can do any of these actions:

wait/notify

wait/notify/notifyAllObjectwait WAITING TIMED_WAITING

  • Another thread calls notify, which wakes up an arbitrary thread waiting on the monitor.
  • Another thread calls notifyAll, which wakes up all threads waiting on the monitor.
  • Thread # interrupt is called. In this case, an InterruptedException is thrown.

The most common example is a conditional loop:

class ConditionLoop {
 private boolean condition;
 synchronized void waitForCondition() throws InterruptedException {
   while (!condition) {
     wait();
   }
 }
 synchronized void satisfyCondition() {
   condition = true;
   notifyAll();
 }
}

  • Keep in mind that in order to use wait/notify/notifyAllfor an object, you must first lock the object.
  • Always wait inside the loop to check for the condition that you are expecting to meet. This concerns the synchronization problem if another thread satisfies the condition before the start of the wait. In addition, it protects your code from collateral awakenings that may (and will) occur.
  • Always verify that you meet the wait condition before calling notify / notifyAll. Failure to do so will result in a notification, but the thread cannot escape the wait loop.

The keywordvolatile

volatile solves the visibility problem and makes the value change atomic , because there is a happens-before relationship: writing to the volatile variable occurs before any subsequent reading of the volatile variable. Thus, it ensures that the next time the field is read, the value that was set by the most recent record will be visible.

class VolatileFlag implements Runnable {
 private volatile boolean shouldStop;
 public void run() {
   while (!shouldStop) {
     //do smth
   }
   System.out.println("Stopped.");
 }
 void stop() {
   shouldStop = true;
 }
 public static void main(String[] args) throws InterruptedException {
   VolatileFlag flag = new VolatileFlag();
   Thread thread = new Thread(flag);
   thread.start();
   flag.stop();
   thread.join();
 }
}

Atomicity

A package java.util.concurrent.atomic contains a set of classes that support compound atomic actions on a single value without locking, like volatile.

Using the AtomicXXX classes, you can implement the atomic operation check-then-act:

class CheckThenAct {
 private final AtomicReference value = new AtomicReference<>();
 void initialize() {
   if (value.compareAndSet(null, "Initialized value")) {
     System.out.println("Initialized only once.");
   }
 }
}

And AtomicInteger, and AtomicLong have an atomic increment / decrement operation:

class Increment {
 private final AtomicInteger state = new AtomicInteger();
 void advance() {
   int oldState = state.getAndIncrement();
   System.out.println("Advanced: '" + oldState + "' -> '" + (oldState + 1) + "'.");
 }
}

If you need a counter and there is no need to get its value atomically, consider using LongAdder instead AtomicLong/AtomicInteger. LongAdder processes the value in several cells and increases their number, if necessary, and, therefore, it works better in high competition. One way to store data in a stream and make blocking optional is to use storage . Conceptually acts as if each thread has its own version of the variable. commonly used to capture the values ​​of each thread, such as a “current transaction”, or other resources. In addition, they are used to maintain flow counters, statistics, or identifier generators.

ThreadLocal

ThreadLocalThreadLocal ThreadLocal

class TransactionManager {
 private final ThreadLocal currentTransaction
     = ThreadLocal.withInitial(NullTransaction::new);
 Transaction currentTransaction() {
   Transaction current = currentTransaction.get();
   if (current.isNull()) {
     current = new TransactionImpl();
     currentTransaction.set(current);
   }
   return current;
 }
}

SECTION 5

Safe Publishing

Publishing an object makes its link available outside the current area (for example, returning a link from a getter). Ensuring that an object is securely published (only when it's fully created) may require synchronization. Publication security can be achieved using:

  • Static initializers. Only one thread can initialize static variables, since class initialization is performed under exclusive lock.

class StaticInitializer {
 // Публикация неизменяемого объекта без дополнительной инициализации
 public static final Year year = Year.of(2017);
 public static final Set keywords;
 // Использование статического инициализатора для построения сложного объекта
 static {
   // Создание изменяемого множества
   Set keywordsSet = new HashSet<>();
   // Состояние инициализации
   keywordsSet.add("java");
   keywordsSet.add("concurrency");
   // Делаем множество немодифицируемым
   keywords = Collections.unmodifiableSet(keywordsSet);
 }
}

  • Volatile fields. The reading stream will always read the last value, because writing to the volatile variable occurs before (happens before) any subsequent reading.

class Volatile {
 private volatile String state;
 void setState(String state) {
   this.state = state;
 }
 String getState() {
   return state;
 }
}

  • Atomicity. For example, it AtomicInteger stores a value in a volatile field, so the rule for volatile variables is also applicable here.

class Atomics {
 private final AtomicInteger state = new AtomicInteger();
 void initializeState(int state) {
   this.state.compareAndSet(0, state);
 }
 int getState() {
   return state.get();
 }
}

  • Final fields.

class Final {
 private final String state;
 Final(String state) {
   this.state = state;
 }
 String getState() {
   return state;
 }
}

Make sure that this link does not evaporate during creation.

class ThisEscapes {
private final String name;
ThisEscapes(String name) {
  Cache.putIntoCache(this);
  this.name = name;
}
String getName() { return name; }
}
class Cache {
private static final Map CACHE = new ConcurrentHashMap<>();
static void putIntoCache(ThisEscapes thisEscapes) {
  // 'this' ссылка испарилась прежде, чем объект полностью сконструирован.
  CACHE.putIfAbsent(thisEscapes.getName(), thisEscapes);
}
}

  • Correctly synchronized fields.

class Synchronization {
 private String state;
 synchronized String getState() {
   if (state == null)
     state = "Initial";
   return state;
 }
}

SECTION 6

Immutable objects

One of the most remarkable properties of immutable objects is thread safety , so synchronization is not necessary for them. Requirements for an immutable object:

  • All fields are final fields.
  • All fields must be either mutable or immutable objects, but not go beyond the boundaries of the object, so the state of the object cannot be changed after creation.
  • The this link does not disappear at creation time.
  • A class is a final class, so redefining its behavior in subclasses is not possible.

An example of an immutable object:

// Помечается как final - подклассы запрещены
public final class Artist {
 // Неизменяемый объект, поле final
 private final String name;
 // Коллекция неизменяемых объектов, final поле
 private final List tracks;
 public Artist(String name, List tracks) {
   this.name = name;
   // Защитная копия
   List copy = new ArrayList<>(tracks);
   // Превращение изменяемой коллекции в неизменяемую
   this.tracks = Collections.unmodifiableList(copy);
   // 'this' никуда не передается во время создания
 }
 // Getters, equals, hashCode, toString
}
//Помечается как final - запрещается наследование
public final class Track {
 //Неизменяемый объект, поле final
 private final String title;
 public Track(String title) {
   this.title = title;
 }
 // Getters, equals, hashCode, toString
}

SECTION 7

Streams The

class is java.lang.Threadused to represent an application or JVM thread. The code is always executed in the context of some Thread class (to get the current thread you can useThread#currentThread()).

conditionDescription
NEWDid not start.
RUNNABLEIt is up and running.
BLOCKEDWaiting on the monitor - he is trying to get a lock and enter the critical section.
WAITINGWaiting for a specific action to be performed by another thread (notify / notifyAll, LockSupport # unpark).
TIMED_WAITINGSame as WAITING, but with a timeout.
TERMINATEDStopped

Table 4: Stream States

Stream methodDescription
startStarts an instance of the Thread class and executes the run () method.
joinBlocks until the end of the stream.
interruptInterrupts a stream. If a thread is blocked in a method that responds to interrupts, an InterruptedException will be thrown in another thread, otherwise the interrupt status will be set.
stop, suspend, resume, destroyAll of these methods are outdated. They perform dangerous operations depending on the state of the flow in question. Instead, use Thread # interrupt () or the volatile flag to tell the thread what it should do

Table 5: Thread coordination methods Thread coordination methods

How to handle InterruptedException?

  • Clear all resources and terminate the thread, if possible at the current level.
  • Declare that the current method throws an InterruptedException.
  • If the method does not raise an InterruptedException, the interrupted flag should be reset to true by calling Thread.currentThread (). Interrupt () and an exception should be thrown that is more appropriate at this level. It is very important to return the true flag in order to enable the handling of interrupts at a higher level.

Handling unexpected exceptions

In threads, it can be indicated UncaughtExceptionHandlerwhich will receive a notification of any uncaught exception due to which the thread is interrupted.

Thread thread = new Thread(runnable);
thread.setUncaughtExceptionHandler((failedThread, exception) -> {
 logger.error("Caught unexpected exception in thread '{}'.",
     failedThread.getName(), exception);
});
thread.start();

SECTION 8

Liveness , or deadlock, occurs when there are several threads and each one expects a resource belonging to another thread, so that a cycle is formed of the resources and the threads waiting for them. The most obvious type of resource is an object monitor, but any resource that causes a lock (for example ) also works. Potential Deadlock Example:

Deadlock

Deadlockwait/notify



class Account {
 private long amount;
 void plus(long amount) { this.amount += amount; }
 void minus(long amount) {
   if (this.amount < amount)
     throw new IllegalArgumentException();
   else
     this.amount -= amount;
 }
 static void transferWithDeadlock(long amount, Account first, Account second){
   synchronized (first) {
     synchronized (second) {
       first.minus(amount);
       second.plus(amount);
     }
   }
 }
}

Mutual locking occurs if at the same time:

  • One thread is trying to transfer data from one account to another and has already imposed a lock on the first account.
  • Another thread is trying to transfer data from the second account to the first, and has already imposed a lock on the second account.

Ways to prevent deadlock:

  • Lock order - always lock in the same order.

class Account {
 private long id;
 private long amount;
 // Некоторые методы опущены
 static void transferWithLockOrdering(long amount, Account first, Account second){
   boolean lockOnFirstAccountFirst = first.id < second.id;
   Account firstLock = lockOnFirstAccountFirst  ? first  : second;
   Account secondLock = lockOnFirstAccountFirst ? second : first;
   synchronized (firstLock) {
     synchronized (secondLock) {
       first.minus(amount);
       second.plus(amount);
     }
   }
 }
}

  • Блокировка с тайм-аутом — не блокируйте бессрочно при наложении блокировки, лучше как можно быстрее снимите все блокировки и попробуйте снова.

class Account {
 private long amount;
// Некоторые методы опущены
 static void transferWithTimeout(
     long amount, Account first, Account second, int retries, long timeoutMillis
 ) throws InterruptedException {
   for (int attempt = 0; attempt < retries; attempt++) {
     if (first.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
     {
       try {
         if (second.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
         {
           try {
             first.minus(amount);
             second.plus(amount);
           }
           finally {
             second.lock.unlock();
           }
         }
       }
       finally {
         first.lock.unlock();
       }
     }
   }
 }
}

JVM is able to detect mutual locks of monitors and display information about them in stream dumps.

Livelock and streaming starvation

Livelock occurs when threads spend all their time negotiating access to a resource or discover and avoid a deadlock so that the thread does not actually move forward. Starvation occurs when threads keep blocking for extended periods, so some threads starve without progress.

SECTION 9 Stream Pools The primary interface for stream pools is also to provide a static Executors factory that contains factory methods for creating a thread pool with the most common configurations.

java.util.concurrent



ExecutorService.java.util.concurrent

MethodDescription
newSingleThreadExecutorReturns an ExecutorService with only one thread.
newFixedThreadPoolReturns an ExecutorService with a fixed number of threads.
newCachedThreadPoolReturns an ExecutorService with a pool of threads of various sizes.
newSingleThreadScheduledExecutorReturns a ScheduledExecutorService with a single thread.
newScheduledThreadPoolReturns a ScheduledExecutorService with the main set of threads.
newWorkStealingPoolReturns the caching task ExecutorService.

Table 6: Static Factory Methods

When determining the size of thread pools, it is often useful to determine the number of logical cores in the machine on which the application is running. You can get this value in Java by calling Runtime.getRuntime().AvailableProcessors().

ImplementationDescription
ThreadPoolExecutorThe default implementation is with a resizing thread pool, one work queue and a custom policy for rejected tasks (via RejectedExecutionHandler) and thread creation (via ThreadFactory).
ScheduledThreadPoolExecutorExtension ThreadPoolExecutor, which provides the ability to schedule periodic tasks.
ForkjoinpoolTask-stealing pool: all threads in the pool try to find and run either assigned tasks or tasks created by other active tasks.

Table 7: Thread Pool Implementations

Tasks are sent using ExecutorService#submit, ExecutorService#invokeAllor ExecutorService#invokeAny, which have several overloads for different types of tasks.

InterfaceDescription
RunnableRepresents a task with no return value.
CallableRepresents a calculation with a return value. It also throws the original Exeption, so no wrapper is needed for the checked exception.

Table 8: Functional task interfaces are an abstraction for asynchronous computing. It represents the result of a calculation that may be available at any time: either a calculated value or an exception. Most methods are used as a return type. It provides methods to examine the current state of the future or blocks until a result is available.

Future

Future ExecutorService Future

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future future = executorService.submit(() -> "result");
try {
 String result = future.get(1L, TimeUnit.SECONDS);
 System.out.println("Result is '" + result + "'.");
}
catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 throw new RuntimeException(e);
}
catch (ExecutionException e) {
 throw new RuntimeException(e.getCause());
}
catch (TimeoutException e) {
 throw new RuntimeException(e);
}
assert future.isDone();

The lock package has a standard interface . The implementation duplicates the functionality of the synchronized keyword, but also provides additional functions, such as receiving information about the status of the lock, non-blocking, and interrupted by the lock. An example of using an explicit instance of ReentrantLock:

Lock

java.util.concurrent.locksLockReentrantLocktryLock()

class Counter {
 private final Lock lock = new ReentrantLock();
 private int value;
 int increment() {
   lock.lock();
   try {
     return ++value;
   } finally {
     lock.unlock();
   }
 }
}

ReadWriteLock

The package java.util.concurrent.locksalso contains the ReadWriteLock interface (and the ReentrantReadWriteLock implementation), which is determined by a pair of locks for reading and writing, usually allowing several readers to read at the same time, but allowing only one writer.

class Statistic {
 private final ReadWriteLock lock = new ReentrantReadWriteLock();
 private int value;
 void increment() {
   lock.writeLock().lock();
   try {
     value++;
   } finally {
     lock.writeLock().unlock();
   }
 }
 int current() {
   lock.readLock().lock();
   try {
     return value;
   } finally {
     lock.readLock().unlock();
   }
 }
}

CountDownLatch

CountDownLatch initialized by counter. Threads can call await()to wait until the counter reaches 0. Other threads (or the same thread) can call countDown()to decrease the counter. Cannot be reused as soon as the counter reaches 0. Used to start an unknown set of threads as soon as a number of actions have occurred. is an abstraction for performing asynchronous calculations. Unlike simple Future, where the only way to get the result is to block, it is recommended to register callbacks to create a pipeline of tasks that should be executed when a result or exception is available. Either during creation (via ), or during adding callbacks (family methods

CompletableFuture

CompletableFutureCompletableFuture#supplyAsync/runAsync*async) the executor may be specified where the calculation should be performed (if it is not specified by the standard global ForkJoinPool#commonPool).

Note that if CompletableFuturealready completed, callbacks registered using non- *asyncmethods will be executed on the calling thread.

If there are several future, you can use CompletableFuture#allOfto get futurewhich will be completed when all future are completed, or CompletableFuture#anyOfthat will be completed as soon as any is completed future.

ExecutorService executor0 = Executors.newWorkStealingPool();
ExecutorService executor1 = Executors.newWorkStealingPool();
//Завершено, когда оба future завершены
CompletableFuture waitingForAll = CompletableFuture
   .allOf(
       CompletableFuture.supplyAsync(() -> "first"),
       CompletableFuture.supplyAsync(() -> "second", executor1)
   )
   .thenApply(ignored -> " is completed.");
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Concurrency Refcard", executor0)
   //Использование того же исполнителя
   .thenApply(result -> "Java " + result)
   //Использование другого исполнителя
   .thenApplyAsync(result -> "Dzone " + result, executor1)
   //Завершено, когда это и другое future завершено
   .thenCombine(waitingForAll, (first, second) -> first + second)
  //Неявно использование ForkJoinPool#commonPool как исполнителя
   .thenAcceptAsync(result -> {
     System.out.println("Result is '" + result + "'.");
   })
  //Общий обработчик
   .whenComplete((ignored, exception) -> {
     if (exception != null)
       exception.printStackTrace();
   });
//Первый блокирующий вызов - блокирует, пока он не будет завершен.
future.join();
future
  //Выполняется в текущем потоке (который является основным).
   .thenRun(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."))
  //Неявное использование ForkJoinPool#commonPool как исполнителя
   .thenRunAsync(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."));

Parallel Collections

The easiest way to make a thread safe collection is to use related methods Collections#synchronized*. Since this solution does not work well in high competition, it java.util.concurrentprovides many data structures that are optimized for parallel use.

Lists

ImplementationDescription
CopyOnWriteArrayListProvides semantics of copy on write, where each modification of the data structure leads to a new internal copy of the data (therefore, writing is very expensive, while reading is cheap). Iterators in the data structure always see a snapshot of the data since the creation of the iterator.

Table 9: Lists in java.util.concurrent

ImplementationDescription
ConcurrenthashmapUsually acts as a segmented hash table. Read operations, as a rule, do not block and reflect the results of the last completed record. Writing the first node to an empty box is done simply by the CAS (compare and install), while other write operations require locks (the first node of the segment is used as a lock).
ConcurrentSkipListMapProvides concurrent access along with sorted Map functionality similar to TreeMap. The performance boundaries are the same as TreeMap's, although several threads can usually read and write from an associative array without conflicts if they do not change the same part of the display.

Table 10: Associative Arrays injava.util.concurrent

Sets

ImplementationDescription
CopyOnWriteArraySetLike CopyOnWriteArrayList, it uses copy-on-write semantics to implement the Set interface.
ConcurrentSkipListSetLike ConcurrentSkipListMap, but implements the Set interface.

Table 11: Sets injava.util.concurrent

Another approach to creating a parallel set is to wrap a parallel Map:

Set concurrentSet = Collections.newSetFromMap(new ConcurrentHashMap());

Queues

Queues act as pipes between “producers” and “consumers”. Elements are placed at one end of the pipe and exit the other end of the pipe in the same way as “first in, first out” (FIFO). The interface BlockingQueueexpands Queueto provide additional options for how to handle a scenario where the queue can be filled (when the producer adds an item) or empty (when the consumer reads or removes the item). In these cases, it BlockingQueueprovides methods that either block forever or block for a certain period of time, waiting for the condition to change due to the actions of another thread.

ImplementationDescription
ConcurrentLinkedQueueUnlimited non-blocking queue supported by linked list.
LinkedBlockingQueueОпционально ограниченная блокирующая очередь, поддерживаемая связанным списком.
PriorityBlockingQueueНеограниченная блокирующая очередь, поддерживаемая минимальной кучей. Элементы удаляются из очереди в порядке, основанном на компараторе Comparator, связанном с очередью (вместо порядка FIFO).
DelayQueueНеограниченная блокирующая очередь элементов, каждый из которых имеет значение задержки. Элементы могут быть удалены только тогда, когда их задержка прошла и удаляются в порядке старейшего истекшего элемента.
SynchronousQueueОчередь о-длины, где производитель и потребитель блокируются до тех пор, пока не прибудет другой. Когда оба потока приходят, значение передается напрямую от производителя к потребителю. Полезно при передаче данных между потоками.

Table 12: Queues atjava.util.concurrent

THE END.

As always, we await your wishes and questions.

Thanks.

Also popular now: