Concurrency structures in .net. ConcurrentQueue from the inside

    ConcurrentQueue can be classified as lock-free competitive data structures. There are no locks in its implementation (lock, Mutex ...) and it is implemented using:
    - the classic function CompareExchange;
    - SpinWait
    - volatile (used as a memory-barrier)
    based ConcurrentQueue laid ring-buffer structure ( ring buffer ).


    The ring buffer is ideal for implementing a queue data structure (FIFO).

    It is based on an array of data and 2 pointers - start (start) and end (end).

    There are two main operations:
    1. Push - add to the end. When adding new elements to the buffer, the end counter is incremented by 1 and a new element is written in its place. If we "rested" on the upper boundary of the array, then the end value is reset ("goes" to the beginning of the array) and the elements begin to be written to the beginning of the array. Writing is possible until the end index reaches the start index.
    2. Pop - select items first. The selection of elements occurs from the start element, sequentially increasing its value until it reaches end. Sampling is possible until the start index reaches the end index.

    Block ring buffer

    ConcurrentQueue is a bit more complex than a classic ring buffer. In its implementation, the concept of a segment (Segment) is used. ConcurrentQueue consists of a linked list of (unidirectional) segments. The segment size is 32.
    private class Segment {
        volatile VolatileBool[] m_state;
        volatile T[] m_array;
        volatile int m_low;
        volatile int m_high;
        volatile Segment m_next;

    Initially, 1 segment is created in ConcurrentQueue.

    As necessary, new segments are added to it on the right.

    As a result, a unidirectional linked list is obtained. The beginning of the linked list is m_head, the end is m_tail. Limitations:
    • m_head segment can have empty cells only on the left
    • m_tail segment can have empty cells only on the right
    • if m_head = m_tail then empty cells can be either left or right.
    • In segments, there cannot be empty cells between m_head and m_tail.

    Adding an item (Enqueue)

    Below is an example algorithm for adding elements to a segment.
    • M_high increases by 1
    • A new value is written to the m_array array with index m_high.

    index = Interlocked.Increment(ref this.m_high);
    if (index <= 31)
         m_array[index] = value;
         m_state[index].m_value = true;

    m_state - an array of the state of the cells, if true - the element is written to the cell, if false - not yet. In fact, this is a kind of “Commit” record. It is needed so that between the operations of increasing the Interlocked.Increment index and writing the value m_array [index] = value, the element should not be read by another thread. Then the data will be read after execution:
    while (!this.m_state[index].m_value)

    Adding a new segment (Segment.Grow)

    As soon as the m_high of the current segment becomes 31, writing to the current segment is terminated and a new segment is created (the current segments continue to live their lives).
    m_next = new ConcurrentQueue.Segment(this.m_index + 1L, this.m_source);
    m_source.m_tail = this.m_next;

    m_next - link to the next segment
    m_source.m_tail - link to the last segment of the list of segments.

    Element Selection (TryDequeue)

    The selection of elements from the queue is based on two basic functionalities:
    • Interlocked.CompareExchange is an atomic operation that writes the value of a variable if its value is equal to the compared value.
      public static extern int CompareExchange(ref int location1, int value, int comparand);

    • SpinWait, from MSDN
      System.Threading.SpinWait is a lightweight synchronization type that you can use in low-level scenarios to avoid the expensive context switches and kernel transitions that are required for kernel events. On multicore computers, when a resource is not expected to be held for long periods of time, it can be more efficient for a waiting thread to spin in user mode for a few dozen or a few hundred cycles, and then retry to acquire the resource . If the resource is available after spinning, then you have saved several thousand cycles. If the resource is still not available, then you have spent only a few cycles and can still enter a kernel-based wait. This spinning-then-waiting combination is sometimes referred to as a two-phase wait operation.

    An example algorithm for the operation of the sample:
    1. Get m_low
    2. Increase m_low by 1 using CompareExchange
    3. If m_low is greater than 31 - go to the next segment
    4. Wait for the commit (m_state [low] .m_value) of the element with index m_low.

    SpinWait spinWait1 = new SpinWait();
    int low = this.Low;
    if (Interlocked.CompareExchange(ref this.m_low, low + 1, low) == low)
       SpinWait spinWait2 = new SpinWait();
       while (!this.m_state[low].m_value)
       result = this.m_array[low];

    Count vs IsEmpty

    IsEmpty Code:
    ConcurrentQueue.Segment segment = this.m_head;
    if (!segment.IsEmpty)
       return false;
    if (segment.Next == null)
       return true;
    SpinWait spinWait = new SpinWait();
    for (; segment.IsEmpty; segment = this.m_head)
       if (segment.Next == null)
          return true;
    return false;

    Those. in fact, this is to find the first non-empty segment. If it is found, the queue is not empty.

    Count Code:
    ConcurrentQueue.Segment head;
    ConcurrentQueue.Segment tail;
    int headLow;
    int tailHigh;
    this.GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);
    if (head == tail)
       return tailHigh - headLow + 1;
    return 32 - headLow + 32 * (int) (tail.m_index - head.m_index - 1L) + (tailHigh + 1);

    In fact, it searches for the first and last segment and calculates the number of elements based on these two segments.
    Conclusion - the Count operation will take more processor time than IsEmpty.

    Snapshot & GetEnumerator

    The ConcurrentQueue framework supports snapshot technology to provide a complete set of elements.
    Integer data returns:
    • Toarray
    • ICollection.CopyTo
    • Getenumerator

    The above operators also work without locks, and integrity is achieved by introducing a counter
    volatile int m_numSnapshotTakers
    within the entire queue - the number of operations that are working with snapshots at the current time. Those. Each operation that wants to get a complete picture must implement the following code:
    Interlocked.Increment(ref this.m_numSnapshotTakers);
    ...//Итератор по всем сегментам
    Interlocked.Decrement(ref this.m_numSnapshotTakers);

    In addition to this, only the Dequeue operation “writes” the changes for us, therefore, only it checks the need to remove the link to the queue element:
    if (this.m_source.m_numSnapshotTakers <= 0)
       this.m_array[low] = default (T);

    Also popular now: