About Publish, Connect, RefCount, and Share on RxSwift
Hi, Habr. I present to you the translation of the article Understanding Publish, Connect, RefCount and Share in RxSwift .
The original article uses Swift second version and the corresponding version of RxSwift. I had the courage to rewrite the code below for Swift 3.
I also want to note that concepts such as Observable and Sequence can be considered the same thing. The same goes for Observer and Subscriber.
I also recommend reading about share (), share Replay (), shareReplayLatestWhileConnected () in RxSwift .
In this article I will try to explain such operators for working with Connectable Observable in RxSwift, as
Before I get to the point, I would like to say a few words about hot and cold Observables. As for me, the concepts of hot and cold Observables are a bit blurry.
Let's have a hot Observable we will call Active Sequence, and a cold Passive Sequence.
An example of a Passive Sequence is a request to the network, which begins only when we subscribe to a sequence. Examples of Active Sequence are a web-socket connection, timer events, or text produced by
And it's all. Think of active and passive sequences. The concepts of hot / cold / warm / cool Observables are too confusing and can be confusing.
If you ever subscribed twice (or more) to the same Observable, you might be surprised at the results.
Take a look at the following piece of code:
Having looked at the console, we will see two HTTP responses. Observable fulfilled the request twice, although this contradicts our expectations.
Obviously, this is not what we want from a normal HTTP request. But we can change this behavior and execute only one request. You just need to apply the operator
As expected, only one HTTP request was executed.
In essence, an operator
Stop stop stop! What else
Then, when the publish () operator is applied, the Observable transforms into a Connectable Observable. The ReactiveX documentation says:
In the above example, Observers subscribe to
An interesting thing is how resources are cleaned up. Take a look at this code.
Even if all subscribers unsubscribed from our Observable, the latter still lives and continues to produce events under the hood.
Now let's compare this with
Difference between
You can perceive the operator
Pay attention to this. When we signed up again, Observable started emitting elements from the beginning.
Feel the difference now?
When you use
On the other hand, it
If something is not completely clear, please let me know in the comments. Thanks.
The original article uses Swift second version and the corresponding version of RxSwift. I had the courage to rewrite the code below for Swift 3.
I also want to note that concepts such as Observable and Sequence can be considered the same thing. The same goes for Observer and Subscriber.
I also recommend reading about share (), share Replay (), shareReplayLatestWhileConnected () in RxSwift .
In this article I will try to explain such operators for working with Connectable Observable in RxSwift, as
publish, connect, refCount
well share
. They are used together in various combinations. It is very important to understand the difference between:publish().connect()
- and
publish().refcount()
(or justshare()
)
Active and Passive Observables
Before I get to the point, I would like to say a few words about hot and cold Observables. As for me, the concepts of hot and cold Observables are a bit blurry.
Let's have a hot Observable we will call Active Sequence, and a cold Passive Sequence.
- Active Sequence emits items all the time , regardless of whether someone subscribed to it or not
- Passive Sequence begins issuing items upon request
An example of a Passive Sequence is a request to the network, which begins only when we subscribe to a sequence. Examples of Active Sequence are a web-socket connection, timer events, or text produced by
UITextField
'om. And it's all. Think of active and passive sequences. The concepts of hot / cold / warm / cool Observables are too confusing and can be confusing.
Multiple Subscriptions per Observable
If you ever subscribed twice (or more) to the same Observable, you might be surprised at the results.
Take a look at the following piece of code:
let url = URL(string: "https://habrahabr.ru/")!
let requestObservable = URLSession.shared
.rx.data(request: URLRequest(url: url))
requestObservable.subscribe(onNext: {
print($0)
})
requestObservable.subscribe(onNext: {
print($0)
})
Having looked at the console, we will see two HTTP responses. Observable fulfilled the request twice, although this contradicts our expectations.
share()
like salvation
Obviously, this is not what we want from a normal HTTP request. But we can change this behavior and execute only one request. You just need to apply the operator
share()
to our Observable.let url = URL(string: "https://habrahabr.ru/")!
let requestObservable = URLSession.shared
.rx.data(request: URLRequest(url: url))
.share()
requestObservable.subscribe(onNext: {
print($0)
})
requestObservable.subscribe(onNext: {
print($0)
})
As expected, only one HTTP request was executed.
In essence, an operator
share()
is just a wrapper over publish().refcount()
. Stop stop stop! What else
publish()
, what refcount()
?publish()
and his friend connect()
Then, when the publish () operator is applied, the Observable transforms into a Connectable Observable. The ReactiveX documentation says:
Connectable Observable is similar to a regular Observable, except for one point. It begins to produce elements not when they subscribe to it, but only when the operator is called on itconnect()
.
let myObservable = Observable.just(1).publish()
print("Subscribing")
myObservable.subscribe(onNext: {
print("first = \($0)")
})
myObservable.subscribe(onNext: {
print("second = \($0)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("Calling connect after 3 seconds")
myObservable.connect()
}
/* Output:
Subscribing
Calling connect after 3 seconds
first = 1
second = 1
*/
In the above example, Observers subscribe to
myObservable
immediately after it has been created. But they work only after 3 seconds when the operator was called connect()
. Simply put, connect()
activates Connectable Observable and includes subscribers. An interesting thing is how resources are cleaned up. Take a look at this code.
let myObservable = Observable
.interval(1, scheduler: MainScheduler.instance)
.publish()
myObservable.connect()
print("Starting at 0 seconds")
let mySubscription = myObservable.subscribe(onNext: {
print("Next: \($0)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("Disposing at 3 seconds")
mySubscription.dispose()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
print("Subscribing again at 6 seconds")
myObservable.subscribe(onNext: {
print("Next: \($0)")
})
}
// Output:
/*
Starting at 0 seconds
Next: 0
Next: 1
Next: 2
Disposing at 3 seconds
Subscribing again at 6 seconds
Next: 6
Next: 7
Next: 8
Next: 9
...
*/
Even if all subscribers unsubscribed from our Observable, the latter still lives and continues to produce events under the hood.
Translator's Note
The method
connect()
returns Disposable
. Thus, you can stop the production of elements by calling the method dispose()
of this Disposable
, or provide this opportunity DisposeBag
.Now let's compare this with
publish().refcount()
.Difference between publish().connect()
andpublish().refcount()
You can perceive the operator
refcount()
as magic, which is processed for you by the Observer. refcount()
calls connect()
automatically when the first Observer is signed, so there is no need to do it yourself.let myObservable = Observable
.interval(1, scheduler: MainScheduler.instance)
.publish()
.refCount()
print("Starting at 0 seconds")
let mySubscription = myObservable.subscribe(onNext: {
print("Next: \($0)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("Disposing at 3 seconds")
mySubscription.dispose()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
print("Subscribing again at 6 seconds")
myObservable.subscribe(onNext: {
print("Next: \($0)")
})
}
// Output:
/*
Starting at 0 seconds
Next: 0
Next: 1
Next: 2
Disposing at 3 seconds
Subscribing again at 6 seconds
Next: 0
Next: 1
Next: 2
Next: 3
...
*/
Pay attention to this. When we signed up again, Observable started emitting elements from the beginning.
Conclusion
Feel the difference now?
publish().connect()
and publish().refcount()
/ or share()
control the unsubscribe mechanism from Obervables in different ways. When you use
publish().connect()
, you need to manually control the mechanism for cleaning the resources of your Observable (this was mentioned in the note under the spoiler) . Your sequence behaves as active and produces elements all the time, regardless of subscriptions. On the other hand, it
publish().refcount()/share()
keeps track of how many Observers are subscribed to Observable and does not disconnect the former from the latter as long as there is at least one subscriber. In other words, when the counter of subscribers falls to zero, Observable "dies" and ceases to produce any elements.If something is not completely clear, please let me know in the comments. Thanks.