Streams, locks, and conditional variables in C ++ 11 [Part 2]
- Tutorial
For a more complete understanding of this article, it is recommended to read its first part , where the main attention was paid to threads and locks, it explains many points (terms, functions, etc.) that will be used without explanation here.
This article will examine conditional variables ...
In addition to the synchronization methods described earlier , C ++ 11 provides support for conditional variables that allow one or more threads to be blocked until either a notification from another thread is received or amythical spurious wakeup (“false / random wakeup”) occurs .
There are two implementations of conditional variables available in the header
I will describe how conditional variables work:
The code below shows an example of using a conditional variable to synchronize threads: during the operation of some threads (let's call them “workers”) an error may occur, while they are placed in the queue. The “registrar” thread processes these errors (getting them from the queue) and prints them. "Workers" signal to the "registrar" when an error occurs. The registrar expects a conditional variable signal. To avoid false awakenings, expectation occurs in a loop where the Boolean condition is checked.
The execution of this code will give approximately the following result (the result will be different each time, because worker threads work (or rather sleep) random intervals):
The method
Thus, using the second overload, you can avoid using the Boolean flag
In addition to the overloaded method
Overloading these methods without a predicate returns cv_status indicating whether a timeout has occurred, or the wakeup occurred due to a conditional variable signal, or is it a false wakeup.
Std also provides a notify_all_at_thread_exit function that implements a mechanism for notifying other threads that a data stream has completed its work, including destroying all objects
If worker finishes its work before the main thread, then the result will be like this:
If the main thread finishes its work before the worker thread, then the result will be like this:
The C ++ 11 standard allows C ++ developers to write multi-threaded code in a standard, platform-independent manner. This article is just a run through streams and synchronization mechanisms from std. The header
This article will examine conditional variables ...
Conditional variables
In addition to the synchronization methods described earlier , C ++ 11 provides support for conditional variables that allow one or more threads to be blocked until either a notification from another thread is received or a
There are two implementations of conditional variables available in the header
:- condition_variable : requires any thread to wait before executing
std::unique_lock - condition_variable_any : A more general implementation that works with any type that can be blocked. This implementation may be more expensive (in terms of resources and performance) to use, so it should be used only if you need those additional features that it provides
I will describe how conditional variables work:
- There must be at least one thread waiting for a condition to become true. The waiting thread must first execute
unique_lock. This lock is passed to a methodwait()that releases the mutex and pauses the thread until a signal from the condition variable is received. When this happens, the thread wakes up and runs againlock. - There must be at least one thread signaling that the condition has become true. A signal can be sent using notify_one () , and one (any) of the waiting threads will be unlocked, or notify_all () , which will unlock all waiting threads.
- Due to some difficulties in creating an awakening condition that can be predictable in multiprocessor systems, spurious wakeups can occur . This means that the thread can be awakened, even if no one has signaled a conditional variable. Therefore, it is still necessary to check whether the awakening condition is true after the flow has been awakened. Because false awakenings can occur many times, such a check must be organized in a cycle.
The code below shows an example of using a conditional variable to synchronize threads: during the operation of some threads (let's call them “workers”) an error may occur, while they are placed in the queue. The “registrar” thread processes these errors (getting them from the queue) and prints them. "Workers" signal to the "registrar" when an error occurs. The registrar expects a conditional variable signal. To avoid false awakenings, expectation occurs in a loop where the Boolean condition is checked.
#include
#include
#include
#include
#include
#include
std::mutex g_lockprint;
std::mutex g_lockqueue;
std::condition_variable g_queuecheck;
std::queue g_codes;
bool g_done;
bool g_notified;
void workerFunc(int id, std::mt19937 &generator)
{
// стартовое сообщение
{
std::unique_lock locker(g_lockprint);
std::cout << "[worker " << id << "]\trunning..." << std::endl;
}
// симуляция работы
std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
// симуляция ошибки
int errorcode = id*100+1;
{
std::unique_lock locker(g_lockprint);
std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl;
}
// сообщаем об ошибке
{
std::unique_lock locker(g_lockqueue);
g_codes.push(errorcode);
g_notified = true;
g_queuecheck.notify_one();
}
}
void loggerFunc()
{
// стартовое сообщение
{
std::unique_lock locker(g_lockprint);
std::cout << "[logger]\trunning..." << std::endl;
}
// до тех пор, пока не будет получен сигнал
while(!g_done)
{
std::unique_lock locker(g_lockqueue);
while(!g_notified) // от ложных пробуждений
g_queuecheck.wait(locker);
// если есть ошибки в очереди, обрабатывать их
while(!g_codes.empty())
{
std::unique_lock locker(g_lockprint);
std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl;
g_codes.pop();
}
g_notified = false;
}
}
int main()
{
// инициализация генератора псевдо-случайных чисел
std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count());
// запуск регистратора
std::thread loggerThread(loggerFunc);
// запуск рабочих
std::vector threads;
for(int i = 0; i < 5; ++i)
threads.push_back(std::thread(workerFunc, i+1, std::ref(generator)));
for(auto &t: threads)
t.join();
// сообщаем регистратору о завершении и ожидаем его
g_done = true;
loggerthread.join();
return 0;
}
The execution of this code will give approximately the following result (the result will be different each time, because worker threads work (or rather sleep) random intervals):
[logger] running...
[worker 1] running...
[worker 2] running...
[worker 3] running...
[worker 4] running...
[worker 5] running...
[worker 1] an error occurred: 101
[worker 2] an error occurred: 201
[logger] processing error: 101
[logger] processing error: 201
[worker 5] an error occurred: 501
[logger] processing error: 501
[worker 3] an error occurred: 301
[worker 4] an error occurred: 401
[logger] processing error: 301
[logger] processing error: 401
The method
waitindicated above has two overloads:- one that uses only
unique_lock; he (the method) blocks the thread and adds it to the queue of threads waiting for a signal from this condition variable; the flow wakes up when a signal from a conditional variable is received or in the case of a false wakeup. - the one in addition to
unique_lockaccepts the predicate used in the loop until it returnsfalse; this overload can be used to avoid false awakenings. In general, this is equivalent to a loop like this:while(!predicate()) wait(lock);
Thus, using the second overload, you can avoid using the Boolean flag
g_notifiedin the example above:void workerFunc(int id, std::mt19937 &generator)
{
// стартовое сообщение
{
std::unique_lock locker(g_lockprint);
std::cout << "[worker " << id << "]\trunning..." << std::endl;
}
// симуляция работы
std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
// симуляция ошибки
int errorcode = id*100+1;
{
std::unique_lock locker(g_lockprint);
std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl;
}
// сообщаем об ошибке
{
std::unique_lock locker(g_lockqueue);
g_codes.push(errorcode);
g_queuecheck.notify_one();
}
}
void loggerFunc()
{
// стартовое сообщение
{
std::unique_lock locker(g_lockprint);
std::cout << "[logger]\trunning..." << std::endl;
}
// до тех пор, пока не будет получен сигнал
while(!g_done)
{
std::unique_lock locker(g_lockqueue);
g_queuecheck.wait(locker, [&](){return !g_codes.empty();});
// если есть ошибки в очереди, обрабатывать их
while(!g_codes.empty())
{
std::unique_lock locker(g_lockprint);
std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl;
g_codes.pop();
}
}
}
In addition to the overloaded method
wait(), there are two more similar methods with the same overload for the predicate:- wait_for : blocks the thread until a conditional variable signal is received
- wait_until : blocks the thread until a conditional variable signal is received or a certain point in time is reached
Overloading these methods without a predicate returns cv_status indicating whether a timeout has occurred, or the wakeup occurred due to a conditional variable signal, or is it a false wakeup.
Std also provides a notify_all_at_thread_exit function that implements a mechanism for notifying other threads that a data stream has completed its work, including destroying all objects
thread_local. Waiting for threads with a mechanism other than joincan lead to abnormal behavior when thread_localsthey were already used, and their destructors could be called after the thread was awakened or after it had already completed (see N3070 and N2880. As a rule, a call to this function should occur immediately before the thread begins to exist. The following is an example of how it notify_all_at_thread_exitcan be used with conditional variables to synchronize two threads:std::mutex g_lockprint;
std::mutex g_lock;
std::condition_variable g_signal;
bool g_done;
void workerFunc(std::mt19937 &generator)
{
{
std::unique_lock locker(g_lockprint);
std::cout << "worker running..." << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
{
std::unique_lock locker(g_lockprint);
std::cout << "worker finished..." << std::endl;
}
std::unique_lock lock(g_lock);
g_done = true;
std::notify_all_at_thread_exit(g_signal, std::move(lock));
}
int main()
{
std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count());
std::cout << "main running..." << std::endl;
std::thread worker(workerFunc, std::ref(generator));
worker.detach();
std::cout << "main crunching..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
{
std::unique_lock locker(g_lockprint);
std::cout << "main waiting for worker..." << std::endl;
}
std::unique_lock lock(g_lock);
while(!g_done) // против ложных пробуждений
g_signal.wait(lock);
std::cout << "main finished..." << std::endl;
return 0;
}
If worker finishes its work before the main thread, then the result will be like this:
main running...
worker running...
main crunching...
worker finished...
main waiting for worker...
main finished...
If the main thread finishes its work before the worker thread, then the result will be like this:
main running...
worker running...
main crunching...
main waiting for worker...
worker finished...
main finished...
In conclusion
The C ++ 11 standard allows C ++ developers to write multi-threaded code in a standard, platform-independent manner. This article is just a run through streams and synchronization mechanisms from std. The header
provides a class with the same name (and many additional functions) representing the threads. The header provides the implementation of several mutexes and "wrappers" for synchronizing access to threads. The header provides two implementations of conditional variables that allow you to block one or more threads, until you receive a notification from another thread or before a false awakening. For more detailed information and understanding of the essence of the matter, of course, it is recommended to read additional literature :)