A Modern Approach to Competition in Android: Corotins at Kotlin

Original author: Geoffrey Métais
  • Transfer
Hello, Habr!

We remind you that we already have a pre-order for the long-awaited book on the Kotlin language from the famous Big Nerd Ranch Guides series. Today we decided to bring to your attention a translation of an article telling about Kotlin coroutines and about the correct work with streams in Android. The topic is being discussed very actively, therefore, for completeness, we also recommend that you look at this article from Habr and this detailed post from the Axmor Software blog.

The modern competitive framework in Java / Android inflicts hell on callbacks and leads to blocking states, since Android does not have a fairly simple way to guarantee thread safety.

Kotlin coroutines are a very effective and complete toolkit that makes managing competition much easier and more productive.

Suspension and blocking: what is the difference

Coroutines do not replace threads, but rather provide a framework for managing them. The philosophy of corutin is to define a context that allows you to wait for background operations to complete without blocking the main thread.

Corutin's goal in this case is to dispense with callbacks and simplify competition.

The simplest example

To begin with, let's take the simplest example: run coroutine in context Main(main thread). In it, we will extract the image from the stream IOand send this image for processing back to Main.

launch(Dispatchers.Main) {
    val image = withContext(Dispatchers.IO) { getImage() } // получить из контекста IO
    imageView.setImageBitmap(image) // Возвращаемся в главный поток
}

The code is simple as a single-threaded function. Moreover, while it is getImagerunning in a dedicated thread pool IO, the main thread is free and can take on any other task! The withContext function pauses the current coroutine while its action ( getImage()) is running . As soon as it getImage()returns and looperfrom the main thread becomes available, coroutine will resume work in the main thread and call imageView.setImageBitmap(image).

The second example: now we need 2 background tasks to be completed so that they can be used. We will use the async / await duet so that these two tasks are performed in parallel, and use their result in the main thread as soon as both tasks are ready:

val job = launch(Dispatchers.Main) {
    val deferred1 = async(Dispatchers.Default) { getFirstValue() }
    val deferred2 = async(Dispatchers.IO) { getSecondValue() }
    useValues(deferred1.await(), deferred2.await())
}
job.join() // приостанавливает выполнение актуальной корутины, пока задача не будет выполнена

asyncsimilar launch, but returns deferred(Kotlin entity equivalent Future), so its result can be obtained using await(). When called without parameters, it works in the default context for the current scope.

Again, the main thread remains free while we wait for our 2 values.
As you can see, the function launchreturns Job, which can be used to wait until the operation is completed - this is done using the function join(). It works like in any other language, with the caveat that it simply suspends the coroutine, and does not block the flow .

Dispatching

Dispatching is a key concept when working with coroutines. This action allows you to "jump" from one thread to another.

Consider how java looks like the equivalent for dispatching in Main, that is,

runOnUiThread:
public final void runOnUiThread(Runnable action) {
    if (Thread.currentThread() != mUiThread) {
        mHandler.post(action); // Диспетчеризация
    } else {
        action.run(); // Немедленное выполнение
    }
}

The context implementation Mainfor Android is a dispatcher based Handler. So this is indeed a very suitable implementation:

launch(Dispatchers.Main) { ... }
        vs
launch(Dispatchers.Main, CoroutineStart.UNDISPATCHED) { ... }
// Начиная с kotlinx 0.26:
launch(Dispatchers.Main.immediate) { ... }

launch(Dispatchers.Main)sends Runnablein Handler, so its code is not executed immediately.

launch(Dispatchers.Main, CoroutineStart.UNDISPATCHED)will immediately execute its lambda expression in the current thread.

Dispatchers.Mainensures that when coroutine resumes work, it will be directed to the main thread ; in addition, Handler is used here as a native Android implementation for sending to the application event loop.

The exact implementation looks like this:

val Main: HandlerDispatcher = HandlerContext(mainHandler, "Main")

Here is a good article to help you understand the intricacies of dispatching in Android:
Understanding Android Core: Looper, Handler, and HandlerThread .

Coroutine context A coroutine

context (also known as a coroutine manager) determines in which thread its code will be executed, what to do if an exception is thrown, and refers to the parent context to propagate the cancellation.

val job = Job()
val exceptionHandler = CoroutineExceptionHandler {
    coroutineContext, throwable -> whatever(throwable)
}
launch(Disaptchers.Default+exceptionHandler+job) { ... }

job.cancel()cancels all coroutines whose parent is job. A exceptionHandler will receive all exceptions thrown in these coroutines.

Scope The

interface coroutineScopesimplifies error handling:
If any of its child coroutines fails, then the entire scope and all child coroutines also fail.

In the example async, if it was not possible to extract the value, while another task continued to work, we have a damaged state, and we need to do something about it.

When working with a coroutineScopefunction, it useValueswill be called only if the extraction of both values ​​was successful. Also, if deferred2refused, deferred1will be canceled.

coroutineScope { 
    val deferred1 = async(Dispatchers.Default) { getFirstValue() }
    val deferred2 = async(Dispatchers.IO) { getSecondValue() }
    useValues(deferred1.await(), deferred2.await())
}

You can also “put in scope” an entire class to set a CoroutineContextdefault context for it and use it.

An example of a class that implements an interface CoroutineScope:

open class ScopedViewModel : ViewModel(), CoroutineScope {
    protected val job = Job()
    override val coroutineContext = Dispatchers.Main+job
    override fun onCleared() {
        super.onCleared()
        job.cancel()
    }
}

Launching Corutin in CoroutineScope:

The dispatcher launchor async, by default, now becomes the dispatcher of the current scope.

launch {
    val foo = withContext(Dispatchers.IO) { … }
    // лямбда-выражение выполняется в контексте CoroutineContext области видимости 
    …
}
launch(Dispatchers.Default) {
    // лямбда-выражение выполняется в задаваемом по умолчанию пуле потоков
    …
}

Autonomous launch of coroutine (outside of any CoroutineScope):

GlobalScope.launch(Dispatchers.Main) {
    // лямбда-выражение выполняется в главном потоке.
    …
}

You can even define the scope for an application by setting a Maindefault dispatcher :

object AppScope : CoroutineScope by GlobalScope {
    override val coroutineContext = Dispatchers.Main.immediate
}

Remarks

  • Coroutines limit interoperability with Java
  • Limit mutability to avoid locks
  • Coroutines are designed to wait, not to organize threads
  • Avoid I / O in Dispatchers.Default(and Main...) - this is what Dispatchers.IO is for
  • Streams are resource consuming, so single-threaded contexts are used
  • Dispatchers.Defaultbased on ForkJoinPoolthat appeared in Android 5+
  • Coroutines can be used through channels

We get rid of locks and callbacks using channels. Channel

definition from JetBrains documentation:

The channel is Channelconceptually very similar to BlockingQueue. The key difference is that it does not block the put operation, it provides for suspend send(or non- offerblock), and instead of blocking the take operation, it provides for suspend receive.


Actors

Consider a simple tool for working with channels: Actor.

Actor, again, is very similar to Handler: we define the context of the coroutine (that is, the thread in which we are going to perform actions) and work with it in a sequential order.

The difference, of course, is that corutins are used here; You can specify the power, and the executed code - pause .

In principle, it actorwill redirect any command to the coroutine channel. It guarantees the execution of a command and restricts operations in its context . This approach perfectly helps to get rid of calls synchronizeand keep all threads free!

protected val updateActor by lazy {
    actor(capacity = Channel.UNLIMITED) {
        for (update in channel) when (update) {
            Refresh -> updateList()
            is Filter -> filter.filter(update.query)
            is MediaUpdate -> updateItems(update.mediaList as List)
            is MediaAddition -> addMedia(update.media as T)
            is MediaListAddition -> addMedia(update.mediaList as List)
            is MediaRemoval -> removeMedia(update.media as T)
        }
    }
}
// используем
fun filter(query: String?) = updateActor.offer(Filter(query))
// или
suspend fun filter(query: String?) = updateActor.send(Filter(query))

In this example, we use the sealed Kotlin classes, choosing which action to perform.

sealed class Update
object Refresh : Update()
class Filter(val query: String?) : Update()
class MediaAddition(val media: Media) : Update()

Moreover, all these actions will be queued, they will never be executed in parallel. This is a convenient way to achieve variability limits .

Android Life Cycle + Coroutines

Actors can also be very useful for managing the Android user interface, simplify cancellation of tasks, and prevent overloading of the main thread.
Let's implement this and call it job.cancel()when activity is destroyed.

class MyActivity : AppCompatActivity(), CoroutineScope {
    protected val job = SupervisorJob() // экземпляр Job для данной активности
    override val coroutineContext = Dispatchers.Main.immediate+job
    override fun onDestroy() {
        super.onDestroy()
        job.cancel() // отмена задачи при уничтожении активности
    }
}

The class SupervisorJobis similar to the usual one Jobwith the one exception that cancellation applies only in the downstream direction.

Therefore, we do not cancel all coroutines in Activitywhen one of them fails.

Things are a little better with the extension function , which allows you to open access to it CoroutineContextfrom any Viewin CoroutineScope.

val View.coroutineContext: CoroutineContext?
    get() = (context as? CoroutineScope)?.coroutineContext

Now we can combine all this, the function setOnClickcreates a united actor to control its actions onClick. In the case of multiple taps, intermediate actions will be ignored, thus eliminating ANR errors (the application does not respond), and these actions will be performed in the scope Activity. Therefore, when the activity is destroyed, all this will be canceled.

fun View.setOnClick(action: suspend () -> Unit) {
    // запускаем один актор в качестве родителя задачи контекста
    val scope = (context as? CoroutineScope)?: AppScope
    val eventActor = scope.actor(capacity = Channel.CONFLATED) {
        for (event in channel) action()
    }
    // устанавливаем слушатель для активации этого актора
    setOnClickListener { eventActor.offer(Unit) }
}

In this example, we set the Channelvalue Conflatedto ignore some events if there are too many of them. You can replace it with Channel.UNLIMITEDif you prefer to queue events without losing any of them, but still want to protect the application from ANR errors.

You can also combine the coroutines and Lifecycle frameworks to automate the cancellation of tasks related to the user interface:

val LifecycleOwner.untilDestroy: Job get() {
    val job = Job()
    lifecycle.addObserver(object: LifecycleObserver {
        @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
        fun onDestroy() { job.cancel() }
    })
    return job
}
// использование
GlobalScope.launch(Dispatchers.Main, parent = untilDestroy) {
    /* здесь происходят удивительные вещи! */
}

Simplifying the situation with callbacks (Part 1)

Here's how to transform the use of the callback-based API thanks Channel.

The API works like this:

  1. requestBrowsing(url, listener)initiates parsing of the folder located at url.
  2. The listener listenerreceives onMediaAdded(media: Media)for any media file found in this folder.
  3. listener.onBrowseEnd() called after parsing a folder

Here is an old feature refreshin the content provider for the VLC browser:

private val refreshList = mutableListOf()
fun refresh() = requestBrowsing(url, refreshListener)
private val refreshListener = object : EventListener{
    override fun onMediaAdded(media: Media) {
        refreshList.add(media))
    }
    override fun onBrowseEnd() {
        val list = refreshList.toMutableList()
        refreshList.clear()
        launch {
            dataset.value = list
            parseSubDirectories()
        }
    }
}

How to improve it?

Create a channel that will run in refresh. Now browser callbacks will only direct the media to this channel, and then close it.

Now the function refreshhas become clearer. She creates a channel, calls the VLC browser, then forms a list of media files and processes it.

Instead of functions, selector consumeEachcan be used forto wait for the media, and this cycle will break as soon as the channel browserChannelcloses.

private lateinit var browserChannel : Channel
override fun onMediaAdded(media: Media) {
    browserChannel.offer(media)
}
override fun onBrowseEnd() {
    browserChannel.close()
}
suspend fun refresh() {
    browserChannel = Channel(Channel.UNLIMITED)
    val refreshList = mutableListOf()
    requestBrowsing(url)
    // Приостанавливается на каждой итерации в ожидании медиа
    for (media in browserChannel) refreshList.add(media)
    // Канал закрыт
    dataset.value = refreshList
    parseSubDirectories()
}

Simplifying the situation with callbacks (part 2): Retrofit The

second approach: we don’t use kotlinx coroutines at all, but we use a coroutine core framework.

See how coroutines actually work!

A function retrofitSuspendCallwraps a call request Retrofit Callto make a function out of it suspend.

With the help of suspendCoroutinewe call the method Call.enqueueand suspend the coroutine. The callback provided in this way will appeal to continuation.resume(response)to resume the coroutine with a response from the server as soon as it is received.

Then we just need to combine our Retrofit functions in retrofitSuspendCallorder to return query results with their help.

suspend inline fun   retrofitSuspendCall(request: () -> Call 
) : Response  = suspendCoroutine { continuation ->
    request.invoke().enqueue(object : Callback {
        override fun onResponse(call: Call, response: Response) {
            continuation.resume(response)
        }
        override fun onFailure(call: Call, t: Throwable) {
            continuation.resumeWithException(t)
        }
    })
}
suspend fun browse(path: String?) = retrofitSuspendCall {
    ApiClient.browse(path)
}
// использование (в контексте корутины Main)
livedata.value = Repo.browse(path)

Thus, the call blocking the network is made in the dedicated Retrofit thread, the coroutine is here, waiting for a response from the server, and there is nowhere to use it in the application!

This implementation is inspired by the gildor / kotlin-coroutines-retrofit library .

There is also a JakeWharton / retrofit2-kotlin-coroutines-adapter with another implementation giving a similar result.

The epilogue

Channel can be used in many other ways; Check out BroadcastChannel for more powerful implementations you might find useful.

You can also create channels using the Produce function .

Finally, using channels it is convenient to organize communication between the components of the UI: the adapter can transmit click events to its fragment / activity through Channelor, for example, through Actor.

Also popular now: