Learning to intercept raw messages or an example of how SObjectizer is overgrown with new features ...

    We are very pleased when new features are added to SObjectizer as a result of prompts and / or wishes of users of SObjectizer. Although this is far from always easy. Indeed, on the one hand, we, as a team of developers and old users of SObjectizer, already have our own stereotypes about how SObjectizer is customary to use. And it’s not always possible to immediately evaluate the “fresh view from the outside”, to understand what the user really wants to see in the framework and why he is not satisfied with the available means. On the other hand, SObjectzer is not such a small framework; adding new features requires some caution. It is necessary that the new functionality does not conflict with existing features. And, all the more so that after adding something new something that already exists and has been working for a long time does not break. Plus, we have a point about maintaining compatibility between versions of SObjectizer, so we are strongly against radical changes ...

    In general, adding a new one to SObjectizer is always pleasant from the point of view of increasing the capabilities of the framework and increasing its usability. But far from always it is just as pleasant and simple from the point of view of implementation.

    Under the cut is a short story about how one new feature was added to SObjectizer. Maybe some of the readers will be interested to see how the old framework adapts to the requests of new users.

    Preamble


    So, it all started with the fact that one of the users of SObjectizer, PavelVainerman , drew our attention to the fact that in SObjectizer there are no ready-made convenient tools for performing occasional one-time interactions between agents.

    It turned out that this is what is meant. Suppose agent A wants to send a request to agent B and wants to receive a response message from agent B. But at the same time, agent A does not want to wait for a response longer than 5 seconds. A trivial “forehead” solution that immediately comes to mind may look like this:

    // Тип сообщения-запроса.
    struct request {
      const so_5::mbox_t reply_to_; // Куда отсылать ответ.
      ... // Другие параметры, конструкторы и т.д.
    };
    // Тип сообщения-ответа.
    struct reply {
      ...
    };
    class A : public so_5::agent_t {
      // Сигнал, который будет использоваться для обозначения истечения тайм-аута.
      struct reply_timed_out final : public so_5::signal_t {};
      ...
      // Обработчики сообщений.
      void on_reply(mhood_t cmd) {...}
      void on_reply_timeout(mhood_t) {...}
      ...
      // Место, где нам требуется сделать запрос агенту B.
      void ask_something(const so_5::mbox_t & B_mbox) {
        // Подписываемся на ответное сообщение и на сообщение о тайм-ауте.
        so_subscribe_self().event(&A::on_reply);
        so_subscribe_self().event(&A::on_reply_timeout);
        // Теперь отсылаем запрос. В запросе передаем свой mbox, на который
        // будет отсылаться ответ.
        so_5::send(B_mbox, so_direct_mbox(), ...);
        // И отсылаем себе отложенное сообщение для того, чтобы отсчитать тайм-аут.
        so_5::send_delayed(*this, 5s);
    };
    

    Unfortunately, this simple version is only a clear demonstration of the veracity of the aphorism that "any complex task has a simple, easy to understand wrong solution." There are several problems here.

    The first problem is related to the pending message A :: reply_timed_out. If the response from Agent B did not arrive on time, then with reply_timed_out everything is fine with us. We get it, process it and forget about it. But what happens if the response from Agent B arrived on time? What will happen to reply_timed_out?

    It will still come to Agent A. No one canceled reply_timed_out. So, as soon as the thread of the SObjectizer timer counts down 5 set seconds, the reply_timed_out message will be delivered to agent A. And we will receive and process it despite the fact that we no longer need it. What's wrong. It would be correct to ensure that the reply_timed_out message does not reach us after we receive a reply from Agent B.

    The most reliable way to do this is to unsubscribe from reply_timed_out. Why exactly this is the topic of a separate big conversation. If anyone is interested, you can separately talk about this topic. In the meantime, we restrict ourselves to the fact that unsubscribing from a delayed message is a "reinforced concrete" option for solving problems with a delayed message.

    The second problem is that agent A is unlikely to need to communicate in this way only with agent B. Most likely, agent A exchanges request / reply messages with several agents at once. Accordingly, when request flies to agents B and C at the same time, then agent A needs to somehow understand who the response came from. Or whose response was not received within 5 seconds.

    The second problem is more or less conveniently solved by refusing to use your own mbox agent A as a return address. It's easier to create a new mbox for every new interaction. And it is this new mbox that will be used both for receiving a response and for a pending message for this particular request.

    However, as soon as we introduce the new mbox, we must make sure that the mbox is removed after it is no longer needed. To do this, we must remove the subscription to this mbox. If the subscriptions are not deleted, then the mbox will remain alive, and this will lead to a constant increase in memory consumption - we will create new mbox-s for each new request, and these mbox-s will not be deleted.

    In general, if you take into account these two problems, then a simple solution will be converted to not very simple:

    class A : public so_5::agent_t {
      // Сигнал, который будет использоваться для обозначения истечения тайм-аута.
      struct reply_timed_out final : public so_5::signal_t {};
      ...
      // Обработчики сообщений теперь будут получать дополнительный параметр,
      // который будет идентифицировать конкретный запрос, к которому относится сообщение.
      void on_reply(const request_info & info, mhood_t cmd) {...}
      void on_reply_timeout(const request_info & info, mhood_t) {...}
      ...
      // Место, где нам требуется сделать запрос стороннему агенту.
      // Здесь мы так же получаем дополнительный параметр, описывающий запрос.
      void ask_something(const request_info & info, const so_5::mbox_t & dest) {
        // Нам нужен уникальный mbox для взаимодействия в рамках этого запроса.
        const auto uniq_mbox = so_environment().create_mbox();
        // Этот вспомогательный объект будет помогать удалять подписки.
        auto subscriptions_dropper = [this, uniq_mbox] {
          so_drop_subscription(uniq_mbox);
          so_drop_subscription(uniq_mbox);
        };
        // Подписываемся на ответное сообщение и на сообщение о тайм-ауте.
        so_subscribe(uniq_mbox)
          .event([this, info, subscriptions_dropper](mhood_t cmd) {
            // Уничтожаем подписки.
            subscription_dropper();
            // Выполняем основную обработку.
            on_reply(info, cmd);
          })
          .event([this, info, subscriptions_dropper](mhood_t cmd) {
            subscription_dropper();
            on_reply_timeout(info, cmd);
          });
        // Теперь отсылаем запрос. В запросе передаем свой уникальный mbox, который
        // мы создали специально для этого запроса.
        so_5::send(B_mbox, uniq_mbox, ...);
        // И отсылаем себе отложенное сообщение для того, чтобы отсчитать тайм-аут.
        so_5::send_delayed(so_environment(), uniq_mbox, 5s);
      }
    };
    

    It turns out not so simple and compact as we would like. But this is far from all. So, in this solution there is no exception safety. There is no explicit cancellation of a pending message when it is no longer needed. But, more importantly, if Agent A wants to have not one default state, as in the example above, but several states, in each of which he needs to respond to messages differently, then everything will become even worse. Well, everything will become even worse if the exchange between A and B requires not one response message, but several. Say, if instead of reply there will be successful_reply and failed_reply, then the amount of work for the agent A developer will increase markedly.

    Why didn’t we ourselves face such a problem?


    A small retreat to the side. When it became clear to us what PavelVainerman was telling us , we ourselves were surprised. After all, the problem is really obvious. But why didn’t we come across it ourselves? At least not encountered so often as to draw attention to it and include a solution for this problem in SObjectizer.

    Probably, there were two factors.

    First, we quickly came up with the ideas for the SEDA approach . There the number of agents is small, stable communications are established between them, so there are no such problems in principle.

    Secondly, probably, a single one-time interaction with us is most often used between short-lived agents. And for an agent who lives only to process a single operation, these problems are not relevant.

    Be that as it may, it is impossible not to note the fact that as soon as new people begin to use your tool, it immediately turns out that they want to use the tool in a completely different way from how you yourself are used to doing it.

    What did we do in the end?


    As a result, we expanded our add-on on SObjectizer under the name so_5_extra , adding support for the so-called. asynchronous operations . Through asynchronous operations, the above example can be rewritten as follows:

    class A : public so_5::agent_t {
      // Сигнал, который будет использоваться для обозначения истечения тайм-аута.
      struct reply_timed_out final : public so_5::signal_t {};
      ...
      // Обработчики сообщений получают дополнительный параметр,
      // который будет идентифицировать конкретный запрос, к которому относится сообщение.
      void on_reply(const request_info & info, mhood_t cmd) {...}
      void on_reply_timeout(const request_info & info, mhood_t) {...}
      ...
      // Место, где нам требуется сделать запрос стороннему агенту.
      // Здесь мы так же используем дополнительный параметр, описывающий запрос.
      void ask_something(const request_info & info, const so_5::mbox_t & dest) {
        // Нам нужен уникальный mbox для взаимодействия в рамках этого запроса.
        const auto uniq_mbox = so_environment().create_mbox();
        // Описываем и активируем асинхронную операцию.
        so_5::extra::async_op::time_limited::make(*this)
          .completed_on(uniq_mbox, so_default_state(), [this, info](mhood_t cmd) {
              on_reply(info, cmd);
            })
          .timeout_handler(so_default_state(), [this, info](mhood_t cmd) {
              on_reply_timeout(info, cmd);
            })
          .activate(5s);
        // Теперь отсылаем запрос. В запросе передаем свой уникальный mbox, который
        // мы создали специально для этого запроса.
        so_5::send(B_mbox, uniq_mbox, ...);
      }
    };
    

    Read more about the new asynchronous operations in so_5_extra here .

    But today we will not talk about how the asynchronous messages themselves are made. And about what was required to be done in SObjectizer in order for asynchronous messages to work in so_5_extra.

    What was the problem with implementing time_limited asynchronous operations?


    So_5_extra includes two implementations of asynchronous operations: time_unlimited, when there are no restrictions at all on the execution time of the operation, and time_limited, when the operation needs to be completed in the allotted time. Above, it was just about time_limited operations, because it was with their implementation that was one of the main snag.

    The bottom line is that when we start a time_limited operation, we must definitely receive and process a pending message, which limits the time of the asynchronous operation. And with this “necessarily” it was just not all simple.

    The fact is that one of the key features of SObjectizer is the state of agents. States allow agents to process different sets of messages in each of the states. Or, to process the same messages in different states in different ways. But there is a flip side: if a message needs to be processed in all states, then you need to explicitly sign the message handler for each of the states. Those. write something like:

    class default_msg_handler_demo : public so_5::agent_t {
      // Список состояний агента.
      state_t st_first{this}, st_second{this}, st_third{this};
      ...
      // Обработчик, который мы хотим повесить на каждое состояние.
      void some_msg_default_handler(mhood_t cmd) {...}
      ...
      virtual void so_define_agent() override {
        ...
        // Подписываем свой обработчик "по умолчанию".
        so_subscribe(some_mbox)
          .in(st_first).in(st_second).in(st_third)
          .event(&default_msg_handler_demo::some_msg_default_handler);
        ...
      }
    };

    Naturally, this is not the best and most convenient solution.

    By using the capabilities of hierarchical finite state machines, you can make it easier, more convenient and more reliable:

    class default_msg_handler_demo : public so_5::agent_t {
      // Список состояний агента.
      // Специальное родительское состояние.
      state_t st_parent{this},
        // И список актуальных состояний, которые будут дочерними.
        st_first{initial_substate_of{st_parent}},
        st_second{substate_of{st_parent}},
        st_third{substate_of{st_parent}};
      ...
      // Обработчик, который мы хотим повесить на каждое состояние.
      void some_msg_default_handler(mhood_t cmd) {...}
      ...
      virtual void so_define_agent() override {
        ...
        // Подписываем свой обработчик "по умолчанию" только в родительском состоянии.
        so_subscribe(some_mbox)
          .in(st_parent)
          .event(&default_msg_handler_demo::some_msg_default_handler);
        ...
      }
    };

    Now the “default” handler will be called regardless of which state the agent is in.

    But, unfortunately, this approach requires the agent to be initially designed using hierarchical state machines. It is unlikely that it would be convenient to use asynchronous operations from so_5_extra if they imposed such a strict requirement on users: they say you want to use asynchronous operations - so please create a parent state in your agent.

    And it is not always possible in principle to do this. Suppose someone wrote you an agent library that has the base type basic_device_manager. You make your own inheritance class my_device_manager and you need to use asynchronous operations in my_device_manager. If the developer didn’t do something like st_parent in basic_device_manager, then you won’t add your st_parent there.

    In general, it was necessary to do something that would catch messages that were addressed to the agent, but which were not processed by the agent. Such messages are sometimes called deadletters .

    What and how did we end up doing?


    Deadletter handlers


    We made it so that the developer can now hang his own handler on a message that was not processed by the "normal" handler. For instance:

    class deadletter_handler_handler_demo : public so_5::agent_t {
      state_t st_first{this}, st_second{this}, st_third{this};
      ...
      void deadletter_handler(mhood_t cmd) {...}
      ...
      void normal_handler(mhood_t cmd) {...}
      ...
      virtual void so_define_agent() override {
        ...
        // Подписываем "нормальный" обработчик только для st_first.
        so_subscribe(some_mbox)
          .in(st_first).event(&deadletter_handler_demo::normal_handler);
        // Подписываем обработчик "потерянных" сообщений.
        so_subscribe_deadletter_handler(some_mbox, &deadletter_handler_demo::deadletter_handler);
        ...
      }
    };

    Now, if the agent receives the some_msg message from the some_mbox mailbox while in the st_first state, normal_handler will be called to process the message. But if the agent is in any other state, then deadletter_handler is called to process this message.

    This feature is used by time_limited operations. When the operation is activated, deadletter_handler is hung up on the message about the expiration of the timeout. And no matter what state the agent is in at the time this message arrives, the message will be received and processed. Which allows you to complete the asynchronous operation. Even in the case when the developer was mistaken and did not define all the timeout handlers he needed.

    Attractive idea that has not been implemented


    The first thought that arose as soon as the problem of deadletter handlers was formulated was to provide each agent with some kind of parental state. And all other states to automatically become subsidiaries to it. Those. there was an idea to force some superstate into each agent. Which is simple and there is nothing to be done about it :)

    This idea was very attractive from the point of view of the current mechanism for storing and searching for subscriptions (this mechanism is not so simple).

    Also, this idea is very beautiful from an ideological point of view. Hierarchical finite state machines as they are.

    But I had to abandon it (maybe for a while?).

    The main reason for the failure is that the state_t object is pretty heavyweight. Depending on the compiler, the standard library, and compilation options, state_t in 64-bit mode can take from 150 to 250 bytes. If you forcefully add superstate to each agent, then the "weight" of each agent increases by one and a half to two hundred bytes. Just like that, out of the blue. Even if this agent does not need superstate at all.

    There was, and indeed is, yet another reason. Superstate for each agent is too big an innovation for SObjectizer to make it from floundering bay,
    without carefully weighing all the consequences. I personally have great suspicions about the fact
    that it is worth adding superstates to SObjectizer and they will begin to abuse them.

    In general, the idea of ​​superstate did not go to work on version 5.5.21. But the notch remained in memory. Perhaps she will still find her embodiment. If anyone has any thoughts on this, it would be interesting to hear and discuss.

    Actual solution


    They abandoned the idea of ​​superstate, but still did not want to change the current mechanism for storing subscriptions. Therefore, a solution was found in which an additional state_t object was nevertheless required. But he exists one for all and all agents refer to him.

    Thanks to this, it was possible to use the same tools for registering deadletter handlers and for finding them. In fact, so_subscribe_deadletter_handler is nothing more than a message handler subscription for a special, invisible to the user state . Well, the search for a deadletter handler for a message is just a normal search for a handler , but not for the current state of the agent, but for this special, invisible state. True, there aresome additional actions for the case when the trace mode of the message delivery mechanism is turned on, but these are completely boring details.

    Was everything so obvious and simple?


    When I read this article before publication, I caught myself thinking that some kind of triviality was being told. Well, everything seems to be simple and clear. But the path to this “simple and clear” turned out to be not at all quick, not direct and not obvious. If anyone is interested, then traces of the evolution of the idea of ​​asynchronous operations can be found in this mini-series of blog posts: No. 1 , No. 2 and No. 3 . Although, as it turned out, even the final post of this series did not describe a resultant solution. I had to stumble upon a serious miscalculation of my own and puzzle over how to prevent memory leaks in the presence of cyclic links between objects. But this is a completely different story ...

    A few words in conclusion


    Thanks first ...


    I would like to thank everyone who helps us to develop SObjectizer: those who use SO-5 and express their thoughts and suggestions (special thanks here to PavelVainerman ), those who still do not use SO-5, but help with tips and more (great thanks, in particular, masterspline ), and just those who are not too lazy to put +1 in the news about SObjectizer on various resources and stars on github :) Thank you all very much!

    ... and briefly about plans for the near future


    We are going to start work on the next version of SObjectizer under the number 5.5.22 in the near future. The main new feature that we want to see in 5.5.22 is support for parallel states for agents. Agents can already use the advanced capabilities of hierarchical states. Something like: nesting of states, shallow and deep-history for states, input / output handlers, time limits for staying in a state. But what SObjectizer did not have so far was parallel states.

    At one time, we did not do them for some reason. But practice has shown that some of the users need parallel states and make their life easier. So we will do them. Everyone is invited to a discussion: any constructive considerations, and especially examples from practice and personal experience will be very useful to us.

    Well, in general, it would be interesting to find out what your impressions of SObjectizer are, what you like, what you don't like, what you would like to have in SO-5, which interferes ... Well, of course, we are ready to answer any questions about SObjectizer .

    Also popular now: