How I replaced RxJava with Corutin in my project and why you probably should also do it

Hi, Habr! I present to you the translation of the article by Paulo Sato on the topic of using Kotlin Coroutines instead of RxJava in my Android projects.

RxJava as a bazooka, most applications do not use half of its firepower. The article will discuss how to replace it with korutin (coroutine) Kotlin.

I worked with RxJava for several years. This is definitely one of the best libraries for any Android project, which is still in good shape, especially if you are programming in Java. If you use Kotlin, then we can say that the city has a new sheriff.

Most use RxJava only to control threads and to prevent callback hell (if you don’t know what it is, consider yourself lucky and here’s why). The fact is that we must bear in mind that the real power of RxJava is reactive programming and backpressure. If you use it to control asynchronous requests, you use a bazooka to kill the spider. She will do her job, but this is overkill.

One notable drawback of RxJava is the number of methods. It is huge and tends to crawl all over the code. In Kotlin, you can use Korutin to implement most of the behavior you previously created using RxJava.

But ... what are korutins?

Korutin is a way to handle competitive tasks in a stream. The thread will continue to work until it is stopped and the context will change for each quortine without creating a new stream.
The korutins in Kotlin are still experimental, but they are included in Kotlin 1.3, so I wrote below a new class UseCase (for clean architecture) that uses them. In this example, a call for corutin is encapsulated in a single file. Thus, the other layers will not be affected by the coroutines being executed, providing a more disconnected architecture.

/**
 * (C) Copyright 2018 Paulo Vitor Sato Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */package com.psato.devcamp.interactor.usecase
import android.util.Log
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.android.UI
import kotlin.coroutines.experimental.CoroutineContext
/**
 * Abstract class for a Use Case (Interactor in terms of Clean Architecture).
 * This interface represents a execution unit for different use cases (this means any use case
 * in the application should implement this contract).
 * <p>
 * By convention each UseCase implementation will return the result using a coroutine
 * that will execute its job in a background thread and will post the result in the UI thread.
 */abstractclassUseCase<T> {
    protectedvar parentJob: Job = Job()
    //var backgroundContext: CoroutineContext = IOvar backgroundContext: CoroutineContext = CommonPool
    var foregroundContext: CoroutineContext = UI
    protectedabstractsuspendfunexecuteOnBackground(): T
    funexecute(onComplete: (T) -> Unit, onError: (Throwable) -> Unit) {
        parentJob.cancel()
        parentJob = Job()
        launch(foregroundContext, parent = parentJob) {
            try {
                val result = withContext(backgroundContext) {
                    executeOnBackground()
                }
                onComplete.invoke(result)
            } catch (e: CancellationException) {
                Log.d("UseCase", "canceled by user")
            } catch (e: Exception) {
                onError(e)
            }
        }
    }
    protectedsuspendfun<X>background(context: CoroutineContext = backgroundContext, block: suspend () -> X): Deferred<X> {
        return async(context, parent = parentJob) {
            block.invoke()
        }
    }
    fununsubscribe() {
        parentJob.cancel()
    }
}

First of all, I created a parent task. This is the key to undo all korutinov that were created in the class UseCase. When we call execution, it is important that the old tasks are canceled, to be sure that we have not missed a single coroutine (this will also happen if we unsubscribe from the given UseCase).

Also, I trigger the launch (UI). This means that I want to create a corutin that will be executed in the UI thread. After that, I call the background method that creates async in CommonPool (this approach, in fact, will have poor performance). In turn, async will return Deffered, and then, I will call its wait method. He is waiting for the completion of the background quorutine, which will bring a result or error.

This can be used to implement most of what we did with RxJava. Below are some examples.

Map


I downloaded the searchShow results and changed them to return the name of the first show.
RxJava code:
publicclassSearchShowsextendsUseCase{
    private ShowRepository showRepository;
    private ResourceRepository resourceRepository;
    private String query;
    @InjectpublicSearchShows(ShowRepository showRepository, ResourceRepository resourceRepository){
        this.showRepository = showRepository;
        this.resourceRepository = resourceRepository;
    }
    publicvoidsetQuery(String query){
        this.query = query;
    }
    @Overrideprotected Single<String> buildUseCaseObservable(){
        return showRepository.searchShow(query).map(showInfos -> {
            if (showInfos != null && !showInfos.isEmpty()
                    && showInfos.get(0).getShow() != null) {
                return showInfos.get(0).getShow().getTitle();
            } else {
                return resourceRepository.getNotFoundShow();
            }
        });
    }
}

Code on korutinah:

classSearchShows@Injectconstructor(privateval showRepository: ShowRepository, privateval resourceRepository: ResourceRepository) :
        UseCase<String>() {
    var query: String? = nulloverridesuspendfunexecuteOnBackground(): String {
        query?.let {
            val showsInfo = showRepository.searchShow(it)
            val showName: String? = showsInfo?.getOrNull(0)?.show?.title
            return showName ?: resourceRepository.notFoundShow
        }
        return""
    }
}

ZIP


Zip will take two issues from the Observer and put them together in a new issue. Note that with RxJava you must specify to make the call in parallel using subscribeOn in each Single. We want to get both at the same time and return them together.

RxJava code:

publicclassShowDetailextendsUseCase{
    private ShowRepository showRepository;
    private String id;
    @InjectpublicSearchShows(ShowRepository showRepository){
        this.showRepository = showRepository;
    }
    publicvoidsetId(String id){
        this.id = id;
    }
    @Overrideprotected Single<Show> buildUseCaseObservable(){
      Single<ShowDetail> singleDetail = showRepository.showDetail(id).subscribeOn(Schedulers.io());
      Single<ShowBanner> singleBanner = showRepository.showBanner(id).subscribeOn(Schedulers.io());
      return Single.zip(singleDetail, singleBanner, (detail, banner) -> new Show(detail,banner));
}

Code on korutinah:

classSearchShows@Injectconstructor(privateval showRepository: ShowRepository, privateval resourceRepository: ResourceRepository) :
        UseCase<Show>() {
    var id: String? = nulloverridesuspendfunexecuteOnBackground(): Show {
        id?.let {
            val showDetail = background{
                    showRepository.showDetail(it)
            }
            val showBanner = background{
                    showRepository.showBanner(it)
            }
            return Show(showDetail.await(), showBanner.await())
        }
        return Show()
    }
}

Flatmap


In this case, I am looking for shows that have a query string and for each result (limited to 200 results), I also get the rating of the show. At the end, I return the list of shows with the corresponding ratings.

RxJava code:

publicclassSearchShowsextendsUseCase{
    private ShowRepository showRepository;
    private String query;
    @InjectpublicSearchShows(ShowRepository showRepository){
        this.showRepository = showRepository;
    }
    publicvoidsetQuery(String query){
        this.query = query;
    }
    @Overrideprotected Single<List<ShowResponse>> buildUseCaseObservable() {
        return showRepository.searchShow(query).flatMapPublisher(
                (Function<List<ShowInfo>, Flowable<ShowInfo>>) Flowable::fromIterable)
                .flatMapSingle((Function<ShowInfo, SingleSource<ShowResponse>>)
                                showInfo -> showRepository.showRating(showInfo.getShow().getIds().getTrakt())
                                        .map(rating -> new ShowResponse(showInfo.getShow().getTitle(), rating
                                                .getRating())).subscribeOn(Schedulers.io()),
                        false, 4).toList();
    }
}

Code on korutinah:

classSearchShows@Injectconstructor(privateval showRepository: ShowRepository) :
        UseCase<List<ShowResponse>>() {
    var query: String? = nulloverridesuspendfunexecuteOnBackground(): List<ShowResponse> {
        query?.let { query ->
            return showRepository.searchShow(query).map {
                background {
                    val rating: Rating = showRepository.showRating(it.show!!.ids!!.trakt!!)
                    ShowResponse(it.show.title!!, rating.rating)
                }
            }.map {
                it.await()
            }
        }
        return arrayListOf()
    }
}

Let me explain. Using RxJava, my repository returns single List emissions, so I need several emissions, one for each ShowInfo. To do this, I called flatMapPublisher. For each issue, I have to highlight ShowResponse, and at the end collect all of them in the list.

We end up with this construct: List foreach → (ShowInfo → ShowRating → ShowResponse) → List.

With Corutin, I made a map for each List element to convert it to List <Deffered>.

As you can see, most of what we did with RxJava is easier to implement with synchronous calls. Korutin can even handle flatMap, which I believe are some of the most complex functions in RxJava.

It is well known that cortinae can be lightweight ( hereexample), but the results puzzled me. In this example, RxJava ran about 3.1 seconds, while the cortinas took about 5.8 seconds to run on CommonPool.

These results put before me the question that there could be something inappropriate in them. Later, I found it. I used retrofit Call, which blocked the stream.

There are two ways to fix this error; the choice depends on which version of Android Studio you are using. In the version of Android Studio 3.1, we need to make sure that we are not blocking the background thread. For this, I used this library:
implementation 'ru.gildor.coroutines: kotlin-coroutines-retrofit: 0.12.0'

This code creates an extension to the function of retrofit Call to suspend the flow:

publicsuspendfun<T : Any> Call<T>.await(): T {
    return suspendCancellableCoroutine { continuation ->
        enqueue(object : Callback<T> {
            overridefunonResponse(call: Call<T>?, response: Response<T?>) {
                if (response.isSuccessful) {
                    val body = response.body()
                    if (body == null) {
                        continuation.resumeWithException(
                            NullPointerException("Response body is null: $response")
                        )
                    } else {
                        continuation.resume(body)
                    }
                } else {
                    continuation.resumeWithException(HttpException(response))
                }
            }
            overridefunonFailure(call: Call<T>, t: Throwable) {
                // Don't bother with resuming the continuation if it is already cancelled.if (continuation.isCancelled) return
                continuation.resumeWithException(t)
            }
        })
        registerOnCompletion(continuation)
    }
}

In Android Studio 3.2, you can update the corutin library to version 0.25.0. This version has CoroutineContext IO (you can see the corresponding comment in my class UseCase).

Running on CommonPool without a blocking call took 2.3 seconds and 2.4 seconds with IO and blocking calls.

image

I hope this article will inspire you to use Corutin, a more lightweight and, perhaps, faster alternative to RxJava and make it a little easier to understand that you are writing synchronized code that is executed asynchronously.

Also popular now: