A look at Tokio: how this asynchronous event handler works

    And why is it used in the Exonum private blockchain framework


    Tokio is a framework for developing scalable network applications in Rust that uses components for working with asynchronous I / O. Tokio often serves as the basis for other libraries and implementations of high-performance protocols. Despite being a fairly young framework, he has already managed to become part of the cross-platform software ecosystem.

    Although Tokio has been criticized for being too difficult to learn, it is already used in production environments, since Tokio-written code is easier to maintain. For example, it has already been integrated into hyper , tower-grpc, and conduit . We, tooturned to this decision when developing our Exonum platform .

    Work on Exonum began in 2016, when Tokio did not exist yet, so we first used the Mio v0.5 library. With the advent of Tokio, it became clear that the Mio library used was outdated, moreover, it was difficult to organize the Exonum event model with its help. The model included several types of events (network messages, timeouts, messages from the REST API, etc.), as well as their sorting by priority.

    Each event entails a change in the state of the node, which means that they must be processed in one stream, in a certain order and according to one principle. At Mio, we had to manually describe the processing scheme for each event, which, while maintaining the code (adding / changing parameters) could result in a large number of errors. Tokio simplified this process with built-in functions.

    Below we will talk about the components of the Tokio stack, which allow you to effectively organize the processing of asynchronous tasks. / image Kevin Dooley CC




    Tokio Architecture


    At its core, Tokio is a “wrapper” over Mio. Mio is a Rust crate that provides an API for low-level I / O and is platform independent - it works with several tools: epoll on Linux, kqueue on Mac OS, or IOCP on Windows. Thus, the Tokio architecture can be represented as follows:



    Futures

    As you can see from the diagram above, the main functional component of Tokio is futures - this is crate Rust, which allows you to work with asynchronous code in a synchronous manner. In other words, the library makes it possible to operate with code that implements tasks not yet completed, as if they had already completed.

    In fact, futures are values ​​that will be calculated in the future, but are still unknown. Various kinds of events can be represented in futures format : database queries, timeouts, lengthy tasks for the CPU, reading information from a socket, etc., and synchronizing them.

    An example of the future in real life is the notification of delivery of a registered letter by mail: upon completion of delivery, a notification is sent to the sender of the successful receipt of the letter by the addressee. After receiving the notification, the sender determines what actions to take next.  

    Developer David Simmons ( by David a Simmons ), collaborated with Intel, Genuity and Sparco Media, as an example of the organization of an asynchronous I / O using the futures brings messaging with HTTP-server.

    Imagine each time a server spawns a new thread for the established connection. With synchronous I / O, the system first reads the bytes in order, then processes the information and writes the result back. At the same time, at the time of reading / writing, the thread will not be able to continue execution (it is blocked) until the operation is completed. This leads to the fact that with a large number of connections, difficulties arise in scaling (the so-called C10k problem ).

    In the case of asynchronous processing, the thread queues an I / O request and continues execution (that is, it is not blocked). The system reads / writes after some time, and the thread, before using the results, asks if the request has been executed. Thus, futures are able to perform different tasks, for example, one can read the request, the second - to process it, and the third - to form the response.

    In the futures crate , the Future type is defined , which is the core of the entire library. This type is defined for objects that are not executed immediately, but after some time. Its main part is expressed in the code as follows:

    trait Future {
        type Item;
        type Error;
        fn poll(&mut self) -> Poll;
        fn wait(self) -> Result { ... }
        fn map(self, f: F) -> Map
            where F: FnOnce(Self::Item) -> U { ... }
    }
    

    The “heart” of Future is the poll () method . He is responsible for sending the indicator of shutdown, call waiting or the calculated value. In this case, futures are launched in the context of the task . A task is associated with only one future, however, the latter can be composite, that is, contain several other futures, joined by join_all () or and_then () . For instance:

    let client_to_server = copy(client_reader, server_writer)
                        .and_then(|(n, _, server_writer)| {
                            shutdown(server_writer).map(move |_| n)
                        });
    

    The task / future is coordinated by the executor. If there are several tasks running at the same time, and some of them are waiting for the results of external asynchronous events (for example, reading data from a network / socket), the executor must efficiently allocate processor resources for their optimal execution. In practice, this happens due to the "transfer" of processor power to tasks that can be performed while other tasks are blocked due to lack of external data.

    In the case of a deferred task, the executor receives information that it can be performed using the notify () method . An example is the futures crate executor, which “wakes up” when calling wait () - the source code for the example is presented in the official Rust repository on GitHub:

        pub fn wait_future(&mut self) -> Result {
            ThreadNotify::with_current(|notify| {
                loop {
                    match self.poll_future_notify(notify, 0)? {
                        Async::NotReady => notify.park(),
                        Async::Ready(e) => return Ok(e),
                    }
                }
            })
        }
    

    Streams

    In addition to futures, Tokio works with other components for asynchronous I / O - streams (streams). While future returns only one final result, stream works with a series of events and is able to return multiple results.

    Again, a real-life example: periodic alerts from a temperature sensor can be presented as stream. The sensor will regularly send the temperature measurement value to the user at certain intervals.

    The stream type may look like this:

    trait Stream {
        type Item;
        type Error;
        fn poll(&mut self) -> Poll, Self::Error>;
    }
    

    The mechanics of working with stream are identical to those applied to futures: similar combinators are used to transform and change the details of a stream. Moreover, stream can easily be converted to the future using the into_future adapter.

    Below we will examine in detail the use of futures and stream in our Exonum framework.

    Tokio in Exonum


    As already mentioned, the Exonum developers decided to use the Tokio library to implement the event loop in the framework.

    A simplified diagram of the organization of the event model in Exonum can be represented as follows:


    Each network node exchanges messages with other nodes. All incoming messages fall into the queue of network events, where, in addition to them, internal events (timeouts and internal API events) also fall. Each type of event forms a separate stream. But the processing of such events, as noted earlier, is a synchronous process, since it entails changes in the state of the node. Event Agregator combines several chains of events into one and sends them via the channel to the event loop, where they are processed in the order of the established priority.

    When communicating between nodes, Exonum performs the following related operations on each of them:
     
    Connect to node N (open a socket, configure a socket) -> Receive messages from node N (receive bytes from a socket, split bytes into messages) -> Forward messages to the channel of the current node

    let connect_handle = Retry::spawn(handle.clone(), strategy, action)
                .map_err(into_other)
                // Configure socket
                .and_then(move |sock| {
                    sock.set_nodelay(network_config.tcp_nodelay)?;
                    let duration =
                        network_config.tcp_keep_alive.map(Duration::from_millis);
                    sock.set_keepalive(duration)?;
                    Ok(sock)
                })
                // Connect socket with the outgoing channel
                .and_then(move |sock| {
                    trace!("Established connection with peer={}", peer);
                    let stream = sock.framed(MessagesCodec::new(max_message_len));
                    let (sink, stream) = stream.split();
                    let writer = conn_rx
                        .map_err(|_| other_error("Can't send data into socket"))
                        .forward(sink);
                    let reader = stream.for_each(result_ok);
                    reader
                        .select2(writer)
                        .map_err(|_| other_error("Socket error"))
                        .and_then(|res| match res {
                            Either::A((_, _reader)) => Ok("by reader"),
                            Either::B((_, _writer)) => Ok("by writer"),
                        })
                })
    

    Due to the fact that futures makes it possible to combine various abstractions into a chain without losing system performance, the program code turns out to be divided into small functional blocks, and therefore it becomes easier to maintain.

    Using the network is a non-trivial task. To work with the node, you need to connect to it, and also provide the reconnection logic in case of a break :

    .map_err(into_other)

    In addition, you must configure the socket :

    .and_then(move |sock| {
                    sock.set_nodelay(network_config.tcp_nodelay)?;
                    let duration =
                        network_config.tcp_keep_alive.map(Duration::from_millis);
                    sock.set_keepalive(duration)?;
                    Ok(sock)
                })
    

    And parse incoming bytes as messages:

    let (sink, stream) = stream.split();

    Each of the given blocks of code, in turn, consists of smaller sections. However, due to the fact that futures allows you to freely combine these blocks, we do not need to think about the internal structure of each of them.

    In conclusion, I would like to note that at present Exonum uses a somewhat outdated version of iron based on the hyper library as an API . However, now we are considering the option of switching to pure hyper, which uses Tokio.



    We offer you some more materials on the topic from our blog on Habré:


    Also popular now: