Using Boost.Asio with Coroutines TS

Introduction


Using callback functions is a popular approach to building network applications using the Boost.Asio library (and not only it). The problem with this approach is the deterioration of the readability and maintainability of the code while complicating the logic of the data exchange protocol [1] .


As an alternative to callbacks, coroutines can be used to write asynchronous code, the readability level of which will be close to the readability of synchronous code. Boost.Asio supports this approach by providing the ability to use the Boost.Coroutine library to handle callbacks.


Boost.Coroutine implements coroutines by storing the execution context of the current thread. This approach competed for inclusion in the next edition of the C ++ standard with a proposal from Microsoft, which introduces the new keywords co_return, co_yield and co_await. The Microsoft proposal has received the status of Technical Specification (TS) [2] and has a high chance of becoming a standard.


Article [3] demonstrates the use of Boost.Asio with Coroutines TS and boost :: future. In my article I want to show how to do without boost :: future. We will take an example of an asynchronous TCP echo server from Boost.Asio as a basis and we will modify it using coroutines from Coroutines TS.



At the time of this writing, Coroutines TS is implemented in Visual C ++ 2017 and clang 5.0 compilers. We will use clang. You must set the compiler flags to enable experimental support for the C ++ 20 standard (-std = c ++ 2a) and Coroutines TS (-fcoroutines-ts). Also include header file.



Coroutine for reading from a socket



In the original example, the function for reading from the socket looks like this:


void do_read() {
    auto self(shared_from_this());
    socket_.async_read_some(
        boost::asio::buffer(data_, max_length),
        [this, self](boost::system::error_code ec, std::size_t length) {
            if (!ec) {
                do_write(length);
            }
        });
}

We initiate an asynchronous read from the socket and set a callback that will be called when the data is received and initiates its sending back. The recording function in the original looks like this:


void do_write(std::size_t length) {
    auto self(shared_from_this());
    boost::asio::async_write(
        socket_, boost::asio::buffer(data_, length),
        [this, self](boost::system::error_code ec, std::size_t /*length*/) {
            if (!ec) {
                do_read();
            }
        });
}

Upon successful data writing to the socket, we again initiate asynchronous reading. In essence, the program logic is reduced to a loop (pseudo-code):



while (!ec)
{
	ec = read(buffer);
	if (!ec)
	{
		ec = write(buffer);
	}
}

It would be convenient to encode this in the form of an explicit loop, however, in this case, we would have to make reading and writing synchronous operations. This is not suitable for us, because we want to serve several client sessions in one thread of execution at the same time. Coroutines come to the rescue. We rewrite the do_read () function as follows:



void do_read() {
    auto self(shared_from_this());
    const auto[ec, length] = co_await async_read_some(
        socket_, boost::asio::buffer(data_, max_length));
    if (!ec) {
        do_write(length);
    }
}

Using the co_await keyword (as well as co_yield and co_return) turns the function into a coroutine. Such a function has several points (suspension point), where its execution is suspended (suspend) while maintaining the state (values ​​of local variables). Later coroutine execution can be resumed (resume), starting from the last stop. The co_await keyword in our function creates a suspension point: after asynchronous reading is initiated, do_read () coroutine execution will be suspended until reading is completed. There is no return from the function, but the execution of the program continues, starting from the point of calling the coroutine. When the client connects, session :: start () is called, where do_read () is called the first time for this session. After the start of asynchronous reading, execution of the start () function continues, it returns from it and the reception of the next connection is initiated. Next, the code from Asio, which called the async_accept () argument handler, continues to execute.


In order for co_await magic to work, its expression - in our case, the async_read_some () function - must return an object of a class that matches a specific contract. The implementation of async_read_some () is taken from the commentary on the article [3] .



template 
auto async_read_some(SyncReadStream &s, DynamicBuffer &&buffers) {
    struct Awaiter {
        SyncReadStream &s;
        DynamicBuffer buffers;
        std::error_code ec;
        size_t sz;
        bool await_ready() { return false; }
        void await_suspend(std::experimental::coroutine_handle<> coro) {
            s.async_read_some(std::move(buffers),
                              [this, coro](auto ec, auto sz) mutable {
                                  this->ec = ec;
                                  this->sz = sz;
                                  coro.resume();
                              });
        }
        auto await_resume() { return std::make_pair(ec, sz); }
    };
    return Awaiter{s, std::forward(buffers)};
}

async_read_some () returns an object of the Awaiter class that implements the contract required by co_await:


  • await_ready () is called at the beginning of the wait to check if the result of the asynchronous operation is already ready. Since in order to get the result, we always need to wait until the data is read, we return false.
  • await_suspend () is called before the calling coroutine is paused. Here we initiate an asynchronous read and pass a handler that will save the results of the asynchronous operation in the member variables of the Awaiter class and resume the coroutine.
  • await_resume () - the return value of this function will be the result of executing co_await. We simply return the previously saved results of the asynchronous operation.

If now we try to build our program, we get a compilation error:



error: this function cannot be a coroutine: 'std::experimental::coroutines_v1::coroutine_traits' has no member named 'promise_type'
    void do_read() {
         ^

The reason is that the compiler requires that a certain contract be implemented for the coroutine as well. This is done by specializing the std :: experimental :: coroutine_traits template:



template 
struct std::experimental::coroutine_traits {
    struct promise_type {
        void get_return_object() {}
        std::experimental::suspend_never initial_suspend() { return {}; }
        std::experimental::suspend_never final_suspend() { return {}; }
        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };
};

We specialized coroutine_traits for coroutines with a return value of type void and any number and types of parameters. The do_read () coroutine fits this description. The template specialization contains the promise_type type with the following functions:


  • get_return_object () is called to create an object that the coroutine will subsequently populate and return. In our case, you do not need to create anything, since do_read () does not return anything.
  • initial_suspend () determines whether the coroutine will be suspended before the first call. The analogy is to start a suspended thread on Windows. We need do_read () to execute without an initial stop, so we return suspend_never.
  • final_suspend () determines whether the coroutine is paused before returning the value and terminating. Return suspend_never.
  • return_void () tells the compiler that the coroutine returns nothing.
  • unhandled_exception () is thrown if an exception was thrown inside the coroutine and it was not processed inside the coroutine. In this case, the program crashes.

Now you can start the server and check its functionality by opening several connections using telnet.



Coroutine for writing to a socket


The do_write () write function is still based on using a callback. Fix it. We rewrite do_write () as follows:



auto do_write(std::size_t length) {
    auto self(shared_from_this());
    struct Awaiter {
        std::shared_ptr ssn;
        std::size_t length;
        std::error_code ec;
        bool await_ready() { return false; }
        auto await_resume() { return ec; }
        void await_suspend(std::experimental::coroutine_handle<> coro) {
            const auto[ec, sz] = co_await async_write(
                ssn->socket_, boost::asio::buffer(ssn->data_, length));
            this->ec = ec;
            coro.resume();
        }
    };
    return Awaiter{self, length};
}

Let's write an awaitable wrapper for writing to the socket:



template 
auto async_write(SyncReadStream &s, DynamicBuffer &&buffers) {
    struct Awaiter {
        SyncReadStream &s;
        DynamicBuffer buffers;
        std::error_code ec;
        size_t sz;
        bool await_ready() { return false; }
        auto await_resume() { return std::make_pair(ec, sz); }
        void await_suspend(std::experimental::coroutine_handle<> coro) {
            boost::asio::async_write(
                s, std::move(buffers), [this, coro](auto ec, auto sz) mutable {
                    this->ec = ec;
                    this->sz = sz;
                    coro.resume();
                });
        }
    };
    return Awaiter{s, std::forward(buffers)};
}

The last step is to rewrite do_read () as an explicit loop:



void do_read() {
    auto self(shared_from_this());
    while (true) {
        const auto[ec, sz] = co_await async_read_some(
            socket_, boost::asio::buffer(data_, max_length));
        if (!ec) {
            auto ec = co_await do_write(sz);
            if (ec) {
                std::cout << "Error writing to socket: " << ec << std::endl;
                break;
            }
        } else {
            std::cout << "Error reading from socket: " << ec << std::endl;
            break;
        }
    }
}

The program logic is now written in a form close to synchronous code, however, it runs asynchronously. The fly in the ointment is that we had to write an additional awaitable class for the return value of do_write (). This illustrates one of the drawbacks of Coroutines TS - the spread of co_await up the stack of asynchronous calls [4] .


Remaking the server :: do_accept () function in a coroutine is left as an exercise. The full text of the program can be found on GitHub .



Conclusion


We examined the use of Boost.Asio with Coroutines TS for programming asynchronous network applications. The advantage of this approach is improved readability of the code, since it becomes close in form to synchronous. The disadvantage is the need to write additional wrappers to support the coroutine model implemented in Coroutines TS.



References


  1. Asynchrony: Back to the Future
  2. Working Draft, Technical Specification for C ++ Extensions for Coroutines
  3. Using C ++ Coroutines with Boost C ++ Libraries
  4. Objections to accepting Coroutines with await in C ++ 17

Also popular now: