Simple Lock-Free Objects for Two Threads

There have been many articles on universal Lock-free objects, however, for some special cases they are unnecessarily cumbersome. My case was just this: it was necessary to organize a one-way transmission of information from one stream to another. The main thread is started by the worker, after which he can only request his stop and can no longer manage it. In turn, the worker thread can notify the main one of its current state (progress of execution), as well as send intermediate results of execution. It turns out that only data transfer from the worker to the main stream is required.

Of course, perhaps I invented a bicycle or, worse, a bicycle with glitches. Therefore, comments and criticism are very welcome!

State object


The state of our workflow is represented as a class. At the same time, the main thread is not obliged to always take the data stored in the state object (for example, it doesn’t matter if the main thread skips the intermediate value of the progress, it is important for it to get the latest current one).

To implement lock-free state transfer, we need three instances of it (different objects of the same class):

var
  ReadItem: TLockFreeWorkState;
  CurrentItem: TLockFreeWorkState;
  WriteItem: TLockFreeWorkState;

The idea is this: a workflow has free access to the WriteItem object. When all the data is saved, the InterlockedExchange operation is performed with the object in CurrentItem, after which the main thread is somehow notified of the readiness of the new state (in my example, the usual PostMessage was used). The main thread in the notification handler performs the InterlockedExchange operation of the CurrentItem object with the ReadItem object, after which it can freely read data from ReadItem.

It turns out such a "bubble": status data appears in WriteItem and then "pops up" through CurrentItem in ReadItem. By the way, I did not come up with a normal name for the base class of such a structure, so I simply called TLockFreeWorkState (maybe someone will have a better idea).

There is one caveat: the main thread can apply for the current state at any time. If we will always perform InterlockedExchange, then we will alternately return the current and previous state.

The regular Newest flag in the class will help us prevent this. When writing state, the workflow always sets WriteItem.Newest: = True, and after InterlockedExchange this flag is in CurrentItem. The main thread at the beginning checks CurrentItem.Newest and, only if it is True, does InterlockedExchange and then ReadItem.Newest immediately resets it to False. I thought it safe to read CurrentItem.Newest from the main thread, but correct me if I’m not right.

Now all this is in the form of a simplified code (the ghost of types is omitted for greater clarity):

type
  TLockFreeWorkState = class
  public
    Newest: Boolean;
  end;
function Read(var CurrentItem, ReadItem: TLockFreeWorkState): Boolean;
begin
  if CurrentItem.Newest then begin
    ReadItem := InterlockedExchangePointer(CurrentItem, ReadItem);
    ReadItem.Newest := False;
    Result := True;
  end else
    Result := False;
end;
procedure Write(var CurrentItem, WriteItem: TLockFreeWorkState);
begin
  WriteItem.Newest := True;
  WriteItem := InterlockedExchangePointer(CurrentItem, WriteItem);
end;

Queue Object


In some ways, the approach is similar, but for implementation we will initially need only one object, but two links to it:

var
  ReadQueue: TLockFreeWorkQueue;
  WriteQueue: TLockFreeWorkQueue;

Initially, a single instance of TLockFreeWorkQueue is created and written to the ReadQueue and WriteQueue variables. The class is a circular buffer and has the following description:

  TLockFreeWorkQueue = class    
  public
    Head: Integer;
    Tail: Integer;
    Items: array[0..QueueCapacity - 1] of TObject;
  end;

where QueueCapacity is some constant (greater than zero) that determines the length of the ring buffer.

When an item is added to the queue, the workflow executes the InterlockedExchangeComparePointer of the WriteQueue.Items [Tail] element. In this case, the element is compared with Nil and, if successful, the element to be added is written to it. If the operation is successful, the Tail value is increased by 1 and reset to 0 if QueueCapacity is reached. We can freely operate with Tail, since only a worker thread (writer thread) has access to this variable. Also, after this, the worker thread should notify the main one that the items have appeared in the queue. If the operation failed, then this means that the queue is full, but more on that later.

The main thread, upon notification from the worker, begins a cycle of reading elements from the queue (in fact, reading can begin at any time). To retrieve an element, InterlockedExchangePointer is called for the ReadQueue.Items [Head] element, where the Nil value is written. If the extracted item is not Nil, then the Head value is increased by 1 and reset to 0 if QueueCapacity is reached.

Now let's deal with the case of buffer overflow. For new elements, we may well create a new queue object and continue to write to it, so that this object can be found to the reader thread, we must pass a link to it in the current filled queue object. To do this, add an additional NextQueue field to the class:

  TLockFreeWorkQueue = class    
  public
    Head: Integer;
    Tail: Integer;
    Items: array[0..QueueCapacity - 1] of TObject;
    NextQueue: TLockFreeWorkQueue;
  end;

Now, if when writing the InterlockedExchangeComparePointer element it returns Nil (the queue is full), we create a new NewWriteQueue queue object: TLockFreeWorkQueue, write the element to be added to it, execute InterlockedExchangePointer with the WriteQueue.NextQueue variable, and finally save NewWriteQueue in the WriteQue variable. Thus, after this operation, the values ​​in ReadQueue and WriteQueue will already refer to different objects.

In the main thread, we need to add empty queue processing. If upon reading InterlockedExchangePointer for the ReadQueue.Items [Head] element returns Nil, then we need to additionally check the NextQueue field, for which we also perform InterlockedExchangePointer (ReadQueue.NextQueue, Nil). If non-Nil is returned, then save the object in NewReadQueue, delete the current ReadQueue, and set this variable to NewReadQueue.

Here is the simplified code for adding an item to a queue:

procedure AddQueueItem(var WriteQueue: TLockFreeWorkQueue; Item: TObject);
var
  NewWriteQueue: TLockFreeWorkQueue;
begin
  if InterlockedCompareExchangePointer(WriteQueue.Items[WriteQueue.Tail]), Item, Nil) = Nil then begin
    // Added successfully
    Inc(WriteQueue.Tail);
    if WriteQueue.Tail = QueueCapacity then
      WriteQueue.Tail := 0;
  end else begin
    // WriteQueue full. Create new chained queue.
    NewWriteQueue := TLockFreeWorkQueue.Create;
    NewWriteQueue.Items[0] := Item;
    Inc(NewWriteQueue.Tail);
    if NewWriteQueue.Tail = QueueCapacity then // Check single-item queue
      NewWriteQueue.Tail := 0;
    InterlockedExchangePointer(WriteQueue.NextQueue, NewWriteQueue);
    WriteQueue := NewWriteQueue;
  end;
end;

and extracting an item from the queue:

function ExtractQueueItem(var ReadQueue: TLockFreeWorkQueue): TObject;
var
  NewReadQueue: TLockFreeWorkQueue;
begin
  Result := Nil;
  repeat
    Result := InterlockedExchangePointer(ReadQueue.Items[ReadQueue.Head], Nil);
    if Result = Nil then begin
      // No new items in this queue. Check next queue is available
      NewReadQueue := InterlockedExchangePointer(ReadQueue.NextQueue, Nil);
      if Assigned(NewReadQueue) then begin
        ReadQueue.Free;
        ReadQueue := NewReadQueue;
      end else
        // No new item in queue
        Exit;
    end;
  until Result <> Nil;
  // Item extracted successfully
  Inc(ReadQueue.Head);
  if ReadQueue.Head = QueueCapacity then
    ReadQueue.Head := 0;
end;

In this code, I may be somewhat safe. I'm not sure that for operations with the NextQueue field you generally need to use InterlockedExchangePointer, it may be safe to perform direct read and write.

Test case


Working and combed code, along with a simple console example, can be viewed under the spoiler.

Test case
program LockFreeTest;
{$APPTYPE CONSOLE}
{$R *.res}
uses
  SysUtils, Classes, Windows, Messages;
// Lock-free work thread state ////////////////////////////////////////////////
type
  TLockFreeWorkState = class
  protected
    FNewest: Boolean;
  public
    class function Read(var CurrentItem, ReadItem): Boolean;
    class procedure Write(var CurrentItem, WriteItem);
    property Newest: Boolean read FNewest write FNewest;
  end;
class function TLockFreeWorkState.Read(var CurrentItem, ReadItem): Boolean;
begin
  if TLockFreeWorkState(CurrentItem).Newest then begin
    pointer(ReadItem) := InterlockedExchangePointer(pointer(CurrentItem), pointer(ReadItem));
    TLockFreeWorkState(ReadItem).Newest := False;
    Result := True;
  end else
    Result := False;
end;
class procedure TLockFreeWorkState.Write(var CurrentItem, WriteItem);
begin
  TLockFreeWorkState(WriteItem).Newest := True;
  pointer(WriteItem) := InterlockedExchangePointer(pointer(CurrentItem), pointer(WriteItem));
end;
// Lock-free work thread queue ////////////////////////////////////////////////
type
  TLockFreeWorkQueue = class
  public const
    QueueCapacity = 4; // Small value for test purposes
  public type
    TLockFreeWorkQueueItems = array[0..QueueCapacity - 1] of TObject;
  public
    Head: Integer; // Access from main thread only
    Tail: Integer; // Access from work thread only
    NextQueue: TLockFreeWorkQueue;
    Items: TLockFreeWorkQueueItems;
  public
    destructor Destroy; override;
    class procedure Add(var WriteQueue: TLockFreeWorkQueue; Item: TObject); static;
    class function Extract(var ReadQueue: TLockFreeWorkQueue): TObject; static;
  end;
destructor TLockFreeWorkQueue.Destroy;
var
  i: Integer;
begin
  // Free non-extracted items
  for i := 0 to QueueCapacity - 1 do
    Items[i].Free;
  // Free NextQueue if exists
  NextQueue.Free;
  inherited;
end;
class procedure TLockFreeWorkQueue.Add(var WriteQueue: TLockFreeWorkQueue; Item: TObject);
var
  NewWriteQueue: TLockFreeWorkQueue;
begin
  // Check item assigned (can't add empty items)
  if not Assigned(Item) or not Assigned(WriteQueue) then
    Exit;
  if InterlockedCompareExchangePointer(pointer(WriteQueue.Items[WriteQueue.Tail]), pointer(Item), Nil) = Nil then begin
    // Added successfully
    Inc(WriteQueue.Tail);
    if WriteQueue.Tail = QueueCapacity then
      WriteQueue.Tail := 0;
  end else begin
    // WriteQueue full. Create new chained queue.
    NewWriteQueue := TLockFreeWorkQueue.Create;
    NewWriteQueue.Items[0] := Item;
    Inc(NewWriteQueue.Tail);
    if NewWriteQueue.Tail = QueueCapacity then // Check single-item queue
      NewWriteQueue.Tail := 0;
    InterlockedExchangePointer(pointer(WriteQueue.NextQueue), NewWriteQueue);
    WriteQueue := NewWriteQueue;
  end;
end;
class function TLockFreeWorkQueue.Extract(var ReadQueue: TLockFreeWorkQueue): TObject;
var
  NewReadQueue: TLockFreeWorkQueue;
begin
  Result := Nil;
  if not Assigned(ReadQueue) then
    Exit;
  repeat
    Result := InterlockedExchangePointer(pointer(ReadQueue.Items[ReadQueue.Head]), Nil);
    if Result = Nil then begin
      // No new items in this queue. Check next queue is available
      NewReadQueue := InterlockedExchangePointer(pointer(ReadQueue.NextQueue), Nil);
      if Assigned(NewReadQueue) then begin
        ReadQueue.Free;
        ReadQueue := NewReadQueue;
      end else
        // No new item in queue
        Exit;
    end;
  until Result <> Nil;
  // Item extracted successfully
  Inc(ReadQueue.Head);
  if ReadQueue.Head = QueueCapacity then
    ReadQueue.Head := 0;
end;
// Test work thread ///////////////////////////////////////////////////////////
const
  WM_MAINNOTIFY = WM_USER + 1;
type
  TWorkThreadState = class(TLockFreeWorkState)
  public
    Progress: Integer;
  end;
  TWorkThreadQueueItem = class
  public
    ItemData: Integer;
  end;
  TWorkThread = class(TThread)
  protected
    FMainHandle: THandle;
    FMainNotified: Integer;
    // State fields
    FStateRead: TWorkThreadState;
    FStateCurrent: TWorkThreadState;
    FStateWrite: TWorkThreadState;
    // Queue fields
    FQueueRead: TLockFreeWorkQueue;
    FQueueWrite: TLockFreeWorkQueue;
    // Debug (test) fiels
    FDebugReadQueue: Boolean;
    procedure Execute; override;
    procedure SetState;
    procedure AddQueueItem(Item: TWorkThreadQueueItem);
    procedure NotifyMain;
  public
    constructor Create(CreateSuspended: Boolean);
    destructor Destroy; override;
    function GetState: TWorkThreadState;
    function ExtractQueueItem: TWorkThreadQueueItem;
    procedure NotificationProcessed;
    property MainHandle: THandle read FMainHandle;
  end;
constructor TWorkThread.Create(CreateSuspended: Boolean);
begin
  inherited Create(CreateSuspended);
  // State objects
  FStateRead := TWorkThreadState.Create;
  FStateCurrent := TWorkThreadState.Create;
  FStateWrite := TWorkThreadState.Create;
  // Queue objects
  FQueueRead := TLockFreeWorkQueue.Create;
  FQueueWrite := FQueueRead;
end;
destructor TWorkThread.Destroy;
begin
  inherited;
  FStateRead.Free;
  FStateCurrent.Free;
  FStateWrite.Free;
  // Always destroy read queue only: only read queue may have NextQueue reference
  FQueueRead.Free;
end;
procedure TWorkThread.NotifyMain;
begin
  if InterlockedExchange(FMainNotified, 1) = 0 then
    PostMessage(FMainHandle, WM_MAINNOTIFY, 0, 0);
end;
procedure TWorkThread.NotificationProcessed;
begin
  InterlockedExchange(FMainNotified, 0);
end;
function TWorkThread.GetState: TWorkThreadState;
begin
  TLockFreeWorkState.Read(FStateCurrent, FStateRead);
  Result := FStateRead;
end;
procedure TWorkThread.SetState;
begin
  TLockFreeWorkState.Write(FStateCurrent, FStateWrite);
end;
procedure TWorkThread.AddQueueItem(Item: TWorkThreadQueueItem);
begin
  TLockFreeWorkQueue.Add(FQueueWrite, Item);
end;
function TWorkThread.ExtractQueueItem: TWorkThreadQueueItem;
begin
  Result := TWorkThreadQueueItem(TLockFreeWorkQueue.Extract(FQueueRead));
end;
procedure TWorkThread.Execute;
const
  TestQueueCountToFlush = 10;
var
  ProgressIndex: Integer;
  TestQueueCount: Integer;
  Item: TWorkThreadQueueItem;
begin
  TestQueueCount := 0;
  ProgressIndex := 0;
  while not Terminated do begin
    // Send current progress
    if FStateWrite.Progress <> ProgressIndex then begin
      // All state object fields initialization required
      FStateWrite.Progress := ProgressIndex;
      SetState;
      NotifyMain;
    end;
    // Emulate calculation
    Sleep(500);
    Inc(ProgressIndex);
    // Put intermediate result in queue
    Item := TWorkThreadQueueItem.Create;
    Item.ItemData := ProgressIndex;
    AddQueueItem(Item);
    Inc(TestQueueCount);
    if TestQueueCount = TestQueueCountToFlush then begin
      TestQueueCount := 0;
      // Allow queue reading from main thread
      FDebugReadQueue := True;
      NotifyMain;
    end;
  end;
end;
// Test application ///////////////////////////////////////////////////////////
type
  TMain = class
  protected
    FHandle: THandle;
    FThread: TWorkThread;
    procedure WndProc(var Message: TMessage);
  public
    constructor Create;
    destructor Destroy; override;
    function Run: Boolean;
    property Handle: THandle read FHandle;
  end;
var
  Main: TMain;
constructor TMain.Create;
begin
  FHandle := AllocateHWnd(WndProc);
  FThread := TWorkThread.Create(True);
  FThread.FMainHandle := Handle;
  FThread.Start;
  writeln('Work thread started');
end;
destructor TMain.Destroy;
begin
  writeln('Stopping work thread...');
  FThread.Free;
  writeln('Work thread stopped');
  DeallocateHWnd(FHandle);
  inherited;
end;
procedure TMain.WndProc(var Message: TMessage);
var
  State: TWorkThreadState;
  Item: TWorkThreadQueueItem;
begin
  if Message.Msg = WM_MAINNOTIFY then begin
    FThread.NotificationProcessed;
    State := FThread.GetState;
    // Show current progress
    writeln('Work progress ', State.Progress);
    // Check queue reading allowed
    if FThread.FDebugReadQueue then begin
      writeln('Read queue...');
      repeat
        Item := FThread.ExtractQueueItem;
        try
          if Assigned(Item) then
            writeln('Queue item: ', Item.ItemData);
        finally
          Item.Free;
        end;
      until not Assigned(Item);
      FThread.FDebugReadQueue := False;
    end;
  end else
    Message.Result := DefWindowProc(Handle, Message.Msg, Message.wParam, Message.lParam);
end;
function TMain.Run: Boolean;
var
  Msg: TMsg;
begin
  writeln('Start message loop (Ctrl+C to break)');
  Result := True;
  while Result do
    case Integer(GetMessage(Msg, Handle, 0, 0)) of
      0:
        Break;
      -1:
        Result := False;
      else
        begin
          TranslateMessage(Msg);
          DispatchMessage(Msg);
        end;
    end;
end;
// Console event handler //////////////////////////////////////////////////////
function ConsoleEventProc(CtrlType: DWORD): BOOL; stdcall;
begin
  Result := False;
  case CtrlType of
    CTRL_CLOSE_EVENT,
    CTRL_C_EVENT,
    CTRL_BREAK_EVENT:
      if Assigned(Main) then begin
        PostMessage(Main.Handle, WM_QUIT, 0, 0);
        Result := True;
      end;
  end;
end;
// Main procedure /////////////////////////////////////////////////////////////
begin
  {$IFDEF DEBUG}
  ReportMemoryLeaksOnShutdown := True;
  {$ENDIF}
  try
    SetConsoleCtrlHandler(@ConsoleEventProc, True);
    Main := TMain.Create;
    try
      Main.Run;
    finally
      FreeAndNil(Main);
    end;
  except
    on E: Exception do
      Writeln(E.ClassName, ': ', E.Message);
  end;
end.


In a normal situation, when an element appears in the queue, it should be retrieved by the main stream as soon as possible. However, to test the queue overflow, I added the TWorkThread.FDebugReadQueue field, which, when set to False, prevents the main thread from reading from the queue (in the TWorkThread.Execute method, the constant TestQueueCountToFlush = 10 is introduced, which allows the main thread to read only after 10 added elements).

Unfortunately, the test case is too simple and does not generate read / write collisions between streams when the stream is switched inside the read / write service functions. But here I am not sure if it is possible to check all the bottlenecks of the algorithm at all and what the code needs to be turned for.

Also popular now: