Grokai RxJava, part two: Operators

Original author: Dan Lew
  • Transfer
In the first part, we examined the main building blocks of RxJava, and also met with the operator map(). I can understand those of you who still do not feel the urge to drop everything and start using this framework, as for now we have, conditionally speaking, considered only the tip of the iceberg. But soon everything will change - most of the power of RxJava is hidden in its operators, and I just prepared for you an example by which you can study some of them.

Task


Suppose I have a method like this:

// Возвращает список url'ов, основываясь на поиске по содержимому веб-страницы
Observable> query(String text); 

I want to write a system for searching and displaying text. Based on what we learned in the previous lesson, we can write something like this:

query("Hello, world!")
    .subscribe(urls -> {
        for (String url : urls) {
            System.out.println(url);
        }
    });

This solution in no way satisfies us because we are losing the ability to transform the data stream. If we have a desire to modify each url, we will have to do all this in Subscriber, thus leaving all our tricks out of map()work.
It would be possible to write map()that would work with one list of urls and display a list of changed urls, but in this case each call map()would contain for-each. Also not very beautiful.

A glimpse of hope


We apply a method Observable.from()that takes a collection and “emits” one element of this collection after another:

Observable.from("url1", "url2", "url3")
    .subscribe(url -> System.out.println(url));

It looks like what we need, let's try to use it in our task:

query("Hello, world!")
    .subscribe(urls -> {
        Observable.from(urls)
            .subscribe(url -> System.out.println(url));
    });

We got rid of the cycle, but what happened in the end looks like a complete mess : instead of the cycle, we got subscriptions embedded in each other. And it’s not only bad that the code looks confusing and therefore it will most likely be difficult to modify; it conflicts with some features of RxJava that I haven't mentioned yet 1 . Hmm.

Is there a better way


Hold your his breath at a rescuer: flatMap().
Observable.flatMap()receives data radiated by one input Observableand returns data radiated by the other Observable, thus replacing one Observablewith the other. An unexpected turn of events, so to speak: you thought that you were getting one data stream, but you were actually getting another. Here's how to flatMap()help us solve our problem:

query("Hello, world!")
    .flatMap(new Func1, Observable>() {
        @Override
        public Observable call(List urls) {
            return Observable.from(urls);
        }
    })
    .subscribe(url -> System.out.println(url));

I showed the full version in order to make it easier for you to understand what is happening, but if you rewrite the code with lambdas, it starts to look just fine:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .subscribe(url -> System.out.println(url));

Pretty strange thing, if you think about it. Why flatMap()returns anotherObservable ? The key point here is that the new Observableis what ours will eventually see Subscriber. He will not receive , he will receive a stream of individual class objects as he would receive from . By the way, this moment seemed to me the most difficult, but as soon as I understood and realized it, most of how RxJava works, fell into place in my head.ListStringObservable.from()


And you can do something cooler


I stress once again, because it is important: flatMap()could restore us allObservable what you just want.
For example, I have a second method:

// Возвращает заголовок вебсайта, или null, если мы получили ошибку 404
Observable getTitle(String URL);

Instead of printing urls, I now want to print the title of every site I’ve been able to reach. There are problems: my method accepts only one url, and it does not return String, it returns Observable, which returns String.
You can solve both of these problems with flatMap(); First, we move from the list of urls to the stream of individual urls, and then use it getTitle()inside flatMap()before passing the final result to Subscriber:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(new Func1>() {
        @Override
        public Observable call(String url) {
            return getTitle(url);
        }
    })
    .subscribe(title -> System.out.println(title));

And once again we simplify everything with the help of lambdas:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .subscribe(title -> System.out.println(title));

Great, huh? We combine together several independent methods that return to us Observables.
Notice how I combined the two API calls together in a single chain. The same can be done for any number of calls to the API. You probably know how difficult it is sometimes to coordinate the work of several API calls in order to get some result we need: we made a request, we got the result in the callback function, we made a new request from the inside ... Brr. And here we took and bypassed all this hell, putting all the same logic into one short reactive call 2 .

Abundance of operators


So far we have looked only at two operators, but there are actually much more of them in RxJava. How else can we improve our code?
getTitle()returns nullif we got an error 404. We do not want to display on the screen "null", and we can filter out the values ​​that are not needed for us:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)
    .subscribe(title -> System.out.println(title));

filter()“Emits” the same element of the data stream that it received, but only if it passes the test.
And now we want to show only 5 results, not more:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)
    .take(5)
    .subscribe(title -> System.out.println(title));

take()returns no more than the specified number of elements (if in our case it turns out less than 5 elements, take()it simply will finish its work earlier.
You know, but let's also save each header we received to disk:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)
    .take(5)
    .doOnNext(title -> saveTitle(title))
    .subscribe(title -> System.out.println(title));

doOnNext()allows us to add some additional action that happens every time we get a new data item, in this case, the action will be to save the header.
Take a look at how easy it is for us to manipulate the flow of data. You can add more and more new ingredients to your recipe, and not end up with an indigestible burda.
RxJava comes with a wagon and a small trolley of diverse operators. Such a huge list may frighten, but it is worth looking at least to have an idea of ​​what is available. It will take you some time to remember the operators available to you, but as soon as you do this, you will gain true strength at your fingertips.
Oh, by the way, you can also write your own operators! This topic is beyond the scope of this series of articles, but, let's put it this way: if you come up with your own operator, you will almost certainly be able to implement it 3 .

So, what is next?


Well, I understand, you are a skeptic and it did not work out to convince you again. Why would you even bother with all these operators?

Idea # 3: Operators Let You Do Anything With a Data Stream


The only limitation is in yourself.
You can write quite complex data manipulation logic without using anything but chains of simple operators. This is functional reactive programming. The more often you use it, the more you change your mind about how the program code should look.
Also, think about how easy it was to present our data to the end user after we transformed it. At the end of our example, we made two requests to the API, processed the data, and at the same time saved them to disk. But our final Subscriberhas no idea about it, he still works with the ordinary . Encapsulation makes code simpler!Observable
In the third part, we will go through other cool features of RxJava, which are associated with data manipulation to a lesser extent: error handling and concurrency.

Go to the third part.


1 So, for example, error handling, multithreading and unsubscribing in RxJava are combined with this code a little less than nothing. I will cover these topics in the third part.
2 And here you may have thought about the other side of callback hell: error handling. I will consider this in the third part.
3 If you want to write your own operators, you should look here . Some details of their implementation, however, will be quite difficult to understand before reading the third part of the article.

Also popular now: