Introduction to RxJava: Subscription Life Cycle

One of the main ideas underlying Rx is that it is not known exactly when the sequence will give a new value or end. However, we do have the ability to control the time at which we will start or finish getting these values. In addition, if our subscribers use external resources, then we will probably want to free them at the end of a certain sequence.
Content:
- Part One - Introduction
- Part Two - Sequences
- Create a sequence
- Sequence filtering
- Study
- Aggregation
- Transformation
- Part Three - Sequence Management
- Part Four - Concurrency
Subscription
There are several overloaded Observable :: subscribe methods that perform the same function.
Subscription subscribe()
Subscription subscribe(Action1 onNext)
Subscription subscribe(Action1 onNext, Action1 onError)
Subscription subscribe(Action1 onNext, Action1 onError, Action0 onComplete)
Subscription subscribe(Observer observer)
Subscription subscribe(Subscriber subscriber)
subscribe () absorbs events, but by itself does not perform immediate actions. Its overloaded versions, which have at least one parameter of type Action , create a Subscriber object . If you do not pass functions to the onError and onCompleted events , they are simply ignored.
Subject s = ReplaySubject.create();
s.subscribe(
v -> System.out.println(v),
e -> System.err.println(e));
s.onNext(0);
s.onError(new Exception("Oops"));
Conclusion
0
java.lang.Exception: Oops
If you do not pass a function to handle errors, an OnErrorNotImplementedException will be thrown at the place where s.onError is raised on the provider side . In this case, provider [1] and consumer [2] are in the same code block, which allows using traditional try-catch . However, in reality, the provider and the consumer may be in different places. In this case, if the consumer does not provide a function for handling errors, he will never know when and for what reason the sequence ended.
Unsubscribe
You can stop receiving data before the sequence ends. Mark method overload subscribe return an object interface Subscribtion , which has 2 methods:
boolean isUnsubscribed()
void unsubscribe()
An unsubscribe call will stop events from being sent to the observer.
Subject values = ReplaySubject.create();
Subscription subscription = values.subscribe(
v -> System.out.println(v),
e -> System.err.println(e),
() -> System.out.println("Done")
);
values.onNext(0);
values.onNext(1);
subscription.unsubscribe();
values.onNext(2);
Conclusion
0
1
By unsubscribing one subscriber, we will not affect other subscribers of the same ovbservable.
Subject values = ReplaySubject.create();
Subscription subscription1 = values.subscribe(
v -> System.out.println("First: " + v)
);
Subscription subscription2 = values.subscribe(
v -> System.out.println("Second: " + v)
);
values.onNext(0);
values.onNext(1);
subscription1.unsubscribe();
System.out.println("Unsubscribed first");
values.onNext(2);
Conclusion
First: 0
Second: 0
First: 1
Second: 1
Unsubscribed first
Second: 2
onError and onCompleted
onError and onCompleted mean completion of the sequence. A bona fide observable that follows Rx contracts will cease to issue values after one of these events occurs. This is something you should always remember when creating your own Observable .
Subject values = ReplaySubject.create();
Subscription subscription1 = values.subscribe(
v -> System.out.println("First: " + v),
e -> System.out.println("First: " + e),
() -> System.out.println("Completed")
);
values.onNext(0);
values.onNext(1);
values.onCompleted();
values.onNext(2);
Conclusion
First: 0
First: 1
Completed
Release of resources
A subscription holds in memory the resources it is associated with. These resources will not be automatically released when the Subscription object goes out of scope. If, after calling the subscribe method, ignore the return value, then there is a risk of losing the only opportunity to unsubscribe. The subscription will continue to exist, while access to it will be lost, which may lead to a memory leak and undesirable actions.
There are exceptions. For example, when calling overloaded subscribe methods that implicitly construct a Subscriber object, a mechanism will be created that will automatically untie subscribers when the sequence ends. However, even in this case, one should keep in mind the infinite sequences. You will still need a Subscription object to stop receiving data from them at some point.
There are several implementations of the Subscription interface :
- Boolean subscription
- CompositeSubscription
- MultipleAssignmentSubscription
- RefCountSubscription
- SafeSubscriber
- Scheduler.Worker
- SerializedSubscriber
- SerialSubscription
- Subscriber
- Testsubscriber
We will meet with them in future articles. It is worth noting that Subscriber also implements Subscription , which means that we also have the opportunity to unsubscribe using the link to the Subscriber object .
With an understanding of the subscription life cycle, you gain control over the resources associated with them. This will allow your program to be more predictable, easy to maintain and extend, and hopefully less prone to bugs.
In the next part, we will learn how to create and process sequences using powerful library tools.
[1] The one who controls (creates) the observable - Note.
[2] One who uses values provided by observable - Note.