Basics of reactive programming using RxJS

Part 1. Reactivity and Streams


This series of articles focuses on reactivity and its use in JS using such a wonderful library as RxJS.

For whom this article : basically, here I will explain the basics, therefore, in the first place, the article is intended for beginners in this technology. At the same time, I hope that experienced developers will be able to learn something new for themselves. For understanding, knowledge of js (es5 / es6) is required.

Motivation: I first encountered RxJS when I started working with angular. It was then that I had difficulty understanding the mechanism of reactivity. The fact that at the beginning of my work most of the articles were devoted to the old version of the library added complexity. I had to read a lot of documentation, various manuals in order to at least understand something. And only some time later I began to realize how “everything is arranged.” To simplify the lives of others, I decided to sort it out.

What is reactivity?


It is difficult to find an answer to the seemingly so common term. In short: reactivity is the ability to react to any changes. But what changes are we talking about? First of all, about data changes. Consider an example:

let a = 2;
let b = 3;
let sum = a + b;
console.log(sum); // 5
a = 3;
console.log(sum); // 5 - данные нужно пересчитать

This example demonstrates the usual imperative programming paradigm. Unlike the imperative approach, the reactive approach is built on a push strategy for propagating changes. Push strategy implies that in the event of a change in data, these changes will be “pushed”, and the data dependent on them will be automatically updated. This is how our example would behave if the push strategy were applied:

let a = 2;
let b = 3;
let sum = a + b;
console.log(sum); // 5
a = 3;
console.log(sum); // 6 - значение переменной sum автоматически пересчиталось

This example shows a reactive approach. It should be noted that this example has nothing to do with reality, I cited it only in order to show the difference in approaches. Reactive code in real applications will look very different, and before proceeding to practice, we should talk about another important component of reactivity.

Data stream


If we search the term “reactive programming” in Wikipedia, the site will give us the following definition: “Reactive programming is a programming paradigm focused on data flows and propagation of changes”. From this definition, we can conclude that reactivity is based on two main “whales”. I mentioned the distribution of changes above, so we will not dwell on this further. But about the data streams should talk more. Let's look at the following example:

const input = document.querySelector('input'); // получаем ссылку на элемент
const eventsArray = [];
input.addEventListener('keyup',
   event => eventsArray.push(event)
); // пушим каждое событие в массив eventsArray

We listen to the keyup event and put the event object in our array. Over time, our array can contain thousands of KeyboardEvent objects. It should be noted that our array is sorted by time - the index of later events is larger than the index of earlier ones. Such an array is a simplified data flow model. Why simplified? Because the array can only store data. We can also iterate an array and somehow process its elements. But the array cannot tell us that a new element has been added to it. In order to find out whether new data has been added to the array, we will have to iterate it again.

But what if our array could tell us that it received new data? Such an array could surely be called a stream. So, we come to the definition of flow. A stream is an array of data sorted by time, which can indicate that the data has changed.

Observable


Now that we know what streams are, let's work with them. In RxJS, streams are represented by the Observable class. To create your own stream, simply call the constructor of this class and pass the subscription function as an argument to it:

const observable = new Observable(observer => {
  observer.next(1);
  observer.next(2);
  observer.complete();
})

Through a call to the Observable class constructor, we create a new thread. As an argument to the constructor, we passed the subscription function. A subscription function is a normal function that takes an observer as a parameter. The observer himself is an object that has three methods:

  • next - throws a new value into the stream
  • error - throws an error into the stream, after which the thread ends
  • complete - terminates the stream

Thus, we have created a stream that emits two values ​​and ends.

Subscription


If we run the previous code, nothing will happen. We will only create a new stream and save a reference to it in the observable variable, but the stream itself will never emit any values. This is because threads are “lazy” objects and do not do anything in themselves. In order for our stream to start emitting values ​​and we could process these values, we need to start “listening” to the stream. This can be done by calling the subscribe method on the observable object.

const observer = {
  next: value => console.log(value), // 1, 2
  error: error => console.error(error), //
  complete: () => console.log("completed") // completed
};
observable.subscribe(observer);

We defined our observer and described three methods for him: next, error, complete. Methods simply log the data that is passed as parameters. Then we call the subscribe method and pass our observer to it. At the time of the call to subscribe, the subscription function is called, the same one that we passed to the constructor at the stage of declaring our stream. Further, the code of the subscription function will be executed, which sends two values ​​to our observer, and then terminates the stream.

Surely, many have a question, what will happen if we subscribe to the stream again? Everything will be the same: the stream will again transmit two values ​​to the observer and end. Each time the subscribe method is called, the subscription function will be called, and all its code will be executed anew. From this we can conclude: no matter how many times we subscribe to a stream, our observers will receive the same data.

Unsubscribe


Now we will try to implement a more complex example. We will write a timer that will count down the seconds from the moment of subscription, and transfer them to observers.

const timer = new Observable(observer => {
  let counter = 0; //объявляем счетчик
  setInterval(() => {
    observer.next(counter++); // передаем значение счетчика наблюдателю и увеличиваем его на единицу
  }, 1000);
});
timer.subscribe({
  next: console.log //просто логируем каждое значение
});

The code is pretty simple. Inside the subscription function, we declare a variable counter. Then, using the closure, we get access to the variable from the arrow function in setInterval. And every second we pass the variable to the observer, after which we increment it. Further we subscribe for a stream, we specify only one method - next. Do not worry that other methods we have not announced. None of the observer methods is required. We can even pass an empty object, but in this case the thread will be wasted.

After start we will see treasured logs which will appear every second. If you want, you can experiment and subscribe to the stream several times. You will see that each of the threads will run independently of the others.

If you think about it, then our thread will run for the entire life of the application, because we don’t have any cancellation logic for setInterval, and in the function-subscription there is no call to the complete method. But what if we need the thread to end?

In fact, everything is very simple. If you look in the documentation, you can see that the subscribe method returns a subscription object. This object has an unsubscribe method. Call it, and our observer will stop receiving values ​​from the stream.

const subscription = timer.subscribe({next: console.log});
setTimeout(() => subscription.unsubscribe(), 5000); //поток завершиться через 5 секунд

After the launch, we will see that the counter stops at the number 4. But, although we have unsubscribed from the stream, our setInterval function continues to work. It increments our counter every second and sends it to the dummy observer. To prevent this from happening, you need to write an interval cancellation logic. To do this, return a new function from the subscription function, in which the cancellation logic will be implemented.

const timer = new Observable(observer => {
  let counter = 0;
  const intervalId = setInterval(() => {
    observer.next(counter++);
  }, 1000);
  return () => {
    clearInterval(intervalId);
  }
});

Now we can breathe a sigh of relief. After calling the unsubscribe method, our unsubscribe function will be called, which will clear the interval.

Conclusion


This article shows the differences between the imperative approach and the reactive approach, and also gives examples of creating your own streams. In the next part, I will discuss what other methods for creating threads exist and how to use them.

Also popular now: