The quickest introduction to Reactive Programming

    The purpose of this article is to show by example why reactive programming is needed, how it relates to functional programming, and how it can be used to write declarative code that can be easily adapted to new requirements. In addition, I want to do this as briefly and simply as possible with an example close to the real one.


    Let's take the following task:
    There is a certain service with REST API and endpoint /people. When a POST request to this endpoint is created, a new entity is created. Write a function that accepts an array of view objects { name: 'Max' }and creates a set of entities through the API (in English, this is called a batch operation).


    Let's solve this problem in the imperative style:


    const request = require('superagent')
    functionbatchCreate(bodies) {
      const calls = []
      for (let body of bodies) {
        calls.push(
          request
            .post('/people')
            .send(body)
            .then(r => r.status)
        )
      }
      returnPromise.all(calls)
    }
    

    For comparison, let's rewrite this piece of code in a functional style. For simplicity, the functional style will be understood as:


    1. Using functional primitives ( .map , .filter , .reduce ) instead of imperative loops ( for , while )
    2. The code is organized in "pure" functions - they depend only on their arguments and do not depend on the state of the system

    Functional code:


    const request = require('superagent')
    functionbatchCreate(bodies) {
      const calls = bodies.map(body =>
        request
          .post('/people')
          .send(body)
          .then(r => r.status)
      )
      returnPromise.all(calls)
    }

    We received a piece of code of the same size and it is worth admitting that it is not clear how this piece is better than the previous one.
    In order to understand what the second piece of code is better - you need to start changing the code, imagine that there is a new requirement for the original task:
    The service that we call has a limit on the number of requests in a period of time: in a second one client can perform no more than five requests. Performing more requests will cause the service to return a 429 HTTP error (too many requests).


    In this place, it is probably worth stopping and trying to solve the problem yourself,% username%


    Let's take our function code as a basis and try to change it. The main problem of "pure" functional programming is that it does not "know" anything - about the runtime environment and I / O (in English there is a side effect for this ), but in practice we are constantly working with them.
    To fill this gap, Reactive Programming comes to the rescue - a set of approaches trying to solve the problem of side effects. The most famous implementation of this paradigm is the Rx library , using the concept of reactive streams


    What is reactive streams? In short, this is an approach that allows you to apply functional primitives (.map, .filter, .reduce) to something distributed over time.


    For example, we transmit a set of commands over the network - we do not need to wait until we receive the entire set, we represent it as a reactive stream and can work with it. Here there are two more important concepts:


    • the flow can be infinite or arbitrarily long distributed over time
    • the sending party sends the command only if the receiving one is ready to process it (backpressure)

    The purpose of this article is to find easy ways, therefore, we will take the Highland library , which tries to solve the same problem as Rx, but is much easier to learn. The idea behind it is simple: let's take Node.js streams as a basis and we will “pour” data from one Stream into another.


    Let’s get started: let's start with a simple one — let's make our code “reactive” without adding new functionality


    const request = require('superagent')
    const H = require(‘highland’)
    functionbatchCreate(bodies) {
       return H(bodies)
        .flatMap(body =>
          H(request
            .post('localhost:3000/people')
            .send(body)
            .then(r => r.status)
          )
        )
        .collect()
        .toPromise(Promise)
    }

    What you should pay attention to:


    • H (bodies) - we create stream from array
    • .flatmap and callback which it accepts. The idea is quite simple - we wrap the Promise into a thread constructor in order to get a stream with a single value (or an error. It is important to understand that this is the value, not Promise).
      As a result, this gives us a stream of threads - with the help of flatMap we smooth it into one stream of values ​​that we can operate on (who said the monad?)
    • .collect - we need to collect all values ​​at one "point" into an array
    • .toPromise - returns us a Promise that will be fulfilled at the moment when we have the value from the stream

    Now let's try to implement our requirement:


    const request = require('superagent')
    const H = require('highland')
    functionbatchCreate(bodies) {
       return H(bodies)
        .flatMap(body =>
          H(request
            .post('localhost:3000/people')
            .send(body)
            .then(r => r.status)
          )
        )
        .ratelimit(5, 1000)
        .collect()
        .toPromise(Promise)
    }

    Thanks to the concept of backpressure, this is just one line of .ratelimit in this paradigm. In Rx, this takes up about the same amount of space .


    Well, that's all, I wonder your opinion:


    • Did I manage to achieve the result declared at the beginning of the article?
    • Is it possible to achieve a similar result using the imperative approach?
    • Are you interested in Reactive programming?

    PS: here you can find another one of my articles about Reactive Programming


    Also popular now: