Creating a synchronization barrier using C ++ 11

Introduction

Comparing two different parallel programming technologies: POSIX streams and C ++ 11 streams, you can notice that the latter lacks an analog of the barrier_t type from the pthread library.

Oddly enough, such an important synchronization primitive is missing from the standard library. This article will discuss how to create a barrier using only the libraries included in the C ++ 11 standard set.

Definition A
barrier is one of the primitives of synchronization. It is created on a number of threads. When the first thread completes its work, then it remains to wait at the barrier and waits until the remaining threads complete the work.
As soon as exactly as many streams accumulate at the barrier as the barrier was created, all flows that are waiting at the barrier continue to work.

Let's start creating our own barrier, with blackjack and ...

First of all, we need to connect the following libraries included in the C ++ 11 standard:
#include 
#include 
#include 
#include 


Now, you probably ask, why do we need all these libraries? Well, the first one is not needed for the barrier, but I don’t think that you can check your code without connecting this library.

But first things first!

What is the most important field for the barrier? Obviously the number of threads.
What else does a barrier need to know? The number of threads that are currently waiting on it.

The hand and reaches
class barrier {
 const unsigned int threadCount;
 unsigned int threadsWaiting;
public:
 barrier(unsigned int n) : threadCount(n) {
  threadsWaiting = 0;
 }
};


However, let's think a little. The barrier is already slowing down the application, because synchronization takes time. Thus, we would like to reduce the costs of creating and processing the barrier itself.
Therefore, atomic operations are more suitable for changing the number of flows that are expected at the barrier.

So, our class now looks like this:

class barrier {
 const unsigned int threadCount;
 std::atomic threadsWaiting;
public:
 barrier(unsigned int n) : threadCount(n) {
  threadsWaiting = 0;
 }
};


Well, we wrote a class skeleton. We can create an object of this class, there is a constructor, there is a copy constructor ...
Excuse me, what did I say? In general, with a combination of object-oriented and parallel programming, it is better to get rid of copy constructors in order to protect yourself from unpleasant consequences.

Well, C ++ 11 allows you to explicitly disable this constructor.

class barrier {
 const unsigned int threadCount;
 std::atomic threadsWaiting;
public:
 barrier(unsigned int n) : threadCount(n) {
  threadsWaiting = 0;
 }
 barrier(const barrier &) = delete;
};


So, we figured it out. It remains only to write a method for which we all started this. Waiting on the barrier.

The following idea comes to mind: we make a logical variable that will be responsible for waiting for the barrier or passing through it, and we will implement the behavior using a conditional variable by this very condition.

So we’ll fix our class with new fields:
class barrier {
 const unsigned int threadCount;
 std::atomic threadsWaiting;
 bool isNotWaiting;
 std::condition_variable waitVariable;
 std::mutex mutex;
public:
 barrier(unsigned int n) : threadCount(n) {
  threadsWaiting = 0;
 }
 barrier(const barrier &) = delete;
};


Now let's figure out the method. If not all the threads have passed yet, then the threads reaching the barrier should sleep on this condition variable, i.e. the following code should be executed

std::unique_lock lock(mutex);
waitVariable.wait(lock, [&]{ return noWait; });


If all the flows have passed, then we must notify the remaining flows that we no longer have to wait on the barrier. This will do the following code:
isNotWaiting = true;
waitVariable.notify_all();
threadsWaiting.store(0);


The last method atomically writes the number 0 to the threadsWaiting variable.

Now it remains to solve one simple question: how to combine these two cases. How do we know how many threads are waiting on a barrier?

Now we recall how the barrier is arranged. To wait for a flow on the barrier, all flows must call up the barrier function. Thus, as soon as the wait method is called, we must immediately increase our threadsWaiting variable by 1.
To do this, we use a function such as fetch_add. This is one of the so-called RMW operations (read-modify-write). She reads the value of an atomic variable, adds it atomically with an argument and writes a new value into it, while returning the old one.

Thus, the two cases described above are combined by a conditional operator, and our class looks as follows:

class barrier {
 const unsigned int threadCount;
 std::atomic threadsWaiting;
 bool isNotWaiting;
 std::condition_variable waitVariable;
 std::mutex mutex;
public:
 barrier(unsigned int n) : threadCount(n) {
  threadsWaiting = 0;
 }
 barrier(const barrier &) = delete;
 void wait() {
  if (threadsWaiting.fetch_add(1) >= threadCount - 1) {
   isNotWaiting = true;
   waitVariable.notify_all();
   threadCount.store(0);
 }
 else {
  std::unique_lock lock(mutex);
  waitVariable.wait(lock,[&]{ return isNoWaiting;});
 }
};


Now it remains only to set the initial value of the variable isNotWaiting, which, obviously, is false.

class barrier {
 const unsigned int threadCount;
 std::atomic threadsWaiting;
 bool isNotWaiting;
 std::condition_variable waitVariable;
 std::mutex mutex;
public:
 barrier(unsigned int n) : threadCount(n) {
  threadsWaiting = 0;
  isNotWaiting = false;
 }
 barrier(const barrier &) = delete;
 void wait() {
  if (threadsWaiting.fetch_add(1) >= threadCount - 1) {
   isNotWaiting = true;
   waitVariable.notify_all();
   threadCount.store(0);
 }
 else {
  std::unique_lock lock(mutex);
  waitVariable.wait(lock,[&]{ return isNotWaiting;});
 }
};


So, we wrote a class for the barrier using the C ++ 11 standard without connecting third-party libraries.

Now you can object to me: well, did I write some code? And where is the evidence that it works?

So the most important part: demonstrating the barrier

#include 
#include 
#include 
#include 
#include 
class barrier {
 const unsigned int threadCount;
 std::atomicthreadsWaiting;
  bool isNotWaiting;
 std::condition_variable waitVariable;
 std::mutex mutex;
public:
 barrier(unsigned int n) : threadCount(n) {
  threadsWaiting = 0;
  isNotWaiting = false;
}
barrier(const barrier &) = delete;
 void wait() {
  if (threadsWaiting.fetch_add(1) >= threadCount - 1) {
   isNotWaiting = true;
   waitVariable.notify_all();
   threadsWaiting.store(0);
 }
 else {
  std::unique_lock lock(mutex);
  waitVariable.wait(lock,[&]{ return isNotWaiting;});
 }
}
};
barrier *myBarrier;
class Thread {
private:
	std::thread* cppthread;
	static void threadFunction(Thread* arg) {
		arg->run();
	}
public:
	Thread() {}
	Thread(const Thread&) = delete;
	virtual ~Thread() {delete cppthread;}
	virtual void run() = 0;
	void start() {
		cppthread = new std::thread(Thread::threadFunction, this);
	}
	void wait() {
		cppthread->join();
	}
};
class BarrierDemo: public Thread {
  int id;
public:
 BarrierDemo(int i) {
 	id = i;
 }
 void run() {
 	std::cout << "Thread " << id << "runs before barrier" << std::endl;
 	myBarrier->wait();
 	std::cout << "Thread " << id << "runs after barrier" << std::endl;
 }
};
int main() {
	// your code goes here
	int threads;
    std::cin >> threads;
    myBarrier = new barrier(threads);
   BarrierDemo* bardemos = static_cast(::operator new(sizeof(BarrierDemo)*threads));
    for (int i = 0; i < threads; i++) {
		new (&bardemos[i])BarrierDemo(i);
		bardemos[i].start();
	}
	for (int i = 0; i < threads; i++) {
		bardemos[i].wait();
	}
   ::operator delete(bardemos);
    delete myBarrier;
	return 0;
}


You can copy the above code to a compiler with C ++ 11 support to test its functionality. This article ends here.

PS It is easy to guess from the given code that this is a “one-time” barrier: as soon as all flows pass through it, you cannot reuse the same instance of the class as a barrier.

Also popular now: