UDP and C # async / await
- Tutorial
Recently, the need arose to solve the following simple problem: there are several dozen devices (training complexes) that need to be regularly asked for their current status. The complexes communicate via UDP, and I wanted to make it so as not to think about the polling cycle and determining which device the answer came from, but just send a request - and when the result came - write it down. I solved this problem before, but I wanted to see how the concept of async / await will simplify and reduce the code. It turned out that the final result takes less than a page.
All polling logic consists of only two methods - a UDP socket read loop and a method of sending a command to a device.
When we send a command, there are two things that need to be taken into account - this is 1) after sending the command, we need to wait for a response from the device and 2) the answer may not come - then we need to return an exception that will tell us about the timeout.
The asynchronous method of sending a command is as follows (* see Update 1) :
Here _client is the standard UdpClient.
We send a command and await for the result that Task should return to us, stored in the dictionary with the key of our connection (it is from him that we are waiting for an answer). When the reading begins, we put TaskCompletionSource into the dictionary, when we get the answer and the connection is no longer needed, or if the exception is thrown, we delete it from the dictionary.
The dictionary itself (we use ConcurrentDictionary instead of Dictionary to avoid problems with cross-talk calls):
There is a point that deserves attention - this is an extension method WithCancellation (token). It is needed in order to support the cancellation of the operation using the CancellationToken, and cancels the task, returning an exception when the specified timeout is exceeded.
And here is the reading cycle itself: we read until we have enough strength, and if the received datagram has a connection address, the key with the parameters of which we have already entered in the dictionary, then the result is placed in the TaskCompletionSource with this key, and we go back to the method of sending a message to await tcs.Task, only having on hand the desired result from the device, we will return this result to the place of the call.
The result is pleasing. This is how async-await simplified the task of polling multiple devices using UDP.
Update 1
As was rightly noted in the comments, the SendReceiveUdpAsync method must be wrapped in try {} finally {}, so that in case of canceling the task and throwing an exception, the value from the dictionary is deleted.
Update 2
Using Reactive Extensions for the same task
habrahabr.ru/post/238445
All polling logic consists of only two methods - a UDP socket read loop and a method of sending a command to a device.
When we send a command, there are two things that need to be taken into account - this is 1) after sending the command, we need to wait for a response from the device and 2) the answer may not come - then we need to return an exception that will tell us about the timeout.
The asynchronous method of sending a command is as follows (* see Update 1) :
public async Task SendReceiveAsync(byte[] msg, string ip, int port, int timeOut)
{
var endPoint = new IPEndPoint(IPAddress.Parse(ip), port);
var tcs = new TaskCompletionSource();
try
{
var tokenSource = new CancellationTokenSource(timeOut);
var token = tokenSource.Token;
if (!_tcsDictionary.ContainsKey(endPoint)) _tcsDictionary.TryAdd(endPoint, tcs);
_client.Send(msg, msg.Length, ip, port);
var result = await tcs.Task.WithCancellation(token);
return result;
}
finally
{
_tcsDictionary.TryRemove(endPoint, out tcs);
}
}
Here _client is the standard UdpClient.
We send a command and await for the result that Task should return to us, stored in the dictionary with the key of our connection (it is from him that we are waiting for an answer). When the reading begins, we put TaskCompletionSource into the dictionary, when we get the answer and the connection is no longer needed, or if the exception is thrown, we delete it from the dictionary.
The dictionary itself (we use ConcurrentDictionary instead of Dictionary to avoid problems with cross-talk calls):
private ConcurrentDictionary, TaskCompletionSource> _tcsDictionary;
There is a point that deserves attention - this is an extension method WithCancellation (token). It is needed in order to support the cancellation of the operation using the CancellationToken, and cancels the task, returning an exception when the specified timeout is exceeded.
static class TaskExtension
{
public static async Task WithCancellation(this Task task, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource();
using (cancellationToken.Register(
s => ((TaskCompletionSource)s).TrySetResult(true), tcs))
if (task != await Task.WhenAny(task, tcs.Task))
throw new OperationCanceledException(cancellationToken);
return await task;
}
}
And here is the reading cycle itself: we read until we have enough strength, and if the received datagram has a connection address, the key with the parameters of which we have already entered in the dictionary, then the result is placed in the TaskCompletionSource with this key, and we go back to the method of sending a message to await tcs.Task, only having on hand the desired result from the device, we will return this result to the place of the call.
Task.Run(() =>
{
IPEndPoint ipEndPoint = null;
while (true)
{
try
{
var receivedBytes = _client.Receive(ref ipEndPoint);
TaskCompletionSource tcs;
if (_tcsDictionary.TryGetValue(ipEndPoint, out tcs)) tcs.SetResult(receivedBytes);
}
catch (SocketException)
{
;//при невозможности соединения продолжаем работать
}
}
});
The result is pleasing. This is how async-await simplified the task of polling multiple devices using UDP.
Update 1
As was rightly noted in the comments, the SendReceiveUdpAsync method must be wrapped in try {} finally {}, so that in case of canceling the task and throwing an exception, the value from the dictionary is deleted.
Update 2
Using Reactive Extensions for the same task
habrahabr.ru/post/238445