RxJS: Don't Unsubscribe

Hello, Habr! I present to you the translation of the article " RxJS: Don't Unsubscribe " by Ben Lesh.

Well ... okay, just don’t refuse subscriptions.

I often help someone debug problems with their RxJS code, including structuring applications that carry a lot of asynchronous code. At the same time, I always see the same thing as people keep processors on tons of subscriptions. The developer makes 3 HTTP requests with Observable, storing 3 subscription objects that will be called when some event occurs.

I know why this is happening. People are used to using `addEventListener` N times, and then when they are no longer needed, call` removeEventListener` N times. It will be natural to do the same with subscription objects, and for the most part you will be right. But there are better ways. Saving too many subscription objects is a sign that you are managing your subscriptions imperatively and not taking advantage of Rx.

What imperative subscription management looks like


Take, for example, this fictional component (this is not specifically React or Angular, but just a general example):

class MyGenericComponent extends SomeFrameworkComponent {
 updateData(data) {
  // что-нибудь специфичное для обновления компонента
 }

 onMount() {
  this.dataSub = this.getData()
   .subscribe(data => this.updateData(data));

  const cancelBtn = this.element.querySelector('.cancel-button');
  const rangeSelector = this.element.querySelector('.rangeSelector');

  this.cancelSub = Observable.fromEvent(cancelBtn, 'click')
   .subscribe(() => {
    this.dataSub.unsubscribe();
   });

  this.rangeSub = Observable.fromEvent(rangeSelector, 'change')
   .map(e => e.target.value)
   .subscribe((value) => {
    if (+value > 500) {
      this.dataSub.unsubscribe();
    }
   });
 }

 onUnmount() {
  this.dataSub.unsubscribe();
  this.cancelSub.unsubscribe();
  this.rangeSub.unsubscribe();
 }
}

In the above example, you can see that I manually call `unsubscribe` on three subscription objects in the` onUnmount () `method. I also call `this.dataSub.unsubscribe ()` when the user presses the cancel button on lines 15 and 22, or when he sets the range selector above 500, which is some threshold at which I want to stop the data stream. (I don’t know why, this is just a weird component).

The disadvantage of this approach is that I manually manage the cancellation of the subscription in several places in this rather trivial example.

The only real benefit of using this approach is performance. Since you use less abstractions to do your job, this is likely to be a little better. However, this is unlikely to have a noticeable effect in most web applications, and I don't think it's worth worrying about.

In addition, you can always combine several subscriptions into one, creating a parent subscription and adding all the others as children. But essentially, you will do the same.

Build subscription management with takeUntil


Now let's do the same example, but using the `takeUntil` operator from RxJS:

class MyGenericComponent extends SomeFrameworkComponent {
 updateData(data) {
  // do something framework-specific to update your component here.
 }

 onMount() {
   const data$ = this.getData();
   const cancelBtn = this.element.querySelector('.cancel-button');
   const rangeSelector = this.element.querySelector('.rangeSelector');
   const cancel$ = Observable.fromEvent(cancelBtn, 'click');
   const range$ = Observable.fromEvent(rangeSelector, 'change')
                            .map(e => e.target.value);
   
   const stop$ = Observable.merge(cancel$, range$.filter(x => x > 500))
   this.subscription = data$.takeUntil(stop$)
                            .subscribe(data => this.updateData(data));
 }

 onUnmount() {
  this.subscription.unsubscribe();
 }
}

The first thing you might notice is less code. But this is only one advantage. Another thing that happened here is that I put events in the `stop $` stream that stop the data stream. This means that as soon as I decide that I want to add another condition to stop the flow, for example by timer, I can simply add a new observable object to `stop $`. The next obvious thing is that I only have one subscription object that I manage imperatively. You cannot change this, since here functional programming intersects with the object-oriented world. Javascript is an imperative language and we have to take the rest of the world in a way halfway.

Another advantage of this approach is that it actually completes the observed object. This means that a completion event will occur that can be processed at any time. If you simply call `unsubscribe` on the returned subscription object, you will not be notified that the subscription has been canceled. However, if you use `takeUntil` (or the other operators listed below), you will be informed through the completion handler that the observable has stopped.

The last advantage that I want to say about is that you actually “plug everything in” by calling a subscription in one place, which is good because with the discipline it becomes much easier to find where you start subscribing in your code. Remember that monitored objects do nothing until you subscribe to them, so the subscription point is an important part of the code.

True, there is one flaw in terms of RxJS semantics, but it is hardly worth worrying about other advantages. The semantic flaw is that the completion of the observed object is a sign that the manufacturer wants to tell the consumer that the job is done, while unsubscribing is the consumer telling the manufacturer that he no longer needs the data.

There will also be a very small performance difference between this and the simple imperative call to `unsubscribe`. However, it is unlikely to be noticeable in most applications.

Other operators


There are many other ways to stop the flow in the Rx-way. I would recommend looking at the following operators at least:
take (n): takes N values ​​before stopping the observable.
takeWhile (predicate): checks the values ​​passed through itself against the predicate; if it returns false, the stream will be completed.
first (): skips the first value and exits.
first (predicate): checks each value for a predicate function, if it returns true, the thread skips the value and terminates.

Summary: Use takeUntil, takeWhile, etc.


You should probably use operators such as `takeUntil` to manage your RxJS subscriptions. Generally, if you see that there are two or more subscriptions in the same component, you should wonder if you can define them better:

  • firstly so advance
  • fires a completion event when you kill your thread
  • this is usually less code
  • management becomes easier
  • fewer actual subscription points (since fewer `subscribe` calls)

Want to learn more about RxJS from the author? Come on rxworkshop.com!