Introduction to reactive programming

    Hello. In this article I will run a gallop across Europe, namely, I will tell you what they mean by reactive programming, introduce you to actors, reactive streams, and finally, using reactive streams, we will recognize mouse gestures like in the old Opera and its spiritual heir - Vivaldi .

    The goal is to introduce the basic concepts of reactive programming and show that everything is not as difficult and scary as it may seem at first glance.

    image
    A source

    What is reactive programming?


    To answer this question, we turn to the site . It has a beautiful picture, which shows 4 main criteria that must be met by reactive applications.

    image

    The application must be fast, fault tolerant and well scaled.
    Looks like "we are for all the good against all the bad," right?

    What is meant by these words:

    1. Responsiveness The

      application should give the user the result in half a second. This also includes the principle of fail fast - that is, when something goes wrong, it is better to return to the user an error message like “Sorry, there was a problem. Try later "than to make the sea wait for the weather. If the operation is long, we show the user a progress bar. If it is very long, “your request will be completed approximately March 18, 2042. We will send you a notification by mail. ”
    2. Scalability is a way to ensure responsiveness under load. Imagine the life cycle of a relatively successful service:
      1. Starting - the request flow is small, the service is spinning on a virtual machine with one core.
      2. The request flow is increasing - virtual kernels have added kernels and requests are processed in several threads.
      3. Even more loading - we connect batching - requests to base and to the hard disk are grouped.
      4. Even more load - you need to raise more servers and ensure the work in the cluster.
        Ideally, the system should scale up or down depending on the load.
    3. Fault tolerance

      We accept that we live in an imperfect world and anything happens. In case something goes wrong with our system, we need to provide error handling and ways to recover.
    4. Finally, we are encouraged to do all this with a system based on messaging ( message-driven ).

    Before continuing, I want to dwell on the difference between event-driven systems and message-driven systems.

    Event-driven:

    • Event - the system reports that it has reached a certain state.
    • Subscribers to the event can be many.
    • The chain of events is usually short, and event handlers are near (both physically and in code) with the source.
    • The source of the event and its handlers usually have a common state (physically - they use the same section of RAM for the exchange of information).

    In contrast to the event-driven, in the message-driven system:

    • Each message has only one addressee.
    • Messages are immutable: you can not change something in the received message so that the sender learned about it and was able to read the information.
    • Elements of the system react (or do not react) to receive messages and can send messages to other elements of the system.

    All this offers us

    Actor model


    Milestones of development:

    • The first mention of the actors is in the scientific work of 1973 - Carl Hewitt, Peter Bishop, and Richard Steiger, “A universal modular ACTOR formalism for artificial intelligence,”
    • 1986 - Erlang appeared. Ericson needed a language for telecommunications equipment that would provide fault tolerance and non-proliferation of errors. In the context of this article - its main features:

      • Everything is a process
      • Messages are the only means of communication (Erlang is a functional language, and the messages in it are immutable).
    • ..
    • 2004 - the first version of the Scala language. Its features are:
      • Powered by JVM,
      • Functional,
      • For multithreading model chosen actors.

    • 2009 - the implementation of actors separated into a separate library - Akka
    • 2014 - Akka.net - it was ported to .Net.

    What can actors do?


    Actors are the same objects, but:

    • Unlike ordinary objects, actors cannot call each other's methods.
    • Actors can transmit information only through unchangeable messages .
    • Upon receipt of a message, the actor may
      • Create new actors (they will be lower in the hierarchy),
      • Send messages to other actors
      • Stop actors lower in the hierarchy and yourself.

    Consider an example.

    image

    Actor A wants to send a message to actor B. All he has is ActorRef (some address). Actor B can be anywhere.
    Actor A sends letter B through the system (ActorSystem). The system puts the letter in the mailbox of actor B and the actor B. “wakes up” Actor B takes the letter from the mailbox and does something.

    Compared to calling another method, it looks unnecessarily difficult, but the model of actors fits perfectly with the real world, if you imagine that actors are people who are trained to do something in response to certain stimuli.

    Imagine a father and son:



    The father sends the text to the son “Clean the room” and continues to go about his business. Son reads SMSku and begins cleaning. Father plays poker in the meantime. The son finishes cleaning and sends SMS "Finish". Looks easy, right?

    Now imagine that father and son are not actors, but ordinary objects that can pull methods from each other. The father pulls the son for the “clean the room” method and follows him closely, waiting until the son finishes cleaning and transfers control back to his father. Father cannot play poker at this time. In this context, the model of actors becomes more attractive.

    Now move on to

    Akka.NET


    Everything that is written below is also true for the original Akka for the JVM, but for me C # is closer than Java, so I’m going to use the example of Akka.NET.

    So what advantages does Akka have?


    • Multithreading through messaging. You no longer have to suffer with all sorts of locks, semaphores, mutexes and other delights characteristic of classical multithreading with shared memory.
    • Transparent communication between the system and its components. No need to worry about the complex network code - the system will find the recipient of the message and guarantee the delivery of the message (here you can insert a joke about UDP vs TCP).
    • A resilient architecture that can automatically scale up or down. For example, under load, the system can raise additional nodes of the cluster and evenly distribute the load.

    But the topic of scaling is very extensive and worthy of a separate publication. Therefore, I will tell you more about the feature that will be useful in all projects:

    Error processing


    Actors have a hierarchy - it can be represented as a tree. Each actor has a parent and can be "children."

    image
    Akka.NET documentation Copyright 2013-2018 Akka.NET project

    For each actor, you can install the Supervision strategy - what to do if something went wrong with the “children”. For example, “nail down” an actor who has problems, and then create a new actor of the same type and charge him with the same work.

    For example, I made an application on Akka.net CRUD, in which the layer of "business logic" is implemented on actors. The goal of this project was to find out whether it is worth using actors in unscalable systems — whether they will make life better or add more pain.

    How built-in error handling in Akka can help:

    Gif


    1. everything is good, the application is working,
    2. something happened to the repository, and now it gives the result only 1 time out of 5,
    3. I set up Supervision strategy to "try 10 times in a second,"
    4. the application is working again (albeit more slowly), and I have time to figure out what's wrong.

    Then there is a temptation to say: “Well, I will write such error handling myself, why should any actors be fenced?”. Fair remark, but only if the points of failure are few.

    And some code. So initialization of system of actors in the IoC container looks like:

    publicContainer()
            {
                system = ActorSystem.Create("MySystem");
                var echo = system.ActorOf<EchoActor>("Echo");
                //stop initialization if something is wrong with actor systemvar alive = echo.Ask<bool>(true, TimeSpan.FromMilliseconds(100)).Result;
                container = new WindsorContainer();
                //search for dependencies//register controllers//register ActorSystem
                propsResolver = new WindsorDependencyResolver(container, (ActorSystem)system);
                system.AddDependencyResolver(propsResolver);
                actorSystemWrapper = new ActorSystemWrapper(system, propsResolver);
                container.Register(Component.For<IActorRefFactory>().Instance(actorSystemWrapper));
                container.Register(Component.For<IDependencyResolver>().Instance(propsResolver));
            }
    

    EchoActor is the simplest actor that returns a value to the sender:

    publicclassEchoActor : ReceiveActor
        {
            publicEchoActor()
            {
                Receive<bool>(flag =>
                {
                    Sender.Tell(flag);
                });
            }
        }
    

    To associate actors with a “normal” code, use the Ask command:

    publicasync Task<ActionResult> Index()
            {
                ViewBag.Type = typeof(Model);
                var res = await CrudActorRef.Ask<IEnumerable<Model>>(DataMessage.GetAll<Model>(), maxDelay);
                return View(res);
            }

    Total


    Pokhimich with actors, I can say:

    • They should look at if you need scalability
    • For complex business logic, it is better not to use them because
      • Strange Dependency Injection. To initialize the actor with the necessary dependencies, you must first create a Props object, then give it to the ActorSystem to create the actor of the desired type. To create Props using IoC containers (for example, Castle Windsor or Autofac) there are ready-made wrappers — DependencyResolvers. But I was faced with the fact that the IoC container was trying to manage the dependency lifetime, and after a while the system silently fell off.

        * Perhaps, instead of injecting a dependency into an object, this dependency should be issued as a child actor.
      • problems with typing. ActorRef knows nothing about the type of actor it refers to. That is, at compile time, it is not known whether the actor can process a message of this type or not.

    Part 2: Reactive Streams


    And now let's move on to a more popular and useful topic - jet streams. If you can never meet actors in the course of work, then Rx streams will be useful both in the frontend and in the backend. Their implementation is in almost all modern programming languages. I will give examples on RxJs, because nowadays even backend programmers sometimes have to do something in JavaScript.


    Rx-flows have for all popular programming languages,

    " Introduction to Programming Reactive you've Been missing " by Andre Staltz , according to CC BY-NC 4.0 license

    To explain what the jet stream, I'll start with the Pull and Push collections.
     Single return valueMultiple return values
    Pull
    Synchronous
    Interactive
    TIEnumerable <T>
    Push
    Asynchronous
    Reactive
    Task <T>IObservable <T>

    Pull collections are what we are all used to in programming. The most striking example is the array.

    const arr = [1,2,3,4,5];

    It already has data, it will not change this data, but it can give it on request.

    arr.forEach(console.log);

    Also, before you do something with the data, you can somehow handle them.

    arr.map(i => i+1).map(I => “my number is ”+i).forEach(console.log);

    And now let's imagine that initially there is no data in the collection, but it will definitely inform you that they have appeared (Push). And at the same time, we still can apply the necessary transformations to this collection.

    For example:

    source.map(i => i+1).map(I => “my number is ”+i).forEach(console.log);

    When a value appears in the source, for example, 1, console.log displays “my number is 1”.

    How it works:

    A new entity appears - Subject (or Observable):

    const observable = Rx.Observable.create(function (observer) {
     observer.next(1);
     observer.next(2);
     observer.next(3);
     setTimeout(() => {
    	 observer.next(4);
    	 observer.complete();
     }, 1000); });

    This is the push-collection, which will send notifications about changes in its state.

    In this case, numbers 1, 2 and 3 will appear in it immediately, 4 seconds later, and then the collection will “end”. This is such a special type of event.

    The second entity is the Observer. He can subscribe to Subject events and do something with the received data. For example:

    observable.subscribe(x =>console.log(x));
    observable.subscribe({ 
    next: x =>console.log('got value ' + x),
    error: err =>console.error('something wrong occurred: ' + err),
    complete: () =>console.log('done'),
    });
    observable
    	.map(x => ‘This is ‘ + x)
    	.subscribe(x =>console.log(x));

    Here you can see that one Subject can have many subscribers.

    It looks easy, but it is not entirely clear why this is needed. I will give 2 more definitions that need to be known when working with jet streams, and then I will show in practice how they work and in what situations their full potential is revealed.

    Cold observables


    • Notify on events when someone subscribes to them.
    • The entire data stream is sent anew to each subscriber regardless of the time of the subscription.
    • Data is copied for each subscriber.

    What this means: let's say the company (Subject) decided to arrange the distribution of gifts. Each employee (Observer) comes to work and receives his copy of the gift. No one is left out.

    Hot observables


    • Try to notify about the event regardless of the presence of subscribers. If at the time of the event there were no subscribers - the data is lost.

    Example: in the morning they bring hot cakes for employees to the company. When they are brought, all larks fly to the smell and dismantle cakes for breakfast. And the owls, who came later, no longer get the pies.

    What situations to use jet streams?


    When there is a data stream distributed in time. For example, user input. Or logs from any service. In one of the projects, I saw a samopinny logger who collected events in N seconds and then recorded the whole pack at a time. Battery code occupied page. If Rx streams were used, it would be much simpler:

    image
    RxJs Reference / Observable , documentation licensed under CC BY 4.0 .
    (there are many examples and pictures explaining what various operations with jet streams do)

    source.bufferTime(2000).subsribe(doThings); 

    And finally, an example of use.

    Recognize mouse gestures using Rx streams


    In the old Opera or its spiritual heir - Vivaldi - was browser control using mouse gestures.

    Gif - mouse gestures in Vivaldi


    That is, you need to recognize the movement of the mouse up / down, right / left and their combinations. This can be written without Rx streams, but the code will be complex and difficult to maintain.

    And this is what it looks like with Rx streams:


    I will start from the end - I will ask what data and in what format I will look for in the original sequence:

    //gestures to look forconst gestures = Rx.Observable.from([
        { name: "Left", 
    	sequence: Rx.Observable.from([{ x: -1, y: 0 }]) },
        { name: "Right", 
    	sequence: Rx.Observable.from([{ x: 1, y: 0 }]) },
        { name: "Up", 
    	sequence: Rx.Observable.from([{ x: 0, y: -1 }]) },
        { name: "Down", sequence:
    	 Rx.Observable.from([{ x: 0, y: 1 }]) },
        { name: "Down+Up", sequence:
    	 Rx.Observable.from([{ x: 0, y: 1 }, { x: 0, y: -1 }]) },
        { name: "Up+Right", sequence:
    	 Rx.Observable.from([{ x: 0, y: -1 }, { x: 1, y: 0 }]) }
    ]);

    These are single vectors and their combinations.

    Next, you need to convert mouse events to Rx streams. All Rx libraries have built-in tools for turning standard events into Observables.

    const mouseMoves = Rx.Observable.fromEvent(canvas, 'mousemove'),
          mouseDowns = Rx.Observable.fromEvent(canvas, 'mousedown'),
            mouseUps = Rx.Observable.fromEvent(canvas, 'mouseup');

    Next, I group the mouse coordinates by 2 and find their difference, getting the mouse offset.

    const mouseDiffs = mouseMoves
        .map(getOffset)
        .pairwise()
        .map(pair => { 
    	return { x: pair[1].x-pair[0].x, y: pair[1].y-pair[0].y }
        });

    And I group these movements using the 'mousedown' and 'mouseup' events.

    const mouseGestures = mouseDiffs
        .bufferToggle(mouseDowns, x => mouseUps)
        .map(concat);

    The concat function cuts out too short movements and groups movements that roughly coincide in direction.

    functionconcat(values) {//summarize move in same directionreturn values.reduce((a, v) => {
    		if (!a.length) {
    			a.push(v);
    		} else {
    const last = a[a.length - 1];
    const lastAngle = Math.atan2(last.x, last.y);
    const angle = Math.atan2(v.x, v.y);
    const angleDiff = normalizeAngle(angle - lastAngle);
    const dist = Math.hypot(v.x, v.y);
    if (dist < 1) return a;//move is too short – ignore//moving in same direction => adding vectorsif (Math.abs(angleDiff) <= maxAngleDiff) {
    last.x += v.x;
    last.y += v.y;
    	     } else {
    		a.push(v);
    	     }
    		}
    		return a;
    	}, []);
    }

    If the movement along the X or Y axis is too short, it is reset. And then only the sign remains from the received offset coordinates. Thus, we obtain the unit vectors that we were looking for.

    const normalizedMouseGestures = mouseGestures.map(arr =>
        arr.map(v => {
            const dist = Math.hypot(v.x, v.y);//length of vector
            v.x = Math.abs(v.x) > minMove && Math.abs(v.x) * treshold > dist ? v.x : 0;
            v.y = Math.abs(v.y) > minMove && Math.abs(v.y) * treshold > dist ? v.y : 0;
            return v;
         })
    ).map(arr =>
        arr
           .map(v => { return { x: Math.sign(v.x), y: Math.sign(v.y) }; })
           .filter(v =>Math.hypot(v.x, v.y) > 0)
    );

    Result:

    gestures.map(gesture =>
        normalizedMouseGestures.mergeMap(
            moves =>
               Rx.Observable.from(moves)
                   .sequenceEqual(gesture.sequence, comparer)
            ).filter(x => x).mapTo(gesture.name)
    ).mergeAll().subscribe(gestureName => actions[gestureName]());

    With the help of sequenceEqual, you can compare the received movements with the original ones and, if there is a match, perform a specific action.

    Gif


    You can play with gestures here.

    Please note that, in addition to gesture recognition, there is also a drawing of both the initial and normalized mouse movements on the HTML canvas. The readability of the code does not suffer from this.

    From which follows another advantage - the functionality written with the help of Rx streams can be easily supplemented and expanded.

    Total


    • Libraries with Rx streams are available for almost all programming languages.
    • Rx streams should be used when there is a stream of events stretched in time (for example, user input).
    • The functionality written with the help of Rx streams can be easily extended and expanded.
    • I did not find significant shortcomings.


    Also popular now: