Completion Port
Hey. Now I will tell you about the IO Completion Ports mechanism in Windows. Developers describe the completion port as "a means of improving the performance of applications that often use I / O." In general, they don’t lie, so IOCP is often used when writing scalable server applications. However, it is believed that the port of completion is a tricky and difficult topic to understand.
Theory.
The port object, in fact, is a kernel event queue from which messages about I / O operations are retrieved and added. Naturally, not all current operations are added there, but only those that we indicated to the port. This is done by linking the file descriptor (handle) of the file (not necessarily the file on the disk, it can be a socket, pipe, mail slot, etc.) with the port descriptor. When an asynchronous I / O operation is initiated on a file, then after its completion, the corresponding record is added to the port.
To process the results, a pool of threads is used, the number of which is selected by the user. When a thread is attached to a pool, it extracts one result of the operation from the queue and processes it. If the queue is empty at the time of attachment, then the thread falls asleep until a message for processing appears. An interesting feature of the completion port can be attributed to the fact that you can put some message in the queue with your hands and then retrieve it.
Looks confusing? In practice, a little easier.
Implementation.
After describing the scheme of work, we will move on to more specific things. Namely, implementations of an application using IOCP. As an example, I will use a server that simply accepts incoming connections and packets from clients. The language used is C.
So, for starters, it would be nice to create this very port. This is done by an API. Remarkably, the same call is used to bind a file handle to an existing port. Why this was done is unknown. The line will create a new termination port object and return its handle to us. Here, the value INVALID_HANDLE_VALUE is passed as the first argument, which means that we need a new port. The next two arguments must be set to 0. When creating, you can specify how many threads can simultaneously work for this port using the last argument. If you specify 0, then a value equal to the number of processors in the system will be used.
The next step is to create the threads that will be involved in the pool. One cannot give universal advice here. Some say that there should be twice as many threads as processors in the system, others that their number should be equal, while others dynamically change the size of the pool. What to do here depends on the application and computer configuration. I have an old stump with HyperThreading, so the system sees my processor as two. For this reason, in my example there will be 2 worker threads in the pool. Please note: we pass the handle to the completion port handle as a parameter to the worker threads. They will need it when the flows will declare their willingness to work. The WorkingThread () function itself will be given below.
Now that the threads are created, you can begin to receive clients and their messages. I will not give the Winsock initialization code here (but it is in the source code of this article), so I’ll just write: The accept call returns the socket of the next client to which you can write to and from which you can read as from a regular file. In your case, there may be a file on disk or any other IO-object. Now we need to notify the completion port that we want it to monitor this socket. To do this, bind the socket and port descriptors:
The last argument in this case is ignored, since the port has already been created. But the penultimate one requires special investigation. In the prototype, it is listed as CompletionKey ("completion key"). In fact, the key is a pointer to any data, i.e. to a structure or class instance defined by you. It is used so that within the stream you can distinguish one operation from another or to store the status of a client. At a minimum, you will have to store a byte there, indicating which operation has completed - sending or receiving (reading or writing).
After binding descriptors, asynchronous I / O can be initiated. For sockets, the Winsock2 functions - WSASend () and WSARecv () are used with the pointer passed to the OVERLAPPED structure, which actually marks an asynchronous operation. For files, you can use the WriteFile () and ReadFile () functions, respectively.
In addition to the OVERLAPPED structure, you will need to transfer some other IO information to the stream, for example, the buffer address, its length, or even the buffer itself. This can be done either through the termination key, or create a structure containing OVERLAPPED as the first field and pass a pointer to it in WSASend () / WSARecv ().
Now consider the API function that attaches the thread calling it to the pool:
Here CompletionPort is the handle of the port to which you want to connect to the pool; lpNumberOfBytes - a pointer to a variable in which the number of bytes transmitted as a result of the operation is completed (in fact, this is the return value of recv () and send () in synchronous mode); lpCompletionKey - pointer to a variable into which a pointer to the completion key is written; lpOverlapped is a pointer to the OVERLAPPED associated with this IO transaction; Finally, dwMilliseconds is the time that the thread may fall asleep while waiting for any request to complete. If you specify INFINITE, it will wait forever.
Now that we’ve got acquainted with the function of extracting from the queue, we can take a look at the function with which workflows begin to execute.
Inside the switch, new asynchronous operations are called, which will be processed the next time the loop passes. If we do not want a certain operation to be completed to be transferred to the port (for example, when the result is not important to us), we can use the following trick - set the first bit of the OVERLAPPED.hEvent field to 1. It is worth noting that, from the point of view of performance, put processing the information received in the same cycle is not the most reasonable decision, because this will slow down the server’s response to incoming packets. To solve the problem, you can take the analysis of the read information into another separate stream, and here the third API function comes in handy:
Its essence is clear from the name - it puts a message in the port queue. Actually, all asynchronous functions invoke it invisibly upon completion of the operation. All arguments listed here are immediately passed to one of the threads. Thanks to PostQueuedCompletionStatus, the completion port can be used not only for processing IO operations, but also simply for efficient queuing with a thread pool.
In our example, it makes sense to create another port and after completing some operation call PostQueuedCompletionStatus (), passing in the key the received packet for processing in another thread.
Internal organization.
The completion port is as follows: As noted above, this is just a kernel event queue. Here is a description of the KQUEUE structure:
When creating a port, the CreateIoCompletionPort function calls the internal NtCreateIoCompletion service. Then it is initialized using the KeInitializeQueue function. When the port is bound to the file object, the Win32 function CreateIoCompletionPort calls NtSetInformationFile. For this function, FILE_INFORMATION_CLASS is set to FileCompletionInformation, and a pointer to the IO_COMPLETION_CONTEXT or FILE_COMPLETION_INFORMATION structure is passed as the FileInformation parameter.
After completing the asynchronous I / O operation for the associated file, the I / O manager creates a request packet from the OVERLAPPED structure and the termination key and puts it in the queue by calling KeInsertQueue. When the thread calls the GetQueuedCompletionStatus function, the NtRemoveIoCompletion function is actually called. NtRemoveIoCompletion checks the parameters and calls the KeRemoveQueue function, which blocks the stream if there are no requests in the queue, or the CurrentCount field of the KQUEUE structure is greater than or equal to MaximumCount. If there are requests, and the number of active threads is less than the maximum, KeRemoveQueue removes the thread that caused it from the queue of waiting threads and increases the number of active threads by 1. When a thread enters the queue of waiting threads, the Queue field of the KTHREAD structure is set to the address of the termination port.
That’s all for me. Below is a link to a program with an example of use.
iocp.c
Theory.
The port object, in fact, is a kernel event queue from which messages about I / O operations are retrieved and added. Naturally, not all current operations are added there, but only those that we indicated to the port. This is done by linking the file descriptor (handle) of the file (not necessarily the file on the disk, it can be a socket, pipe, mail slot, etc.) with the port descriptor. When an asynchronous I / O operation is initiated on a file, then after its completion, the corresponding record is added to the port.
To process the results, a pool of threads is used, the number of which is selected by the user. When a thread is attached to a pool, it extracts one result of the operation from the queue and processes it. If the queue is empty at the time of attachment, then the thread falls asleep until a message for processing appears. An interesting feature of the completion port can be attributed to the fact that you can put some message in the queue with your hands and then retrieve it.
Looks confusing? In practice, a little easier.
Implementation.
After describing the scheme of work, we will move on to more specific things. Namely, implementations of an application using IOCP. As an example, I will use a server that simply accepts incoming connections and packets from clients. The language used is C.
So, for starters, it would be nice to create this very port. This is done by an API. Remarkably, the same call is used to bind a file handle to an existing port. Why this was done is unknown. The line will create a new termination port object and return its handle to us. Here, the value INVALID_HANDLE_VALUE is passed as the first argument, which means that we need a new port. The next two arguments must be set to 0. When creating, you can specify how many threads can simultaneously work for this port using the last argument. If you specify 0, then a value equal to the number of processors in the system will be used.
HANDLE CreateIOCompletionPort(
HANDLE FileHandle,
HANDLE ExistingCompletionPort,
ULONG_PTR CompletionKey,
DWORD NumberOfConcurrentThreads);
HANDLE iocp=CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0);
The next step is to create the threads that will be involved in the pool. One cannot give universal advice here. Some say that there should be twice as many threads as processors in the system, others that their number should be equal, while others dynamically change the size of the pool. What to do here depends on the application and computer configuration. I have an old stump with HyperThreading, so the system sees my processor as two. For this reason, in my example there will be 2 worker threads in the pool. Please note: we pass the handle to the completion port handle as a parameter to the worker threads. They will need it when the flows will declare their willingness to work. The WorkingThread () function itself will be given below.
for(int i=1;i<=2;i++)
{
HANDLE hWorking=CreateThread(0,0,(LPTHREAD_START_ROUTINE)&WorkingThread,iocp,0,0);
CloseHandle(hWorking);
}
Now that the threads are created, you can begin to receive clients and their messages. I will not give the Winsock initialization code here (but it is in the source code of this article), so I’ll just write: The accept call returns the socket of the next client to which you can write to and from which you can read as from a regular file. In your case, there may be a file on disk or any other IO-object. Now we need to notify the completion port that we want it to monitor this socket. To do this, bind the socket and port descriptors:
while(1)
{
SOCKET clientsock=WSAAccept(listensock,(sockaddr *)&clientaddr,&clientsize,0,0);
...
}
CreateIoCompletionPort((HANDLE)clientsock,iocp,(ULONG_PTR)key,0);
The last argument in this case is ignored, since the port has already been created. But the penultimate one requires special investigation. In the prototype, it is listed as CompletionKey ("completion key"). In fact, the key is a pointer to any data, i.e. to a structure or class instance defined by you. It is used so that within the stream you can distinguish one operation from another or to store the status of a client. At a minimum, you will have to store a byte there, indicating which operation has completed - sending or receiving (reading or writing).
After binding descriptors, asynchronous I / O can be initiated. For sockets, the Winsock2 functions - WSASend () and WSARecv () are used with the pointer passed to the OVERLAPPED structure, which actually marks an asynchronous operation. For files, you can use the WriteFile () and ReadFile () functions, respectively.
In addition to the OVERLAPPED structure, you will need to transfer some other IO information to the stream, for example, the buffer address, its length, or even the buffer itself. This can be done either through the termination key, or create a structure containing OVERLAPPED as the first field and pass a pointer to it in WSASend () / WSARecv ().
Now consider the API function that attaches the thread calling it to the pool:
BOOL GetQueuedCompletionStatus(
HANDLE CompletionPort,
LPDWORD lpNumberOfBytes,
PULONG_PTR lpCompletionKey,
LPOVERLAPPED *lpOverlapped,
DWORD dwMilliseconds);
Here CompletionPort is the handle of the port to which you want to connect to the pool; lpNumberOfBytes - a pointer to a variable in which the number of bytes transmitted as a result of the operation is completed (in fact, this is the return value of recv () and send () in synchronous mode); lpCompletionKey - pointer to a variable into which a pointer to the completion key is written; lpOverlapped is a pointer to the OVERLAPPED associated with this IO transaction; Finally, dwMilliseconds is the time that the thread may fall asleep while waiting for any request to complete. If you specify INFINITE, it will wait forever.
Now that we’ve got acquainted with the function of extracting from the queue, we can take a look at the function with which workflows begin to execute.
void WorkingThread(HANDLE iocp)
{
while(1)
{
if(!GetQueuedCompletionStatus(iocp,&bytes,&key,&overlapped,INFINITE))
//ошибка порта
break;
if(!bytes)
//0 означает что дескриптор файла закрыт, т.е. клиент отсоединился
switch(key->OpType)
{
...
}
}
}
Inside the switch, new asynchronous operations are called, which will be processed the next time the loop passes. If we do not want a certain operation to be completed to be transferred to the port (for example, when the result is not important to us), we can use the following trick - set the first bit of the OVERLAPPED.hEvent field to 1. It is worth noting that, from the point of view of performance, put processing the information received in the same cycle is not the most reasonable decision, because this will slow down the server’s response to incoming packets. To solve the problem, you can take the analysis of the read information into another separate stream, and here the third API function comes in handy:
BOOL PostQueuedCompletionStatus(
HANDLE CompletionPort,
DWORD dwNumberOfBytesTransferred,
ULONG_PTR dwCompletionKey,
LPOVERLAPPED lpOverlapped);
Its essence is clear from the name - it puts a message in the port queue. Actually, all asynchronous functions invoke it invisibly upon completion of the operation. All arguments listed here are immediately passed to one of the threads. Thanks to PostQueuedCompletionStatus, the completion port can be used not only for processing IO operations, but also simply for efficient queuing with a thread pool.
In our example, it makes sense to create another port and after completing some operation call PostQueuedCompletionStatus (), passing in the key the received packet for processing in another thread.
Internal organization.
The completion port is as follows: As noted above, this is just a kernel event queue. Here is a description of the KQUEUE structure:
typedef stuct _IO_COMPLETION
{
KQUEUE Queue;
} IO_COMPLETION;
typedef stuct _KQUEUE
{
DISPATCHER_HEADER Header;
LIST_ENTRY EnrtyListHead; //очередь пакетов
DWORD CurrentCount;
DWORD MaximumCount;
LIST_ENTRY ThreadListHead; //очередь ожидающих потоков
} KQUEUE;
When creating a port, the CreateIoCompletionPort function calls the internal NtCreateIoCompletion service. Then it is initialized using the KeInitializeQueue function. When the port is bound to the file object, the Win32 function CreateIoCompletionPort calls NtSetInformationFile. For this function, FILE_INFORMATION_CLASS is set to FileCompletionInformation, and a pointer to the IO_COMPLETION_CONTEXT or FILE_COMPLETION_INFORMATION structure is passed as the FileInformation parameter.
NtSetInformationFile(
HANDLE FileHandle,
PIO_STATUS_BLOCK IoStatusBlock,
PVOID FileInformation,
ULONG Length,
FILE_INFORMATION_CLASS FileInformationClass);
typedef struct _IO_COMPLETION_CONTEXT
{
PVOID Port;
PVOID Key;
} IO_COMPLETION_CONTEXT;
typedef struct _FILE_COMPLETION_INFORMATION
{
HANDLE IoCompletionHandle;
ULONG CompletionKey;
} FILE_COMPLETION_INFORMATION, *PFILE_COMPLETION_INFORMATION;
After completing the asynchronous I / O operation for the associated file, the I / O manager creates a request packet from the OVERLAPPED structure and the termination key and puts it in the queue by calling KeInsertQueue. When the thread calls the GetQueuedCompletionStatus function, the NtRemoveIoCompletion function is actually called. NtRemoveIoCompletion checks the parameters and calls the KeRemoveQueue function, which blocks the stream if there are no requests in the queue, or the CurrentCount field of the KQUEUE structure is greater than or equal to MaximumCount. If there are requests, and the number of active threads is less than the maximum, KeRemoveQueue removes the thread that caused it from the queue of waiting threads and increases the number of active threads by 1. When a thread enters the queue of waiting threads, the Queue field of the KTHREAD structure is set to the address of the termination port.
That’s all for me. Below is a link to a program with an example of use.
iocp.c