Windows Sockets, IOCP, and Delphi

Prologue


Recently, I was faced with the need to work effectively with sockets in a Windows application. The task is typical for a busy server. Only the implementation language, Delphi, will seem atypical here.
I want to describe a way of mass asynchronous work with a large number of sockets using I / O Completion Ports. Microsoft recommends using this technology in its literature. I think many people are familiar with it, but just in case, I’ll indicate a linkon MSDN. The essence of the technology is that the system organizes a highly efficient queue of events, and the program processes it from the thread pool, the size of which is selected by the number of computing cores. This approach has advantages with a large number of simultaneously performed asynchronous input-output operations for different endpoints. The finished source can (better) immediately look here . Not everything is perfect, but for experiments it will do.

Roadmap


I, in a sense, will adhere to the ideology of Node.Js in everything that relates to the organization of objects and input-output operations.
In the case of the server side, you will need to implement the following:
  • Listening to a socket. Acceptance or rejection of new compounds.
  • Tracking client socket close signal.

For the client, the first item on this list is not relevant, but you must implement an asynchronous connection to the server. In both classes there will be the ability to simultaneously read and write to one endpoint.
All created client and server socket instances will use one common message queue and one thread pool. This is necessary to be able to use both types of sockets in one application in an optimal way.

Implementation


Let's get started. To begin with, I note that in connection with a completely asynchronous event-based model of construction, I will implement not classes but interfaces. This is very convenient in this case, since the responsibility for the allocated memory is removed from the end programmer. Anyway, to track its use here in another way is either very costly or completely impossible. A lot of work should happen during module initialization.
  • Creating lists of sockets of different types.
  • Initialization of the socket subsystem.
  • Create a message queue.
  • Creating a pool to process the queue.
  • Creating events for sockets.
  • Creating streams that monitor socket events (for example, connecting a new client).

And so, the initialization section contains the following procedure, which implements the list point by point.
procedure Init;
var
  WSAData: TWsaData;
  i: Integer;
begin
  gClients := TProtoStore.Create;
  gListeners := TProtoStore.Create;
  gServerClients := TProtoStore.Create;
  if WSAStartup(MAKEWORD(2, 2), WSAData) <> 0 then
    raise IOCPClientException.Create(sErrorInit_WSAtartup);
  gIOCP := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, CPUCount * 2);
  if gIOCP = INVALID_HANDLE_VALUE then
    raise IOCPClientException.Create(sErrorInit_CreateIoCompletionPort);
  for i := 1 to CPUCount * 2 do
  begin
    SetLength(gWorkers, Length(gWorkers) + 1);
    gWorkers[Length(gWorkers) - 1] := TWorkerThread.Create();
  end;
  gListenerAcceptEvent := WSACreateEvent;
  if gListenerAcceptEvent = WSA_INVALID_EVENT then
    raise IOCPClientException.Create(sErrorInit_WSACreateEvent);
  gServerClientsCloseEvent := WSACreateEvent;
  if gServerClientsCloseEvent = WSA_INVALID_EVENT then
    raise IOCPClientException.Create(sErrorInit_WSACreateEvent);
  gClisentsConnectAndCloseEvents := WSACreateEvent;
  if gClisentsConnectAndCloseEvents = WSA_INVALID_EVENT then
    raise IOCPClientException.Create(sErrorInit_WSACreateEvent);
  gClientSocketEventThread := TSocketEventThread.Create
    (gClisentsConnectAndCloseEvents, gClients, ET_EVENT_SIGNALED);
  gClientSocketEventThread.Start;
  gServerClientsSocketEventThread := TSocketEventThread.Create
    (gServerClientsCloseEvent, gServerClients, ET_EVENT_SIGNALED);
  gServerClientsSocketEventThread.Start;
  gServerSocketEventThread := TSocketEventThread.Create(gListenerAcceptEvent,
    gListeners, ET_EVENT_SIGNALED);
  gServerSocketEventThread.Start;
end;

The CreateIoCompletionPort function in this case creates a special message queue.
You can see that the same TSocketEventThread stream class is used to track events on sockets with different purposes. Streams of this class execute a procedure that expects socket events, and immediately queue messages (for each socket of the type that this stream serves) about the occurrence of an event.
procedure TSocketEventThread.WaitForClientsEvents;
var
  WaitResult: DWORD;
const
  TimeOut: DWORD = 100;
begin
  WaitResult := WSAWaitForMultipleEvents(1, @fEvent, FALSE, TimeOut, FALSE);
  if WaitResult = WSA_WAIT_FAILED then
    raise IOCPClientException.Create
      (sErrorWaitForClientsEvents_WSAWaitForMultipleEvents);
  if WaitResult = WSA_WAIT_EVENT_0 then
  begin
    if not WSAResetEvent(fEvent) then
      raise IOCPClientException.Create
        (sErrorWaitForClientsEvents_WSAResetEvent);
    fStore.Post(fKey);
  end;
end;

Here is the method fStore.Post (fKey); just sends messages to the queue.
procedure TProtoStore.Post(CompletionKey: DWORD);
var
  i: Integer;
begin
  fLock.Enter;
  try
    for i := 0 to Length(ProtoArray) - 1 do
    begin
      ProtoArray[i]._AddRef;
      if not PostQueuedCompletionStatus(gIOCP, 0, CompletionKey,
        POverlapped(ProtoArray[i])) then
      begin
        ProtoArray[i]._Release;
        raise IOCPClientException.Create(sErrorPost_PostQueuedCompletionStatus);
      end;
    end;
  finally
    fLock.Leave;
  end;
end;

Of particular note is the use of objects with interfaces.
The _AddRef method is used to indicate the fact that the object is "in the queue" and should not be destroyed. (Later after processing _Release will be called). The PostQueuedCompletionStatus procedure directly queues the message.
The pool will process each message asynchronously.
To do this, he performs the following procedure.
procedure TWorkerThread.ProcessIOCP;
var
  NumberOfBytes: DWORD;
  CompletionKey: NativeUInt;
  Overlapped: POverlapped;
  Proto: TIOCPSocketProto;
begin
  if not((not GetQueuedCompletionStatus(gIOCP, NumberOfBytes, CompletionKey,
    Overlapped, INFINITE)) and (Overlapped = nil)) then
  begin
    if CompletionKey = ET_EVENT_SIGNALED then
    begin
      Proto := TIOCPSocketProto(Overlapped);
      with Proto do
      begin
        IOCPProcessEventsProc();
        _Release;
      end
    end
    else if CompletionKey <> 0 then
    begin
      Proto := TIOCPSocketProto(CompletionKey);
      if Proto.IOCPProcessIOProc(NumberOfBytes, Overlapped) then
        Proto._Release;
    end;
  end
end;

The GetQueuedCompletionStatus procedure is used to receive a message from the queue. Next, it is determined whether this message is a completed I / O message or is this a message about an event that has occurred. Two methods are shown here to pass some information through the queue, in this case a link to a specific instance of the socket class.
Processing is unified for all types of sockets, this is achieved using inheritance from a common ancestor that contains common handlers, their redefinition is allowed.
Consider the mechanism for processing socket events.
procedure TIOCPSocketProto.IOCPProcessEventsProc();
var
  WSAEvents: TWsaNetworkEvents;
  AcceptedSocket: TSocket;
  RemoteAddress: string;
begin
  if fStateLock <> CLI_SOCKET_LOCK_CLOSED then
  begin
    fClosingLock.BeginRead;
    try
      if (fStateLock <> CLI_SOCKET_LOCK_CLOSED) then
        if WSAEnumNetworkEvents(fSocket, 0, WSAEvents) <> SOCKET_ERROR then
        begin
          if ((WSAEvents.lNetworkEvents and FD_CONNECT) <> 0) then
          begin
            if 0 = WSAEvents.iErrorCode[FD_CONNECT_BIT] then
              InterlockedExchange(fStateLock, CLI_SOCKET_LOCK_CONNECTED);
            CallOnConnect;
          end;
          if ((WSAEvents.lNetworkEvents and FD_CLOSE) <> 0) and
            (0 = WSAEvents.iErrorCode[FD_CLOSE_BIT]) then
            CallOnClose;
          if ((WSAEvents.lNetworkEvents and FD_ACCEPT) <> 0) and
            (0 = WSAEvents.iErrorCode[FD_ACCEPT_BIT]) then
          begin
            AcceptedSocket := DoAccept(RemoteAddress);
            if AcceptedSocket <> INVALID_SOCKET then
            begin
              fClientClass.Create(AcceptedSocket, fOnConnect, fOnClose,
                RemoteAddress).Prepare;
            end;
          end;
        end
    finally
      fClosingLock.EndRead;
    end;
  end;
end;

The TMultiReadExclusiveWriteSynchronizer class is interestingly applied here. It is used to prevent attempts to close the socket and destroy the object from another thread in the pool (fClosingLock.BeginRead). All operations with the socket are performed as read operations for this synchronization object, except for the creation and closing operations of the socket - they are write operations and therefore can be performed only with exclusive ownership of the resource.
In all other respects, working with sockets in this procedure is completely ordinary.
The only thing worth considering in this procedure is the connection of the new client to the server, the DoAccept method.
function TIOCPSocketProto.DoAccept(var RemoteAddress: string): TSocket;
var
  addr: TSockAddr;
  addrlen: Integer;
  dwCallbackData: NativeUInt;
  RemoteAddrLen: DWORD;
begin
  dwCallbackData := NativeUInt(self);
  addrlen := SizeOf(addr);
  Result := WSAAccept(fSocket, @addr, @addrlen, ServerAcceptCallBack,
    dwCallbackData);
  if Result <> INVALID_SOCKET then
  begin
    SetLength(RemoteAddress, 255);
    RemoteAddrLen := Length(RemoteAddress);
    if WSAAddressToString(addr, addrlen, nil, PChar(@RemoteAddress[1]),
      RemoteAddrLen) = SOCKET_ERROR then
      raise IOCPClientException.Create(sErrorAccept_WSAAddressToString);
    SetLength(RemoteAddress, RemoteAddrLen - 1)
  end
end;

The key here is to use WSAAccept. This feature allows you to reject client connections so that the client actually receives the FD_CONNECT event.
This is the preferred way to organize so-called blacklists.
We go further. Consider the organization of input output. We will do this using the read operation as an example.
procedure TIOCPSocketProto.Read(Length: DWORD;
  OnRead, OnReadProcess: TOnReadEvent);
var
  Bytes, Flags: DWORD;
  WsaBuf: TWsaBuf;
begin
  fClosingLock.BeginRead;
  try
    if fStateLock = CLI_SOCKET_LOCK_CONNECTED then
    begin
      if InterlockedCompareExchange(fReadLock, IO_PROCESS, IO_IDLE) = IO_IDLE
      then
      begin
        fOnRead := OnRead;
        fOnReadProcess := OnReadProcess;
        fReaded := 0;
        fReadBufLength := Length;
        fReadBuffer := nil;
        GetMem(fReadBuffer, Length);
        if fReadBuffer <> nil then
        begin
          Bytes := 0;
          FillChar(fOverlappedRead, SizeOf(fOverlappedRead), 0);
          WsaBuf.buf := fReadBuffer;
          WsaBuf.len := fReadBufLength;
          Flags := 0;
          Bytes := 0;
          _AddRef;
          if (WSARecv(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedRead, nil)
            = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then
          begin
            FreeMem(fReadBuffer, Length);
            InterlockedExchange(fReadLock, IO_IDLE);
            _Release;
            raise IOCPClientException.Create(sErrorRead_WSARecv);
          end;
        end
        else
          raise IOCPClientException.Create(sErrorRead_GetMem);
      end
      else
        raise IOCPClientException.Create(sErrorRead_InProcess);
    end
    else
      raise IOCPClientException.Create(sErrorRead_NotConnected);
  finally
    fClosingLock.EndRead;
  end;
end;

Here I had to use interlocked locking, because it is very fast and satisfies the need to cut off an attempt to recall an I / O option. The memory is allocated under the buffer once in each operation. Next, reading from the socket in asynchronous mode is called. The object is also “marked” with AddRef to prevent it being deleted while in the queue. Upon completion of reading the packet, messages about this are automatically queued.
Consider what happens when a message about completed input / output is selected from the queue.
function TIOCPSocketProto.IOCPProcessIOProc(NumberOfBytes: DWORD;
  Overlapped: POverlapped): Boolean;
var
  Bytes, Flags: DWORD;
  WsaBuf: TWsaBuf;
begin
  Result := FALSE;
  fClosingLock.BeginRead;
  try
    if Overlapped = @fOverlappedRead then
    begin
      if NumberOfBytes <> 0 then
      begin
        if fReadLock = IO_PROCESS then
        begin
          inc(fReaded, NumberOfBytes);
          if fReaded < fReadBufLength then
          begin
            CallOnReadProcess;
            WsaBuf.buf := fReadBuffer;
            inc(WsaBuf.buf, fReaded);
            WsaBuf.len := fReadBufLength;
            dec(WsaBuf.len, fReaded);
            Flags := 0;
            Bytes := 0;
            if (WSARecv(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedRead,
              nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then
            begin
              CallOnRead;
              Result := True;
            end
          end
          else
          begin
            CallOnReadProcess;
            CallOnRead;
            Result := True;
          end;
        end
      end
      else
      begin
        CallOnRead;
        Result := True;
      end;
    end
    else if Overlapped = @fOverlappedWrite then
    begin
      if NumberOfBytes <> 0 then
      begin
        if fWriteLock = IO_PROCESS then
        begin
          inc(fWrited, NumberOfBytes);
          if fWrited < fWriteBufLength then
          begin
            CallOnWriteProcess;
            WsaBuf.buf := fWriteBuffer;
            inc(WsaBuf.buf, fWrited);
            WsaBuf.len := fWriteBufLength;
            dec(WsaBuf.len, fWrited);
            Flags := 0;
            Bytes := 0;
            if (WSASend(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedWrite,
              nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then
            begin
              CallOnWrite;
              Result := True;
            end
          end
          else
          begin
            CallOnWriteProcess;
            CallOnWrite;
            Result := True;
          end;
        end
      end
      else
      begin
        CallOnWrite;
        Result := True;
      end;
    end
  finally
    fClosingLock.EndRead;
  end;
end;

The essence of this procedure is that it causes reading or writing to the socket until the moment when the allocated buffer is not full. An interesting point in this case is the determination of the type of operation by reference to the overlaid structure. This link is provided by the queue and you only need to compare it with the corresponding fields of the class, in which structures for reading and writing are stored.
It is also noteworthy that if the read / write operation was performed instantly, then it still gets into the queue, however this can be configured through the api.
It is also worth considering creating a socket class and queuing.
constructor TIOCPClientSocket.Create(RemoteAddress: string;
  OnConnect, OnClose: TOnSimpleSocketEvenet);
var
  lRemoteAddress: TSockAddr;
  lRemoteAddressLength: Integer;
begin
  inherited Create();
  fStore := gClients;
  fOnConnect := OnConnect;
  fOnClose := OnClose;
  fStateLock := 0;
  fRemoteAddressStr := RemoteAddress;
  fSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);
  if fSocket = INVALID_SOCKET then
    raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSASocket);
  if (WSAEventSelect(fSocket, gClisentsConnectAndCloseEvents,
    FD_CONNECT or FD_CLOSE) = SOCKET_ERROR) then
    raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAEventSelect);
  if CreateIoCompletionPort(fSocket, gIOCP, NativeUInt(self), 0) = 0 then
    raise IOCPClientException.Create
      (sErrorTIOCPClientSocket_CreateIoCompletionPort);
  fStateLock := CLI_SOCKET_LOCK_CREATED;
  fStore.Add(self);
  lRemoteAddressLength := SizeOf(lRemoteAddress);
  lRemoteAddress.sa_family := AF_INET;
  if WSAStringToAddress(PChar(@fRemoteAddressStr[1]), AF_INET, nil,
    lRemoteAddress, lRemoteAddressLength) = SOCKET_ERROR then
    raise IOCPClientException.Create
      (sErrorTIOCPClientSocket_WSAStringToAddress);
  if (WSAConnect(fSocket, lRemoteAddress, lRemoteAddressLength, nil, nil, nil,
    nil) = SOCKET_ERROR) and (WSAGetLastError <> WSAEWOULDBLOCK) then
    raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAConnect);
end; 

In the client socket constructor, a socket (WSASocket) is instantly created, registered in the queue (CreateIoCompletionPort), associated with the event, and calls the asynchronous connection function (WSAConnect). The fact of connection is expected in the thread that was considered first (the stream of waiting for events in sockets). That in turn will put this event in the queue.

Epilogue


This article briefly discusses, in my opinion, successful techniques for creating classes for event programming.
I managed to create a class for high performance socket work for Delphi. This topic is generally extremely poorly covered and I plan to continue this publication with another 2 - 3 posts on topics of socket contexts using interfaces and creating secure connections using IOCP (cryptographic providers and Winsock Secure Socket Extensions). The full example code is here .

Also popular now: