Windows Sockets, IOCP, and Delphi
Prologue
Recently, I was faced with the need to work effectively with sockets in a Windows application. The task is typical for a busy server. Only the implementation language, Delphi, will seem atypical here.
I want to describe a way of mass asynchronous work with a large number of sockets using I / O Completion Ports. Microsoft recommends using this technology in its literature. I think many people are familiar with it, but just in case, I’ll indicate a linkon MSDN. The essence of the technology is that the system organizes a highly efficient queue of events, and the program processes it from the thread pool, the size of which is selected by the number of computing cores. This approach has advantages with a large number of simultaneously performed asynchronous input-output operations for different endpoints. The finished source can (better) immediately look here . Not everything is perfect, but for experiments it will do.
Roadmap
I, in a sense, will adhere to the ideology of Node.Js in everything that relates to the organization of objects and input-output operations.
In the case of the server side, you will need to implement the following:
- Listening to a socket. Acceptance or rejection of new compounds.
- Tracking client socket close signal.
For the client, the first item on this list is not relevant, but you must implement an asynchronous connection to the server. In both classes there will be the ability to simultaneously read and write to one endpoint.
All created client and server socket instances will use one common message queue and one thread pool. This is necessary to be able to use both types of sockets in one application in an optimal way.
Implementation
Let's get started. To begin with, I note that in connection with a completely asynchronous event-based model of construction, I will implement not classes but interfaces. This is very convenient in this case, since the responsibility for the allocated memory is removed from the end programmer. Anyway, to track its use here in another way is either very costly or completely impossible. A lot of work should happen during module initialization.
- Creating lists of sockets of different types.
- Initialization of the socket subsystem.
- Create a message queue.
- Creating a pool to process the queue.
- Creating events for sockets.
- Creating streams that monitor socket events (for example, connecting a new client).
And so, the initialization section contains the following procedure, which implements the list point by point.
procedure Init;
var
WSAData: TWsaData;
i: Integer;
begin
gClients := TProtoStore.Create;
gListeners := TProtoStore.Create;
gServerClients := TProtoStore.Create;
if WSAStartup(MAKEWORD(2, 2), WSAData) <> 0 then
raise IOCPClientException.Create(sErrorInit_WSAtartup);
gIOCP := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, CPUCount * 2);
if gIOCP = INVALID_HANDLE_VALUE then
raise IOCPClientException.Create(sErrorInit_CreateIoCompletionPort);
for i := 1 to CPUCount * 2 do
begin
SetLength(gWorkers, Length(gWorkers) + 1);
gWorkers[Length(gWorkers) - 1] := TWorkerThread.Create();
end;
gListenerAcceptEvent := WSACreateEvent;
if gListenerAcceptEvent = WSA_INVALID_EVENT then
raise IOCPClientException.Create(sErrorInit_WSACreateEvent);
gServerClientsCloseEvent := WSACreateEvent;
if gServerClientsCloseEvent = WSA_INVALID_EVENT then
raise IOCPClientException.Create(sErrorInit_WSACreateEvent);
gClisentsConnectAndCloseEvents := WSACreateEvent;
if gClisentsConnectAndCloseEvents = WSA_INVALID_EVENT then
raise IOCPClientException.Create(sErrorInit_WSACreateEvent);
gClientSocketEventThread := TSocketEventThread.Create
(gClisentsConnectAndCloseEvents, gClients, ET_EVENT_SIGNALED);
gClientSocketEventThread.Start;
gServerClientsSocketEventThread := TSocketEventThread.Create
(gServerClientsCloseEvent, gServerClients, ET_EVENT_SIGNALED);
gServerClientsSocketEventThread.Start;
gServerSocketEventThread := TSocketEventThread.Create(gListenerAcceptEvent,
gListeners, ET_EVENT_SIGNALED);
gServerSocketEventThread.Start;
end;
The CreateIoCompletionPort function in this case creates a special message queue.
You can see that the same TSocketEventThread stream class is used to track events on sockets with different purposes. Streams of this class execute a procedure that expects socket events, and immediately queue messages (for each socket of the type that this stream serves) about the occurrence of an event.
procedure TSocketEventThread.WaitForClientsEvents;
var
WaitResult: DWORD;
const
TimeOut: DWORD = 100;
begin
WaitResult := WSAWaitForMultipleEvents(1, @fEvent, FALSE, TimeOut, FALSE);
if WaitResult = WSA_WAIT_FAILED then
raise IOCPClientException.Create
(sErrorWaitForClientsEvents_WSAWaitForMultipleEvents);
if WaitResult = WSA_WAIT_EVENT_0 then
begin
if not WSAResetEvent(fEvent) then
raise IOCPClientException.Create
(sErrorWaitForClientsEvents_WSAResetEvent);
fStore.Post(fKey);
end;
end;
Here is the method fStore.Post (fKey); just sends messages to the queue.
procedure TProtoStore.Post(CompletionKey: DWORD);
var
i: Integer;
begin
fLock.Enter;
try
for i := 0 to Length(ProtoArray) - 1 do
begin
ProtoArray[i]._AddRef;
if not PostQueuedCompletionStatus(gIOCP, 0, CompletionKey,
POverlapped(ProtoArray[i])) then
begin
ProtoArray[i]._Release;
raise IOCPClientException.Create(sErrorPost_PostQueuedCompletionStatus);
end;
end;
finally
fLock.Leave;
end;
end;
Of particular note is the use of objects with interfaces.
The _AddRef method is used to indicate the fact that the object is "in the queue" and should not be destroyed. (Later after processing _Release will be called). The PostQueuedCompletionStatus procedure directly queues the message.
The pool will process each message asynchronously.
To do this, he performs the following procedure.
procedure TWorkerThread.ProcessIOCP;
var
NumberOfBytes: DWORD;
CompletionKey: NativeUInt;
Overlapped: POverlapped;
Proto: TIOCPSocketProto;
begin
if not((not GetQueuedCompletionStatus(gIOCP, NumberOfBytes, CompletionKey,
Overlapped, INFINITE)) and (Overlapped = nil)) then
begin
if CompletionKey = ET_EVENT_SIGNALED then
begin
Proto := TIOCPSocketProto(Overlapped);
with Proto do
begin
IOCPProcessEventsProc();
_Release;
end
end
else if CompletionKey <> 0 then
begin
Proto := TIOCPSocketProto(CompletionKey);
if Proto.IOCPProcessIOProc(NumberOfBytes, Overlapped) then
Proto._Release;
end;
end
end;
The GetQueuedCompletionStatus procedure is used to receive a message from the queue. Next, it is determined whether this message is a completed I / O message or is this a message about an event that has occurred. Two methods are shown here to pass some information through the queue, in this case a link to a specific instance of the socket class.
Processing is unified for all types of sockets, this is achieved using inheritance from a common ancestor that contains common handlers, their redefinition is allowed.
Consider the mechanism for processing socket events.
procedure TIOCPSocketProto.IOCPProcessEventsProc();
var
WSAEvents: TWsaNetworkEvents;
AcceptedSocket: TSocket;
RemoteAddress: string;
begin
if fStateLock <> CLI_SOCKET_LOCK_CLOSED then
begin
fClosingLock.BeginRead;
try
if (fStateLock <> CLI_SOCKET_LOCK_CLOSED) then
if WSAEnumNetworkEvents(fSocket, 0, WSAEvents) <> SOCKET_ERROR then
begin
if ((WSAEvents.lNetworkEvents and FD_CONNECT) <> 0) then
begin
if 0 = WSAEvents.iErrorCode[FD_CONNECT_BIT] then
InterlockedExchange(fStateLock, CLI_SOCKET_LOCK_CONNECTED);
CallOnConnect;
end;
if ((WSAEvents.lNetworkEvents and FD_CLOSE) <> 0) and
(0 = WSAEvents.iErrorCode[FD_CLOSE_BIT]) then
CallOnClose;
if ((WSAEvents.lNetworkEvents and FD_ACCEPT) <> 0) and
(0 = WSAEvents.iErrorCode[FD_ACCEPT_BIT]) then
begin
AcceptedSocket := DoAccept(RemoteAddress);
if AcceptedSocket <> INVALID_SOCKET then
begin
fClientClass.Create(AcceptedSocket, fOnConnect, fOnClose,
RemoteAddress).Prepare;
end;
end;
end
finally
fClosingLock.EndRead;
end;
end;
end;
The TMultiReadExclusiveWriteSynchronizer class is interestingly applied here. It is used to prevent attempts to close the socket and destroy the object from another thread in the pool (fClosingLock.BeginRead). All operations with the socket are performed as read operations for this synchronization object, except for the creation and closing operations of the socket - they are write operations and therefore can be performed only with exclusive ownership of the resource.
In all other respects, working with sockets in this procedure is completely ordinary.
The only thing worth considering in this procedure is the connection of the new client to the server, the DoAccept method.
function TIOCPSocketProto.DoAccept(var RemoteAddress: string): TSocket;
var
addr: TSockAddr;
addrlen: Integer;
dwCallbackData: NativeUInt;
RemoteAddrLen: DWORD;
begin
dwCallbackData := NativeUInt(self);
addrlen := SizeOf(addr);
Result := WSAAccept(fSocket, @addr, @addrlen, ServerAcceptCallBack,
dwCallbackData);
if Result <> INVALID_SOCKET then
begin
SetLength(RemoteAddress, 255);
RemoteAddrLen := Length(RemoteAddress);
if WSAAddressToString(addr, addrlen, nil, PChar(@RemoteAddress[1]),
RemoteAddrLen) = SOCKET_ERROR then
raise IOCPClientException.Create(sErrorAccept_WSAAddressToString);
SetLength(RemoteAddress, RemoteAddrLen - 1)
end
end;
The key here is to use WSAAccept. This feature allows you to reject client connections so that the client actually receives the FD_CONNECT event.
This is the preferred way to organize so-called blacklists.
We go further. Consider the organization of input output. We will do this using the read operation as an example.
procedure TIOCPSocketProto.Read(Length: DWORD;
OnRead, OnReadProcess: TOnReadEvent);
var
Bytes, Flags: DWORD;
WsaBuf: TWsaBuf;
begin
fClosingLock.BeginRead;
try
if fStateLock = CLI_SOCKET_LOCK_CONNECTED then
begin
if InterlockedCompareExchange(fReadLock, IO_PROCESS, IO_IDLE) = IO_IDLE
then
begin
fOnRead := OnRead;
fOnReadProcess := OnReadProcess;
fReaded := 0;
fReadBufLength := Length;
fReadBuffer := nil;
GetMem(fReadBuffer, Length);
if fReadBuffer <> nil then
begin
Bytes := 0;
FillChar(fOverlappedRead, SizeOf(fOverlappedRead), 0);
WsaBuf.buf := fReadBuffer;
WsaBuf.len := fReadBufLength;
Flags := 0;
Bytes := 0;
_AddRef;
if (WSARecv(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedRead, nil)
= SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then
begin
FreeMem(fReadBuffer, Length);
InterlockedExchange(fReadLock, IO_IDLE);
_Release;
raise IOCPClientException.Create(sErrorRead_WSARecv);
end;
end
else
raise IOCPClientException.Create(sErrorRead_GetMem);
end
else
raise IOCPClientException.Create(sErrorRead_InProcess);
end
else
raise IOCPClientException.Create(sErrorRead_NotConnected);
finally
fClosingLock.EndRead;
end;
end;
Here I had to use interlocked locking, because it is very fast and satisfies the need to cut off an attempt to recall an I / O option. The memory is allocated under the buffer once in each operation. Next, reading from the socket in asynchronous mode is called. The object is also “marked” with AddRef to prevent it being deleted while in the queue. Upon completion of reading the packet, messages about this are automatically queued.
Consider what happens when a message about completed input / output is selected from the queue.
function TIOCPSocketProto.IOCPProcessIOProc(NumberOfBytes: DWORD;
Overlapped: POverlapped): Boolean;
var
Bytes, Flags: DWORD;
WsaBuf: TWsaBuf;
begin
Result := FALSE;
fClosingLock.BeginRead;
try
if Overlapped = @fOverlappedRead then
begin
if NumberOfBytes <> 0 then
begin
if fReadLock = IO_PROCESS then
begin
inc(fReaded, NumberOfBytes);
if fReaded < fReadBufLength then
begin
CallOnReadProcess;
WsaBuf.buf := fReadBuffer;
inc(WsaBuf.buf, fReaded);
WsaBuf.len := fReadBufLength;
dec(WsaBuf.len, fReaded);
Flags := 0;
Bytes := 0;
if (WSARecv(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedRead,
nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then
begin
CallOnRead;
Result := True;
end
end
else
begin
CallOnReadProcess;
CallOnRead;
Result := True;
end;
end
end
else
begin
CallOnRead;
Result := True;
end;
end
else if Overlapped = @fOverlappedWrite then
begin
if NumberOfBytes <> 0 then
begin
if fWriteLock = IO_PROCESS then
begin
inc(fWrited, NumberOfBytes);
if fWrited < fWriteBufLength then
begin
CallOnWriteProcess;
WsaBuf.buf := fWriteBuffer;
inc(WsaBuf.buf, fWrited);
WsaBuf.len := fWriteBufLength;
dec(WsaBuf.len, fWrited);
Flags := 0;
Bytes := 0;
if (WSASend(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedWrite,
nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then
begin
CallOnWrite;
Result := True;
end
end
else
begin
CallOnWriteProcess;
CallOnWrite;
Result := True;
end;
end
end
else
begin
CallOnWrite;
Result := True;
end;
end
finally
fClosingLock.EndRead;
end;
end;
The essence of this procedure is that it causes reading or writing to the socket until the moment when the allocated buffer is not full. An interesting point in this case is the determination of the type of operation by reference to the overlaid structure. This link is provided by the queue and you only need to compare it with the corresponding fields of the class, in which structures for reading and writing are stored.
It is also noteworthy that if the read / write operation was performed instantly, then it still gets into the queue, however this can be configured through the api.
It is also worth considering creating a socket class and queuing.
constructor TIOCPClientSocket.Create(RemoteAddress: string;
OnConnect, OnClose: TOnSimpleSocketEvenet);
var
lRemoteAddress: TSockAddr;
lRemoteAddressLength: Integer;
begin
inherited Create();
fStore := gClients;
fOnConnect := OnConnect;
fOnClose := OnClose;
fStateLock := 0;
fRemoteAddressStr := RemoteAddress;
fSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);
if fSocket = INVALID_SOCKET then
raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSASocket);
if (WSAEventSelect(fSocket, gClisentsConnectAndCloseEvents,
FD_CONNECT or FD_CLOSE) = SOCKET_ERROR) then
raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAEventSelect);
if CreateIoCompletionPort(fSocket, gIOCP, NativeUInt(self), 0) = 0 then
raise IOCPClientException.Create
(sErrorTIOCPClientSocket_CreateIoCompletionPort);
fStateLock := CLI_SOCKET_LOCK_CREATED;
fStore.Add(self);
lRemoteAddressLength := SizeOf(lRemoteAddress);
lRemoteAddress.sa_family := AF_INET;
if WSAStringToAddress(PChar(@fRemoteAddressStr[1]), AF_INET, nil,
lRemoteAddress, lRemoteAddressLength) = SOCKET_ERROR then
raise IOCPClientException.Create
(sErrorTIOCPClientSocket_WSAStringToAddress);
if (WSAConnect(fSocket, lRemoteAddress, lRemoteAddressLength, nil, nil, nil,
nil) = SOCKET_ERROR) and (WSAGetLastError <> WSAEWOULDBLOCK) then
raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAConnect);
end;
In the client socket constructor, a socket (WSASocket) is instantly created, registered in the queue (CreateIoCompletionPort), associated with the event, and calls the asynchronous connection function (WSAConnect). The fact of connection is expected in the thread that was considered first (the stream of waiting for events in sockets). That in turn will put this event in the queue.
Epilogue
This article briefly discusses, in my opinion, successful techniques for creating classes for event programming.
I managed to create a class for high performance socket work for Delphi. This topic is generally extremely poorly covered and I plan to continue this publication with another 2 - 3 posts on topics of socket contexts using interfaces and creating secure connections using IOCP (cryptographic providers and Winsock Secure Socket Extensions). The full example code is here .