RxJS with Angular 101

What do you need to know about RxJS operators in reactive programming? Keep reading, and let me introduce you to the most crucial operators in RxJS.

RxJS with Angular 101

RxJS is a library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code. Angular and Angular Material implements RxJS out of the box. Other Javascript libraries can use RxJS also but have to install it like a package.

Official documentation can be found on RxJS. Since the documentation is quite complicated and as the RxJS library is very large, this article will cover the most important concepts used on daily basis.

Observable

An Observable is basically a function that can return a stream of values to an observer over time, this can either be synchronously or asynchronously. It is called observable, as in something that’s able to be observed.

Operators

Operators have a big role when using RxJS, with them the code can achieve reactive programming. Simply said, operators are functions. There are Creation operators and Pipeable operators. Creation operators are functions that can be used to create an Observable with some common predefined behavior or by joining other Observables. A Pipeable operator is a pure function that takes one Observable as input and generates another Observable as output while the previous Observable stays unmodified. To make it easier to understand, the operators will also be shown with marbles.

Async pipe and Observable

The async pipe subscribes to an Observable or Promise and returns the latest value it has emitted. That means with the async pipe promises and observables can be used directly in the template, without creating or saving the result in a variable. When the async pipe is used Angular is automatically subscribing to the observable, and unsubscribing at the end of the component lifecycle.

Here is the example.

import { Observable } from 'rxjs'
...

    vehicles$: Observable<Vehicle[]>

...

ngOnInit() {
    this.vehicles$ = this.vehicleService.loadAllVehicles()
}
<div *ngFor="let vehicle of (vehicles$ | async)">
    <p>{{vehicle.name}}</p>
    <p>{{vehicle.price}}</p>
</div>

of

Creation operator which creates an Observable stream from a source. Mostly, the operator of() is used with other operators. In the next section is an example of using of() with pipe(), map(), filter() and reduce()

Screenshot-2022-04-01-at-13.44.21

pipe

The pipe is used to improve the readability of the code. The purpose of the pipe function is to unite together all the functions that take and return observable. It takes an observable initially, then that observable is used throughout the pipe function by each function used inside of it. Also, it cleans up Observable.prototype by removing operators and it makes the RxJS library more tree-shakeable.

Here is the example.

import { of, pipe, map, filter, reduce } from 'rxjs'

of(1, 2, 3, 4, 5)
  .pipe(
    map((x) => x * 2),
    filter((x) => x > 5),
    reduce((acc, current) => acc + current, 0)
  )
  .subscribe((value) => console.log(value));

// Logs:
// 24

map, filter, reduce

Most used pipeable operators in RxJS are map(), filter() and reduce(). Similar to methods in Javascript that work with arrays, only in RxJS do they work with streams.

Here is the example.

import { of, map } from 'rxjs'

of(1, 2, 3)
  .pipe(map((x) => 10 * x))
  .subscribe((value) => console.log(value));

// Logs:
// 10
// 20
// 30

Screenshot-2022-04-01-at-13.53.51

import { of, filter } from 'rxjs'

of(2, 30, 22, 5, 60, 1)
  .pipe(filter((x) => x > 10))
  .subscribe((value) => console.log(value));

// Logs:
// 30
// 22
// 60

Screenshot-2022-04-01-at-13.55.02

import { of, reduce } from 'rxjs'

of(1, 2, 3, 4, 5)
  .pipe(reduce((x, y) => x + y))
  .subscribe((value) => console.log(value));

// Logs:
// 15

Screenshot-2022-04-01-at-13.55.40-1

tap

An operator which transparently performs actions or side effects. While it is possible to perform side effects inside of, for example, a map or filter, that would make their mapping functions impure. The tap operator is designed solely to help remove side effects from other operations. Some examples of side effects are calling the function that might be visible to the end-user (show some message or dialog), debug code...
The previous example with the operator tap().

Here is the example.

import { of, pipe, map, filter, reduce, tap } from 'rxjs'

of(1, 2, 3, 4, 5)
  .pipe(
    map((x) => x * 2),
    filter((x) => x > 5),
    reduce((acc, current) => acc + current, 0),
    tap((sum) => showResultInDialog(sum))
  )
  .subscribe();

forkJoin

It is one of the most popular combination operators, similar to Promise.all, but for observables. It is used when there is a group of observables and only the final value of every observable matter. Once all observables are complete forkJoin will then emit a group of the last values from corresponding observables. The resulting streams emit only one time when all the streams are complete. Be careful because it will never be completed if one of the inner streams doesn't complete and will throw an error if one of the inner streams has errors. If there is a need where the stream has to be complete even if some of the inner streams have errors, catchError() can be set to every inner stream or just the ones that could have errors.

Here is the example.
If there is an observable that emits more than one item it may work better with an operator like combineLatest().

import { forkJoin, of, pipe, catchError } from 'rxjs'
...

forkJoin({
  driveTypes: this.configuratorService.getDriveTypes(),
  vehicleTypes: this.configuratorService.getTypes(),
  vehicle: this.loadVehicle().pipe(
    catchError(() => return of(undefined))
  ),
}).subscribe((responses) => {
  this.driveTypes = responses.driveTypes;
  this.vehicleTypes = responses.vehicleTypes;
  this.vehicle = responses.vehicle;
});

combineLatest

This operator is best used when there are multiple, long-lived observables that rely on each other. The operator will not emit an initial value until each observable emits at least one value.

Here is the example.
Depending on the observable emitting it is necessary to decide which operator is more appropriate, combineLatest() or forkJoin().

import { combineLatest, Observable } from 'rxjs'
import { FormControl } from '@angular/forms';
...
search = new FormControl('');
searchString$ = this.search.valueChanges.pipe(startWith(''));

combineLatest([
  this.cpApi.getCustomerVehicles(this.customerId),
  this.searchString$,
])
  .pipe(
    map(([vehicles, searchString$]) => 
        this.searchFilter(vehicles, searchString$);
    )
  )
  .subscribe();

Screenshot-2022-04-01-at-14.04.45

mergeMap

Operator mergeMap() is a combination of two operators - merge and map. The map part maps a value from a source observable to an observable stream. The merge part works like mergeAll - it combines all inner observable streams returned from the map and concurrently emits all values from every input stream to get nesting observables. Nesting observables are useful when certain actions result in or initiate subsequent asynchronous operations whose results need to be brought back into the source observable. Operator catchError() will catch error from every endpoint.

Here is the example.

import { Observable, tap, mergeMap, catchError, of } from 'rxjs'
...

this.vehicleService
  .quickEditStock(vehicleId, quickEditData)
  .pipe(
    mergeMap((quickEditStockResult) =>
      this.vehicleService.updateWarranty(
        quickEditStockResult.vehicleId,
        warrantyData
      )
    ),
    mergeMap((warrantyResult) =>
      this.vehicleService.updateVehicle(warrantyResult.id, vehicleData)
    ),
    mergeMap((configUnitResult) =>
      this.marketService.updateVehicleAds(
          configUnitResult.value, vehicleAdData)
    ),
    tap((vehicleAdsResult) => {
      this.refreshRemainingVehicleAdsData(vehicleAdsResult);

      this.showSnackBar(vehicleAdsResult);
    }),
    catchError(() => {
      this.showSnackbarError();
      return of(undefined);
    })
  )
  .subscribe();

Screenshot-2022-04-01-at-14.07.47

debounceTime

An operator which delays the values emitted by a source for the given due time. If within this time a new value arrives, the previous pending value is dropped and the timer is reset. In this way, debounceTime() keeps track of the most recent value and emits that most recent value when the given due time is passed. The time is in milliseconds. It is most commonly used with entering data into a form.

Here is the example.

import { pipe, debounceTime } from 'rxjs/operators'
...

this.formGroup
  .get("mileage")
  .valueChanges.pipe(debounceTime(500))
  .subscribe((value) => {
    this.prepareForm(value);
  });

Screenshot-2022-04-01-at-14.09.43

distinctUntilChanged

Only emit when the current value is different than the previous. Most commonly used with debounceTime().

Here is the example.

import { pipe, debounceTime, distinctUntilChanged } from 'rxjs/operators

this.formGroup
  .get("search")
  .pipe(debounceTime(500), distinctUntilChanged())
  .subscribe((value) => {
    this.appraisalService.findVehicleByVin(value);
  });

Screenshot-2022-04-01-at-14.11.20

switchMap

Maps each value to an Observable, then flattens all of these inner Observables. The most important difference between switchMap and other flattening operators (mergeMap, concatMap...) is the canceling effect. On each emission, the previous inner observable is canceled and the new observable is subscribed.

Here is the example.

import { pipe, debounceTime, distinctUntilChanged, switchMap } 
 from 'rxjs';

this.formGroup
  .get('search')
  .pipe(
     debounceTime(500),
     distinctUntilChanged(),
     switchMap((value) => this.appraisalService.findVehicleByVin(value))
   )
   .subscribe();

Screenshot-2022-04-01-at-14.15.06

Other operators that can be useful

  • iif() - Conditionally create a stream with an if/else approach
  • groupBy() - Group into observable according to the criteria you set for them
  • count() - Count the number of emmisions and then emits that number
  • max() - Emit the item with the largest value
  • first() - Emit only the first event of a stream
  • last() - Emit only the last event of a stream
  • skip() - Skip a certain number of events
  • throttleTime() - Emit event at most once every time period
  • concat() - Append the events of one observable sequence after another in order
  • takeUntil() - Emit events until a subordinate observable emits
  • takeWhile() - Emit values so long as each value satisfies the given predicate

Conclusion

In this article, we have gone through the most important things that I consider in RxJS. Once you learn this you are on your way to mastering RxJS. It may take time to get used to using RxJS at first, but it will pay off!

Thank you for your the time to read this blog! Feel free to share your thoughts about this topic and drop us an email at hello@prototyp.digital.