"Modern" dining philosophers in C ++ through actors and CSP

    Some time ago, a link to the article "Modern dining philosophers" spread over resources like Reddit and HackerNews. The article is interesting, it shows several solutions to this well-known problem, implemented in modern C ++ using the task-based approach. If someone has not read this article, then it makes sense to spend time and read it.


    However, I cannot say that the solutions presented in the article seemed simple and clear to me. This is probably due to the use of Tasks. Too many of them are created and dispatched by various dispatchers / serializers. So it is not always clear where, when and what tasks are performed.


    At the same time, the task-based approach is not the only one possible to solve such problems. Why not see how the task of the "dining philosophers" is solved through the models of actors and CSP?


    Therefore, I tried to look at and implemented several solutions to this problem both with the use of Actors and with the use of CSP. The code for these solutions can be found in the BitBucket repository . And under the cut explanations and explanations, so who cares, you are welcome under the cat.


    A few common words


    I didn’t have the goal of exactly repeating the solutions shown in the very article "Modern dining philosophers" , especially since I fundamentally dislike one important thing: in fact, the "philosopher" does not do anything in those decisions. He only says “I want to eat,” and then either someone magically provides forks to him, or he says “now it won't work out”.


    It is clear why the author resorted to this behavior: it allows you to use the same implementation of the "philosopher" in conjunction with different implementations of the "protocols". However, it seems to me personally that it is more interesting when exactly the “philosopher” tries to take one fork first, then the other. And when the "philosopher" is forced to handle unsuccessful attempts to seize the plug.


    It is such an implementation of the task of the "dining philosophers" that I tried to do. In this case, some solutions used the same approaches as in the above-mentioned article (for example, implemented by the ForkLevelPhilosopherProtocol and WaiterFair protocols).


    I built my decisions on the basis of SObjectizer , which would hardly surprise those who read my articles before. If someone has not yet heard about SObjectizer, then in a nutshell: this is one of the few living and developing OpenSource "action frameworks" for C ++ (among others, CAF and QP / C ++ can also be mentioned ). I hope that the examples given with my comments will be clear enough even for those unfamiliar with SObjectizer. If not, I will be happy to answer questions in the comments.


    Actor based solutions


    The discussion of implemented solutions will begin with those made on the basis of the Actors. First, consider the implementation of the Edsger Dijkstra solution, then move on to several other solutions and see how the behavior of each of the solutions differs.


    Dijkstra's decision


    Edsger Dijkstra, not only did he formulate the task of “dining filophos” (formulated using forks and spaghetti voiced by Tony Hoar), he also suggested a very simple and beautiful solution. Namely: philosophers should seize the forks only in the order of increasing numbers of the forks, and if the philosopher managed to take the first fork, then he would not let go of it until he received the second fork.


    For example, if a philosopher needs to use plugs with numbers 5 and 6, the philosopher must first take plug number 5. Only after that he can take plug number 6. Thus, if forks with smaller numbers lie to the left of philosophers, then the philosopher must first take the left fork and only then can it take the right fork.


    The last philosopher on the list, who has to deal with forks for numbers (N-1) and 0, does the opposite: he first takes the right fork with the number 0, and then the left fork with the number (N-1).


    To implement this approach, two types of actors are required: one for forks, the second for philosophers. If the philosopher wants to eat, he sends a message to the corresponding actor-fork to seize the fork, and the actor-fork responds with a response message.


    The implementation code for this approach can be seen here .


    Messages


    Before talking about the actors, you need to look at the messages that the actors will exchange:


    structtake_t
    {const so_5::mbox_t m_who;
       std::size_t m_philosopher_index;
    };
    structtaken_t :public so_5::signal_t {};
    structput_t :public so_5::signal_t {};

    When a philosopher actor wants to take a fork, he sends a message to the fork take_tactor, and the fork actor answers with a message taken_t. When the philosopher actor finishes eating and wants to put the forks back on the table, he sends messages to the forks actors put_t.


    In the message take_tfield m_whoindicates the mailbox (aka mbox) of the actor-philosopher. A reply message must be sent to this mailbox taken_t. The second field from take_tthis example is not used, we will need it when we get to the implementations of waiter_with_queue and waiter_with_timestamps.


    Actor Fork


    Now we can look at what the actor-plug is. Here is his code:


    classfork_tfinal :public so_5::agent_t
    {
    public :
       fork_t( context_t ctx ) : so_5::agent_t{ std::move(ctx) } {}
       voidso_define_agent() override
       {
          // Начальным должно быть состояние 'free'.this >>= st_free;
          // В состоянии 'free' обрабатывается только одно сообщение.
          st_free
             .event( [this]( mhood_t<take_t> cmd ) {
                   this >>= st_taken;
                   so_5::send< taken_t >( cmd->m_who );
                } );
          // В состоянии 'taken' обрабатываются два сообщения.
          st_taken
             .event( [this]( mhood_t<take_t> cmd ) {
                   // Философу придется подождать в очереди.
                   m_queue.push( cmd->m_who );
                } )
             .event( [this]( mhood_t<put_t> ) {
                   if( m_queue.empty() )
                      // Вилка сейчас никому не нужна.this >>= st_free;
                   else
                   {
                      // Вилка должна достаться первому из очереди.constauto who = m_queue.front();
                      m_queue.pop();
                      so_5::send< taken_t >( who );
                   }
                } );
       }
    private :
       // Определение состояний для актора.conststate_t st_free{ this, "free" };
       conststate_t st_taken{ this, "taken" };
       // Очередь ждущих философов.std::queue< so_5::mbox_t > m_queue;
    };

    Each actor in a SObjectizer must be derived from the base class agent_t. What we see here for the type fork_t.


    The class fork_toverrides the method so_define_agent(). This is a special method, it is automatically called by SObjectizer when registering a new agent. In the method so_define_agent(), the agent is “configured” to work in SObjectizer: the starting state changes, the necessary messages are subscribed to.


    Each actor in a SObjectizer is a state machine with states (even if the actor uses only one default state). Here the actor fork_thas two states: free and taken . When the actor is free , the plug can be "captured" by the philosopher. And after the capture of the "fork" the actor fork_tshould go into the state taken . Within the class of fork_tstates are represented by instances st_freeand st_takena special type state_t.


    States allow you to process incoming messages in different ways. For example, in the free state, the agent responds only to take_tand the reaction is very simple: the state of the actor changes and the response is sent taken_t:


    st_free
       .event( [this]( mhood_t<take_t> cmd ) {
             this >>= st_taken;
             so_5::send< taken_t >( cmd->m_who );
          } );

    While all other messages, including those put_tin the free state, are simply ignored.


    In the state taken, the actor handles two messages, and it even take_tprocesses the message differently:


    st_taken
       .event( [this]( mhood_t<take_t> cmd ) {
             m_queue.push( cmd->m_who );
          } )
       .event( [this]( mhood_t<put_t> ) {
             if( m_queue.empty() )
                this >>= st_free;
             else
             {
                constauto who = m_queue.front();
                m_queue.pop();
                so_5::send< taken_t >( who );
             }
          } );

    Here the handler is most interesting for put_t: if the queue of waiting philosophers is empty, then we can return to free , but if not empty, then the first of them needs to be sent taken_t.


    Actor-philosopher


    The code of the actor-philosopher is much more extensive, so I will not give it here in full. We will discuss only the most significant fragments.


    The actor-philosopher has several more states:


    state_t st_thinking{ this, "thinking.normal" };
    state_t st_wait_left{ this, "wait_left" };
    state_t st_wait_right{ this, "wait_right" };
    state_t st_eating{ this, "eating" };
    state_t st_done{ this, "done" };

    The actor starts his work in the state of thinking , then switches to wait_left , then to wait_right , then to eating . From eating the actor can return to thinking or can go to done if the philosopher ate everything he should have.


    The state diagram for the actor-philosopher can be represented as follows:


    image


    The logic of the behavior of an actor is described in the implementation of its method so_define_agent():


    voidso_define_agent() override
    {
       // В состоянии thinking реагируем только на сигнал stop_thinking.
       st_thinking
          .event( [=]( mhood_t<stop_thinking_t> ) {
                // Пытаемся взять левую вилку.this >>= st_wait_left;
                so_5::send< take_t >( m_left_fork, so_direct_mbox(), m_index );
             } );
       // Когда ждем левую вилку реагируем только на taken.
       st_wait_left
          .event( [=]( mhood_t<taken_t> ) {
                // У нас есть левая вилка. Пробуем взять правую.this >>= st_wait_right;
                so_5::send< take_t >( m_right_fork, so_direct_mbox(), m_index );
             } );
       // Пока ждем правую вилку, реагируем только на taken.
       st_wait_right
          .event( [=]( mhood_t<taken_t> ) {
                // У нас обе вилки, можно поесть.this >>= st_eating;
             } );
       // Пока едим реагируем только на stop_eating.
       st_eating
          // 'stop_eating' должен быть отослан как только входим в 'eating'.
          .on_enter( [=] {
                so_5::send_delayed< stop_eating_t >( *this, eat_pause() );
             } )
          .event( [=]( mhood_t<stop_eating_t> ) {
             // Обе вилки нужно вернуть на стол.
             so_5::send< put_t >( m_right_fork );
             so_5::send< put_t >( m_left_fork );
             // На шаг ближе к финалу.
             ++m_meals_eaten;
             if( m_meals_count == m_meals_eaten )
                this >>= st_done; // Съели все, что могли, пора завершаться.else
                think();
          } );
       st_done
          .on_enter( [=] {
             // Сообщаем о том, что мы закончили.completion_watcher_t::done( so_environment(), m_index );
          } );
    }

    Perhaps the only point on which it should stop especially is the approach to imitating the processes of "thinking" and "food." In the actor's code, there is no this_thread::sleep_foror some other way of blocking the current working thread. Instead, deferred messages are used. For example, when the actor enters the eating state, it sends a pending message to itself stop_eating_t. This message is given to the SObjectizer timer, and the timer delivers the message to the actor when the time comes.


    The use of deferred messages allows you to run all the actors in the context of a single working thread. Roughly speaking, one thread reads from some message queue and yanks the handler of the next message from the corresponding receiving actor. More on working contexts for actors will be discussed below.


    results


    The results of this implementation may look like this (small fragment):


        Socrates: tttttttttttLRRRRRRRRRRRRRREEEEEEEttttttttLRRRRRRRRRRRRRREEEEEEEEEEEEE
           Plato: ttttttttttEEEEEEEEEEEEEEEEttttttttttRRRRRREEEEEEEEEEEEEEttttttttttLLL
       Aristotle: ttttEEEEEtttttttttttLLLLLLRRRREEEEEEEEEEEEttttttttttttLLEEEEEEEEEEEEE
       Descartes: tttttLLLLRRRRRRRREEEEEEEEEEEEEtttLLLLLLLLLRRRRREEEEEEttttttttttLLLLLL
         Spinoza: ttttEEEEEEEEEEEEEttttttttttLLLRRRREEEEEEEEEEEEEttttttttttRRRREEEEEEtt
            Kant: ttttttttttLLLLLLLRREEEEEEEEEEEEEEEttttttttttLLLEEEEEEEEEEEEEEtttttttt
    Schopenhauer: ttttttEEEEEEEEEEEEEttttttLLLLLLLLLEEEEEEEEEttttttttLLLLLLLLLLRRRRRRRR
       Nietzsche: tttttttttLLLLLLLLLLEEEEEEEEEEEEEttttttttLLLEEEEEEEEEttttttttRRRRRRRRE
    Wittgenstein: ttttEEEEEEEEEEtttttLLLLLLLLLLLLLEEEEEEEEEttttttttttttRRRREEEEEEEEEEEt
       Heidegger: tttttttttttLLLEEEEEEEEEEEEEEtttttttLLLLLLREEEEEEEEEEEEEEEtttLLLLLLLLR
          Sartre: tttEEEEEEEEEttttLLLLLLLLLLLLRRRRREEEEEEEEEtttttttLLLLLLLLRRRRRRRRRRRR

    Read this as follows:


    • t denotes that the philosopher "reflects";
    • Lindicates that the philosopher is waiting for the capture of the left fork (is in the wait_left state );
    • Rindicates that the philosopher is waiting for the capture of the right fork (is in the wait_right state );
    • E means that the philosopher is "eating."

    We can see that Socrates can take the plug on the left only after Sartre gives it away. Then Socrates will wait for Plato to release the right fork. Only then can Socrates eat.


    A simple decision without an arbitrator (waiter)


    If we analyze the result of the work of Dijkstra’s solution, we will see that philosophers spend a lot of time waiting for the forks to be captured. What is not good, because This time can also be spent on meditation. Not for nothing is the opinion that if you think on an empty stomach, you can get much more interesting and unexpected results;)


    Let's look at the simplest solution, in which the philosopher returns the first fork seized if he cannot capture the second one (in the Modern Dining philosophers article mentioned above, this solution is implemented by ForkLevelPhilosopherProtocol).


    The source code of this implementation can be seen here , and the code of the corresponding actor-philosopher is here .


    Messages


    This solution uses almost the same set of messages:


    structtake_t
    {const so_5::mbox_t m_who;
       std::size_t m_philosopher_index;
    };
    structbusy_t :public so_5::signal_t {};
    structtaken_t :public so_5::signal_t {};
    structput_t :public so_5::signal_t {};

    The only difference is the presence of a signal busy_t. This signal actor-plug sends in response to the actor-philosopher if the plug is already captured by another philosopher.


    Actor Fork


    Actor-fork in this solution is even easier than in the decision Dijkstra:


    classfork_tfinal :public so_5::agent_t
    {
    public :
       fork_t( context_t ctx ) : so_5::agent_t( ctx )
       {
          this >>= st_free;
          st_free.event( [this]( mhood_t<take_t> cmd )
                {
                   this >>= st_taken;
                   so_5::send< taken_t >( cmd->m_who );
                } );
          st_taken.event( []( mhood_t<take_t> cmd )
                {
                   so_5::send< busy_t >( cmd->m_who );
                } )
             .just_switch_to< put_t >( st_free );
       }
    private :
       conststate_t st_free{ this };
       conststate_t st_taken{ this };
    };

    We don’t even have to keep a waiting line for waiting philosophers here.


    Actor-philosopher


    The actor-philosopher in this implementation is similar to that of Dijkstra’s decision, but here the actor-philosopher has to process more busy_t, so the state diagram looks like this:


    image


    Similarly, the entire logic of the actor-philosopher is defined in so_define_agent():


    voidso_define_agent() override
    {
       st_thinking
          .event< stop_thinking_t >( [=] {
             this >>= st_wait_left;
             so_5::send< take_t >( m_left_fork, so_direct_mbox(), m_index );
          } );
       st_wait_left
          .event< taken_t >( [=] {
             this >>= st_wait_right;
             so_5::send< take_t >( m_right_fork, so_direct_mbox(), m_index );
          } )
          .event< busy_t >( [=] {
             think( st_hungry_thinking );
          } );
       st_wait_right
          .event< taken_t >( [=] {
             this >>= st_eating;
          } )
          .event< busy_t >( [=] {
             so_5::send< put_t >( m_left_fork );
             think( st_hungry_thinking );
          } );
       st_eating
          .on_enter( [=] {
                so_5::send_delayed< stop_eating_t >( *this, eat_pause() );
             } )
          .event< stop_eating_t >( [=] {
             so_5::send< put_t >( m_right_fork );
             so_5::send< put_t >( m_left_fork );
             ++m_meals_eaten;
             if( m_meals_count == m_meals_eaten )
                this >>= st_done;
             else
                think( st_normal_thinking );
          } );
       st_done
          .on_enter( [=] {
             completion_watcher_t::done( so_environment(), m_index );
          } );
    }

    In general, this is practically the same code as in Dijkstra’s solution, except that a couple of handlers were added for busy_t.


    results


    The results of the work look different:


        Socrates: tttttttttL..R.....EEEEEEEEEEEEttttttttttR...L.L...EEEEEEEttEEEEEE
           Plato: ttttEEEEEEEEEEEttttttL.....L..EEEEEEEEEEEEEEEttttttttttL....L....
       Aristotle: ttttttttttttL..L.R..EEEEEEtttttttttttL..L....L....R.....EEEEEEEEE
       Descartes: ttttttttttEEEEEEEEttttttttttttEEEEEEEEttttEEEEEEEEEEEttttttL..L..
         Spinoza: ttttttttttL.....L...EEEEEEtttttttttL.L......L....L..L...R...R...E
            Kant: tttttttEEEEEEEttttttttL.L.....EEEEEEEEttttttttR...R..R..EEEEEtttt
    Schopenhauer: tttR..R..L.....EEEEEEEttttttR.....L...EEEEEEEEEEEEEEEEttttttttttt
       Nietzsche: tttEEEEEEEEEEtttttttttEEEEEEEEEEEEEEEttttL....L...L..L....EEEEEEE
    Wittgenstein: tttttL.L..L.....R.R.....L.....L....L...EEEEEEEEEEEEEEEtttttttttL.
       Heidegger: ttttR..R......EEEEEEEEEEEEEttttttttttR..L...L...L..L...EEEEtttttt
          Sartre: tttEEEEEEEtttttttL..L...L....R.EEEEEEEtttttEEEEtttttttR.....R..R.

    Here we see a new symbol, which means that the actor-philosopher is in "hungry thoughts".


    Even on this short fragment, one can see that there are long moments of time during which the philosopher cannot eat. This is because this solution is protected from the problem of deadlocks, but does not have protection against fasting.


    Solution with a waiter and a queue


    The simplest solution shown above without an arbitrator does not protect against fasting. The above-mentioned article "Modern dining philosophers" contains a solution to the problem of fasting in the form of the WaiterFair protocol. The bottom line is that an arbitrator (waiter) appears to whom philosophers turn to when they want to eat. And the waiter has a queue of applications from philosophers. And the philosopher gets the forks only if both of the forks are now free, and there is still no one in the queue of the neighbors of the philosopher who turned to the waiter.


    Let's look at how the same solution might look like on actors.


    The source code for this implementation can be found here .


    Trick


    The easiest thing would be to introduce a new set of messages through which philosophers could communicate with the waiter. But I wanted to keep not only the existing set of messages (eg take_t, taken_t, busy_t, put_t). I also wanted the same actor-philosopher to be used as in the previous decision. Therefore, I needed to solve a clever puzzle: how to make the actor-philosopher communicate with the only actor-waiter, but at the same time thought that he interacts directly with the actors-forks (which are no longer in fact).


    This problem was solved with the help of a simple trick: the actor-waiter creates a set of mboxes, links to which are given to actors-philosophers as references to the mboxes of the forks. At the same time, the actor-waiter subscribes to messages from all these mboxes (which is easily implemented in SObjectizer, since SObjectizer is not only / much of the Actor model, but also Pub / Sub is supported out of the box) .


    In code, it looks like this:


    classwaiter_tfinal :public so_5::agent_t
    {
    public :
       waiter_t( context_t ctx, std::size_t forks_count )
          :  so_5::agent_t{ std::move(ctx) }
          ,  m_fork_states( forks_count, fork_state_t::free )
       {
          // Нужны mbox-ы для каждой "вилки"
          m_fork_mboxes.reserve( forks_count );
          for( std::size_t i{}; i != forks_count; ++i )
             m_fork_mboxes.push_back( so_environment().create_mbox() );
       }
       ...
       voidso_define_agent() override
       {
          // Требуется создать подписки для каждой "вилки".for( std::size_t i{}; i != m_fork_mboxes.size(); ++i )
          {
             // Нам нужно знать индекс вилки. Поэтому используются лямбды.// Лямбда захватывает индекс и затем отдает этот индекс в// актуальный обработчик входящего сообщения.
             so_subscribe( fork_mbox( i ) )
                .event( [i, this]( mhood_t<take_t> cmd ) {
                      on_take_fork( std::move(cmd), i );
                   } )
                .event( [i, this]( mhood_t<put_t> cmd ) {
                      on_put_fork( std::move(cmd), i );
                   } );
          }
       }
    private :
       ...
       // Почтовые ящики для несуществующих "вилок".std::vector< so_5::mbox_t > m_fork_mboxes;

    Those. First, create a vector mboxes for non-existent "forks", then subscribe to each of them. Yes, so we subscribe to know which plug the request belongs to.


    The actual inbound request handler take_tis the method on_take_fork():


    voidon_take_fork( mhood_t<take_t> cmd, std::size_t fork_index ){
       // Используем тот факт, что у левой вилки индекс совпадает// с индексом самого философа.if( fork_index == cmd->m_philosopher_index )
          handle_take_left_fork( std::move(cmd), fork_index );
       else
          handle_take_right_fork( std::move(cmd), fork_index );
    }

    By the way, it was here that we needed the second field from the message take_t.


    So, on_take_fork()we have the original query and the index of the fork to which the query belongs. Therefore, we can determine whether the philosopher is asking for the left or right fork. And, accordingly, we can process them in different ways (and we have to process them in different ways).


    Since the philosopher always asks for the left plug first, we need to do all the necessary checks at this very moment. And we may end up in one of the following situations:


    1. Both forks are free and can be given to the philosopher who sent the request. In this case, we send the taken_tphilosopher, and mark the right fork as reserved so that no one else can take it.
    2. Plugs cannot be given to the philosopher. No matter why. Maybe some of them are busy right now. Or in the queue there is someone from the neighbors of the philosopher. Anyway, we put the philosopher who sent the request into a queue, after which we send him busy_t.

    Thanks to this logic, the philosopher who has received taken_tfor the left fork can safely send a request take_tfor the right fork. This request will be immediately satisfied, because the plug is already reserved for this philosopher.


    results


    If you run the resulting solution, you can see something like:


        Socrates: tttttttttttL....EEEEEEEEEEEEEEttttttttttL...L...EEEEEEEEEEEEEtttttL.
           Plato: tttttttttttL....L..L..L...L...EEEEEEEEEEEEEtttttL.....L....L.....EEE
       Aristotle: tttttttttL.....EEEEEEEEEttttttttttL.....L.....EEEEEEEEEEEtttL....L.L
       Descartes: ttEEEEEEEEEEtttttttL.L..EEEEEEEEEEEEtttL..L....L....L.....EEEEEEEEEE
         Spinoza: tttttttttL.....EEEEEEEEEttttttttttL.....L.....EEEEEEEEEEEtttL....L.L
            Kant: ttEEEEEEEEEEEEEtttttttL...L.....L.....EEEEEttttL....L...L..L...EEEEE
    Schopenhauer: ttttL...L.....L.EEEEEEEEEEEEEEEEEtttttttttttL..L...L..EEEEEEEttttttt
       Nietzsche: tttttttttttL....L..L..L...L...L.....L....EEEEEEEEEEEEttL.....L...L..
    Wittgenstein: tttttttttL....L...L....L....L...EEEEEEEttttL......L.....L.....EEEEEE
       Heidegger: ttttttL..L...L.....EEEEEEEEEEEEtttttL...L..L.....EEEEEEEEEEEttttttL.
          Sartre: ttEEEEEEEEEEEEEttttttttL.....L...EEEEEEEEEEEEttttttttttttL.....EEEEE

    You can pay attention to the lack of characters R. This is because there can be no failure or waiting on the request of the right fork.


    Another decision using the arbitrator (waiter)


    In some cases, the previous solution waiter_with_queue may show results similar to this one:


        Socrates: tttttEEEEEEEEEEEEEEtttL.....L.L....L....EEEEEEEEEttttttttttL....L.....EE
           Plato: tttttL..L..L....L.L....EEEEEEEEEEEEEEEttttttttttttL.....EEEEEEEEEttttttt
       Aristotle: tttttttttttL..L...L.....L.....L....L.....EEEEEEEEEEEEtttttttttttL....L..
       Descartes: ttttttttttEEEEEEEEEEttttttL.....L....L..L.....L.....L..L...L..EEEEEEEEtt
         Spinoza: tttttttttttL..L...L.....L.....L....L.....L..L..L....EEEEEEEEEEtttttttttt
            Kant: tttttttttL....L....L...L...L....L..L...EEEEEEEEEEEttttttttttL...L......E
    Schopenhauer: ttttttL....L..L...L...L.L....L...EEEEEtttttL....L...L.....EEEEEEEEEttttt
       Nietzsche: tttttL..L..L....EEEEEEEEEEEEEttttttttttttEEEEEEEEEEEEEEEttttttttttttL...
    Wittgenstein: tttEEEEEEEEEEEEtttL....L....L..EEEEEEEEEtttttL..L..L....EEEEEEEEEEEEEEEE
       Heidegger: tttttttttL...L..EEEEEEEEttttL..L.....L...EEEEEEEEEtttL.L..L...L....L...L
          Sartre: ttttttttttL..L....L...L.EEEEEEEEEEEtttttL...L..L....EEEEEEEEEEtttttttttt

    One can see the presence of sufficiently long periods of time when philosophers cannot eat even in spite of the presence of free forks. For example, the left and right forks for Kant are free for a long time, but Kant cannot take them, since his neighbors are already waiting in line. Who are waiting for their neighbors. Who are waiting for their neighbors, etc.


    Therefore, the above implementation of waiter_with_queue protects against starvation in the sense that sooner or later the philosopher will eat. This is guaranteed to him. But periods of fasting can be quite long. And resource utilization may not be optimal at times.


    In order to solve this problem, I implemented another solution, waiter_with_timestamp (its code can be found here ). Instead of a queue, they use prioritization of requests from philosophers, taking into account the time of their fasting. The longer the philosopher goes hungry, the more priority is his request.


    We will not consider the code of this decision, since By and large, the main thing in it is the same trick with a set of mboxes for non-existent "forks", which we have already discussed in a conversation about the implementation of waiter_with_queue.


    Several implementation details that I would like to draw attention to.


    There are several details in implementations based on Actors that I would like to draw attention to, since These details demonstrate the interesting features of SObjectizer.


    Working context for actors


    In the above embodiments, all the main actors ( fork_t, philosopher_t, waiter_t) worked in the context of a common working thread. That does not mean at all that in SObjectizer all actors work only on one single thread. In SObjectizer, actors can be tied to different contexts, as can be seen, for example, in the function code run_simulation()in the no_waiter_simple solution.


    Run_simulation code from no_waiter_simple
    voidrun_simulation( so_5::environment_t & env, constnames_holder_t & names ){
       env.introduce_coop( [&]( so_5::coop_t & coop ) {
          coop.make_agent_with_binder< trace_maker_t >(
                so_5::disp::one_thread::create_private_disp( env )->binder(),
                names,
                random_pause_generator_t::trace_step() );
          coop.make_agent_with_binder< completion_watcher_t >(
                so_5::disp::one_thread::create_private_disp( env )->binder(),
                names );
          constauto count = names.size();
          std::vector< so_5::agent_t * > forks( count, nullptr );
          for( std::size_t i{}; i != count; ++i )
             forks[ i ] = coop.make_agent< fork_t >();
          for( std::size_t i{}; i != count; ++i )
             coop.make_agent< philosopher_t >(
                   i,
                   forks[ i ]->so_direct_mbox(),
                   forks[ (i + 1) % count ]->so_direct_mbox(),
                   default_meals_count );
       });
    }

    This function creates additional type actors trace_maker_tand completion_watcher_t. They will work in separate work contexts. For this, two instances of the type dispatcher are created one_threadand the actors are bound to these dispatcher instances. Which means that these actors will work as active objects : everyone will have their own working thread.


    SObjectizer provides a set of several different dispatchers that can be used directly out of the box. In this case, the developer can create as many dispatcher instances in his application as the developer needs.


    But the most important thing is that in the actor it is not necessary to change anything in order to make it work on another dispatcher. Let's say we can easily start actors fork_ton one pool of worker threads, and actors philosopher_ton another pool.


    Run_simulation code from no_waiter_simple_tp
    voidrun_simulation( so_5::environment_t & env, constnames_holder_t & names ){
       env.introduce_coop( [&]( so_5::coop_t & coop ) {
          coop.make_agent_with_binder< trace_maker_t >(
                so_5::disp::one_thread::create_private_disp( env )->binder(),
                names,
                random_pause_generator_t::trace_step() );
          coop.make_agent_with_binder< completion_watcher_t >(
                so_5::disp::one_thread::create_private_disp( env )->binder(),
                names );
          constauto count = names.size();
          // Параметры для настройки поведения thread_pool-диспетчера.
          so_5::disp::thread_pool::bind_params_t bind_params;
          bind_params.fifo( so_5::disp::thread_pool::fifo_t::individual );
          std::vector< so_5::agent_t * > forks( count, nullptr );
          // Создаем пул нитей для акторов-вилок.auto fork_disp = so_5::disp::thread_pool::create_private_disp(
                   env,
                   3u// Размер пула.
                );
          for( std::size_t i{}; i != count; ++i )
             // Каждая вилка будет привязана к пулу.
             forks[ i ] = coop.make_agent_with_binder< fork_t >(
                   fork_disp->binder( bind_params ) );
          // Создаем пул нитей для акторов-философов.auto philosopher_disp = so_5::disp::thread_pool::create_private_disp(
                   env,
                   6u// Размер пула.
                );
          for( std::size_t i{}; i != count; ++i )
             coop.make_agent_with_binder< philosopher_t >(
                   philosopher_disp->binder( bind_params ),
                   i,
                   forks[ i ]->so_direct_mbox(),
                   forks[ (i + 1) % count ]->so_direct_mbox(),
                   default_meals_count );
       });
    }

    And while we did not need to change a single line in the classes fork_tand philosopher_t.


    Actor state change tracing


    If you look at the implementation of philosophers in the above-mentioned article Modern Dining philosophers, you can easily see the code related to the trace of the philosopher’s actions, for example:


    voiddoEat(){
        eventLog_.startActivity(ActivityType::eat);
        wait(randBetween(10, 50));
        eventLog_.endActivity(ActivityType::eat);

    At the same time, in the SObjectizer-based implementations shown above, there is no such code. But tracing is nevertheless performed. Due to what?


    The fact is that in SObjectizer there is a special thing: an agent state listener. Such a listener is implemented as a class heir agent_state_listener_t. When a listener contacts an agent, the SObjectizer automatically notifies the listener about every change in agent status.


    The installation of the listener can be seen in the Agent Designer greedy_philosopher_tand philosopher_t:


    philosopher_t(...)
       ...
       {
          so_add_destroyable_listener(
                state_watcher_t::make( so_environment(), index ) );
       }

    Here state_watcher_tis the implementation of the listener I need.


    State_watcher_t definition
    classstate_watcher_tfinal :public so_5::agent_state_listener_t
    {
       const so_5::mbox_t m_mbox;
       conststd::size_t m_index;
       state_watcher_t( so_5::mbox_t mbox, std::size_t index );
    public :
       staticautomake( so_5::environment_t & env, std::size_t index ){
          return so_5::agent_state_listener_unique_ptr_t{
                newstate_watcher_t{ trace_maker_t::make_mbox(env), index }
          };
       }
       voidchanged( so_5::agent_t &, const so_5::state_t & state ) override;
    };

    When an instance state_watcher_tis associated with an agent, the SObjectizer method calls changed()when the agent status changes. And state_watcher_t::changedactions are initiated inside to trace the actions of the actor-philosopher.


    Fragment of the state_watcher_t :: changed implementation
    voidstate_watcher_t::changed( so_5::agent_t &, const so_5::state_t & state ) 
    {
       constauto detect_label = []( conststd::string & name ) {...};
       constchar state_label = detect_label( state.query_name() ); 
       if( '?' == state_label )
          return;
       so_5::send< trace::state_changed_t >( m_mbox, m_index, state_label );
    }

    CSP based solutions


    Now we will talk about implementations that do not use actors at all. Let's look at the same solutions (no_waiter_dijkstra, no_waiter_simple, waiter_with_timestamps) for the implementation of which std::threadSObjectizer-based mchains (which, in fact, are CSP-shny channels) are also used. Moreover, stress especially in the CSP-shnyh solutions use the same set of messages (all the same take_t, taken_t, busy_t, put_t).


    In the CSP approach, OS threads are used instead of "processes". Therefore, each fork, each philosopher and each waiter is implemented by a separate object std::thread.


    Dijkstra's decision


    The source code of this solution can be seen here .


    Thread for the plug


    The thread for the plug in the Dijkstra solution works very simply: the cycle of reading messages from the input channel + processing messages like take_tand put_t. What is implemented by the function of the fork_processfollowing form:


    voidfork_process(
       so_5::mchain_t fork_ch ){
       // Состояние вилки: занята или нет.bool taken = false;
       // Очередь ждущих философов.std::queue< so_5::mbox_t > wait_queue;
       // Читаем и обрабатываем сообщения пока канал не закроют.
       so_5::receive( so_5::from( fork_ch ),
             [&]( so_5::mhood_t<take_t> cmd ) {
                if( taken )
                   // Вилка занята, философ должен ждать в очереди.
                   wait_queue.push( cmd->m_who );
                else
                {
                   // Философ может взять вилку.
                   taken = true;
                   so_5::send< taken_t >( cmd->m_who );
                }
             },
             [&]( so_5::mhood_t<put_t> ) {
                if( wait_queue.empty() )
                   taken = false; // Вилка больше никому не нужна.else
                {
                   // Вилку нужно отдать первому философу из очереди.constauto who = wait_queue.front();
                   wait_queue.pop();
                   so_5::send< taken_t >( who );
                }
             } );
    }

    The function has fork_processonly one argument: an input channel that is created somewhere else.


    The most interesting thing fork_processis the "cycle" of retrieving messages from the channel until the channel is closed. This loop is implemented with just one function call receive():


    so_5::receive( so_5::from( fork_ch ),
          [&]( so_5::mhood_t<take_t> cmd ) {...},
          [&]( so_5::mhood_t<put_t> ) {...} );

    In SObjectizer, there are several versions of the function, receive()and here we see one of them. This version reads messages from the channel until the channel is closed. For each message read, a handler is searched. If the handler is found, it is called and the message is processed. If not found, the message is simply discarded.


    Message handlers are specified as lambda functions. These lambdas look like twins of the respective handlers in the actor fork_tfrom the Actor-based solution. That, in principle, is not surprising.


    Thread for the philosopher


    The logic of the philosopher is implemented in the function philosopher_process. This function has a fairly large code, so we will deal with it in parts.


    Full code philosopher_process
    oid philosopher_process(
       trace_maker_t & tracer,
       so_5::mchain_t control_ch,
       std::size_t philosopher_index,
       so_5::mbox_t left_fork,
       so_5::mbox_t right_fork,
       int meals_count ){
       int meals_eaten{ 0 };
       random_pause_generator_t pause_generator;
       // Этот канал будет использован для получения ответов от вилок.auto self_ch = so_5::create_mchain( control_ch->environment() );
       while( meals_eaten < meals_count )
       {
          tracer.thinking_started( philosopher_index, thinking_type_t::normal );
          // Имитируем размышления приостанавливая нить.std::this_thread::sleep_for(
                pause_generator.think_pause( thinking_type_t::normal ) );
          // Пытаемся взять левую вилку.
          tracer.take_left_attempt( philosopher_index );
          so_5::send< take_t >( left_fork, self_ch->as_mbox(), philosopher_index );
          // Запрос отправлен, ждем ответа.
          so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
             [&]( so_5::mhood_t<taken_t> ) {
                // Взяли левую вилку, пытаемся взять правую.
                tracer.take_right_attempt( philosopher_index );
                so_5::send< take_t >(
                      right_fork, self_ch->as_mbox(), philosopher_index );
                // Запрос отправлен, ждем ответа.
                so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
                   [&]( so_5::mhood_t<taken_t> ) {
                      // У нас обе вилки. Можно поесть.
                      tracer.eating_started( philosopher_index );
                      // Имитируем поглощение пищи приостанавливая нить.std::this_thread::sleep_for( pause_generator.eat_pause() );
                      // На шаг ближе к финалу.
                      ++meals_eaten;
                      // После еды возвращаем правую вилку.
                      so_5::send< put_t >( right_fork );
                   } );
                // А следом возвращаем и левую.
                so_5::send< put_t >( left_fork );
             } );
       }
       // Сообщаем о том, что мы закончили.
       tracer.philosopher_done( philosopher_index );
       so_5::send< philosopher_done_t >( control_ch, philosopher_index );
    }

    Let's start with the function prototype:


    voidphilosopher_process(
       trace_maker_t & tracer,
       so_5::mchain_t control_ch,
       std::size_t philosopher_index,
       so_5::mbox_t left_fork,
       so_5::mbox_t right_fork,
       int meals_count )

    The meaning and purpose of some of these parameters will have to be explained.


    Since we do not use SObjectizer agent agents, we are not able to remove the philosopher’s trail through the agent state listener, as was done in the Actor version. Therefore, in the philosopher’s code one has to do such inserts:


    tracer.thinking_started( philosopher_index, thinking_type_t::normal );

    And the argument traceris just a reference to an object that traces the work of philosophers.


    The argument control_chspecifies the channel in which the message should be written philosopher_done_tafter the philosopher has eaten everything that is necessary for him. This channel will then be used to determine when all philosophers have completed their work.


    Arguments left_forkand right_forkspecify channels for interacting with forks. These channels will be sent messages take_tand put_t. But if these are channels, then why use type mbox_tinstead mchain_t?


    This is a good question! But we will see the answer to it below, when discussing another solution. In the meantime, it can be said that mchain is something like a kind of mbox, so links to mchain can be passed through objects mbox_t.


    Next, a series of variables is defined that shape the state of the philosopher:


    int meals_eaten{ 0 };
    random_pause_generator_t pause_generator;
    auto self_ch = so_5::create_mchain( control_ch->environment() );

    Probably the most important variable is this self_ch. This is the personal channel of the philosopher, through which the philosopher will receive response messages from forks.


    Well, now we can go to the main logic of the philosopher. Those. to the cycle of repeating such operations as thinking, grabbing forks and absorbing food.


    It can be noted that, in contrast to the Actor-based solution, it is used here this_thread::sleep_forinstead of pending messages to simulate long-term operations .


    Attempting to take the plug looks almost the same as in the case of the Actors:


    so_5::send< take_t >( left_fork, self_ch->as_mbox(), philosopher_index );

    It uses the same type take_t. But it has a type field mbox_t, whereas it self_chhas a type mchain_t. Therefore it is necessary to convert the link to the channel into a link to the mailbox through a call as_mbox().


    Then you can see the call receive():


    so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
          [&]( so_5::mhood_t<taken_t> ) {...} );

    This call returns control only when one instance taken_tis retrieved and processed. Well, or if the channel is closed. In general, we are waiting for the response we need from the plug.


    In general, this is almost all that could be told about philosopher_process. Although it is worth focusing on the nested call receive()for the same channel:


    so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
          [&]( so_5::mhood_t<taken_t> ) {
             ...
             so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
                   [&]( so_5::mhood_t<taken_t> ) {...} );
             ...
          } );

    This nesting allows you to write the logic of the philosopher in a simple and more or less compact form.


    Simulation start function


    When discussing solutions based on Actors, we did not stop at the analysis of functions run_simulation(), since there was nothing particularly interesting or important there. But in the case of the CSP approach, run_simulation()it makes sense to stop. Including in order to once again see how difficult it is to write the correct multi-threaded code (by the way, regardless of the programming language).


    Full code of the run_simulation function
    voidrun_simulation(
       so_5::environment_t & env,
       constnames_holder_t & names )noexcept{
       constauto table_size = names.size();
       constauto join_all = []( std::vector<std::thread> & threads ) {
          for( auto & t : threads )
             t.join();
       };
       trace_maker_t tracer{
             env,
             names,
             random_pause_generator_t::trace_step() };
       // Создаем вилки.std::vector< so_5::mchain_t > fork_chains;
       std::vector< std::thread > fork_threads( table_size );
       for( std::size_t i{}; i != table_size; ++i )
       {
          // Персональный канал для очередной вилки.
          fork_chains.emplace_back( so_5::create_mchain(env) );
          // Рабочая нить для очередной вилки.
          fork_threads[ i ] = std::thread{ fork_process, fork_chains.back() };
       }
       // Канал для получения уведомлений от философов.auto control_ch = so_5::create_mchain( env );
       // Создаем философов.constauto philosopher_maker =
             [&](auto index, auto left_fork_idx, auto right_fork_idx) {
                returnstd::thread{
                      philosopher_process,
                      std::ref(tracer),
                      control_ch,
                      index,
                      fork_chains[ left_fork_idx ]->as_mbox(),
                      fork_chains[ right_fork_idx ]->as_mbox(),
                      default_meals_count };
             };
       std::vector< std::thread > philosopher_threads( table_size );
       for( std::size_t i{}; i != table_size - 1u; ++i )
       {
          // Запускаем очередного философа на отдельной нити.
          philosopher_threads[ i ] = philosopher_maker( i, i, i+1u );
       }
       // Последний философ должен захватывать вилки в обратном порядке.
       philosopher_threads[ table_size - 1u ] = philosopher_maker(
             table_size - 1u,
             table_size - 1u,
             0u );
       // Ждем пока все философы закончат.
       so_5::receive( so_5::from( control_ch ).handle_n( table_size ),
             [&names]( so_5::mhood_t<philosopher_done_t> cmd ) {
                fmt::print( "{}: done\n", names[ cmd->m_philosopher_index ] );
             } );
       // Дожидаемся завершения рабочих нитей философов.
       join_all( philosopher_threads );
       // Принудительно закрываем каналы для вилок.for( auto & ch : fork_chains )
          so_5::close_drop_content( ch );
       // После чего дожидаемся завершения рабочих нитей вилок.
       join_all( fork_threads );
       // Показываем результат.
       tracer.done();
       // И останавливаем SObjectizer.
       env.stop();
    }

    Basically, the code run_simulation()should be more or less clear. Therefore, I will analyze only some of the points.


    We need working threads for forks. This fragment is just responsible for creating them:


    std::vector< so_5::mchain_t > fork_chains;
    std::vector< std::thread > fork_threads( table_size );
    for( std::size_t i{}; i != table_size; ++i )
    {
       fork_chains.emplace_back( so_5::create_mchain(env) );
       fork_threads[ i ] = std::thread{ fork_process, fork_chains.back() };
    }

    At the same time, we need to save both the channels created for the forks and the working threads themselves. Channels will be required below to transmit references to them to the philosophers. And working threads will be required in order to then call joinfor them.


    After that we create working threads for philosophers and also collect the working threads in a container, since we will then need to call join:


    std::vector< std::thread > philosopher_threads( table_size );
    for( std::size_t i{}; i != table_size - 1u; ++i )
    {
       philosopher_threads[ i ] = philosopher_maker( i, i, i+1u );
    }
    philosopher_threads[ table_size - 1u ] = philosopher_maker(
          table_size - 1u,
          table_size - 1u,
          0u );

    Next, we need to give philosophers some time to perform a simulation. We are waiting for the end of the simulation with the help of this fragment:


    so_5::receive( so_5::from( control_ch ).handle_n( table_size ),
          [&names]( so_5::mhood_t<philosopher_done_t> cmd ) {
             fmt::print( "{}: done\n", names[ cmd->m_philosopher_index ] );
          } );

    This option receive()returns control only after receiving table_sizetype messages philosopher_done_t.


    Well, after receiving all it philosopher_done_tremains to perform the cleaning of resources.


    We make joinfor all workers the threads of philosophers:


    join_all( philosopher_threads );

    Then you need to do joinfor all the threads forks. But you joincan not just call , because then we stupidly hang. After all, workers forks threads sleep inside calls receive()and no one will wake them up. Therefore, we need to close all the channels for the plugs first and only then call join:


    for( auto & ch : fork_chains )
       so_5::close_drop_content( ch );
    join_all( fork_threads );

    Now the main resource cleanup operations can be considered complete.


    A few words about noexcept


    I hope that the code is run_simulationnow completely clear and I can try to explain why this feature is marked as noexcept . The fact is that in her exception-safety is not guaranteed from the word at all. Therefore, the best reaction to the occurrence of an exception in such a trivial implementation is to force the completion of the entire application.


    But why run_simulationnot provide security in relation to exceptions?


    Let's imagine that we create working threads for forks and when creating a new thread we have an exception. In order to ensure at least some exception-safety, we need to forcibly complete those threads that have already been launched. And someone might think that it is enough to rewrite the thread start code for the plugs as follows:


    try
    {
       for( std::size_t i{}; i != table_size; ++i )
       {
          fork_chains.emplace_back( so_5::create_mchain(env) );
          fork_threads[ i ] = std::thread{ fork_process, fork_chains.back() };
       }
    }
    catch( ... )
    {
       for( std::size_t i{}; i != fork_chains.size(); ++i )
       {
          so_5::close_drop_content( fork_chains[ i ] );
          if( fork_threads[ i ].joinable() )
             fork_threads[ i ].join();
       }
       throw;
    }

    Unfortunately, this is an obvious and wrong decision. Since it will be useless if an exception occurs after we create all the threads for the forks. Therefore, it is better to use something like this:


    structfork_threads_stuff_t {std::vector< so_5::mchain_t > m_fork_chains;
       std::vector< std::thread > m_fork_threads;
       fork_threads_stuff_t( std::size_t table_size )
          :  m_fork_threads( table_size )
       {}
       ~fork_threads_stuff_t()
       {
          for( std::size_t i{}; i != m_fork_chains.size(); ++i )
          {
             so_5::close_drop_content( m_fork_chains[ i ] );
             if( m_fork_threads[ i ].joinable() )
                m_fork_threads[ i ].join();
          }
       }
       voidrun(){
          for( std::size_t i{}; i != m_fork_threads.size(); ++i )
          {
             m_fork_chains.emplace_back( so_5::create_mchain(env) );
             m_fork_threads[ i ] = std::thread{ fork_process, m_fork_chains.back() };
          }
       }
    } fork_threads_stuff{ table_size }; // Преаллоцируем нужные ресурсы.
    fork_threads_stuff.run(); // Создаем каналы и запускаем нити.// Вся нужная очистка произойдет в деструкторе fork_threads_stuff.

    Well, or you can use tricks that allow us to execute the code we need when exiting the scop (for example, Boost ScopeExit, GSL finally (), and the like).


    A similar problem exists with the launch of threads for philosophers. And it will need to be addressed in this way.


    However, if you put all the necessary code to ensure exception-safety in run_simulation(), then the code run_simulation()will be both more voluminous and more difficult to perceive. What is not good for a function written exclusively for demonstration purposes and not applying for production quality. Therefore, I decided to score on the provision of exception-safety inside run_simulation()and marked the function as noexcept , which will lead to a call std::terminatein case of an exception. IMHO, for this kind of demo is quite a reasonable option.


    However, the problem is that you need to have relevant experience in order to understand which resource cleanup code will work and which will not. An inexperienced programmer may find that it is enough just to call join, although in fact you still need to close the channels before the call join. And such a problem in the code will be very difficult to catch.


    A simple solution without the use of an arbitrator (waiter)


    Now we can consider how in the CSP-approach a simple solution will look with the return of forks in case of an unsuccessful capture, but without an arbitrator.


    The source code for this solution can be found here .


    Thread for the plug


    The thread for the plug is implemented by a function fork_processthat looks like this:


    voidfork_process(
       so_5::mchain_t fork_ch ){
       // Состояние вилки: захвачена или свободна.bool taken = false;
       // Обрабатываем сообщения пока канал не закроют.
       so_5::receive( so_5::from( fork_ch ),
             [&]( so_5::mhood_t<take_t> cmd ) {
                if( taken )
                   so_5::send< busy_t >( cmd->m_who );
                else
                {
                   taken = true;
                   so_5::send< taken_t >( cmd->m_who );
                }
             },
             [&]( so_5::mhood_t<put_t> ) {
                if( taken )
                   taken = false;
             } );
    }

    It can be seen that this one is fork_processsimpler than the analogous one in the Dijkstra decision (we could observe the same picture when considering solutions based on Actors).


    Thread for the philosopher


    The thread for the philosopher is implemented by a function philosopher_processthat turns out to be somewhat more complicated than its counterpart in the decision of Dijkstra.


    Full code philosopher_process
    voidphilosopher_process(
       trace_maker_t & tracer,
       so_5::mchain_t control_ch,
       std::size_t philosopher_index,
       so_5::mbox_t left_fork,
       so_5::mbox_t right_fork,
       int meals_count ){
       int meals_eaten{ 0 };
       // Этот флаг потребуется для трассировки действий философа.thinking_type_t thinking_type{ thinking_type_t::normal };
       random_pause_generator_t pause_generator;
       // Канал для получения ответов от вилок.auto self_ch = so_5::create_mchain( control_ch->environment() );
       while( meals_eaten < meals_count )
       {
          tracer.thinking_started( philosopher_index, thinking_type );
          // Имитируем размышления приостанавливая нить.std::this_thread::sleep_for( pause_generator.think_pause( thinking_type ) );
          // На случай, если захватить вилки не получится.
          thinking_type = thinking_type_t::hungry;
          // Пытаемся взять левую вилку.
          tracer.take_left_attempt( philosopher_index );
          so_5::send< take_t >( left_fork, self_ch->as_mbox(), philosopher_index );
          // Запрос отправлен, ждем ответа.
          so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
             []( so_5::mhood_t<busy_t> ) { /* ничего не нужно делать */ },
             [&]( so_5::mhood_t<taken_t> ) {
                // Левая вилка взята, попробуем взять правую.
                tracer.take_right_attempt( philosopher_index );
                so_5::send< take_t >(
                      right_fork, self_ch->as_mbox(), philosopher_index );
                // Запрос отправлен, ждем ответа.
                so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
                   []( so_5::mhood_t<busy_t> ) { /* ничего не нужно делать */ },
                   [&]( so_5::mhood_t<taken_t> ) {
                      // У нас обе вилки, можно поесть.
                      tracer.eating_started( philosopher_index );
                      // Имитируем поглощение пищи приостанавливая нить.std::this_thread::sleep_for( pause_generator.eat_pause() );
                      // На шаг ближе к финалу.
                      ++meals_eaten;
                      // После еды нужно вернуть правую вилку.
                      so_5::send< put_t >( right_fork );
                      // Следующий период размышлений должен быть помечен как "normal".
                      thinking_type = thinking_type_t::normal;
                   } );
                // Левую вилку нужно вернуть в любом случае.
                so_5::send< put_t >( left_fork );
             } );
       }
       // Уведомляем о завершении работы.
       tracer.philosopher_done( philosopher_index );
       so_5::send< philosopher_done_t >( control_ch, philosopher_index );
    }

    In general, the code is philosopher_processvery similar to the code philosopher_processfrom the Dijkstra solution. But there are two important differences.


    First, it is variable thinking_type. It is needed in order to form the correct trail of the philosopher’s work, as well as to calculate the pauses when imitating the philosopher’s “thoughts”.


    Secondly, these are handlers for messages busy_t. We can see them when calling receive():


    so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
       []( so_5::mhood_t<busy_t> ) { /* ничего не нужно делать */ },
       [&]( so_5::mhood_t<taken_t> ) {
          ...
          so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
             []( so_5::mhood_t<busy_t> ) { /* ничего не нужно делать */ },
             [&]( so_5::mhood_t<taken_t> ) {...} );

    Yes, the handlers for busy_tare empty, but this is because all the necessary actions have either already been done before the call receive(), or will be done after exiting receive(). Therefore, when receiving busy_tnothing to do. But the handlers themselves must be defined, since their presence prohibits receive()throwing instances busy_twithout processing. Due to the presence of such handlers, it receive()returns control when a message arrives in the channel busy_t.


    Solution with a waiter and time stamps


    On the basis of the CSP approach, another decision was made that I would like to briefly highlight here. Analyzing decisions on the basis of Actors it was about decisions with an arbitrator (waiter): we considered waiter_with_queue decision, in which the waiter uses a queue of applications, and waiter_with_timestamps solution was also mentioned. Both of these solutions used the same trick: the waiter created a set of mboxes for non-existent forks, these mboxes were distributed to philosophers, but messages from mboxes were processed by the waiter.


    A similar trick is needed in the CSP approach so that I can reuse the existing implementation philosopher_processfrom the no_waiter_simple solution. But can a waiter create a set of mchains to be used by philosophers and from which the waiter will read messages addressed to forks?


    Unfortunately not.


    Creating a set of mchains is not a problem. The problem is to read messages from several mchains at once.


    In SObjectizer, there is a function select()that allows you to do this, for example, it allows you to write like this:


    so_5::select( so_5::from_all(),
       case_(ch1, one_handler_1, one_handler_2, one_handler_3, ...),
       case_(ch2, two_handler_1, two_handler_2, two_handler_3, ...),
       ...);

    But for select()it is necessary that the list of channels and message handlers from them be available in compile-time. Whereas in my solutions to the "dinner philosophers" problem, this list becomes known only during execution. Therefore, in the CSP-approach it is impossible to reuse the trick from the Actor-based approach in its pure form.


    But we can rethink it.


    So, the bottom line is that in the original messages take_tand put_tthere is no field to store the index of the fork. And so we need to somehow get these indexes. Since we cannot push the indices inside take_tand put_t, then let's make extended versions of these messages:


    structextended_take_tfinal :public so_5::message_t
    {
       const so_5::mbox_t m_who;
       conststd::size_t m_philosopher_index;
       conststd::size_t m_fork_index;
       extended_take_t(
          so_5::mbox_t who,
          std::size_t philosopher_index,
          std::size_t fork_index )
          :  m_who{ std::move(who) }
          ,  m_philosopher_index{ philosopher_index }
          ,  m_fork_index{ fork_index }
       {}
    };
    structextended_put_tfinal :public so_5::message_t
    {
       conststd::size_t m_fork_index;
       extended_put_t(
          std::size_t fork_index )
          :  m_fork_index{ fork_index }
       {}
    };

    Generally speaking, there is no need to inherit the message types from so_5::message_t, although I usually just use such inheritance (it has some small benefits). In this case, inheritance is used simply to demonstrate this method of defining SObjectizer messages.

    Now you need to make the waiter read the expanded versions of messages instead of the original ones. So we need to learn how to intercept messages take_tand put_t, convert them to extended_take_tand extended_put_t, send new messages to the waiter.


    For this we need our own mbox. Only :)


    Own mbox code
    classwrapping_mbox_tfinal :public so_5::extra::mboxes::proxy::simple_t
    {
       usingbase_type_t = so_5::extra::mboxes::proxy::simple_t;
       // Куда сообщения должны доставляться.const so_5::mbox_t m_target;
       // Индекс вилки, который должен использоваться в новых сообщениях.conststd::size_t m_fork_index;
       // Типы сообщений для перехвата.staticstd::type_index original_take_type;
       staticstd::type_index original_put_type;
    public :
       wrapping_mbox_t(
          const so_5::mbox_t & target,
          std::size_t fork_index )
          :  base_type_t{ target }
          ,  m_target{ target }
          ,  m_fork_index{ fork_index }
       {}
       // Это основной метод so_5::abstract_message_box_t для доставки сообщений.// Переопределяем его для перехвата и преобразования сообщений.voiddo_deliver_message(
          conststd::type_index & msg_type,
          const so_5::message_ref_t & message,
          unsignedint overlimit_reaction_deep )const override
       {
          if( original_take_type == msg_type )
          {
             // Получаем доступ к исходному сообщению.constauto & original_msg = so_5::message_payload_type<::take_t>::
                   payload_reference( *message );
             // Шлем новое сообщение вместо старого.
             so_5::send< extended_take_t >(
                   m_target,
                   original_msg.m_who,
                   original_msg.m_philosopher_index,
                   m_fork_index );
          }
          elseif( original_put_type == msg_type )
          {
             // Шлем новое сообщение вместо старого.
             so_5::send< extended_put_t >( m_target, m_fork_index );
          }
          elsebase_type_t::do_deliver_message(
                   msg_type,
                   message,
                   overlimit_reaction_deep );
       }
       // Фабричный метод чтобы было проще создавать wrapping_mbox_t.staticautomake(
          const so_5::mbox_t & target,
          std::size_t fork_index ){
          return so_5::mbox_t{ newwrapping_mbox_t{ target, fork_index } };
       }
    };
    std::type_index wrapping_mbox_t::original_take_type{ typeid(::take_t) };
    std::type_index wrapping_mbox_t::original_put_type{ typeid(::put_t) };

    Here was used the easiest way to create your own mbox-a: in the accompanying project so_5_extra there is a template that you can reuse and save yourself a lot of time. Without using this preset, I would have to inherit directly from so_5::abstract_message_box_tand implement a number of pure virtual methods.


    Anyway, now there is a class wrapping_mbox_t. So we can now create a set of instances of this class, links to which will be distributed to philosophers. Philosophers will send messages in wrapping_mbox, and these messages will be transformed and redirected to a single waiter mchain. Therefore, the function waiter_process, which is the main function of the waiter's thread, will have this simple form:


    voidwaiter_process(
       so_5::mchain_t waiter_ch,
       details::waiter_logic_t & logic ){
       // Получаем и обрабатываем сообщения пока канал не закроют.
       so_5::receive( so_5::from( waiter_ch ),
             [&]( so_5::mhood_t<details::extended_take_t> cmd ) {
                logic.on_take_fork( std::move(cmd) );
             },
             [&]( so_5::mhood_t<details::extended_put_t> cmd ) {
                logic.on_put_fork( std::move(cmd) );
             } );
    }

    Of course, the waiter’s application logic is implemented elsewhere and its code is not so simple and short, but we will not dive there. Those interested can see the solution code waiter_with_timestamps here .


    Right now we can answer the question: "Why are the channels for forks transmitted to philosopher_processmboxes?" This is because a custom mbox was implemented to solve the waiter_with_timestamps, not the mchain.


    Of course, you could create your own mchain. But it would have required some more work, because in so_5_extra, there is not yet the same preset for its own mchains (maybe it will appear over time). So to save time, I just stopped at mbox instead of mchain.


    Conclusion


    That's probably all that I wanted to tell you about the solutions implemented on the basis of the Actors and CSP. I saw my task not to make the most effective decisions. And to show exactly how they can look. I hope that someone was interested.


    Let me divert the attention of those interested in SObjectizer. Everything goes to the fact that work on the next "big" version of SObjectizer will start in the near future - branch 5.6, breaking compatibility with branch 5.5. Those who wish to have their say on this subject can do it here (or here ). A more or less up-to-date list of what changes in SO-5.6 can be found here (you can also add your own wishes there).


    I have everything on it, many thanks to all readers for the time spent on this article!


    Ps. The word "modern" in the title is quoted because there is nothing modern in the solutions themselves. Is that except for the use of code in C ++ 14.


    Also popular now: