Building blocks of distributed applications. First approach


    In the last article, we examined the theoretical foundations of reactive architecture. It's time to talk about data streams, ways to implement reactive Erlang / Elixir systems, and messaging patterns in them:


    • Request response
    • Request-Chunked Response
    • Response with Request
    • Publish-subscribe
    • Inverted Publish-subscribe
    • Task distribution

    SOA, MSA and Messaging


    SOA, MSA - system architectures that define the rules for building systems, while messaging provides primitives for their implementation.


    I do not want to promote this or that architecture of building systems. I support the use of the most effective and useful practices for a specific project and business. Whatever paradigm we choose, it is better to create system blocks with an eye on the Unix-way: components with minimal connectivity that are responsible for individual entities. API methods perform the simplest actions with entities.


    Messaging - as the name implies, is a message broker. Its main purpose is to receive and give messages. He is responsible for the interfaces for sending information, the formation of logical channels for transmitting information within the system, routing and balancing, and the processing of failures at the system level.
    The messaging being developed is not trying to compete with or replace rabbitmq. Its main features:


    • Distribution
      Exchange points can be created on all nodes of the cluster, as close as possible to the code that uses them.
    • Simplicity.
      Focus on minimizing boilerplate code and usability.
    • The best performance.
      We are not trying to repeat the functionality of rabbitmq, but only highlight the architectural and transport layer, which is as simple as possible in OTP, minimizing costs.
    • Flexibility.
      Each service can combine many exchange templates.
    • Fault tolerance inherent in design.
    • Scalability.
      Messaging grows with the application. As the load increases, you can move the exchange points to individual machines.

    Comment. From the point of view of code organization, meta-projects are well suited for complex systems with Erlang / Elixir. All project code is in one repository - an umbrella project. At the same time, microservices are as isolated as possible and perform simple operations that are responsible for a separate entity. With this approach, it is easy to support the API of the entire system, just make changes, it is convenient to write units and integration tests.


    System components interact directly or through a broker. From a messaging perspective, each service has several life phases:


    • Service initialization.
      At this stage, the configuration and launch of the service executing process and dependencies takes place.
    • Creating an exchange point.
      The service can use the static exchange point specified in the configuration of the node, or create exchange points dynamically.
    • Service registration.
      In order for the service to be able to service requests, it must be registered at the exchange point.
    • Normal functioning.
      Service produces useful work.
    • Shutdown.
      There are 2 types of shutdown: regular and emergency. With a regular service, it disconnects from the exchange point and stops. In emergency cases, messaging runs one of the failover scenarios.

    It looks pretty complicated, but not everything is so scary in the code. Examples of code with comments will be given in the analysis of templates a little later.


    Exchanges


    An exchange point is a messaging process that implements the logic of interacting with components within a messaging template. In all the examples below, the components interact through exchange points, the combination of which forms messaging.


    Message exchange patterns (MEPs)


    Globally, sharing patterns can be divided into two-way and one-way. The former imply a response to the message received, the latter do not. A classic example of a two-way pattern in a client-server architecture is the Request-response pattern. Consider the template and its modifications.


    Request – response or RPC


    RPC is used when we need to get a response from another process. This process can be launched on the same site or located on a different continent. Below is a diagram of the interaction of the client and server through messaging.



    Since messaging is completely asynchronous, for the client the exchange is divided into 2 phases:


    1. Request Submission


      messaging:request(Exchange, ResponseMatchingTag, RequestDefinition, HandlerProcess).

      Exchange - unique name of the exchange point.
      ResponseMatchingTag - local label for processing the response. For example, in the case of sending several identical requests belonging to different users.
      RequestDefinition - request body.
      HandlerProcess - PID of the handler. This process will receive a response from the server.


    2. Response processing


      handle_info(#'$msg'{exchange = EXCHANGE, tag = ResponseMatchingTag,message = ResponsePayload}, State)

      ResponsePayload - server response.



    For the server, the process also consists of 2 phases:


    1. Exchange Point Initialization
    2. Processing incoming requests

    Let's illustrate this template with code. Suppose we need to implement a simple service that provides the only exact time method.


    Server code


    Take out the definition of the service API in api.hrl:


    %% =====================================================
    %%  entities
    %% =====================================================
    -record(time, {
      unixtime :: non_neg_integer(),
      datetime :: binary()
    }).
    -record(time_error, {
      code :: non_neg_integer(),
      error :: term()
    }).
    %% =====================================================
    %%  methods
    %% =====================================================
    -record(time_req, {
      opts :: term()
    }).
    -record(time_resp, {
      result :: #time{} | #time_error{}
    }).

    Define a service controller in time_controller.erl


    %% В примере показан только значимый код. Вставив его в шаблон gen_server можно получить рабочий сервис.
    %% инициализация gen_server
    init(Args) ->
      %% подключение к точке обмена
      messaging:monitor_exchange(req_resp, ?EXCHANGE, default, self())
      {ok, #{}}.
    %% обработка события потери связи с точкой обмена. Это же событие приходит, если точка обмена еще не запустилась.
    handle_info(#exchange_die{exchange = ?EXCHANGE}, State) ->
      erlang:send(self(), monitor_exchange),
      {noreply, State};
    %% обработка API
    handle_info(#time_req{opts = _Opts}, State) ->
      messaging:response_once(Client, #time_resp{
    result = #time{ unixtime = time_utils:unixtime(now()), datetime = time_utils:iso8601_fmt(now())}
      });
      {noreply, State};
    %% завершение работы gen_server
    terminate(_Reason, _State) ->
      messaging:demonitor_exchange(req_resp, ?EXCHANGE, default, self()),
      ok.

    Client code


    In order to send a request to a service, you can call the messaging request API anywhere in the client:


    case messaging:request(?EXCHANGE, tag, #time_req{opts = #{}}, self()) of
        ok -> ok;
        _ -> %% repeat or fail logic
    end

    In a distributed system, the configuration of the components can be very different and at the time of the request messaging may not yet start, or the service controller will not be ready to serve the request. Therefore, we need to check the messaging response and handle the failure case.
    After successful sending, the client will receive a response or error from the service.
    Handle both cases in handle_info:


    handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time{unixtime = Utime}}}, State) ->
      ?debugVal(Utime),
      {noreply, State};
    handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time_error{code = ErrorCode}}}, State) ->
      ?debugVal({error, ErrorCode}),
      {noreply, State};

    Request-Chunked Response


    Better not to allow the transmission of huge messages. The responsiveness and stable operation of the entire system depends on this. If the response to the request takes up a lot of memory, then a breakdown into parts is mandatory.



    I will give a couple of examples of such cases:


    • Components exchange binary data, such as files. Breakdown of the answer into small parts helps to work efficiently with files of any size and not to catch memory overflows.
    • Listings. For example, we need to select all the records from a huge table in the database and transfer it to another component.

    I call these answers a locomotive. In any case, 1024 1 MB messages are better than a single 1 GB message.


    In the Erlang cluster, we get an additional gain - reducing the load on the exchange point and the network, since the answers are immediately sent to the recipient, bypassing the exchange point.


    Response with Request


    This is a fairly rare modification of the RPC pattern for building interactive systems.



    Publish-subscribe (data distribution tree)


    Event-oriented systems deliver data to consumers as data is available. Thus, systems are more prone to push models than pull or poll. This feature allows you not to waste resources by constantly querying and waiting for data.
    The figure shows the process of distributing a message to consumers who are subscribed to a specific topic.



    Classical examples of the use of this template is the distribution of the state: the gaming world in computer games, market data on exchanges, useful information in data feeds.


    Consider the subscriber code:


    init(_Args) ->
      %% подписываемся на обменник, ключ = key
      messaging:subscribe(?SUBSCRIPTION, key, tag, self()),
      {ok, #{}}.
    handle_info(#exchange_die{exchange = ?SUBSCRIPTION}, State) ->
      %% если точка обмена недоступна, то пытаемся переподключиться
      messaging:subscribe(?SUBSCRIPTION, key, tag, self()),
      {noreply, State};
    %% обрабатываем пришедшие сообщения
    handle_info(#'$msg'{exchange = ?SUBSCRIPTION, message = Msg}, State) ->
      ?debugVal(Msg),
      {noreply, State};
    %% при остановке потребителя - отключаемся от точки обмена
    terminate(_Reason, _State) ->
      messaging:unsubscribe(?SUBSCRIPTION, key, tag, self()),
      ok.

    The source can call the post publication function in any convenient place:


    messaging:publish_message(Exchange, Key, Message).

    Exchange - the name of the exchange point,
    Key - the routing key
    Message - payload


    Inverted Publish-subscribe



    By expanding pub-sub, you can get a pattern that is convenient for logging. The set of sources and consumers can be completely different. The figure shows a case with one consumer and many sources.


    Task distribution pattern


    In almost every project, tasks of deferred processing arise, such as generating reports, delivering notifications, receiving data from third-party systems. The throughput of a system that performs these tasks is easily scalable by adding handlers. All that remains for us is to form a cluster of handlers and evenly distribute tasks between them.


    Consider the situations that arise with the example of 3 handlers. Even at the stage of task distribution, the question arises of the fairness of the distribution and overflow of handlers. The round-robin distribution will be responsible for justice, and in order to avoid a situation of overflow of handlers, we introduce the prefetch_limit restriction . In transient modes, prefetch_limit will prevent one handler from receiving all tasks.


    Messaging manages queues and processing priority. Handlers receive tasks as they become available. The task may succeed or fail:


    • messaging:ack(Tack) - called in case of successful message processing
    • messaging:nack(Tack)- is called in all emergency situations. After the task returns, messaging will transfer it to another handler.


    Suppose a complex failure occurred during the processing of three tasks: handler 1, after receiving the task, crashed before it could communicate to the exchange point. In this case, the exchange point after ack timeout expires will transfer the job to another handler. Handler 3 for some reason abandoned the task and sent nack, as a result, the task also passed to another handler that successfully completed it.


    Preliminary result


    We took apart the basic building blocks of distributed systems and got a basic understanding of their application in Erlang / Elixir.


    By combining basic patterns, you can build complex paradigms for solving emerging problems.


    In the final part of the cycle, we will consider the general issues of the organization of services, routing and balancing, and also talk about the practical side of scalability and fault tolerance of systems.


    The end of the second part.


    Photo by Marius Christensen
    Illustrations prepared by websequencediagrams.com


    Also popular now: