UDP and C # Reactive Extensions

  • Tutorial
I recently read a post on UDP and C # async / await , which describes how to solve the simple task of polling devices over UDP with a single client. Solving the problem with async \ await really reduces the amount of code compared to the manual implementation of asynchronous calls. On the other hand, it creates many problems with task synchronization, concurrent data access, and exception handling. The resulting solution is very error prone. The original version of the author contained errors of non-release of resources.

Is it possible to make it easier and more reliable?

And what is the problem actually?

The problem is with the UdpClient.Receive(- Async) method . This method is not reentrant, that is, if the client is already waiting for the datagram to arrive, then you cannot call this method again. Even if an error does not occur, then it is quite possible to get the datagram expected by another "stream". Therefore, you need to write additional code that synchronizes user actions and status UdpClient.

async \ await and Tasks Parallel Library does not have ready-made synchronization tools. You need to either write code with your hands, as in the original article, or use ready-made libraries like TPL Dataflow . But alas, Dataflow is very heavy.

Reactive extensions

Instead of TPL Dataflow, you can use Reactive Extensions (RX). RX describes asynchronous data streams (asynchronous sequences). RX has many functions for creating and manipulating data streams. RX allows you to work not only with IO, but also "event streams" generated by interface elements. This allows the entire program to be described as a set of data streams.

Code example
To solve the initial problem, you need to add a library Rx-Mainfrom NuGet to the project and write several helpers:
public static IObservable ReceiveObservable(this UdpClient client)
    return client.ReceiveAsync().ToObservable();
public static IObservable SendObservable(this UdpClient client, byte[] msg, int bytes, string ip, int port)
    return client.SendAsync(msg, bytes, ip, port).ToObservable();
public static IObservable ReceiveStream(this UdpClient client)
    return Observable.Defer(() => client.ReceiveObservable()).Repeat();

The first two helpers turn Task into an IObservable (asynchronous sequence of one element) using an extension method.
The last helper just shows an example of sequence manipulation.
Observable.Defer- Delays the call to the sequence constructor in the parameter until the subscriber appears.
The extension method .Repeat()repeats endlessly the original sequence.
Together, the two methods create an endless loop for retrieving datagrams from a socket.

Now the method of sending and receiving data:
public IObservable SendReceiveUdpAsync(byte[] msg, string ip, int port, int timeOut)
    var o = from _ in this.client.SendObservable(msg, msg.Length, ip, port)
            from r in receiveStream
            where r.RemoteEndPoint.Address.ToString() == ip && r.RemoteEndPoint.Port == port
            select r.Buffer;
    return o.Take(1).Timeout(TimeSpan.FromMilliseconds(timeOut));

Yes, yes, RX supports Linq for asynchronous sequences.
This Linq expression is rather hard to understand without RX knowledge, but its essence is very simple: after receiving the result from the stream, SendObservablesubscribe to the stream receiveStreamand get only those elements that satisfy the where predicate , return the buffer from the received datagram. Next, one result of the resulting sequence is taken and a timeout is hung.

The most important part of the code is the definition receiveStream:
receiveStream = client.ReceiveStream().Publish().RefCount();

Hot, cold and warm sequences
When you work with RX sequences, it is important to know their “temperature”.

Cold sequences are those that appear when the sequence subscriber appears and disappear when the subscriber ceases to exist.
The extension method ReceiveStreamreturns just such a sequence. This means that each subscriber will have his own sequence, that is, several challenges will occur in parallel UdpClient.ReceiveAsyncand the problem described at the beginning cannot be solved.

Hot sequences - which exist independently of subscribers. For example, the sequence of user mouse movements. The function Publishin the code above allows you to turn a cold sequence into a hot one. But this carries another problem. If the constructor UdpClientdoes not specify a port and callReceivebefore the call Send, an error will occur.

Therefore, we need an intermediate option - the sequence should be common to all subscribers and should exist as long as there is at least one subscriber. This sequence is called “warm” and is created by a challenge RefCount.

Event Subscription
For testing, I also wrote a “server” function:
public IDisposable Listen(Func process)
    return receiveStream.Subscribe(async r =>
        var msg = process(r);
        await client.SendObservable(msg, msg.Length, r.RemoteEndPoint);

The Subscribe method allows you to specify the action that will be called on each element of the asynchronous sequence. You can also set an action to end the sequence and to exclude.

It is also worth noting that RX supports async \ await, that is, you do not need to know RX to use code based on asynchronous sequences.


The resulting code does not contain a single cycle, not a single explicit synchronization, not a single thread or task creation. In this case, the code is completely asynchronous and safe.
RX is definitely worth exploring, even if you will not use it. The main part of Rx was invented by applying the principle of monad duality to the standard IEnumerable and IEnumerator interfaces, so RX turned out to be compact and powerful. In addition, RX is also available for JavaScript, C ++, Java, Scala and Python, Ruby.

The source code along with the client and server was uploaded to github - github.com/gandjustas/UdpRxSample .

Also popular now: