About Publish, Connect, RefCount, and Share on RxSwift

Hi, Habr. I present to you the translation of the article Understanding Publish, Connect, RefCount and Share in RxSwift .

The original article uses Swift second version and the corresponding version of RxSwift. I had the courage to rewrite the code below for Swift 3.
I also want to note that concepts such as Observable and Sequence can be considered the same thing. The same goes for Observer and Subscriber.

I also recommend reading about share (), share Replay (), shareReplayLatestWhileConnected () in RxSwift .


In this article I will try to explain such operators for working with Connectable Observable in RxSwift, as publish, connect, refCountwell share. They are used together in various combinations. It is very important to understand the difference between:

  • publish().connect()
  • and publish().refcount()(or just share())

Active and Passive Observables


Before I get to the point, I would like to say a few words about hot and cold Observables. As for me, the concepts of hot and cold Observables are a bit blurry.

Let's have a hot Observable we will call Active Sequence, and a cold Passive Sequence.

  • Active Sequence emits items all the time , regardless of whether someone subscribed to it or not
  • Passive Sequence begins issuing items upon request

An example of a Passive Sequence is a request to the network, which begins only when we subscribe to a sequence. Examples of Active Sequence are a web-socket connection, timer events, or text produced by UITextField'om.

And it's all. Think of active and passive sequences. The concepts of hot / cold / warm / cool Observables are too confusing and can be confusing.

Multiple Subscriptions per Observable


If you ever subscribed twice (or more) to the same Observable, you might be surprised at the results.

Take a look at the following piece of code:

let url = URL(string: "https://habrahabr.ru/")!
let requestObservable = URLSession.shared
    .rx.data(request: URLRequest(url: url))
requestObservable.subscribe(onNext: {
    print($0)
})
requestObservable.subscribe(onNext: {
    print($0)
})

Having looked at the console, we will see two HTTP responses. Observable fulfilled the request twice, although this contradicts our expectations.



share() like salvation


Obviously, this is not what we want from a normal HTTP request. But we can change this behavior and execute only one request. You just need to apply the operator share()to our Observable.

let url = URL(string: "https://habrahabr.ru/")!
let requestObservable = URLSession.shared
    .rx.data(request: URLRequest(url: url))
    .share()
requestObservable.subscribe(onNext: {
    print($0)
})
requestObservable.subscribe(onNext: {
    print($0)
})

As expected, only one HTTP request was executed.


In essence, an operator share()is just a wrapper over publish().refcount().
Stop stop stop! What else publish(), what refcount()?

publish() and his friend connect()


Then, when the publish () operator is applied, the Observable transforms into a Connectable Observable. The ReactiveX documentation says:
Connectable Observable is similar to a regular Observable, except for one point. It begins to produce elements not when they subscribe to it, but only when the operator is called on itconnect() .
let myObservable = Observable.just(1).publish()
print("Subscribing")
myObservable.subscribe(onNext: {
    print("first = \($0)")
})
myObservable.subscribe(onNext: {
    print("second = \($0)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    print("Calling connect after 3 seconds")
    myObservable.connect()
}
/* Output:
Subscribing
Calling connect after 3 seconds
first = 1
second = 1
*/

In the above example, Observers subscribe to myObservableimmediately after it has been created. But they work only after 3 seconds when the operator was called connect(). Simply put, connect()activates Connectable Observable and includes subscribers.

An interesting thing is how resources are cleaned up. Take a look at this code.

let myObservable = Observable
    .interval(1, scheduler: MainScheduler.instance)
    .publish()
myObservable.connect()
print("Starting at 0 seconds")
let mySubscription = myObservable.subscribe(onNext: {
    print("Next: \($0)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    print("Disposing at 3 seconds")
    mySubscription.dispose()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 6)  {
    print("Subscribing again at 6 seconds")
    myObservable.subscribe(onNext: {
        print("Next: \($0)")
    })
}
// Output:
/* 
Starting at 0 seconds
Next: 0
Next: 1
Next: 2
Disposing at 3 seconds
Subscribing again at 6 seconds
Next: 6
Next: 7
Next: 8
Next: 9
...
*/

Even if all subscribers unsubscribed from our Observable, the latter still lives and continues to produce events under the hood.

Translator's Note
The method connect()returns Disposable. Thus, you can stop the production of elements by calling the method dispose()of this Disposable, or provide this opportunity DisposeBag.

Now let's compare this with publish().refcount().

Difference between publish().connect()andpublish().refcount()


You can perceive the operator refcount()as magic, which is processed for you by the Observer. refcount()calls connect()automatically when the first Observer is signed, so there is no need to do it yourself.

let myObservable = Observable
    .interval(1, scheduler: MainScheduler.instance)
    .publish()
    .refCount()
print("Starting at 0 seconds")
let mySubscription = myObservable.subscribe(onNext: {
    print("Next: \($0)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    print("Disposing at 3 seconds")
    mySubscription.dispose()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 6)  {
    print("Subscribing again at 6 seconds")
    myObservable.subscribe(onNext: {
        print("Next: \($0)")
    })
}
// Output:
/* 
Starting at 0 seconds
Next: 0
Next: 1
Next: 2
Disposing at 3 seconds
Subscribing again at 6 seconds
Next: 0
Next: 1
Next: 2
Next: 3
...
*/

Pay attention to this. When we signed up again, Observable started emitting elements from the beginning.

Conclusion


Feel the difference now? publish().connect()and publish().refcount()/ or share()control the unsubscribe mechanism from Obervables in different ways.

When you use publish().connect(), you need to manually control the mechanism for cleaning the resources of your Observable (this was mentioned in the note under the spoiler) . Your sequence behaves as active and produces elements all the time, regardless of subscriptions.

On the other hand, it publish().refcount()/share()keeps track of how many Observers are subscribed to Observable and does not disconnect the former from the latter as long as there is at least one subscriber. In other words, when the counter of subscribers falls to zero, Observable "dies" and ceases to produce any elements.

If something is not completely clear, please let me know in the comments. Thanks.

Also popular now: