Simple Lock-Free Objects for Two Threads
There have been many articles on universal Lock-free objects, however, for some special cases they are unnecessarily cumbersome. My case was just this: it was necessary to organize a one-way transmission of information from one stream to another. The main thread is started by the worker, after which he can only request his stop and can no longer manage it. In turn, the worker thread can notify the main one of its current state (progress of execution), as well as send intermediate results of execution. It turns out that only data transfer from the worker to the main stream is required.
Of course, perhaps I invented a bicycle or, worse, a bicycle with glitches. Therefore, comments and criticism are very welcome!
The state of our workflow is represented as a class. At the same time, the main thread is not obliged to always take the data stored in the state object (for example, it doesn’t matter if the main thread skips the intermediate value of the progress, it is important for it to get the latest current one).
To implement lock-free state transfer, we need three instances of it (different objects of the same class):
The idea is this: a workflow has free access to the WriteItem object. When all the data is saved, the InterlockedExchange operation is performed with the object in CurrentItem, after which the main thread is somehow notified of the readiness of the new state (in my example, the usual PostMessage was used). The main thread in the notification handler performs the InterlockedExchange operation of the CurrentItem object with the ReadItem object, after which it can freely read data from ReadItem.
It turns out such a "bubble": status data appears in WriteItem and then "pops up" through CurrentItem in ReadItem. By the way, I did not come up with a normal name for the base class of such a structure, so I simply called TLockFreeWorkState (maybe someone will have a better idea).
There is one caveat: the main thread can apply for the current state at any time. If we will always perform InterlockedExchange, then we will alternately return the current and previous state.
The regular Newest flag in the class will help us prevent this. When writing state, the workflow always sets WriteItem.Newest: = True, and after InterlockedExchange this flag is in CurrentItem. The main thread at the beginning checks CurrentItem.Newest and, only if it is True, does InterlockedExchange and then ReadItem.Newest immediately resets it to False. I thought it safe to read CurrentItem.Newest from the main thread, but correct me if I’m not right.
Now all this is in the form of a simplified code (the ghost of types is omitted for greater clarity):
In some ways, the approach is similar, but for implementation we will initially need only one object, but two links to it:
Initially, a single instance of TLockFreeWorkQueue is created and written to the ReadQueue and WriteQueue variables. The class is a circular buffer and has the following description:
where QueueCapacity is some constant (greater than zero) that determines the length of the ring buffer.
When an item is added to the queue, the workflow executes the InterlockedExchangeComparePointer of the WriteQueue.Items [Tail] element. In this case, the element is compared with Nil and, if successful, the element to be added is written to it. If the operation is successful, the Tail value is increased by 1 and reset to 0 if QueueCapacity is reached. We can freely operate with Tail, since only a worker thread (writer thread) has access to this variable. Also, after this, the worker thread should notify the main one that the items have appeared in the queue. If the operation failed, then this means that the queue is full, but more on that later.
The main thread, upon notification from the worker, begins a cycle of reading elements from the queue (in fact, reading can begin at any time). To retrieve an element, InterlockedExchangePointer is called for the ReadQueue.Items [Head] element, where the Nil value is written. If the extracted item is not Nil, then the Head value is increased by 1 and reset to 0 if QueueCapacity is reached.
Now let's deal with the case of buffer overflow. For new elements, we may well create a new queue object and continue to write to it, so that this object can be found to the reader thread, we must pass a link to it in the current filled queue object. To do this, add an additional NextQueue field to the class:
Now, if when writing the InterlockedExchangeComparePointer element it returns Nil (the queue is full), we create a new NewWriteQueue queue object: TLockFreeWorkQueue, write the element to be added to it, execute InterlockedExchangePointer with the WriteQueue.NextQueue variable, and finally save NewWriteQueue in the WriteQue variable. Thus, after this operation, the values in ReadQueue and WriteQueue will already refer to different objects.
In the main thread, we need to add empty queue processing. If upon reading InterlockedExchangePointer for the ReadQueue.Items [Head] element returns Nil, then we need to additionally check the NextQueue field, for which we also perform InterlockedExchangePointer (ReadQueue.NextQueue, Nil). If non-Nil is returned, then save the object in NewReadQueue, delete the current ReadQueue, and set this variable to NewReadQueue.
Here is the simplified code for adding an item to a queue:
and extracting an item from the queue:
In this code, I may be somewhat safe. I'm not sure that for operations with the NextQueue field you generally need to use InterlockedExchangePointer, it may be safe to perform direct read and write.
Working and combed code, along with a simple console example, can be viewed under the spoiler.
In a normal situation, when an element appears in the queue, it should be retrieved by the main stream as soon as possible. However, to test the queue overflow, I added the TWorkThread.FDebugReadQueue field, which, when set to False, prevents the main thread from reading from the queue (in the TWorkThread.Execute method, the constant TestQueueCountToFlush = 10 is introduced, which allows the main thread to read only after 10 added elements).
Unfortunately, the test case is too simple and does not generate read / write collisions between streams when the stream is switched inside the read / write service functions. But here I am not sure if it is possible to check all the bottlenecks of the algorithm at all and what the code needs to be turned for.
Of course, perhaps I invented a bicycle or, worse, a bicycle with glitches. Therefore, comments and criticism are very welcome!
State object
The state of our workflow is represented as a class. At the same time, the main thread is not obliged to always take the data stored in the state object (for example, it doesn’t matter if the main thread skips the intermediate value of the progress, it is important for it to get the latest current one).
To implement lock-free state transfer, we need three instances of it (different objects of the same class):
var
ReadItem: TLockFreeWorkState;
CurrentItem: TLockFreeWorkState;
WriteItem: TLockFreeWorkState;
The idea is this: a workflow has free access to the WriteItem object. When all the data is saved, the InterlockedExchange operation is performed with the object in CurrentItem, after which the main thread is somehow notified of the readiness of the new state (in my example, the usual PostMessage was used). The main thread in the notification handler performs the InterlockedExchange operation of the CurrentItem object with the ReadItem object, after which it can freely read data from ReadItem.
It turns out such a "bubble": status data appears in WriteItem and then "pops up" through CurrentItem in ReadItem. By the way, I did not come up with a normal name for the base class of such a structure, so I simply called TLockFreeWorkState (maybe someone will have a better idea).
There is one caveat: the main thread can apply for the current state at any time. If we will always perform InterlockedExchange, then we will alternately return the current and previous state.
The regular Newest flag in the class will help us prevent this. When writing state, the workflow always sets WriteItem.Newest: = True, and after InterlockedExchange this flag is in CurrentItem. The main thread at the beginning checks CurrentItem.Newest and, only if it is True, does InterlockedExchange and then ReadItem.Newest immediately resets it to False. I thought it safe to read CurrentItem.Newest from the main thread, but correct me if I’m not right.
Now all this is in the form of a simplified code (the ghost of types is omitted for greater clarity):
type
TLockFreeWorkState = class
public
Newest: Boolean;
end;
function Read(var CurrentItem, ReadItem: TLockFreeWorkState): Boolean;
begin
if CurrentItem.Newest then begin
ReadItem := InterlockedExchangePointer(CurrentItem, ReadItem);
ReadItem.Newest := False;
Result := True;
end else
Result := False;
end;
procedure Write(var CurrentItem, WriteItem: TLockFreeWorkState);
begin
WriteItem.Newest := True;
WriteItem := InterlockedExchangePointer(CurrentItem, WriteItem);
end;
Queue Object
In some ways, the approach is similar, but for implementation we will initially need only one object, but two links to it:
var
ReadQueue: TLockFreeWorkQueue;
WriteQueue: TLockFreeWorkQueue;
Initially, a single instance of TLockFreeWorkQueue is created and written to the ReadQueue and WriteQueue variables. The class is a circular buffer and has the following description:
TLockFreeWorkQueue = class
public
Head: Integer;
Tail: Integer;
Items: array[0..QueueCapacity - 1] of TObject;
end;
where QueueCapacity is some constant (greater than zero) that determines the length of the ring buffer.
When an item is added to the queue, the workflow executes the InterlockedExchangeComparePointer of the WriteQueue.Items [Tail] element. In this case, the element is compared with Nil and, if successful, the element to be added is written to it. If the operation is successful, the Tail value is increased by 1 and reset to 0 if QueueCapacity is reached. We can freely operate with Tail, since only a worker thread (writer thread) has access to this variable. Also, after this, the worker thread should notify the main one that the items have appeared in the queue. If the operation failed, then this means that the queue is full, but more on that later.
The main thread, upon notification from the worker, begins a cycle of reading elements from the queue (in fact, reading can begin at any time). To retrieve an element, InterlockedExchangePointer is called for the ReadQueue.Items [Head] element, where the Nil value is written. If the extracted item is not Nil, then the Head value is increased by 1 and reset to 0 if QueueCapacity is reached.
Now let's deal with the case of buffer overflow. For new elements, we may well create a new queue object and continue to write to it, so that this object can be found to the reader thread, we must pass a link to it in the current filled queue object. To do this, add an additional NextQueue field to the class:
TLockFreeWorkQueue = class
public
Head: Integer;
Tail: Integer;
Items: array[0..QueueCapacity - 1] of TObject;
NextQueue: TLockFreeWorkQueue;
end;
Now, if when writing the InterlockedExchangeComparePointer element it returns Nil (the queue is full), we create a new NewWriteQueue queue object: TLockFreeWorkQueue, write the element to be added to it, execute InterlockedExchangePointer with the WriteQueue.NextQueue variable, and finally save NewWriteQueue in the WriteQue variable. Thus, after this operation, the values in ReadQueue and WriteQueue will already refer to different objects.
In the main thread, we need to add empty queue processing. If upon reading InterlockedExchangePointer for the ReadQueue.Items [Head] element returns Nil, then we need to additionally check the NextQueue field, for which we also perform InterlockedExchangePointer (ReadQueue.NextQueue, Nil). If non-Nil is returned, then save the object in NewReadQueue, delete the current ReadQueue, and set this variable to NewReadQueue.
Here is the simplified code for adding an item to a queue:
procedure AddQueueItem(var WriteQueue: TLockFreeWorkQueue; Item: TObject);
var
NewWriteQueue: TLockFreeWorkQueue;
begin
if InterlockedCompareExchangePointer(WriteQueue.Items[WriteQueue.Tail]), Item, Nil) = Nil then begin
// Added successfully
Inc(WriteQueue.Tail);
if WriteQueue.Tail = QueueCapacity then
WriteQueue.Tail := 0;
end else begin
// WriteQueue full. Create new chained queue.
NewWriteQueue := TLockFreeWorkQueue.Create;
NewWriteQueue.Items[0] := Item;
Inc(NewWriteQueue.Tail);
if NewWriteQueue.Tail = QueueCapacity then // Check single-item queue
NewWriteQueue.Tail := 0;
InterlockedExchangePointer(WriteQueue.NextQueue, NewWriteQueue);
WriteQueue := NewWriteQueue;
end;
end;
and extracting an item from the queue:
function ExtractQueueItem(var ReadQueue: TLockFreeWorkQueue): TObject;
var
NewReadQueue: TLockFreeWorkQueue;
begin
Result := Nil;
repeat
Result := InterlockedExchangePointer(ReadQueue.Items[ReadQueue.Head], Nil);
if Result = Nil then begin
// No new items in this queue. Check next queue is available
NewReadQueue := InterlockedExchangePointer(ReadQueue.NextQueue, Nil);
if Assigned(NewReadQueue) then begin
ReadQueue.Free;
ReadQueue := NewReadQueue;
end else
// No new item in queue
Exit;
end;
until Result <> Nil;
// Item extracted successfully
Inc(ReadQueue.Head);
if ReadQueue.Head = QueueCapacity then
ReadQueue.Head := 0;
end;
In this code, I may be somewhat safe. I'm not sure that for operations with the NextQueue field you generally need to use InterlockedExchangePointer, it may be safe to perform direct read and write.
Test case
Working and combed code, along with a simple console example, can be viewed under the spoiler.
Test case
program LockFreeTest;
{$APPTYPE CONSOLE}
{$R *.res}
uses
SysUtils, Classes, Windows, Messages;
// Lock-free work thread state ////////////////////////////////////////////////
type
TLockFreeWorkState = class
protected
FNewest: Boolean;
public
class function Read(var CurrentItem, ReadItem): Boolean;
class procedure Write(var CurrentItem, WriteItem);
property Newest: Boolean read FNewest write FNewest;
end;
class function TLockFreeWorkState.Read(var CurrentItem, ReadItem): Boolean;
begin
if TLockFreeWorkState(CurrentItem).Newest then begin
pointer(ReadItem) := InterlockedExchangePointer(pointer(CurrentItem), pointer(ReadItem));
TLockFreeWorkState(ReadItem).Newest := False;
Result := True;
end else
Result := False;
end;
class procedure TLockFreeWorkState.Write(var CurrentItem, WriteItem);
begin
TLockFreeWorkState(WriteItem).Newest := True;
pointer(WriteItem) := InterlockedExchangePointer(pointer(CurrentItem), pointer(WriteItem));
end;
// Lock-free work thread queue ////////////////////////////////////////////////
type
TLockFreeWorkQueue = class
public const
QueueCapacity = 4; // Small value for test purposes
public type
TLockFreeWorkQueueItems = array[0..QueueCapacity - 1] of TObject;
public
Head: Integer; // Access from main thread only
Tail: Integer; // Access from work thread only
NextQueue: TLockFreeWorkQueue;
Items: TLockFreeWorkQueueItems;
public
destructor Destroy; override;
class procedure Add(var WriteQueue: TLockFreeWorkQueue; Item: TObject); static;
class function Extract(var ReadQueue: TLockFreeWorkQueue): TObject; static;
end;
destructor TLockFreeWorkQueue.Destroy;
var
i: Integer;
begin
// Free non-extracted items
for i := 0 to QueueCapacity - 1 do
Items[i].Free;
// Free NextQueue if exists
NextQueue.Free;
inherited;
end;
class procedure TLockFreeWorkQueue.Add(var WriteQueue: TLockFreeWorkQueue; Item: TObject);
var
NewWriteQueue: TLockFreeWorkQueue;
begin
// Check item assigned (can't add empty items)
if not Assigned(Item) or not Assigned(WriteQueue) then
Exit;
if InterlockedCompareExchangePointer(pointer(WriteQueue.Items[WriteQueue.Tail]), pointer(Item), Nil) = Nil then begin
// Added successfully
Inc(WriteQueue.Tail);
if WriteQueue.Tail = QueueCapacity then
WriteQueue.Tail := 0;
end else begin
// WriteQueue full. Create new chained queue.
NewWriteQueue := TLockFreeWorkQueue.Create;
NewWriteQueue.Items[0] := Item;
Inc(NewWriteQueue.Tail);
if NewWriteQueue.Tail = QueueCapacity then // Check single-item queue
NewWriteQueue.Tail := 0;
InterlockedExchangePointer(pointer(WriteQueue.NextQueue), NewWriteQueue);
WriteQueue := NewWriteQueue;
end;
end;
class function TLockFreeWorkQueue.Extract(var ReadQueue: TLockFreeWorkQueue): TObject;
var
NewReadQueue: TLockFreeWorkQueue;
begin
Result := Nil;
if not Assigned(ReadQueue) then
Exit;
repeat
Result := InterlockedExchangePointer(pointer(ReadQueue.Items[ReadQueue.Head]), Nil);
if Result = Nil then begin
// No new items in this queue. Check next queue is available
NewReadQueue := InterlockedExchangePointer(pointer(ReadQueue.NextQueue), Nil);
if Assigned(NewReadQueue) then begin
ReadQueue.Free;
ReadQueue := NewReadQueue;
end else
// No new item in queue
Exit;
end;
until Result <> Nil;
// Item extracted successfully
Inc(ReadQueue.Head);
if ReadQueue.Head = QueueCapacity then
ReadQueue.Head := 0;
end;
// Test work thread ///////////////////////////////////////////////////////////
const
WM_MAINNOTIFY = WM_USER + 1;
type
TWorkThreadState = class(TLockFreeWorkState)
public
Progress: Integer;
end;
TWorkThreadQueueItem = class
public
ItemData: Integer;
end;
TWorkThread = class(TThread)
protected
FMainHandle: THandle;
FMainNotified: Integer;
// State fields
FStateRead: TWorkThreadState;
FStateCurrent: TWorkThreadState;
FStateWrite: TWorkThreadState;
// Queue fields
FQueueRead: TLockFreeWorkQueue;
FQueueWrite: TLockFreeWorkQueue;
// Debug (test) fiels
FDebugReadQueue: Boolean;
procedure Execute; override;
procedure SetState;
procedure AddQueueItem(Item: TWorkThreadQueueItem);
procedure NotifyMain;
public
constructor Create(CreateSuspended: Boolean);
destructor Destroy; override;
function GetState: TWorkThreadState;
function ExtractQueueItem: TWorkThreadQueueItem;
procedure NotificationProcessed;
property MainHandle: THandle read FMainHandle;
end;
constructor TWorkThread.Create(CreateSuspended: Boolean);
begin
inherited Create(CreateSuspended);
// State objects
FStateRead := TWorkThreadState.Create;
FStateCurrent := TWorkThreadState.Create;
FStateWrite := TWorkThreadState.Create;
// Queue objects
FQueueRead := TLockFreeWorkQueue.Create;
FQueueWrite := FQueueRead;
end;
destructor TWorkThread.Destroy;
begin
inherited;
FStateRead.Free;
FStateCurrent.Free;
FStateWrite.Free;
// Always destroy read queue only: only read queue may have NextQueue reference
FQueueRead.Free;
end;
procedure TWorkThread.NotifyMain;
begin
if InterlockedExchange(FMainNotified, 1) = 0 then
PostMessage(FMainHandle, WM_MAINNOTIFY, 0, 0);
end;
procedure TWorkThread.NotificationProcessed;
begin
InterlockedExchange(FMainNotified, 0);
end;
function TWorkThread.GetState: TWorkThreadState;
begin
TLockFreeWorkState.Read(FStateCurrent, FStateRead);
Result := FStateRead;
end;
procedure TWorkThread.SetState;
begin
TLockFreeWorkState.Write(FStateCurrent, FStateWrite);
end;
procedure TWorkThread.AddQueueItem(Item: TWorkThreadQueueItem);
begin
TLockFreeWorkQueue.Add(FQueueWrite, Item);
end;
function TWorkThread.ExtractQueueItem: TWorkThreadQueueItem;
begin
Result := TWorkThreadQueueItem(TLockFreeWorkQueue.Extract(FQueueRead));
end;
procedure TWorkThread.Execute;
const
TestQueueCountToFlush = 10;
var
ProgressIndex: Integer;
TestQueueCount: Integer;
Item: TWorkThreadQueueItem;
begin
TestQueueCount := 0;
ProgressIndex := 0;
while not Terminated do begin
// Send current progress
if FStateWrite.Progress <> ProgressIndex then begin
// All state object fields initialization required
FStateWrite.Progress := ProgressIndex;
SetState;
NotifyMain;
end;
// Emulate calculation
Sleep(500);
Inc(ProgressIndex);
// Put intermediate result in queue
Item := TWorkThreadQueueItem.Create;
Item.ItemData := ProgressIndex;
AddQueueItem(Item);
Inc(TestQueueCount);
if TestQueueCount = TestQueueCountToFlush then begin
TestQueueCount := 0;
// Allow queue reading from main thread
FDebugReadQueue := True;
NotifyMain;
end;
end;
end;
// Test application ///////////////////////////////////////////////////////////
type
TMain = class
protected
FHandle: THandle;
FThread: TWorkThread;
procedure WndProc(var Message: TMessage);
public
constructor Create;
destructor Destroy; override;
function Run: Boolean;
property Handle: THandle read FHandle;
end;
var
Main: TMain;
constructor TMain.Create;
begin
FHandle := AllocateHWnd(WndProc);
FThread := TWorkThread.Create(True);
FThread.FMainHandle := Handle;
FThread.Start;
writeln('Work thread started');
end;
destructor TMain.Destroy;
begin
writeln('Stopping work thread...');
FThread.Free;
writeln('Work thread stopped');
DeallocateHWnd(FHandle);
inherited;
end;
procedure TMain.WndProc(var Message: TMessage);
var
State: TWorkThreadState;
Item: TWorkThreadQueueItem;
begin
if Message.Msg = WM_MAINNOTIFY then begin
FThread.NotificationProcessed;
State := FThread.GetState;
// Show current progress
writeln('Work progress ', State.Progress);
// Check queue reading allowed
if FThread.FDebugReadQueue then begin
writeln('Read queue...');
repeat
Item := FThread.ExtractQueueItem;
try
if Assigned(Item) then
writeln('Queue item: ', Item.ItemData);
finally
Item.Free;
end;
until not Assigned(Item);
FThread.FDebugReadQueue := False;
end;
end else
Message.Result := DefWindowProc(Handle, Message.Msg, Message.wParam, Message.lParam);
end;
function TMain.Run: Boolean;
var
Msg: TMsg;
begin
writeln('Start message loop (Ctrl+C to break)');
Result := True;
while Result do
case Integer(GetMessage(Msg, Handle, 0, 0)) of
0:
Break;
-1:
Result := False;
else
begin
TranslateMessage(Msg);
DispatchMessage(Msg);
end;
end;
end;
// Console event handler //////////////////////////////////////////////////////
function ConsoleEventProc(CtrlType: DWORD): BOOL; stdcall;
begin
Result := False;
case CtrlType of
CTRL_CLOSE_EVENT,
CTRL_C_EVENT,
CTRL_BREAK_EVENT:
if Assigned(Main) then begin
PostMessage(Main.Handle, WM_QUIT, 0, 0);
Result := True;
end;
end;
end;
// Main procedure /////////////////////////////////////////////////////////////
begin
{$IFDEF DEBUG}
ReportMemoryLeaksOnShutdown := True;
{$ENDIF}
try
SetConsoleCtrlHandler(@ConsoleEventProc, True);
Main := TMain.Create;
try
Main.Run;
finally
FreeAndNil(Main);
end;
except
on E: Exception do
Writeln(E.ClassName, ': ', E.Message);
end;
end.
In a normal situation, when an element appears in the queue, it should be retrieved by the main stream as soon as possible. However, to test the queue overflow, I added the TWorkThread.FDebugReadQueue field, which, when set to False, prevents the main thread from reading from the queue (in the TWorkThread.Execute method, the constant TestQueueCountToFlush = 10 is introduced, which allows the main thread to read only after 10 added elements).
Unfortunately, the test case is too simple and does not generate read / write collisions between streams when the stream is switched inside the read / write service functions. But here I am not sure if it is possible to check all the bottlenecks of the algorithm at all and what the code needs to be turned for.