The basics of reactive programming using RxJS. Part 3. Higher Order Observables

  • Tutorial


In this article we will look at how it is possible to process another in one thread, why it is needed, and how Higher Order Observables (hereinafter HOO) operators will help us in this.

When working with threads, a situation often arises when it is necessary to transfer the results of another to a thread as a value. For example, we want to execute an ajax request and process its response in the current thread, or run several parallel requests, implement pooling. I think many people are used to solving such problems using a mechanism such as promise. But is it possible to solve them using RxJS? Of course, and everything is much easier than you think!

Series of articles “Fundamentals of reactive programming using RxJS”:


Note : in order to understand the theoretical part of the article, you do not have to read the previous articles, you just need to know what observable, operators and pipes are. In the practical part, we will refine the example from the second article , which you can find here .

Problem


Let's imagine the following task: we need to find out every second whether the server is accessible. How can we solve it?

First, create a stream using the timer method:

timer(0, 1000).subscribe({
  next: console.log
});

The timer method is very similar in principle to interval . But unlike it, it allows you to set the thread start timeout, which is transmitted by the first parameter. The second parameter indicates the interval through which a new value will be generated. If the second parameter is not specified, the timer will generate only one value and terminate the stream.

Since you and I do not have a server, I suggest just writing a function that emulates a request to the server:

const makeRequest = () => {
  return timer(1000).pipe(
    mapTo('success')
  )
}

What does this method do? It returns a stream created using the timer method, which emits a value after one second has passed and terminates. Since the timer method only generates a number, we use the mapTo operator to replace it with the string “success”.

This is what the stream created by the makeRequest method looks like:



Now we have a choice: call the makeRequest method inside the stream or assign this responsibility to the observer?

The first approach is preferable, since in this case we will be able to use the full potential of RxJS with its operators and relieve our observer of unnecessary duties. We use the timer method to execute requests by interval:

timer(0, 1000).pipe(
  map(() => makeRequest())
).subscribe({
  next: console.log
});

When we run such code, we will see that in console.log we get not a message with the text “success”, but an object of type Observable: We



quite expect the answer, because we return the stream to map. For a stream to work, you need to subscribe to it. Well, let's see how not to do it :

timer(0, 1000).pipe(
  map(() => makeRequest())
).subscribe({
  next: observable => observable.subscribe({
    next: console.log
  });
});

The problem with the example above is that we get a subscription in a subscription. But what if we want to make more than one request in a chain? Or what if at some point we need to unsubscribe from the flow inside? In this case, our code will more and more resemble “noodles”. To solve this problem, RxJS has special operators called HOO.

Hoo


HOO is a special type of statements that accept streams as values. One such operator is the mergeAll method.

When a stream arrives at mergeAll, it subscribes to it. The stream that the operator subscribed to is called internal. The stream from which the operator receives other flows as values ​​is called external.

When an internal thread generates a value, mergeAll pushes that value into the external thread. Thus, we get rid of the need to subscribe manually. If we unsubscribe from the external flow, then mergeAll will automatically unsubscribe from the internal one.

Let's see how we can rewrite our example with mergeAll:

timer(0, 1000).pipe(
  map(() => makeRequest())
  mergeAll()
).subscribe({
  next: console.log
});

In the example above, the external stream was created by the timer statement. And the flows that are created in the map operator are internal. Each created thread falls into the mergeAll statement.



The combination map + mergeAll is used very often, therefore in RxJS there is a mergeMap method:

timer(0, 1000).pipe(
  mergeMap(() => makeRequest())
).subscribe({
  next: console.log
});

When an external thread generates a value, the mergeMap operator calls the callback function passed to it, which generates a new thread. Then mergeMap subscribes to the generated stream.



The peculiarity of the mergeAll / mergeMap operator is that if another stream comes down to it, then it also subscribes to it. Thus, in an external stream, we can get values ​​from several internal ones at once. Let's see the following example:

  timer(0, 1000)

This is how the external stream will look without the mergeMap operator:



And so with mergeMap:

timer(0, 1000).pipe(
  mergeMap(() => interval(1000)) 
)



Every second, we create a new internal thread and mergeMap subscribes to it. Thus, we have many internal threads working at the same time, the values ​​from which fall into the external:





Note : be careful using mergeMap, each new internal thread will work until you unsubscribe from the external one. In the example above, the number of internal threads is growing every second, in the end there can be so many threads that the computer can not cope with the load.

concatAll / concatMap


The mergeMap method is great when you don’t care about the execution order of internal threads, but what if you need it? Suppose we want the next server request to be executed only after receiving a response from the previous one?

For such purposes, the HOO operator concatAll / concatMap is suitable. This operator, having subscribed to the internal thread, waits until it finishes, and only then subscribes to the next one.

If during the execution of one thread a new one descends to it, then it is placed in the queue until the previous one is completed.

// поток, генерирующий 1 по прошествии одной секунды
const firstInnerObservable = timer(1000).pipe(
  mapTo(1)
);
// поток, генерирующий 2 по прошествии половины секунды
const secondInnerObservable = timer(500).pipe(
  mapTo(2)
);
of(
  firstInnerObservable,
  secondInnerObservable
).pipe(
  concatAll()
).subscribe({
  next: console.log
});

In the example above, we create two threads using the timer method. For clarity, I used the mapTo operator to display different values. The first thread will generate 1, the second - 2. An external thread is created using the of method, which takes two of the above observable as input.

The concatAll statement first receives firstInnerObservable, subscribes to it, and waits for it to complete, and only after the completion of the first subscribes to secondInnerObservable. Here's what the external stream will look like:



If we replace concatAll with mergeAll, then the stream will look like this:

of(
  firstInnerObservable,
  secondInnerObservable
).pipe(
  mergeAll()
).subscribe({
  next: console.log
});



switchAll / switchMap


This operator differs from the previous ones in that when it receives a new stream, it immediately unsubscribes from the previous one and subscribes to the new one.

Take the example above and replace concatAll with switchAll, and see how the external flow behaves:

of(
  firstInnerObservable,
  secondInnerObservable
).pipe(
  switchAll()
).subscribe({
  next: console.log
});



Only the value from the second internal stream got into the external stream. That's because switchMap unsubscribed from the first when it received the second thread.

When is this needed? For example, when implementing a data search. If the answer from the server has not yet arrived, and we have already sent a new request, then we do not need to wait for the previous one.

exhaust / exhaustMap


exhaust is the exact opposite of the switchAll statement, and its behavior is similar to concatAll. This method, subscribing to the stream, waits for it to complete. If a new stream comes down to it, then it is simply discarded.

of(
  firstInnerObservable,
  secondInnerObservable
).pipe(
  exhaust()
).subscribe({
  next: console.log
});



In the example above, we did not get a deuce, because at that moment the operator was waiting for the completion of the first thread, and simply dropped the second one.

I think many have a question, when can such behavior be needed? A good example is the login form. It makes no sense to send several requests to the server until the current one is completed.

We are finalizing the application


We recall the example from the second article . In it, we implemented a search on GitHub and used the mergeMap operator to send requests to the server. Now we know the features of this operator, is it really suitable in our case?

fromEvent(input, 'keyup').pipe(
  debounceTime(700),
  map(event => event.target.value),
  filter(val => val.length > 2),
  distinctUntilChanged(),
  mergeMap(value => {
    return from(getUsersRepsFromAPI(value)).pipe(
      catchError(err => of([]))
    )
  })
).subscribe({
  next: reps => recordRepsToList(reps)
})

Let's assume that the GitHub server will be heavily overloaded, then processing our response will take a lot of time. What could possibly go wrong in this case?

Suppose a user entered some data, did not wait for an answer, and entered new ones. In this case, we will send the second request to the server. However, no one guarantees that the answer to the first request will come earlier.

Since the mergeMap operator doesn’t care in what order to process the internal threads, in the case when the first request is executed later than the second, we will erase the actual data. Therefore, I propose to replace the mergeMap method with switchMap:

fromEvent(input, 'keyup').pipe(
  debounceTime(700),
  map(event => event.target.value),
  filter(val => val.length > 2),
  distinctUntilChanged(),
  switchMap(value => {
    return from(getUsersRepsFromAPI(value)).pipe(
      catchError(err => of([]))
    )
  })
).subscribe({
  next: reps => recordRepsToList(reps)
})

Now, if the user enters new data, switchMap will unsubscribe from the previous stream and subscribe to the new one.

It is worth noting that our http request will continue to hang until the server gives an answer to it. But, since we have unsubscribed from the internal stream, the answer will not fall into the external stream.

Note : if you work with Angular and use HttpClient to work with http, then you can not worry about canceling the request itself. HttpClient can do this for you when unsubscribing.

Cancel http


The fetch api has the ability to cancel the http request using the AbortController . When combined with the switchMap operator, this functionality will save user traffic.

Let's rewrite our example a bit. And create a method that will wrap the fetch call in observable:

const createCancellableRequest = (url) => {
  // создаем контроллер для возможности отмены
  const controller = new AbortController();
  const signal = controller.signal;
  return new Observable(observer => {
    fetch(url, { signal })
      .then(response => {
        if (response.ok) {
          return response.json();
        }
        throw new Error('Ошибка');
    })
    // передаем успешный ответ наблюдателю
    .then(result => observer.next(result))
    // завершаем поток
    .then(() => observer.complete())
    // в случае ошибки, оповещаем об этом наблюдателя
    .catch(error => observer.error(error));
    // функция, вызывающаяся при отписке
    return () => {
      // отменяем запрос
      controller.abort();
    };
  });
};

Also change the getUsersRepsFromApi method:

const getUsersRepsFromAPI = (username) => {
  const url = `https://api.github.com/users/${ username }/repos`;
  return createCancellableRequest(url);
}

Now the method returns not promise, but observable. Therefore, we remove the from wrapper in switchMap:

switchMap(value => {
  return getUsersRepsFromAPI(value).pipe(
    catchError(err => of([])
  )
)

Note : in RxJS version 6.5, they added the fromFetch statement , which itself calls the abort method under the hood, so you no longer need to write your own “bike”.

That's all! All sample code can be found here .

Conclusion


Today we looked at what HOO is and some very useful operators from this category. Of course, these were far from all of them. For more detailed and detailed information, I recommend visiting the RxJS documentation .

In the next article I plan to consider what is the difference between Hot and Cold observables.

Finally: do not use the subscription in the subscription, because there is HOO!

Also popular now: