Asynchrony 2: Teleporting Through Portals



    Not even a year had passed before I got to the continuation of the article about asynchrony. This article develops the ideas of the very first article about asynchrony [1] . It discusses a rather complicated task, the example of which will reveal the power and flexibility of using coroutines in various non-trivial scenarios. In conclusion, we will consider two tasks for the race condition (race-condition), as well as a small but very pleasant bonus.

    For all this time, the first article has already gotten into the search top.



    So let's go!



    Task


    The original wording is straightforward and sounds like this:
    Get a heavy object over the network and transfer it to the UI.

    We will complicate the task by adding “interesting” requirements for the UI:
    1. An action is generated from a UI thread through an event.
    2. The result must be returned back to the UI.
    3. We do not want to block the UI, so the operation must be performed asynchronously.

    Add the “funny” conditions for receiving the object:
    1. Network operations are slow, so we will cache the object.
    2. I would like to have a persistent cache, so that after the restart the objects are saved.
    3. A persistent device is slow, so for faster return of objects we will additionally cache them in memory.

    Let's take a look at performance aspects:
    1. It would be desirable to have parallel, but not consecutive record in caches (persistent storage and memory).
    2. Reading from the caches should also be parallel, while if the value is found in one of the caches, then immediately use it, without waiting for a response from the other cache.
    3. Network operations should not in any way interfere with caches, that is, if, for example, the caches are dull, then this should not affect network interactions.
    4. I want to support a large number of connections in a limited number of streams, that is, I want asynchronous network interaction for a more careful attitude to resources.

    To exacerbate the logic:
    1. We will need to cancel operations.
    2. Moreover, if we still received our object via the network, then cancellation should not be applied to subsequent operations to update the cache, that is, it is necessary to implement a “cancellation of cancellation” for some set of actions.

    If someone thought it was not hardcore enough, then add more requirements:
    1. It is necessary to implement timeouts for operations. Moreover, timeouts should be both for the entire operation, and for some parts. For instance:
      • timeout for all network interaction: connection, request, response;
      • timeout for the entire operation, including network interaction and work with caches.
    2. Operation schedulers can be either their own or foreign (for example, a scheduler in a UI thread).
    3. No operations should block threads. This means that the use of mutexes and other means of synchronization is prohibited, as they will block our threads.



    Now enough. If someone immediately got an answer on how to do this, then I will be happy to get acquainted with this decision. Well, below I propose my solution: it is clear that in it the emphasis will not be on the implementation, for example, caches and persistence, but on a specific parallel and asynchronous interaction, taking into account the requirements for locks and schedulers.

    Decision


    For the solution we will use the following model.



    I will describe the essence of what is happening:
    1. UI, Mem Cache, Disk Cache, NetworkAre objects that perform the corresponding operations on our newly created handler Handler.
    2. Handler performs a simple sequence:
      • Parallel operation starts receiving data from the caches objects Mem Cacheand Disk Cache. If successful, that is, when a response is received with the result found from at least one cache, it immediately returns the result. And in case of failure (as in the diagram), execution continues.
      • After waiting for the lack of result from both caches, it Handleraccesses to Networkobtain the object over the network. To do this, you connect to the service ( connect), send a request ( send) and receive a response ( receive). Such operations are performed asynchronously and do not block other network interactions.
      • The Networkobject received from the component is written in parallel to both caches.
      • After waiting for the completion of writing to the cache, the value is returned to the UI stream.
    3. The program contains the following planners and their associated objects:
      • UI thread that initiates the asynchronous operation Handlerand in which the result should return;
      • A common thread pool in which all basic operations are performed, including Mem Cacheand Disk Cache;
      • network thread pool for Network. It is created separately from the main thread pool so that the load of the main pool does not affect the network thread pool.

    As I wrote earlier, we will implement objects in the simplest way, since for the asynchronous aspects this does not really matter:

    // stub: дисковый кеш
    struct DiskCache
    {
       boost::optional get(const std::string& key)
       {
           JLOG("get: " << key);
           return boost::optional();
       }
       void set(const std::string& key, const std::string& val)
       {
           JLOG("set: " << key << ";" << val);
       }
    };
    // кеш в памяти: хеш-таблица
    struct MemCache
    {
       boost::optional get(const std::string& key)
       {
          auto it = map.find(key);
          return it == map.end()
             ? boost::optional()
             : boost::optional(it->second);
       }
       void set(const std::string& key, const std::string& val)
       {
          map[key] = val;
       }
    private:
       std::unordered_map map;
    };
    struct Network
    {
       // ...
       // получение объекта по сети
       std::string get(const std::string& key)
       {
           net::Socket socket;
           JLOG("connecting");
           socket.connect(address, port);
           // первый байт - размер строки
           Buffer sz(1, char(key.size()));
           socket.write(sz);
           // затем - строка
           socket.write(key);
           // получаем размер результата
           socket.read(sz);
           Buffer val(size_t(sz[0]), 0);
           // получаем сам результат
           socket.read(val);
           JLOG("val received");
           return val;
       }
    private:
        std::string address;
        int port;
        // ...
    };
    // UI-объект: взаимодействие с UI
    struct UI : IScheduler
    {
       void schedule(Handler handler)
       {
           // запуск операции в UI-потоке
           // ...
       }
       void handleResult(const std::string& key, const std::string& val)
       {
           TLOG("UI result inside UI thread: " << key << ";" << val);
           // TODO: add some actions
       }
    };
    

    As a rule, all UI frameworks contain a method that allows you to run the necessary actions in the UI stream (for example, in Android:, Activity.runOnUiThreadUltimate ++:, PostCallbackQt: through the signal-slot mechanism). These methods should be used in the implementation of the method UI::schedule.

    The initialization of the entire economy takes place in an imperative style:

    // создаем пул потоков для общих действий
    ThreadPool cpu(3, "cpu");
    // создаем пул потоков для сетевых действий
    ThreadPool net(2, "net");
    // планировщик для сериализации действий с диском
    Alone diskStorage(cpu, "disk storage");
    // планировщик для сериализации действий с памятью
    Alone memStorage(cpu, "mem storage");
    // задание планировщика по умолчанию
    scheduler().attach(cpu);
    // привязка сетевого сервиса к сетевому пулу
    service().attach(net);
    // привязка обработки таймаутов к общему пулу
    service().attach(cpu);
    // привязка дискового портала к дисковому планировщику
    portal().attach(diskStorage);
    // привязка портала памяти к соответствующему планировщику
    portal().attach(memStorage);
    // привязка сетевого портала к сетевому пулу
    portal().attach(net);
    UI& ui = single();
    // привязка UI-портала к UI-планировщику
    portal().attach(ui);
    

    In the UI thread, for some user action, we perform:

    go([key] {
       // timeout для всех операций: 1с=1000 мс
       Timeout t(1000);
       std::string val;
       // получить результат из кешей параллельно
       boost::optional result = goAnyResult({
           [&key] {
               return portal()->get(key);
           }, [&key] {
               return portal()->get(key);
           }
       });
       if (result)
       {
           // результат найден
           val = std::move(*result);
           JLOG("cache val: " << val);
       }
       else
       {
           // кеши не содержат результата
           // получаем объект по сети
           {
               // таймаут на сетевую обработку: 0.5с=500 мс
               Timeout tNet(500);
               val = portal()->get(key);
           }
           JLOG("net val: " << val);
           // начиная с этого момента и до конца блока
           // отмена (и таймауты) отключены
           EventsGuard guard;
           // параллельно записываем в оба кеша
           goWait({
               [&key, &val] {
                   portal()->set(key, val);
               }, [&key, &val] {
                   portal()->set(key, val);
               }
           });
           JLOG("cache updated");
       }
       // переходим в UI и обрабатываем результат
       portal()->handleResult(key, val);
    });
    



    Implementation of used primitives


    As an attentive reader noted, I used a considerable number of primitives, the implementation of which can only be guessed at. Therefore, the following is a description of the approach and the classes used. I think this will clarify what portals are, how to use them, and also answer the question about teleportations.



    Pending primitives


    Let's start with the simplest - waiting primitives.

    goWait: start an asynchronous operation and wait for completion


    So, for the seed, we implement a function that will start the operation asynchronously and wait for its completion:

    void goWait(Handler);
    

    Of course, as an implementation, launching a handler in the current coroutine is quite suitable. But in more complex scenarios this will not suit us, therefore, to implement this function, we will create a new coroutine:

    void goWait(Handler handler) {
        deferProceed([&handler](Handler proceed) {
            go([proceed, &handler] { // создаем новую сопрограмму
                handler();
                proceed(); // продолжаем выполнение сопрограммы
            });
        });
    }
    

    I will describe briefly what is happening here. To the input of the goWait function, we get a handler that should be launched in a new coroutine. To perform the necessary operations, we use a function deferProceedthat is implemented as follows:

    typedef std::function ProceedHandler; 
    void deferProceed(ProceedHandler proceed) {
        auto& coro = currentCoro();
        defer([&coro, proceed] {
            proceed([&coro] { coro.resume(); });
        });
    }
    

    What does this function do? It actually wraps the defer call for more convenient use (what is it deferand why it should be used, described in my previous article ), namely: it takes not Handler, but ProceedHandler, which is passed as an input parameter Handlerto continue coroutine execution. Actually, he proceedsaves a link to the current coroutine in his object and calls it coro.resume(). Thus, we encapsulate all work with coroutines, and the user needs to work only with the proceed-processor.

    Back to the function goWait. So, when you call deferProceed, we have proceedone that needs to be called at the end of the operation inhandler. All that remains for us to do is to create a new coroutine, run our handler in it handler, and after it is completed, immediately call proceed, which within itself will call coro.resume(), thereby continuing the execution of the original coroutine.

    This gives us a wait without blocking the flow: during a call, goWaitwe pause our operations in the current coroutine, and when the transferred handler finishes, we continue execution as if nothing had happened.

    goWait: start several asynchronous operations and wait for them to complete


    Now we implement a function that starts a whole bunch of asynchronous operations and expects them to complete:

    void goWait(std::initializer_list handlers);
    

    At the entrance, we are given a list of handlers that must be run asynchronously, that is, each handler will start in its coroutine. A significant difference from the previous function is that we need to continue executing the original coroutine only after all the handlers have completed. Some of them use all sorts of mutexes and condition variables (and indeed some implement it!), But we can’t do this (see requirements), so we will look for other ways to implement it.

    The idea is actually quite trivial: you need to start a counter that, when a certain value is reached, will callproceed. Each handler at its completion will update the counter, and thus the last of the handlers will continue to execute the original coroutine. However, there is one small complication: the counter must be divided between running coroutines, and the last handler must not only call proceed, but also delete this counter from memory. All this can be implemented as follows:

    void goWait(std::initializer_list handlers)
    {
        deferProceed([&handlers](Handler proceed) {
            std::shared_ptr proceeder(nullptr, [proceed](void*) { proceed(); });
            for (const auto& handler: handlers)
            {
                go([proceeder, &handler] {
                    handler();
                });
            }
        });
    }
    

    At the very beginning, we launch the good old deferProceed, but inside it is hidden a little magic. Few people know that when constructing shared_ptrit is possible to pass not only a pointer to the data, but also deleterone that will delete the object, calling not delete ptr, but a handler. Actually, that's where we put the proceed call in order to continue the original coroutine at the end. In this case, there is no need to delete the object itself, since we put "nothing" there nullptr. Then everything is simple: in a loop we go through all the handlers and run them in the coroutines we create. Here, too, there is one nuance: we capture our value proceeder, which will lead to its copying, which means an increase in our atomic link counter inside shared_ptr. After finishing workhandlerour lambda with the captured proceeder will be deleted, which will lead to a decrease in the counter. The last one to reduce the counter to zero and delete the object proceederwill call deleterfor the shared one shared_ptr, that is, it will call in the end coro.proceed().



    For clarity, the following is a sequence of operations using the example of starting two handlers in different threads:



    Example: Recursively Parallel Fibonacci Numbers


    To illustrate use, consider the following example. Suppose we found a whimper on us and we wanted to calculate the Fibonacci series recursively and in parallel. No problems:

    int fibo (int v)
    {
        if (v < 2)
            return v;
        int v1, v2;
        goWait({
            [v, &v1] { v1 = fibo(v-1); },
            [v, &v2] { v2 = fibo(v-2); }
        });
        return v1 + v2;
    }
    

    I note that there will never be a stack overflow: each function call fibooccurs in its own coroutine.

    Waiter: start several asynchronous operations and wait for their completion


    Often we need not only to wait for a fixed set of handlers, but also to do something useful between things and only then to wait. Sometimes we don’t even know how many handlers may be needed, that is, we create them in the course of our operations. In fact, we need to operate with a group of processors as a whole. To do this, you can use a primitive Waiterwith the following interface:

    struct Waiter
    {
        Waiter& go(Handler);
        void wait();
    };
    

    There are only two methods:
    1. go: run another handler;
    2. wait: wait for all running handlers.

    You can run the above methods several times over the entire lifetime of the object Waiter.

    The idea of ​​implementation is exactly the same: it is necessary to have proceederone that would continue the work of our coroutine. However, a little subtlety is added: it proceederis now split between running coroutines and the object Waiter. Accordingly, at the time of the method call, waitwe need to get rid of the copy in itself Waiter. Here's how to do it:

    void Waiter::wait()
    {
        if (proceeder.unique())
        {
            // только Waiter владеет proceeder =>
            JLOG("everything done, nothing to do");
            return;
        }
        defer([this] {
            // перемещаем proceeder в область вне сопрограммы
            auto toDestroy = std::move(proceeder);
            // разделяемый proceeder удалится либо здесь,
            // либо в какой-либо сопрограмме обработчика
        });
        // proceeder в этот момент был удален,
        // восстановим его снова для последующего использования
        init0();
    }
    

    And again, nothing needs to be done! Thanks for that shared_ptr. Amen!



    Example: Recursively Parallel Fibonacci Numbers


    To consolidate the material, we consider an alternative implementation of our whim using Waiter:

    int fibo (int v)
    {
        if (v < 2)
            return v;
        int v1;
        Waiter w;
        w.go([v, &v1] { v1 = fibo(v-1); });
        int v2 = fibo(v-2);
        w.wait();
        return v1 + v2;
    }
    

    Another option:

    int fibo (int v)
    {
        if (v < 2)
            return v;
        int v1, v2;
        Waiter()
            .go([v, &v1] { v1 = fibo (v-1); })
            .go([v, &v2] { v2 = fibo (v-2); })
            .wait();
        return v1 + v2;
    }
    

    I do not want to choose.

    goAnyWait: start several asynchronous operations and wait for at least one to complete


    We will continue to run several operations at the same time. But we will expect exactly until at least one operation is completed:

    size_t goAnyWait(std::initializer_list handlers);
    

    At the input, we are given a list of handlers, at the output, the number of the handler that ended first.

    To implement this primitive, we will slightly modernize our approach. Now we will not separate void* ptr == nullptr, but rather imagine a specific atomic counter counter. At the very beginning, it is initialized with a value 0. Each handler at the end of its work increments the counter. And if it suddenly turned out that there was a change in the value from 0to 1, then he and only he calls proceed():

    size_t goAnyWait(std::initializer_list handlers)
    {
       VERIFY(handlers.size() >= 1, "Handlers amount must be positive");
       size_t index = static_cast(-1);
       deferProceed([&handlers, &index](Handler proceed) {
          std::shared_ptr> counter =
             std::make_shared>();
          size_t i = 0;
          for (const auto& handler: handlers)
          {
             go([counter, proceed, &handler, i, &index] {
                handler();
                if (++ *counter == 1)
                {
                   // ага, попался!
                   index = i;
                   proceed();
                }
             });
             ++ i;
          }
       });
       VERIFY(index < handlers.size(), "Incorrect index returned");
       return index;
    }
    

    As you might guess, this trick can also be used for cases when you need to wait for two, three or more processors.

    goAnyResult: start several asynchronous operations and wait for at least one result


    Now let's move on to the most delicious, which, in fact, is necessary for our task. Namely: to launch several operations and wait for the result we need. Moreover, any handler may not return the result. That is, he will finish his work, but at the same time he will say: “Well, I couldn’t, I couldn’t.”

    With this approach, additional complexity appears. After all, all the handlers can complete the work, but we will not get the result. Therefore, you will have to, firstly, somehow, at the end of all operations, check whether we get the desired result, and secondly, return an “empty” result. We will use emptiness for signaling , and it turns out with such a simple prototype:boost::optionalgoAnyResult

    template
    boost::optional goAnyResult(
       std::initializer_list<
          std::function<
             boost::optional()
          >
       > handlers)
    

    There is nothing terrible here: we just pass a list of handlers that optionally return ours T_result. That is, handlers must have a signature:

    boost::optional handler();
    

    The situation compared to the previous primitive is only slightly modified. The counter remains the same, only now it is necessary to check it when it is destroyed counter, and if we get it when it is increased 1, then it is necessary to return the "empty" value, since no one was able to distort the counter before and return the necessary result. Thus, instead of a simple atomic value for, counterwe have a whole object Counter:

    template
    boost::optional goAnyResult(
       std::initializer_list<
          std::function<
             boost::optional()
          >
       > handlers)
    {
        typedef boost::optional Result;
        typedef std::function ResultHandler;
        struct Counter
        {
            Counter(ResultHandler proceed_) : proceed(std::move(proceed_)) {}
            ~Counter()
            {
                tryProceed(Result()); // в деструкторе продолжаем по-любому
            }
            void tryProceed(Result&& result)
            {
                if (++ counter == 1)
                    proceed(std::move(result));
            }
        private:
            std::atomic counter;
            ResultHandler proceed;
        };
        Result result;
        deferProceed([&handlers, &result](Handler proceed) {
            std::shared_ptr counter = std::make_shared(
                [&result, proceed](Result&& res) {
                    result = std::move(res);
                    proceed();
                }
            );
            for (const auto& handler: handlers)
            {
                go([counter, &handler] {
                    Result result = handler();
                    if (result) // пытаемся продолжить только при наличии результата
                        counter->tryProceed(std::move(result));
                });
            }
        });
        return result;
    }
    

    The intrigue here is that it std::movemoves the result only when the condition inside tryProceedis satisfied. And all because it std::movedoes not perform the movement as such, no matter how someone would like it. This is just a cast operation on links.

    With the expectations sorted out, go to the schedulers and thread pools.

    Scheduler, pools, synchronization


    Scheduler Interface


    After reviewing, so to speak, the basic basics, we move on to the dessert.
    We introduce the scheduler interface:

    struct IScheduler : IObject
    {
        virtual void schedule(Handler handler) = 0;
    };
    

    Its task is to execute handlers. Note that the scheduler interface has no cancellations, no timeouts, or pending operations. The scheduler interface should be crystal clear so that it can be easily docked with various frameworks (cf. [2] : here you can cancel, and actors, and delays, it will be very convenient to cross with UI schedulers).



    Thread pool


    We need a thread pool to perform various actions that implements the scheduler interface:

    typedef boost::asio::io_service IoService;
    struct IService : IObject
    {
        virtual IoService& ioService() = 0;
    };
    struct ThreadPool : IScheduler, IService
    {
        ThreadPool(size_t threadCount);
        void schedule(Handler handler)
        {
            service.post(std::move(handler));
        }
    private:
        IoService& ioService();
        std::unique_ptr work;
        boost::asio::io_service service;
        std::vector threads;
    };
    

    What do we have here?
    1. The constructor in which we set the number of threads.
    2. An implementation of the scheduler interface using boost::asio::io_service::post.
    3. A member of the class workthat holds the event loop io_service, because otherwise, in the absence of events, the loop will end its work and the threads will collapse.
    4. Array of threads.

    In addition, our class implements (and privately) a kind of muddy interface IServicewith a method ioServicethat returns IoServicewhich is boost::asio::io_service. It all looks strange, but now I’ll try to explain what the trick is.

    The fact is that to work with network sockets and timeouts we need an advanced scheduler interface. This interface is actually hidden inside boost::asio::io_service. The remaining components, which I will use in the future, must somehow access the instance boost::asio::io_service. To prevent easy access to this class, I introduced an interfaceIService, allowing you to get the cherished copy. However, in the implementation, the method is made private. This provides a certain level of protection against misuse, since in order to pull this object out, you will first need to convert ThreadPoolto IService, and then call the desired method. An alternative would be to use friend classes. But I did not want to spoil my ThreadPoolknowledge about possible use cases, so I considered that the approach used was a reasonable fee for encapsulation.

    Coroutine class


    After introducing the thread pool and the scheduler, it was the turn to introduce a class for manipulating coroutines. It will be called, oddly enough, Journey(why so, it will be clear later):

    struct Journey
    {
        void proceed();
        Handler proceedHandler();
        void defer(Handler handler);
        void deferProceed(ProceedHandler proceed);
        static void create(Handler handler, mt::IScheduler& s);
    private:
        Journey(mt::IScheduler& s);
        struct CoroGuard
        {
            CoroGuard(Journey& j_) : j(j_)  { j.onEnter0();   }
            ~CoroGuard()                    { j.onExit0();    }
            coro::Coro* operator->()        { return &j.coro; }
        private:
            Journey& j;
        };
        void start0(Handler handler);
        void schedule0(Handler handler);
        CoroGuard guardedCoro0();
        void proceed0();
        void onEnter0();
        void onExit0();
        mt::IScheduler* sched;
        coro::Coro coro;
        Handler deferHandler;
    };
    

    What is striking here?
    • Private constructor. It is called by the static public method create.
    • JourneyIt contains a pointer to the scheduler sched, the coroutine itself, coroand the deferHandler-handler that is called inside defer.
    • CoroGuard- a proxy class that, with each operation on a coroutine, automatically performs actions onEnter0when entering and onExit0exiting.

    To understand how this works, let's look at the implementation of several simple methods:

    void Journey::schedule0(Handler handler)
    {
        VERIFY(sched != nullptr, "Scheduler must be set in journey");
        sched->schedule(std::move(handler)); 
    }
    void Journey::proceed0()
    {
        // используем защитник для продолжения сопрограммы
        guardedCoro0()->resume(); 
    }
    Journey::CoroGuard Journey::guardedCoro0()
    {
        return CoroGuard(*this);
    }
    // возврат в сопрограмму можно делать только с использованием планировщика
    void Journey::proceed()
    {
        schedule0([this] {
            proceed0();
        });
    }
    // тот самый обработчик, который возвращает управление сопрограмме
    Handler Journey::proceedHandler()
    {
        return [this] {
            proceed();
        };
    }
    // запуск новой сопрограммы
    // см. также задачу 1
    void Journey::start0(Handler handler)
    {
        schedule0([handler, this] {
            // снова используем защитник
            guardedCoro0()->start([handler] {
                JLOG("started");
                // не забывает про исключения
                try
                {
                    handler();
                }
                catch (std::exception& e)
                {
                    (void) e;
                    JLOG("exception in coro: " << e.what());
                }
                JLOG("ended");
            });
        });
    }
    

    Let's now examine the defer operation:

    void Journey::defer(Handler handler)
    {
        // запоминаем обработчик
        deferHandler = handler;
        // и выходим из текущей сопрограммы
        coro::yield();
    }
    // deferProceed, используемый ранее
    void Journey::deferProceed(ProceedHandler proceed)
    {
        defer([this, proceed] {
            proceed(proceedHandler());
        });
    }
    

    Everything is simple! It remains to understand where our pending handlers run deferHandler.

    TLS Journey* t_journey = nullptr;
    void Journey::onEnter0()
    {
        t_journey = this;
    }
    // см. также задачу 2
    void Journey::onExit0()
    {
        if (deferHandler == nullptr)
        {
            // нет обработчика => действия завершены, можно самоликвидироваться
            delete this;
        }
        else
        {
            // в противном случае выполняем отложенное действие
            deferHandler();
            deferHandler = nullptr;
        }
        // восстанавливаем значение, так как теперь находимся вне сопрограммы
        t_journey = nullptr;
    }
    

    And finally, consider the implementation of a static function create:

    void Journey::create(Handler handler, mt::IScheduler& s)
    {
        (new Journey(s))->start0(std::move(handler));
    }
    

    It is worth noting that the user does not have any possibility of explicit creation Journey, that is, he does not suspect at all that there is such a class. But more on that later, but now ...

    Teleportation


    Finally, move on to the strawberry! Teleportation ... It's about a primitive that can only be implemented using coroutines. And this primitive is so powerful and so simple that it is worth stopping at it in more detail and savoring it. It’s a strawberry!

    The easiest way to start a discussion is with implementation:

    void Journey::teleport(mt::IScheduler& s)
    {
        if (&s == sched)
        {
            JLOG("the same destination, skipping teleport <-> " << s.name());
            return;
        }
        JLOG("teleport " << sched->name() << " -> " << s.name());
        sched = &s;
        defer(proceedHandler());
    }
    

    Two things are done here:
    1. It is checked whether the coroutine scheduler is different from the scheduler that was submitted to the method input. If it matches him, then nothing needs to be done, the planner is already needed.
    2. If it is different, then the coroutine scheduler is changed and transferred to the coroutine: it deferperforms a function that leads to the exit from the coroutine and the launch of the handler for the continuation of the coroutine as soon as possible. However, a new scheduler will be used to return, so entry into the coroutine will occur already in the new thread pool.



    The diagram below explains the process of switching coroutine execution from Scheduler/ Threadto Scheduler2/ Thread2:



    What does this give us? Actually it gives switching between flows pools, and also, generally speaking, between schedulers. In particular, it is possible to switch between the UI thread and the computation threads, so that the UI, as they say, is not dull:

    auto result = someCalculations();
    teleport(uiScheduler);
    showResult(result);
    teleport(calcScheduler);
    auto newResult = continueSmartCalculations(result);
    teleport(uiScheduler);
    updateResult(newResult);
    //…
    

    That is, in order to go to the UI, you just need to teleport to the desired stream, and everything will be thread safe in accordance with the requirements of the development of UI applications. The same technique can be applied if we need to go, for example, to a network thread pool or a database thread pool - in general, to any place where you can use the scheduler interface.

    Portals





    Now you can move on to the cherry. I would even say sweet and juicy cherry. As you already noticed, to update the state of the UI application, we needed to teleport first to the UI scheduler, and then back. In order not to do this every time, create a portal that will automatically return back as soon as the necessary action is completed.

    struct Portal
    {
        Portal(mt::IScheduler& destination) :
            source(journey().scheduler())
        {
            JLOG("creating portal " << source.name() << " <=> " << destination.name());
            teleport(destination);
        }
        ~Portal()
        {
            teleport(source);
        }
    private:
        mt::IScheduler& source;
    };
    

    That is, in the constructor we remember the source (the current coroutine scheduler), and then teleport the coroutine to a known direction. In the destructor, teleportation to the original scheduler occurs.

    Thanks to this RAII idiom, we don’t have to worry about the fact that we suddenly may not be where we expected (for example, we will not do heavy calculations in a UI stream or in a network thread pool), everything will be done automatically.

    Let's look at an example:

    ThreadPool tp1(1, "tp1");
    ThreadPool tp2(1, "tp2");
    go([&tp2] {
        Portal p(tp2);
        JLOG("throwing exception");
        throw std::runtime_error("exception occur");
    }, tp1);
    

    The coroutine starts in tp1, then a portal is created and switching to tp2 occurs. After an exception is thrown, the portal destructor is called, which actually freezes the promotion of exceptions, teleports to tp1 and continues the coroutine, which will continue to promote the exception in another thread. Free and without SMS!



    In order to further aggravate the use of portals (although, it would seem, where already), let’s say:

    struct Scheduler
    {
        Scheduler();
        void attach(mt::IScheduler& s)
        {
            scheduler = &s;
        }
        void detach()
        {
            scheduler = nullptr;
        }
        operator mt::IScheduler&() const
        {
            VERIFY(scheduler != nullptr, "Scheduler is not attached");
            return *scheduler;
        }
    private:
        mt::IScheduler* scheduler;
    };
    struct DefaultTag;
    template
    Scheduler& scheduler()
    {
        return single();
    }
    template
    struct WithPortal : Scheduler
    {
        struct Access : Portal
        {
            Access(Scheduler& s) : Portal(s) {}
            T* operator->()             { return &single(); }
        };
        Access operator->()             { return *this; }
    };
    template
    WithPortal& portal()
    {
        return single>();
    }
    

    This allows us to bind portals to classes, as in the following example:

    ThreadPool tp1(1, "tp1");
    ThreadPool tp2(1, "tp2");
    struct X
    {
        void op() {}
    };
    portal().attach(tp2);
    go([] {
        portal()->op();
    }, tp1);
    

    In this example, we Xmapped the portal to the tp2 thread pool. Thus, with each call to the method of a single instance of the class X(used in return ), the coroutine will transition to the thread pool we need. Our execution context will travel back and forth, teleporting through the portals of the objects used!&single()Journey



    This gives us the opportunity to not keep track of where the methods of our classes should be called. Classes take care of themselves switching to the desired stream and returning back. It was this approach that was used at the very beginning when solving the problem. This allowed us to achieve high code clarity and use the full power of coroutines, teleportations and portals.

    Non-Blocking Mutexes


    Mutexes are often used to work with shared resources. It is understandable: such a primitive is easy to use and in most cases it justifies itself.

    But what happens to a mutex when someone has already taken over a resource? In this case, the wait on the mutex occurs until the resource is freed. In this case, the flow is blocked and ceases to do useful work.

    What would we like? In terms of performance, we would like the threads to be involved a little more than fully, and not be distracted by expectations. “It will be done,” the coroutine replied and smiled smugly.

    Non-blocking mutexes can be implemented using coroutines in various ways. I will use an elegant method that allows you to reuse existing functionality with a minimum of additions. To do this, create a new scheduler:

    struct Alone : mt::IScheduler
    {
        Alone(mt::IService& service);
        void schedule(Handler handler)
        {
            strand.post(std::move(handler));
        }
    private:
        boost::asio::io_service::strand strand;
    };
    

    The class constructor Aloneuses an interface as an input parameter IService, which allows us to correctly initialize io_service::strandfrom boost.asio. In fact, this is another scheduler boost.asiothat guarantees that at the same time no more than one handler will be launched. This is precisely in line with our ideas about what a mutex (mutual exclusion) is.

    Since the idiom Aloneimplements the scheduler interface, we can use all the power of our teleportations and portals without a twinge of conscience, as if it were necessary.

    To consolidate the material, consider the code:

    struct MemCache
    {
        boost::optional get(const std::string& key);
        void set(const std::string& key, const std::string& val);
    };
    // инициализация
    ThreadPool common_pool(3);             // общий пул потоков
    Alone mem_alone(common_pool);          // сериализация действий с памятью
    portal().Attach(mem_alone);  // привязка портала для памяти
    // теперь выполняем необходимые операции
    auto value = portal()->get(key);
    // или
    portal()->set(anotherKey, anotherValue);
    

    Access to the object will be serialized automatically, while the stream will not be blocked in case of parallel access to the object. Miracles, and more!



    External events


    Of course, it's good to live in your own little sandbox and perform operations regardless of the environment. But life looks down upon our incomprehensible idealism, periodically spitting. Therefore, we have no choice but to wipe ourselves and move on, despite all the hardships and hardships.



    What awaits us in asynchronous programming? And the fact that some actions that today seem indisputable, at the next moment, it will be necessary to review and make adjustments (see. "Cancellation"). That is, we want to cancel our actions depending on the current situation.

    It is necessary not only to take into account the changing execution conditions: it is necessary to be able to respond to network factors - to correctly process timeouts. Of course, it’s good when we were able to get the result, but if the result was not received on time, then it may not be necessary. It used to be! What is the use of learning a subject today if the exam was yesterday and we didn’t show up at it?

    All of these requirements place a heavy burden on existing frameworks. Therefore, as a rule, these requirements are slaughtered and then raked at production, when something hangs, something is stupid, resources are busy, and the action is dull, despite the futility of the result. Let's try to make an approach to this bar.

    First, we introduce the types of external events and associated exceptions:

    enum EventStatus
    {
        ES_NORMAL,
        ES_CANCELLED,
        ES_TIMEDOUT,
    };
    struct EventException : std::runtime_error
    {
        EventException(EventStatus s);
        EventStatus status();
    private:
        EventStatus st;
    };
    

    To control the coroutine from the outside (see cancellation), you need some object that separates the state between the caller and the callee:

    struct Goer
    {
        Goer();
        EventStatus reset();
        bool cancel();
        bool timedout();
    private:
        struct State
        {
            State() : status(ES_NORMAL) {}
            EventStatus status;
        };
        bool setStatus0(EventStatus s);
        State& state0();
        std::shared_ptr state;
    };
    

    Everything is pretty trivial here: we have a smart status indicator that we can change and check.

    Next, we Journeyadd event processing to our traveler class :

    void Journey::handleEvents()
    {
        // может быть вызван из деструктора
        if (!eventsAllowed || std::uncaught_exception())
            return;
        auto s = gr.reset();
        if (s == ES_NORMAL)
            return; // нет событий
        throw EventException(s);
    }
    void Journey::disableEvents()
    {
        handleEvents();
        eventsAllowed = false;
    }
    void Journey::enableEvents()
    {
        eventsAllowed = true;
        handleEvents();
    }
    

    Here it is worth paying attention to the addition of a flag whether to process events or not. Sometimes we need to complete some important actions before we throw an exception and expand the stack. For this, the defender is intended:

    struct EventsGuard
    {
        EventsGuard();  // вызывает disableEvents()
        ~EventsGuard(); // вызывает  enableEvents()
    };
    

    The question arises, and when is that notorious one called up handleEvents? But when:

    void Journey::defer(Handler handler)
    {
        // добавляем перед выходом из сопрограммы
        handleEvents();
        deferHandler = handler;
        coro::yield();
        // и сразу после пробуждения
        handleEvents();
    }
    

    That is, at the time of any context switch, for example, during an asynchronous operation or teleportation. If we perform any heavy synchronous operations, then for more quick response to events, additional calls must be inserted into our handlers handleEvents. This will solve the problem of responsiveness of our operations to external events.

    Now we implement the launch of the coroutine:

    Goer go(Handler handler, mt::IScheduler& scheduler)
    {
        return Journey::create(std::move(handler), scheduler);
    }
    

    Journey::createreturns a shared state Goerto respond to external events:

    struct Journey
    {
        // …
        Goer goer() const
        {
            return gr;
        }
        // …
    private:
        // …
        Goer gr;
    };
    Goer Journey::create(Handler handler, mt::IScheduler& s)
    {
        return (new Journey(s))->start0(std::move(handler));
    }
    // см. задачу 1
    Goer Journey::start0(Handler handler)
    {
        // …
        return goer();
    }
    Небольшой пример использования:
    Goer op = go(myMegaHandler);
    // …
    If (weDontNeedMegaHandlerAnymore)
        op.cancel();
    

    After the call, op.cancel()a change of state will occur, and on the next call, the handleEvents()cancellation will begin its exceptional and hyped up business.



    As you probably already noticed, the creation of a traveler Journeywho will go back and forth, teleporting through portals, happens secretly inside the function go. Thus, the user does not even know that he has to deal with a hidden object. He simply calls freestanding methods: go, defer, deferProceedetc., which in itself recall traveler instance.. JourneyUsing TLS.

    Timeout Processing


    Let's take a look at the implementation of nested timeouts:

    struct Timeout
    {
        Timeout(int ms);
        ~Timeout();
    private:
        boost::asio::deadline_timer timer;
    };
    


    To do this, we will use boost::asio::deadline_timer:

    Timeout::Timeout(int ms) :
        timer(service(), boost::posix_time::milliseconds(ms))
    {
        // получаем текущее разделяемое состояние
        Goer goer = journey().goer();
        // запускаем асинхронный обработчик
        timer.async_wait([goer](const Error& error) mutable {
            // mutable, так как мы изменяем захваченное состояние goer
            if (!error) // если не было отмены таймера, то таймаутим
                goer.timedout();
        });
    }
    Timeout::~Timeout()
    {
        // отменяем запущенный таймер
        timer.cancel_one();
        // проверяем, вдруг произошло событие
        handleEvents();
    }
    

    Using the RAII idiom, we are able to invest timeout processing into each other independently, without restricting ourselves to anything.

    Let me give you a trivial example:

    // внутри сопрограммы
    Timeout t(100); // 100 мс
    for (auto element: container)
    {
        performOperation(element);
        handleEvents();
    }
    

    Failed for 100 ms - bye!



    Another example demonstrates the possibility of nested timeouts:

    // установка таймаута 200 мс на все операции
    Timeout outer(200);
    portal()->performOp();
    {
        // установка таймаута 100 мс
        // только на операции внутри области видимости
        Timeout inner(100);
        portal()->performAnotherOp();
        // а эту операцию защищаем от посягательств
        EventsGuard guard;
        performGuardedAction();
    }
    

    Tasks


    I have two tasks for the state of the race. In general, catching the state of the race is a very "fascinating" task. Therefore, I give you the opportunity to think about them.

    What is the purpose of such an exercise? I will list:
    1. Behavioral analysis will help to understand in more detail how coroutines work.
    2. Competitiveness and asynchrony have specific consequences. It is necessary to know about the pitfalls so that in case of which you do not rake.
    3. And finally - just stretch your brain. They say that it works better from this.

    Task 1


    So, task number 1.

    There is a function for launching a coroutine:

    Goer Journey::start0(Handler handler)
    {
        schedule0([handler, this] {
            guardedCoro0()->start([handler] {
                JLOG("started");
                try
                {
                    handler();
                }
                catch (std::exception& e)
                {
                    (void) e;
                    JLOG("exception in coro: " << e.what());
                }
                JLOG("ended");
            });
        });
        return goer();
    }
    

    That's exactly in it there is a race condition. Where? What needs to be changed to fix this fatal flaw?

    Answer
    Goer Journey::start0(Handler handler)
     {
    +    Goer gr = goer();
         schedule0([handler, this] {
             guardedCoro0()->start([handler] {
                 JLOG("started");
    @@ -121,7 +122,7 @@
                 JLOG("ended");
             });
         });
    -    return goer();
    +    return gr;
     }
    





    Task 2


    The same condition. There is a code:

    void Journey::onExit0()
    {
        if (deferHandler == nullptr)
        {
            delete this;
        }
        else
        {
            deferHandler();
            deferHandler = nullptr;
        }
        t_journey = nullptr;
    }
    

    Where is the error code and how to fix it?

    Answer
         {
    @@ -153,8 +154,8 @@
    -        deferHandler();
    -        deferHandler = nullptr;
    +        Handler handler = std::move(deferHandler);
    +        handler();
         }
    




    In addition to the defer handler replacement problem, double destruction of this handler is also possible.


    Bonus: Garbage Collector (GC)





    Yes, yes, we’ll kill the simplest GC on our coroutines. Let's start right away with an example:

    struct A   { ~A() { TLOG("~A"); } };
    struct B:A { ~B() { TLOG("~B"); } };
    struct C   { ~C() { TLOG("~C"); } };
    ThreadPool tp(1, "tp");
    go([] {
        A* a = gcnew();
        C* c = gcnew();
    }, tp);
    

    Console output:
    tp#1: ~C
    tp#1: ~B
    tp#1: ~A
    

    Pay attention to non-virtual destructors and the correct destruction of objects! Although some argue that inheritance always requires the use of virtual destructors.

    All magic, as usual, is hidden inside:

    template
    T* gcnew(V&&... v) {
        return gc().add(new T(std::forward(v)...));
    }
    GC& gc() { return journey().gc; }
    struct GC {
        ~GC()
        {
            // удаляем в обратном порядке
            for (auto& deleter: boost::adaptors::reverse(deleters))
                deleter();
        }
        template T* add(T* t)
        {
            // добавляем удалятор типа T
            deleters.emplace_back([t] { delete t; });
            return t;
        }
    private:
        std::vector deleters;
    };
    

    The copy GCis stored inside the traveler Journey, which is destroyed at the end of our coroutine. At the same time, a restriction is imposed: such objects should not be shared, they can only be used inside the coroutine.

    conclusions


    So, we examined several extremely useful primitives for building fairly complex applications:
    1. Non-blocking primitives waiting for completion of actions / results.
    2. Thread pools and schedulers.
    3. Non-blocking synchronization.
    4. Teleportation, that is, switching between different schedulers.
    5. Portals - a powerful and flexible abstraction of the execution of actions in a given environment: a specific thread, a group of threads, sequentially in a group of threads, etc.

    The proposed approaches can significantly simplify the program code without paying with performance. Pending primitives do not block threads, which positively affects the loading of processors by tasks. And the use of non-blocking mutexes in general takes synchronization to a new, hitherto unprecedented level.

    Portals allow you to completely ignore what requirements are presented to the caller. This is true when working in heterogeneous conditions: database, network, disk, UI, shared data, heavy computing operations. That is, when solving all those problems in which data are processed and transmitted between various manufacturers and consumers.

    This is actually just an introduction to asynchrony on coroutines. The flesh is yet to come! I hope this article brings you programmatic pleasure.



    Code
    github.com/gridem/Synca
    bitbucket.org/gridem/synca

    Presentation C ++ Party, Yandex
    tech.yandex.ru/events/cpp-party/march-msk/talks/1761

    Presentation C ++ User Group
    youtu.be/uUQX5QS1CCg
    habrahabr. ru / post / 212793

    Literature
    [1] Asynchrony: back to the future habrahabr.ru/post/201826
    [2] Akka-scheduler interface doc.akka.io/docs/akka/2.1.4/scala/scheduler.html

    Also popular now: