Android background tutorial. Part 4: RxJava
- Transfer

Event handling is a loop.
In the last part, we talked about using thread pool executors for background work in Android. The problem with this approach is that the event dispatcher knows how the result should be processed. Now let's see what RxJava offers.
Disclaimer: This is not an article on how to use RxJava in Android. Such texts on the Internet and so a breakthrough. This one is about the details of the library implementation.
Generally speaking, RxJava is not even a tool specifically for working in the background, it is a tool for processing event flows.
Background work is just one aspect of this processing. The general idea behind the approach is to use a Scheduler. Let's look directly at the code for this class:
public abstract class Scheduler {
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) { ... }
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { ... }
@NonNull
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) { ... } {
@NonNull
public S when(@NonNull Function>, Completable> combine) { ... }
}
Pretty complicated, right? The good news is that you don’t have to implement it yourself! The library already includes a number of such planners: Schedulers.io (), Schedulers.computation (), and so on. All that is required of you is to pass the scheduler instance to the subscribeOn () / observeOn () method of your Rx chain:
apiClient.login(auth)
// some code omitted
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
Then RxJava will do the rest for you: take the lambdas that you pass into the statements and execute them on the desired scheduler.
For example, if you want your observers to change the user interface, all you have to do is pass AndroidSchedulers.mainThread () to observeOn (). And the thing is in the hat: no more excessive connectivity, no platform-specific code, one happiness. Of course, AndroidSchedulers is not included in the original RxJava library, but is connected as a separate one, but this is just another line in your build.gradle.
And what's the deal with threads? The trick is that you can’t just place subscribeOn () / observeOn () anywhere in your rxChain (which would be convenient, right?) Instead, you have to consider how these operators get their schedulers. To begin with, let's understand that each time you call map, or flatMap, or filter, or something else, you get a new object.
For instance:
private fun attemptLoginRx() {
showProgress(true)
apiClient.login(auth)
.flatMap {
user -> apiClient.getRepositories(user.repos_url, auth)
}
.map {
list -> list.map { it.full_name }
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doFinally { showProgress(false) }
.subscribe(
{ list -> showRepositories(this, list) },
{ error -> Log.e("TAG", "Failed to show repos", error) }
)
}
So here almost every line creates a new object:
// new SingleFlatMap()
val flatMap = apiClient.login(auth)
.flatMap { apiClient.getRepositories(it.repos_url, auth) }
// new SingleMap
val map = flatMap
.map { list -> list.map { it.full_name } }
// new SingleSubscribeOn
val subscribeOn = map
.subscribeOn(Schedulers.io())
// new SingleObserveOn
val observeOn = subscribeOn
.observeOn(AndroidSchedulers.mainThread())
// new SingleDoFinally
val doFinally = observeOn
.doFinally { showProgress(false) }
// new ConsumerSingleObserver
val subscribe = doFinally
.subscribe(
{ list -> showRepositories(this@LoginActivity, list) },
{ error -> Log.e("TAG", "Failed to show repos", error) }
)
}
And, for example, SingleMap will receive its scheduler through a chain of calls starting with a call to .subscribe () at the end of our chain:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(SingleObserver subscriber) {
ObjectHelper.requireNonNull(subscriber, "subscriber is null");
subscriber = RxJavaPlugins.onSubscribe(this, subscriber);
ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");
try {
subscribeActual(subscriber);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
subsribeActual is implemented for each Single-operator like this:
source.subscribe()
where source is the operator preceding the current one, so a chain is created with which we work and through which we reach the first created Single. In our case, this is Single.fromCallable:
override fun login(auth: Authorization): Single = Single.fromCallable {
val response = get("https://api.github.com/user", auth = auth)
if (response.statusCode != 200) {
throw RuntimeException("Incorrect login or password")
}
val jsonObject = response.jsonObject
with(jsonObject) {
return@with GithubUser(getString("login"), getInt("id"),
getString("repos_url"), getString("name"))
}
}
Inside this lambda we make our network calls.
But where is our scheduler? Here, inside SingleSubsribeOn:
@Override
protected void subscribeActual(final SingleObserver s) {
final SubscribeOnObserver parent = new SubscribeOnObserver(s, source);
s.onSubscribe(parent);
Disposable f = scheduler.scheduleDirect(parent);
parent.task.replace(f);
}
In this case, the scheduler is the one we passed to the subsribeOn () method.
All this code shows how the scheduler that we passed into the chain is used by the code that we passed into the operator lambdas.
Also pay attention to the observeOn () method. It creates an instance of the class (in our case, SingleObserveOn), and its subscribeActial for us already looks trivial:
@Override
protected void subscribeActual(final SingleObserver s) {
source.subscribe(new ObserveOnSingleObserver(s, scheduler));
}
But ObserveOnSingleObserver is much more interesting here:
ObserveOnSingleObserver(SingleObserver actual, Scheduler scheduler) {
this.actual = actual;
this.scheduler = scheduler;
}
@Override
public void onSuccess(T value) {
this.value = value;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
When obserOn is called in the scheduler thread, an observer is called, which, in turn, opens the possibility of switching threads directly to rxChain: you can get data from the server to Schedulers.io (), then perform resource-intensive calculations in Schedulers.computation (), update the UI , count something else, and then just go to the code in subscribe.
RxJava is a rather complex “under the hood”, a very flexible and powerful tool for handling events (and, as a result, managing background work). But, in my opinion, this approach has its drawbacks:
- RxJava takes a lot of time to learn
- The number of operators that need to be learned is large, and the difference between them is not obvious
- Stack traces of calls for RxJava have almost no relation to the code that you yourself write
What's next? Of course, the Kotlin corutins!
Previous articles in the series:
From the author: Mobius conference will be held tomorrow and the day after tomorrow , where I will talk about the coroutines in Kotlin. If a series of articles is interesting and you want to continue, it's not too late to decide on her visit!