The basics of reactive programming using RxJS. Part 2. Operators and pipes

  • Tutorial


In a previous article, we looked at what streams are and what they eat with. In the new part, we will get acquainted with what methods RxJS provides for creating flows, what are operators, pipes and how to work with them.

RxJS has a rich API . The documentation describes more than a hundred methods. To get to know them a bit, we will write a simple application and in practice we will see what the reactive code looks like. You will see that the same tasks that used to seem routine and required writing a lot of code have an elegant solution if you look at them through the prism of reactivity. But before we get into practice, we’ll look at how flows can be represented graphically and get acquainted with convenient methods for creating and processing them.

Graphical representation of threads


To clearly demonstrate how a particular flow behaves, I will use the notation adopted in the reactive approach. Recall our example from the previous article:

const observable = new Observable((observer) => {
  observer.next(1);
  observer.next(2);
  observer.complete();
});

Here's what its graphical representation will look like:



A stream is usually depicted as a straight line. If the stream emits any value, then it is displayed on the line as a circle. A straight line in the display is the signal to end the stream. To display the error, use the symbol - “×”.

const observable = new Observable((observer) => {
  observer.error();
});



One line streams


In my practice, I rarely had to create my own Observable instances directly. Most methods for creating threads are already in RxJS. To create a stream emitting values ​​1 and 2, it is enough to use the of method:

const observable = of(1, 2);

The of method accepts any number of arguments and returns a finished instance of the Observable. After subscribing, it will emit the obtained values ​​and complete:



If you want to present the array as a stream, you can use the from method. The from method as an argument expects any iterable object (array, string, etc.) or promise, and projects this object onto the stream. Here's what the stream obtained from the string will look like:

const observable = from('abc');



And so, you can wrap a promise in a stream:

const promise = new Promise((resolve, reject) => {
  resolve(1);
});
const observable = from(promise);



Note: often threads are compared to promise. In fact, they have only one thing in common - a  push strategy for spreading change. The rest are completely different entities. Promise cannot produce multiple values. It can only execute resolve or reject, i.e. only have two states. A stream can transmit several values, and can be reused.

Do you remember the example with the interval from the first article ? This stream is a timer that counts the time in seconds from the moment of subscription.

const timer = new Observable(observer => {
  let counter = 0;
  const intervalId = setInterval(() => {
    observer.next(counter++);
  }, 1000);
  return () => {
   clearInterval(intervalId);
  }
});

Here's how you can implement the same thing in one line:

const timer = interval(1000);



And finally, a method that allows you to create a stream of events for DOM elements:

const observable = fromEvent(domElementRef, 'keyup');

As values, this stream will receive and emit keyup event objects.

Pipes & Operators


Pipe is an Observable class method added in RxJS in version 5.5. Thanks to it, we can build chains of operators for sequential processing of values ​​received in the stream. Pipe is a unidirectional channel that interconnects operators. The operators themselves are normal functions described in RxJS that process values ​​from a stream.

For example, they can convert the value and pass it further to the stream, or they can act as filters and not skip any values ​​if they do not meet the specified condition.

Let's look at the operators in action. Multiply each value from the stream by 2 using the map operator:

of(1,2,3).pipe(
  map(value => value * 2)
).subscribe({
  next: console.log
});

Here's what the stream looks like before applying the map operator:



After the map operator:



Let's use the filter operator. This statement works just like the filter function in the Array class. The method takes a function as the first argument, which describes a condition. If the value from the stream satisfies the condition, then it is passed on:

of(1, 2, 3).pipe(
  // пропускаем только нечетные значения
  filter(value => value % 2 !== 0),
  map(value = value * 2)
).subscribe({
  next: console.log
});

And this is how the whole scheme of our stream will look:



After filter:



After map:



Note: pipe! == subscribe. The pipe method declares the flow behavior, but does not subscribe. Until you call the subscribe method, your stream will not start working.

We are writing an application


Now that we have figured out what pipe and operators are, you can get down to practice. Our application will perform one simple task: display a list of open github repositories by the owner’s nickname entered.

There will be few requirements:

  • Do not execute an API request if the string entered in input contains less than 3 characters;
  • In order not to fulfill the request for each character entered by the user, it is necessary to set a delay (debounce) of 700 milliseconds before accessing the API;

To search for repositories, we will use the github API . I recommend running the examples themselves on stackblitz . There I laid out the finished implementation. Links are provided at the end of the article.

Let's start with the html markup. Let's describe input and ul elements:


    Then, in the js or ts file, we get links to the current elements using the browser API:

    const input = document.querySelector('input');
    const ul = document.querySelector('ul');
    

    We also need a method that will execute a request to the github API. Below is the code for the getUsersRepsFromAPI function, which accepts the user's nickname and performs an ajax request using fetch. Then it returns a promise, converting the successful response to json along the way:

    const getUsersRepsFromAPI = (username) => {
      const url = `https://api.github.com/users/${ username }/repos`;
      return fetch(url)
        .then(response => {
          if(response.ok) {
            return response.json();
          }
          throw new Error('Ошибка');
        });
    }
    

    Next, we write a method that will list the names of repositories:

    const recordRepsToList = (reps) => {
      for (let i = 0; i < reps.length; i++) {
        // если элемент не существует, то создаем его
        if (!ul.children[i]) {
          const newEl = document.createElement('li');
          ul.appendChild(newEl);
        }
        // записываем название репозитория в элемент
        const li = ul.children[i];
        li.innerHTML = reps[i].name;
      }
      // удаляем оставшиеся элементы
      while (ul.children.length > reps.length) {
        ul.removeChild(ul.lastChild);
      }
    }
    

    The preparations are complete. It is time to take a look at RxJS in action. We need to listen to the keyup event of our input. First of all, we must understand that in a reactive approach, we work with flows. Fortunately, RxJS already provides a similar option. Remember the fromEvent method that I mentioned above. We use it:

    const keyUp = fromEvent(input, 'keyup');
    keyUp.subscribe({
      next: console.log
    });
    

    Now our event is presented as a stream. If we look at what is displayed in the console, we will see an object of type KeyboardEvent. But we need a user-entered value. This is where the pipe method and the map operator come in handy:

    fromEvent(input, 'keyup').pipe(
      map(event => event.target.value)
    ).subscribe({
      next: console.log
    });
    

    We proceed to the implementation of the requirements. To begin with, we will execute the query when the entered value contains more than two characters. To do this, use the filter operator:

    fromEvent(input, 'keyup').pipe(
      map(event => event.target.value),
      filter(value => value.length > 2)
    )
    

    We dealt with the first requirement. We proceed to the second. We need to implement debounce. RxJS has a debounceTime statement. This operator as the first argument takes the number of milliseconds during which the value will be held before passing on. In this case, each new value will reset the timer. Thus, at the output we get the last value, after which 700 milliseconds have passed.

    fromEvent(input, 'keyup').pipe(
      debounceTime(700),
      map(event => event.target.value),
      filter(value => value.length > 2)
    )
    

    Here is what our stream might look like without debounceTime:



    And here it will look like the same stream passed through this statement:



    With debounceTime we will less likely access the API, due to which we will save traffic and offload the server.

    For additional optimization, I suggest using another operator - distinctUntilChanged. This method will save us from duplicates. It is best to show its work using an example:

    from('aaabccc').pipe(
      distinctUntilChanged()
    )
    

    Without distinctUntilChanged:



    With distinctUntilChanged:



    Add this statement immediately after the debounceTime statement. Thus, we will not access the API if the new value for some reason coincides with the previous one. A similar situation can occur when the user has entered new characters and then erased them again. Since we have implemented a delay, only the last value will fall into the stream, the answer to which we already have.

    Go to the server


    Already now we can describe the logic of the request and the processing of the response. While we can only work with promise. Therefore, we describe another map operator that will call the getUsersRepsFromAPI method. In the observer, we describe the processing logic of our promise:

    /* Код учебный! В рабочих проектах с RxJS лучше избегать использования promise, вместо него стоит использовать потоки */
    fromEvent(input, 'keyup').pipe(
      debounceTime(700),
      map(event => event.target.value),
      filter(val => val.length > 2),
      distinctUntilChanged(),
      map(value => getUsersRepsFromAPI(value))
    ).subscribe({
      next: promise => promise.then(reps => recordRepsToList(reps))
    });
    

    At the moment, we have implemented everything we wanted. But our example has one big drawback: there is no error handling. Our observer receives only a promise and has no idea that something could go wrong.

    Of course, we can hang catch on promise in the next method, but because of this, our code will begin to more and more resemble a “callback hell”. If suddenly we need to execute one more request, then the complexity of the code will increase.

    Note:Using promise in RxJS code is considered antipattern. Promise has many disadvantages compared to observable. It cannot be undone, and cannot be reused. If you have a choice, then choose observable. The same goes for the toPromise method of the Observable class. This method was implemented for compatibility with libraries that cannot work with streams.

    We can use the from method to project a promise onto a stream, but this method is fraught with additional calls to the subscribe method, and will also lead to the growth and complexity of the code.

    This problem can be solved using the mergeMap operator:

    fromEvent(input, 'keyup').pipe(
      debounceTime(700),
      map(event => event.target.value),
      filter(val => val.length > 2),
      distinctUntilChanged(),
      mergeMap(value => from(getUsersRepsFromAPI(value)))
    ).subscribe({
      next: reps => recordRepsToList(reps),
      error: console.log
    })
    

    Now we do not need to write promise processing logic. The from method made a promise stream, and the mergeMap operator processed it. If the promise is fulfilled successfully, then the next method is called, and our observer will receive the finished object. If an error occurs, the error method will be called, and our observer will output an error in console.

    The mergeMap operator is slightly different from the operators we worked with earlier; it belongs to the so-called Higher Order Observables , which I will discuss in the next article. But, looking ahead, I will say that the mergeMap method itself subscribes to the stream.

    Error processing


    If our thread receives an error, then it will terminate. And if we try to interact with the application after an error, then we will not get any reaction, since our thread has completed.

    Here the catchError operator will help us. catchError is raised only when an error occurs in the stream. It allows you to intercept it, process it and return to the stream the usual value, which will not lead to its completion.

    fromEvent(input, 'keyup').pipe(
      debounceTime(700),
      map(event => event.target.value),
      filter(val => val.length > 2),
      distinctUntilChanged(),
      mergeMap(value => from(getUsersRepsFromAPI(value))),
      catchError(err => of([])
    ).subscribe({
      next: reps => recordRepsToList(reps),
      error: console.log
    })
    

    We catch the error in catchError and instead return a stream with an empty array. Now, when an error occurs, we will clear the list of repositories. But then the flow ends again.

    The thing is that catchError replaces our original stream with a new one. And then our observer listens only to him. When the of stream emits an empty array, the complete method will be called.

    In order not to replace our original thread, we call the catchError operator on the from thread from inside the mergeMap operator.

    fromEvent(input, 'keyup').pipe(
      debounceTime(700),
      map(event => event.target.value),
      filter(val => val.length > 2),
      distinctUntilChanged(),
      mergeMap(value => {
        return from(getUsersRepsFromAPI(value)).pipe(
          catchError(err => of([])
        )
      })
    ).subscribe({
      next: reps => recordRepsToList(reps),
      error: console.log
    })
    

    Thus, our original stream will not notice anything. Instead of an error, it will get an empty array.

    Conclusion


    We finally started to practice and saw what pipe and operators are for. We looked at how to reduce code using the rich API that RxJS provides us with. Of course, our application is not finished, in the next part we will analyze how you can process another in one thread and how to cancel our http request in order to save even more traffic and resources of our application. And so that you can see the difference, I laid out an example without using RxJS, you can see it here . At this link you will find the full code of the current application. To generate the circuits, I used the RxJS visualizer .

    I hope this article has helped you better understand how RxJS works. I wish you success in your study!

    Also popular now: