What is RxJS? Levelling up your skills

A collection of important stuff you may not know about RxJS

In this article, we go over a quick RxJS refresher, practical tips for using RxJS, the most common operators we use, and things to look out for.

What is RxJS?

To quote the official documentation:

RxJS (or reactive extensions for javascript) is a library for composing asynchronous and event-based programs by using observable sequences

One of the best things about RxJS is readability. You can mix asynchronous and synchronous logic, as well as use hundreds of out-of-the-box operators that are also well documented and tested, allowing you to do all sorts of asynchronous operations super quickly. There are also debugging tools available to debug and improve on your observable piping.

Other cool features include: ✍ The ability to be cancellable, 🚩 The error handling is quite neat , ⚡ The power to listen to the result of an observable in multiple places.

What’s all this talk about pipes?

Let’s get the terminology out of the way:

  • In the Pull Model , the consumer of data has all the power — the consumer decides when they will receive the data.
  • In the Push model , the producer overlords decide when you will receive your data. (refer to the example below)
  • Streams are a sequence of data values over time
  • Observables are streams of data that can return a stream of values to an observer over a period of time. You can also view Observables as lazy push collections.
  • Observers are consumers of Observables
  • Operators are functions that change or create observables. There are two main types of operators; Pipeable operators are functions that return another observable and Creation operators are standalone functions that create fresh observables.

// Pull Model

function dataGetter(){
   return ‘The consumer decides when to get the data’;
}

// Push Model

function dataGetter(){
   return new Promise((resolve, reject) => {
       fetch("http://www.google.com")
       .then(response => resolve(response))
   }
} 

Implementing Observables

Let’s go through the lifetime of an Observable and explain the common and important things to know:

Creating

In RxJS, we most commonly use creation operators like of , from , and interval to create an Observable. Check out learnrxjs.io for a list of ways on how to create an Observable.

Subscribing

In order to get any data from Observables, you need to subscribe to them. The official documentation describes subscribing as follows:

“Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.” … “it is simply a way to start an Observable execution”

One key difference to note here between Observables and the native addEventListener, is that Observables do not maintain a list of attached Observers.

Executing

To send fresh data to an Observable, we can use one of the following notifications:

  • next(1) — used to send fresh data to the stream
  • error() — notifies Observers an error has occurred. Terminates the stream.
  • complete() — notifies Observers that the observable is finished. Terminates the stream.

import { Observable } from 'rxjs';

const observableA = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.error('Err');
});

const observableB = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.complete();
  subscriber.next(4); // Is not delivered because it would violate the contract
});

When the stream is terminated, it means subscribers stop getting information from the Observable. In practical terms, this means that if you have three components subscribed to an Observable getting data from an API, which returns HTTP 403 error on a resource unless you handle it properly, those components will no longer be getting any more data from this stream.

Disposing

In order to avoid subscription and memory leaks, it is important to kill rogue subscriptions and free up your memory. This can be done with a .unsubscribe, however, we suggest using take, takeUntilor ng-neat/until-destroy which provides a nice and clean way to unsubscribe using decorators. The following snippet shows all 3 possible ways to destroy subscriptions:

// Option A:
import { from } from 'rxjs';

const observable = from([10, 20, 30]);
const subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe();


// Option B:
import { from } from 'rxjs';

const observable = from([10, 20, 30]);
const subscription = observable
.pipe(
  take(3)
)
.subscribe(x => console.log(x));


// Option C:
import {from, Subject} from 'rxjs';
import {takeUntil} from 'rxjs/operators';

const observable = from([10, 20, 30]);
const destroy = new Subject();
const subscription = observable
  .pipe(
    takeUntil(destroy)
  )
  .subscribe(x => console.log(x));
// Later:
destroy.next();

The last option is quite useful as it integrates well with the onDestroy function in the component lifecycle.

Losing your marbles with marble diagrams

Marble diagrams can look intimidating, but they are not as complicated as they may look. Check out the Rx Visualiser project ) to get an interactive understanding of marble diagrams.

After you play around with the site, head on back here and look at the following neat illustration from the official docs:

A simple marble diagram with lots of text. IT’S SIMPLER THAN IT LOOKS

What is the difference between Subjects and Observables?

You may have noticed that one of my earlier examples made use of a Subject to destroy the Observable.

Relevant Thanos Reference

Simply put, Subjects are special Observables that allow you to push values after they have been created.

Our quick tip on subjects, is that when you use them, expose only the Observable, not the entire subject:

 const sampleSubject = new Subject();
 return sampleSubject.asObservable();

Exposing the entire subject is the equivalent of a public variable anyone can change — anyone can write to this stream. If you expose only the observable, it’s effectively read-only and you can only listen.

The problem you may face using subjects is that they don’t “remember” a value. This is why you probably see BehaviourSubjects used more often. BehaviorSubjects further add to subjects by maintaining a value. Whenever you subscribe to it, it immediately returns the last value. If you want it to give you a value when it has one, use this:

private userSubject = new BehaviorSubject<User>(null);
public user$ = this.userSubject.filter((user) => user !== null);

BehaviorSubjects are widely used to maintain app state in small to mid-sized applications. However, if you’re building something more complex, we recommend using NgRx with facades .

Useful operators

The documentation includes a comprehensive list of operators with examples, but these can be quite overwhelming for beginners — here are what we believe are the essentials to get you started:

Familiarise yourself with the essential map and filter operators — these are incredibly useful operators which you will end up using on a day to day basis. FlatMap is good for when you want to reduce the observable nesting to one level (thus flattening the nesting of observables).

Testing often requires observables which never emit, emit an empty value or emit an error. For these cases, you should learn to use Never Empty , and Throw

tap or do can easily be used in pipes to debug and find issues (more on this in a bit).

Reducing pipe execution can result in significant performance gains for both the backend and the frontend. debounceTime and auditTime are commonly on search boxes as they remove executions which are very close to each other (time-wise) and also reduce bandwidth use on the frontend. distinctUntilChanged can also be useful for many cases, such as listening to browser events like scrolling and mouse movement.

Higher order operators

Higher order operators are used when we need to store Observables within Observables. This might sound like an edge case, but is in-fact quite a common occurrence. An example of this can be seen when we need to call 2 APIs in sequence (the second API call depends on the output of the first). For example: Authenticate a user, and then fetch this user’s information.

As a general rule of the thumb, nested subscriptions are an anti-pattern.

Nested subscriptions can always be avoided by using higher order operators like switchMap, mergeMap/flatMap, combineLatest.

Let’s quickly gloss over some of the more common HOOs:

concat and concatMap

This operator combines 2 streams sequentially. Complete needs to be called on the first stream before the second stream can start returning values.

Concat diagram from the docs

SwitchMap

SwitchMap negates the need for nested subscribtions by switching to the inner observable .

SwitchMap marble Diagram courtesy from github

CombineLatest

CombineLatest emits all values from it’s observables whenever one of the streams has a change:

CombineLatest marble diagram from rxjs-dev

What is the difference between hot and cold observables?

Cold observables create the data themselves, while hot observables rely on external factors. Let’s use an example for this:

import {BehaviorSubject, interval, of} from 'rxjs';
import {map} from 'rxjs/operators';


export class HotVsCold {

  //  cold observable demo
  coldObservable$ = of(['The', 'data', 'is', 'all', 'here']);

  coldObservableDemo() {
    //  each subscription re-creates the data
    this.coldObservable$.subscribe((streamData) => {
      console.log(streamData);
    });

    this.coldObservable$.subscribe((streamData) => {
      console.log(streamData);
    });
  }

  //  hot observable demo
  hotObservable$ = new BehaviorSubject(1);

  hotObservableDemo() {
    // increment the hotobservable number every second
    interval(1000).pipe(
      map(() => this.hotObservable$.getValue()),
    ).subscribe((data) => {
      this.hotObservable$.next(data + 1);
    });

    //  each subscription listens to the same set of data
    this.hotObservable$.subscribe((streamData) => {
      console.log(streamData);
    });

    this.hotObservable$.subscribe((streamData) => {
      console.log(streamData);
    });
  }
}

Improving your pipes

Fixing piping issues causes headaches for RxJS plumbers all around the world. You can use do/tap to find issues in the code, however, there are better ways to debug issues.

One such way is by using the rxjs-spy library. You can add tags to your pipes, and log only those tags when needed:

import {create} from 'rxjs-spy';
import {interval} from 'rxjs';
import {map} from 'rxjs/operators';
import {tag} from 'rxjs-spy/cjs/operators';

...

spy = create();

constructor() {
  const cartoons = interval(2000).pipe(
    map(() => ['Tom & Jerry', 'Mr. Bean']),
    map((names) => names.map(name => [name, name.length])),
    tag('people')
  ).subscribe();
  
  this.spy.show();
  this.spy.log('people');
}

Logger result

The library also makes subscription tracking easier by showing you the number of subscriptions the tagged observables have:

This makes it ideal for debugging subscription leaks.

Closing thoughts

RxJS is a very powerful library which can simplify code and make it more readable. There is also additional tooling one could install to give you even more power over your data. Once you understand marble diagrams, they are also an excellent way to explain operators.




Tags

RxJS
menu_icon
Gabriel Stellini

4th September 2020