Skip to main content

RxJS

RxJS is a library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code.

Installing RxJS

To install RxJS, run the following command in your terminal:

npm install rxjs

Observable

An Observable is a representation of any set of values over any amount of time.

import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});

console.log('just before subscribe');

observable.subscribe({
next(x) {
console.log('got value ' + x);
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
},
});

console.log('just after subscribe');

Subject

A Subject is a special type of Observable that allows values to be multicasted to many Observers. While plain Observables are unicast (each subscribed Observer owns an independent execution of the Observable), Subjects are multicast.

import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});

subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});

subject.next(1);
subject.next(2);

Operators

Operators are functions. There are two kinds of operators:

  • Pipeable Operators are the kind that can be piped to Observables using the syntax observableInstance.pipe(operator()). These include, filter(...), and mergeMap(...). When called, they do not change the existing Observable instance. Instead, they return a new Observable, whose subscription logic is based on the first Observable.
  • Creation Operators are the other kind of operator, which can be called as standalone functions to create a new Observable. For example, of(1, 2, 3) creates an Observable that will emit 1, 2, and 3, one right after another.
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';

from([1, 2, 3, 4])
.pipe(filter((n) => n % 2 === 0))
.subscribe((x) => console.log(x)); // 2, 4

Subscription

A Subscription is an object that represents a disposable resource, usually the execution of an Observable. A Subscription has one important method, unsubscribe, that takes no argument and just disposes the resource held by the subscription.

import { interval } from 'rxjs';

const observable = interval(1000);
const subscription = observable.subscribe((x) => console.log(x));
// Later:
// This cancels the ongoing Observable execution which
// was started by calling subscribe with an Observer.
subscription.unsubscribe();

Scheduler

A Scheduler controls when a subscription starts and when notifications are delivered.

import { asyncScheduler } from 'rxjs';

console.log('before');
asyncScheduler.schedule(() => console.log('async'));
// schedule 5 seconds in the future
asyncScheduler.schedule(() => console.log('async'), 5000);
console.log('after');

Reference