AsyncCollections: the story of one bike

Since ancient times, I have been a big fan of System.Collections.Concurrent and BlockingCollection in particular. How many times this miracle of engineering rescued in the most various situations - not to count.

Async / await has come into use since a little less time. It would seem that life is beautiful, but there is one “but”: I don’t really want to mix asynchronous code with the blocking code. And BlockingCollection, as you might guess (at least from the name), in some cases the flow blocks.

False Trace: Nito.AsyncEx


One day I came across a mention of the Nito.AsyncEx library by Stephen Cleary, which found a class with the intriguing name AsyncCollection. However, looking at what's under the hood, I was left at a loss: AsyncLock from the same library was there, hung up on all the actions on the wrapped IProducerConsumerCollection. AsyncLock, in turn, actively uses the most ordinary locks and a thin layer of magic, which I suddenly got sick of unraveling. Even if this implementation does what is claimed, it looks somewhat tricked out, monstrous and, possibly, not very optimal. Is it really impossible to solve this problem more accurately?

We all know what such thoughts are fraught with. Visual Studio, New project ...

Asyncquueue


To begin with, we decide what we generally want from our asynchronous collection. As a starting point, you can take the following interface:

	public interface IAsyncCollection: IEnumerable
	{
		int Count { get; }
		void Add( T item );
		Task TakeAsync();
	}

In addition, for simplicity, we dwell on the fact that our collection is the turn. Why exactly the queue? Yes, for about the same reason that the default queue is used in the BlockingCollection.

Then comes the intense work of thought associated with an attempt to determine the possible states of our collection. At first glance, there may be 3 pieces:

1. There are no elements in the collection, but there were TakeAsync () calls, tasks that need to be completed when the elements appear (for simplicity and brevity, I will call them awaiter). In this case:
  • Awaiter obviously needs to be stored somewhere. The queue begs, more specifically, ConcurrentQueue.
  • If a call to TakeAsync () occurs, we have a new awaiter, drop it into the awaiter queue.
  • If a call to Add () occurs, we have a new element with which you can instantly take one of the awaiter and complete it.

2. There are no awaiter, but there were calls to Add (). The situation is completely symmetrical with the previous one:
  • Items need to be stored somewhere. Where? In ConcurrentQueue, where else.
  • If a call to Add () occurs, a new element appears, drop it into the element queue.
  • If a call to TakeAsync () occurs, a new awaiter appears, which can be terminated instantly by taking the top item from the queue.

3. Both queues - both the awaiter queue and the element queue - are empty. Depending on the following action, we go either to state 1 or to state 2:
  • If Add () is called, a new element appears, try to take awaiter from the queue for it, it is empty there, try to add it to the element queue ...
  • At this very moment, a call to TakeAsync () occurs, a new awaiter appears, we try to take an element from the queue for it, it is empty there for now, try to add it to the awaiter queue ...
  • Oops We broke everything: awaiter and the element are sitting in different lines and waiting for each other. What to do?

I don’t feel like hanging locks, we didn’t get away from the implementation stuffed with them from Nito.AsyncEx. What do all sorts of ConcurrentQueue do in such cases? They understand that right now there is an operation in the neighboring thread that is about to end and after which we can do something useful, create SpinWait and spinning in anticipation. Let's try to reproduce this idea with us. Necessary:

  • understand what condition we are in (1 or 2)
  • at the same time report that we started our operation, whether it was adding awaiter or adding an element
  • depending on the state, either add an awaiter / element to the queue or turn around until the / awaiter element is added to the opposite queue, which we will immediately pick up

The first two requirements are very much like the work of the Interlocked class; To store the state, you can use something like the balance of queues: TakeAsync () atomically reduces the balance by one, Add () atomically increases it. And by the balance value that Interlocked.Increment / Interlocked.Decrement returns, you can find out that a new / awaiter element is coming, even before it appears in the corresponding queue. Enough talk, let's try to code all of the above:

	public class AsyncQueue: IAsyncCollection
	{
		private ConcurrentQueue _itemQueue = new ConcurrentQueue();
		private ConcurrentQueue> _awaiterQueue = new ConcurrentQueue>();
		//	_queueBalance < 0 means there are free awaiters and not enough items.
		//	_queueBalance > 0 means the opposite is true.
		private long _queueBalance = 0;
		public void Add( T item )
		{
			long balanceAfterCurrentItem = Interlocked.Increment( ref _queueBalance );
			if ( balanceAfterCurrentItem > 0 )
			{
				//	Items are dominating, so we can safely add a new item to the queue.
				_itemQueue.Enqueue( item );
			}
			else
			{
				//	There's at least one awaiter available or being added as we're speaking, so we're giving the item to it.
				TaskCompletionSource awaiter;
				SpinWait spin = new SpinWait();
				while ( !_awaiterQueue.TryDequeue( out awaiter ) )
					spin.SpinOnce();
				awaiter.SetResult( item );
			}
		}
		public Task TakeAsync()
		{
			long balanceAfterCurrentAwaiter = Interlocked.Decrement( ref _queueBalance );
			if ( balanceAfterCurrentAwaiter < 0 )
			{
				//	Awaiters are dominating, so we can safely add a new awaiter to the queue.
				var taskSource = new TaskCompletionSource();
				_awaiterQueue.Enqueue( taskSource );
				return taskSource.Task;
			}
			else
			{
				//	There's at least one item available or being added, so we're returning it directly.
				T item;
				SpinWait spin = new SpinWait();
				while ( !_itemQueue.TryTake( out item ) )
					spin.SpinOnce();
				return Task.FromResult( item );
			}
		}
	}

We are testing, we are surprised to find that it seems to even work. Victory? On the one hand, yes, on the other, an overclocked creative impulse cannot be stopped so easily ...

Useful (and not so) goodies


Let's look carefully at what we have done. Synchronous Add (), asynchronous TakeAsync () ... Stop, asynchronous method without the ability to cancel it? The mess. We are correcting.

Firstly, when canceling the CancellationToken, you must immediately cancel the corresponding task:

		public Task TakeAsync( CancellationToken cancellationToken )
		{
			//	...
			if ( balanceAfterCurrentAwaiter < 0 )
			{
				var taskSource = new TaskCompletionSource();
				_awaiterQueue.Enqueue( taskSource );
				cancellationToken.Register(
					state =>
					{
						TaskCompletionSource awaiter = state as TaskCompletionSource;
						awaiter.TrySetCanceled();
					},
					taskSource,
					useSynchronizationContext : false );
				return taskSource.Task;
			}
			else
			{
				//	...
			}
		}

Secondly, we obviously will not be able to pick out the canceled awaiter from somewhere in the middle of the queue, so we need to teach Add () to skip the canceled awaiter. The balance is magically maintained automatically:

		private bool TryAdd( TItem item )
		{
			long balanceAfterCurrentItem = Interlocked.Increment( ref _queueBalance );
			if ( balanceAfterCurrentItem > 0 )
			{
				_itemQueue.Enqueue( item );
				return true;
			}
			else
			{
				TaskCompletionSource awaiter;
				SpinWait spin = new SpinWait();
				while ( !_awaiterQueue.TryDequeue( out awaiter ) )
					spin.SpinOnce();
				//	Returns false if the cancellation occurred earlier.
				return awaiter.TrySetResult( item );
			}
		}
		public void Add( TItem item )
		{
			while ( !TryAdd( item ) ) ;
		}

Thirdly, the old TakeAsync () method (which is without a CancellationToken) can generally be moved into an extension to the IAsyncCollection interface:

	public interface IAsyncCollection: IEnumerable
	{
		int Count { get; }
		void Add( T item );
		Task TakeAsync( CancellationToken cancellationToken );
	}
	public static class AsyncCollectionExtensions
	{
		public static Task TakeAsync( this IAsyncCollection collection )
		{
			return collection.TakeAsync( CancellationToken.None );
		}
	}

By the way, about IAsyncCollection. If you look closely, our implementation of AsyncQueue does not have to be nailed to ConcurrentQueue, any thread-safe IProducerConsumerCollection is suitable for storing elements. For example, ConcurrentStack. Therefore, you can do this:

	public class AsyncCollection: IAsyncCollection
		where TItemQueue: IProducerConsumerCollection, new()
	{
		private TItemQueue _itemQueue = new TItemQueue();
		private ConcurrentQueue> _awaiterQueue = new ConcurrentQueue>();
		//	...
	}
	public class AsyncQueue: AsyncCollection>
	{
	}
	public class AsyncStack: AsyncCollection>
	{
	}

On the one hand, I would like not to produce type parameters, but simply accept the IProducerConsumerCollection in the constructor, but the trouble is: we can slip a collection that is already referenced from the outside and into which they can stuff elements from the outside (or, even worse, pick it up part of our elements), thereby destroying the synchronization between the real state of the collection and the memorized balance. The factory method has the same problem, so you have to create the collection yourself.

Benchmarks!


It is time to measure the speed of our bike. To run the benchmarks, there is a BenchmarkDotNet package that implements a bunch of small details that it is advisable to take into account when running the benchmarks, so we are using it. The general idea of ​​the benchmark is as follows:

	class AsyncQueueBenchmark
	{
		private const int _consumerThreadCount = 3;
		private const int _producerThreadCount = 3;
		private const int _itemsAddedPerThread = 10000;
		private const int _itemsAddedTotal = _producerThreadCount * _itemsAddedPerThread;
		private IAsyncCollection _currentQueue;
		private CancellationTokenSource _cancelSource;
		private int _itemsTaken;
		// Выполнение этого метода будет измеряться
		private void DdosCurrentQueue()
		{
			_consumerTasks = Enumerable.Range( 0, _consumerThreadCount )
				.Select( _ => Task.Run( () => RunConsumerAsync() ) )
				.ToArray();
			_producerTasks = Enumerable.Range( 0, _producerThreadCount )
				.Select( _ => Task.Run( () => RunProducer() ) )
				.ToArray();
			Task.WaitAll( _producerTasks );
			Task.WaitAll( _consumerTasks );
		}
		private async Task RunConsumerAsync()
		{
			try
			{
				CancellationToken cancelToken = _cancelSource.Token;
				while ( _itemsTaken < _itemsAddedTotal && !cancelToken.IsCancellationRequested )
				{
					int item = await _currentQueue.TakeAsync( cancelToken );
					int itemsTakenLocal = Interlocked.Increment( ref _itemsTaken );
					if ( itemsTakenLocal >= _itemsAddedTotal )
					{
						_cancelSource.Cancel();
						break;
					}
				}
			}
			catch ( OperationCanceledException )
			{
			}
		}
		private void RunProducer()
		{
			for ( int i = 0; i < _itemsAddedPerThread; i++ )
			{
				int item = 42;
				_currentQueue.Add( item );
			}
		}

Those. we just take a fixed pack of elements, draw them into a queue in several streams, parallel to several streams we rake this queue, detect how long it will take. We palm off different implementations of IAsyncCollection, compare. The race involves:

1. Freshly cycled AsyncQueue
2. Nito.AsyncEx.AsyncCollection as follows:

	class NitoAsyncCollectionAdapter: IAsyncCollection
	{
		private Nito.AsyncEx.AsyncCollection _collection;
		public NitoAsyncCollectionAdapter()
		{
			_collection = new Nito.AsyncEx.AsyncCollection();
		}
		#region IAsyncCollection Members
		public void Add( T item )
		{
			_collection.Add( item );
		}
		public Task TakeAsync( System.Threading.CancellationToken cancellationToken )
		{
			return _collection.TakeAsync( cancellationToken );
		}
		#endregion
	}

3. BlockingCollection (well, how not to compare with it) in the form:

	class BlockingCollectionAdapter: IAsyncCollection
	{
		private BlockingCollection _collection;
		public BlockingCollectionAdapter()
		{
			_collection = new BlockingCollection();
		}
		#region IAsyncCollection Members
		public void Add( T item )
		{
			_collection.Add( item );
		}
		public Task TakeAsync( System.Threading.CancellationToken cancellationToken )
		{
			T item = _collection.Take( cancellationToken );
			return Task.FromResult( item );
		}
		#endregion
	}

Results:
HellBrick.AsyncCollections.AsyncQueue: 1ms | Stats: MedianTicks = 3368, MedianMs = 1, Error = 06.34%
Nito.AsyncEx.AsyncCollection: 12ms | Stats: MedianTicks = 40503, MedianMs = 12, Error = 31.36%
System.Concurrent.BlockingCollection: 2ms | Stats: MedianTicks = 7222, MedianMs = 2, Error = 38.82%

The intuitive assessment of Nito.AsyncEx.AsyncCollection did not fail: it really is a monstrous inhibited crap. But the most interesting: we managed to overtake the BlockingCollection in performance and at the same time do without blocking threads. Win! Open a cake or any other bonus tasty treat and move on.

AsyncBatchQueue


I occasionally had to use a small wrapper over the BlockingCollection, which took single elements as input and gave them in batches of a certain size. At the same time, if for a certain time the required number of elements did not accumulate, the timer worked and made a forced flush of what we managed to dial. Who wants an asynchronous version of such a thing? I want.

For starters, we can do without a timer and manual flush. It is logical to store and give the collected bundles of elements using our new AsyncQueue:

	public class AsyncBatchQueue
	{
		private int _batchSize;
		private Batch _currentBatch;
		private AsyncQueue> _batchQueue = new AsyncQueue>();
		public AsyncBatchQueue( int batchSize )
		{
			_batchSize = batchSize;
			_currentBatch = new Batch( this );
		}
		public void Add( T item )
		{
			SpinWait spin = new SpinWait();
			while ( !_currentBatch.TryAdd( item ) )
				spin.SpinOnce();
		}
		public Task> TakeAsync( CancellationToken cancellationToken )
		{
			return _batchQueue.TakeAsync( cancellationToken );
		}
		private class Batch: IReadOnlyList
		{
			private AsyncBatchQueue _queue;
			//	?
			public Batch( AsyncBatchQueue queue )
			{
				_queue = queue;
			}
			public bool TryAdd( T item )
			{
				//	?
			}
		}
	}

What happens here: in the Add method, you need to try adding the element to the current batch and, if we filled it, flush it in _batchQueue. In this case, it is quite possible that another thread is ahead of us, it is currently adding / flush, but has not yet managed to write a link to a new (empty) batch in _currentBatch. Hence the good old SpinWait.

The main magic will be in the nested Batch class, the idea of ​​which was most impudently borrowed from the implementation of ConcurrentQueue (by the way, if someone has not read the source code, I recommend that you familiarize yourself: there are a lot of interesting things there). This idea is as follows:

  • We store the elements in a regular array, since we know the size in advance
  • Problems with concurrency are solved using the Interlocked.Increment field, where the index of the last inserted item is stored
  • If the thread has captured the last slot of the array, then it [the thread, not the slot] is responsible for executing the flush of the current batch
  • If the stream has captured a slot that goes beyond the boundaries of the array, then we are out of luck: this batch is already full and the stream needs to spin in anticipation of a new

It looks something like this. (Caution, the code is not yet viable! I'll tell you a bit later why.)

		private class Batch: IReadOnlyList
		{
			private AsyncBatchQueue _queue;
			private T[] _items;
			private int _lastReservationIndex = -1;
			private int _count = -1;
			public Batch( AsyncBatchQueue queue )
			{
				_queue = queue;
				_items = new T[ _queue._batchSize ];
			}
			public bool TryAdd( T item )
			{
				int index = Interlocked.Increment( ref _lastReservationIndex );
				//	The following is true if someone has beaten us to the last slot and we have to wait until the next batch comes along.
				if ( index >= _queue._batchSize )
					return false;
				//	The following is true if we've taken the last slot, which means we're obligated to flush the current batch and create a new one.
				if ( index == _queue._batchSize - 1 )
					FlushInternal( _queue._batchSize );
				_items[ index ] = item;
				return true;
			}
			private void FlushInternal( int count )
			{
				_count = count;
				_queue._currentBatch = new Batch( _queue );
				_queue._batchQueue.Add( this );
			}
		}

Further it would be nice to implement IReadOnlyList though. One nuance pops up here: no one guarantees that when we are for-flush or batch, all elements of the array are filled with real data. The thread that grabbed the last element could simply be faster. The solution suggests itself: for each slot in the array, store a flag that determines whether the corresponding value can be read.

		private class Batch: IReadOnlyList
		{
			//	...
			private bool[] _finalizationFlags;
			public Batch( AsyncBatchQueue queue )
			{
				//	...
				_finalizationFlags = new bool[ _queue._batchSize ];
			}
			public bool TryAdd( T item )
			{
				//	...
				_items[ index ] = item;
				_finalizationFlags[ index ] = true;
				return true;
			}
			public T this[ int index ]
			{
				get
				{
					if ( index >= _count )
						throw new IndexOutOfRangeException();
					return GetItemWithoutValidation( index );
				}
			}
			private T GetItemWithoutValidation( int index )
			{
				SpinWait spin = new SpinWait();
				while ( !_finalizationFlags[ index ] )
					spin.SpinOnce();
				return _items[ index ];
			}
			//	... остальные методы реализуется через GetItemWithoutValidation
		}


And now real magic begins. The problem is that there are a lot of places in the code where a compiler with a processor can ruin everything by rearranging instructions and caching it is absolutely impossible to cache.

1. In AsyncBatchCollection.Add (), the _currentBatch value can be read once and cached, as a result of which, if the batch is full, the thread will spin forever. volatile to the rescue:

	public class AsyncBatchQueue
	{
		//	...
		private volatile Batch _currentBatch;
		//	...
	}

2. In the FlushInternal () method, batch can be added to the output queue before the _count field is populated. We stick full fence:

	private void FlushInternal( int count )
	{
		_count = count;
		_queue._currentBatch = new Batch( _queue );
		//	The full fence ensures that the current batch will never be added to the queue before _count is set.
		Thread.MemoryBarrier();
		_queue._batchQueue.Add( this );
	}

3. In the TryAdd method, the write instructions in _items [index] and _finalizationFlags [index] can be rearranged. We stick in the full fence again:

	public bool TryAdd( T item )
	{
		//	...
		//	The full fence prevents setting finalization flag before the actual item value is written.
		_items[ index ] = item;
		Thread.MemoryBarrier();
		_finalizationFlags[ index ] = true;
		return true;
	}

4. The inverse problem (reading an item before the flag) can occur in GetItemWithoutValidation. Stick yourself-you know-what:

	private T GetItemWithoutValidation( int index )
	{
		SpinWait spin = new SpinWait();
		while ( !_finalizationFlags[ index ] )
			spin.SpinOnce();
		//	The full fence prevents reading item value before finalization flag is set.
		Thread.MemoryBarrier();
		return _items[ index ];
	}

5. Everything in the same method the value of _finalizationFlags [index] can be cached, because of which the thread will spin forever. Usually this is solved by hanging volatile modifier on the field, but it is not possible to do this with an array element, so you understand:

	private T GetItemWithoutValidation( int index )
	{
		SpinWait spin = new SpinWait();
		while ( !_finalizationFlags[ index ] )
		{
			spin.SpinOnce();
			//	The full fence prevents caching any part of _finalizationFlags[ index ] expression.
			Thread.MemoryBarrier();
		}
		//	...
	}

Here, by the way, it is worth making a small digression
ConcurrentQueue solves a similar problem in a very unusual way :
	internal volatile VolatileBool[] m_state;
	struct VolatileBool
	{
		public VolatileBool(bool value)
		{
			m_value = value;
		}
		public volatile bool m_value;
	}

If VolatileBool was a class instead of a structure, everything would be extremely simple: even if a link to an instance of VolatileBool is cached somewhere, reading volatile m_value is guaranteed to return the actual value of the field. Why this feint works with a structure that is supposed to be copied at the time of calling m_state [index], I still do not understand.


It seems that the dangerous places ended on this and the basic functionality should work (at least I sincerely would like to believe in it).

Now get the timer back.


Everything seems to be great, but there is one (no longer associated with multithreading) nuance: if the number of elements added to the collection is not a multiple of batchSize, then we will never see the remainder. Need the ability to do flush manually, or better - on a timer. The easiest way is to make the Flush () method call immediately grab the last slot in the array, thus marking batch full. In this case, it is imperative to remember the last real value of _lastReservationIndex, otherwise we won’t be able to find out how many slots are really occupied (spoiler: here Interlocked.CompareExchange () comes to the rescue). A total of 5 possible scenarios:

  1. _lastReservationIndex <0. There is nothing to flush.
  2. _lastReservationIndex> = _queue._batchSize. FlushInternal () will execute the thread that grabs the last slot, nothing needs to be done.
  3. _lastReservationIndex is valid and we managed to atomically set it to _queue._batchSize. We know the actual number of elements in the array, you can do FlushInternal ().
  4. Between reading the past value of _lastReservationIndex and writing there the new value, another thread crawled in and grabbed the last element. In fact, the situation repeats option number 2: we do nothing.
  5. The same as in No. 4, but the batch is not filled. Spin, try again.

	public class AsyncBatchQueue: IEnumerable>
	{
		//	...
		public void Flush()
		{
			SpinWait spin = new SpinWait();
			while ( !_currentBatch.TryFlush() )
				spin.SpinOnce();
		}
		//	...
		private class Batch: IReadOnlyList
		{
			//	...			
			public bool TryFlush()
			{
				int expectedPreviousReservation = Volatile.Read( ref _lastReservationIndex );
				//	We don't flush if the batch doesn't have any items or if another thread is about to flush it
				//	However, we report success to avoid unnecessary spinning.
				if ( expectedPreviousReservation < 0 || expectedPreviousReservation >= _queue._batchSize )
					return true;
				int previousReservation = Interlocked.CompareExchange( ref _lastReservationIndex, _queue._batchSize, expectedPreviousReservation );
				//	Flush reservation has succeeded.
				if ( expectedPreviousReservation == previousReservation )
				{
					FlushInternal( previousReservation + 1 );
					return true;
				}
				//	The following is true if someone has completed the batch by the time we tried to flush it.
				//	Therefore the batch will be flushed anyway even if we don't do anything.
				//	The opposite means someone has slipped in an update and we have to spin.
				return previousReservation >= _queue._batchSize;
			}
			//	...
		}
	}

Done! It remains to hang a timer from above - this is such a process devoid of magic that I will try to do without copy-paste of the code associated with it. There will be no benchmarks either, because I do not know with whom performance could be compared.

What's next?


Firstly, both collections examined suffer from one subtle flaw. If someone makes Thread.Abort (), then at the most unexpected moment ThreadAbortException may be thrown and destroy so carefully maintained consistency of the state of the collections. In the aforementioned ConcurrentQueue (and in a bunch of other places) this problem is solved in a very extravagant way:

try
{
}
finally
{
	//	Insert Thread.Abort()-safe code here
}

The case is quite rare, but just in case it would be nice to defend against it all the same. Maybe someday I’ll do it all the same.

Secondly, for happiness, at least one more asynchronous collection is missing: a prioritized queue. And, unlike BlockingCollection, a trivial implementation using TakeFromAny () is not visible on the horizon. To be continued?..

PS


For those who heroically read to the end:

Nuget package: www.nuget.org/packages/AsyncCollections
Source code: github.com/HellBrick/AsyncCollections

If there is criticism, bugs, wishes or just common thoughts - write, I will be glad to discuss.

Also popular now: