RxJS

functional event processing

Lexicography

  • ​Observable
  • Subscription
  • Pipe
  • Operator

Observable (the marble notation)

--a---(bc)---X---|-> a, b, c are emitted values X is an error | is the 'completed' signal ---> is the timeline

Lexicography

  1. ​Observable
  2. Subscription
  3. Pipe
  4. Operator

Subscription

  1. Observables are lazy
  2. Subscriptions execute the whole thing
  3. further reading hot and cold observables

Lexicography

  1. ​Observable
  2. Subscription
  3. Pipe
  4. Operator

Pipeline

handles the data flow like any data pipeline

Operator

  1. ​creation
  2. combination
  3. error handling
  4. filtering
  5. multicasting
  6. transformation
  7. conditional
  8. utility

Comparison

​Observable vs Promise
const promise = new Promise( () => console.log(`Promise is called`) );import { Observable } from 'rxjs'; const observable = new Observable( () => console.log(`Observable is called`) ); observable.subscribe();/* * Promise - Not cancellable (native API) */import { interval } from 'rxjs'; const cancellable = interval(1000); const subscription = cancellable.subscribe( value => console.log(value) ); subscription.unsubscribe();/* * Promise - Not sharable */ const observableToShare = interval(1000); const sharedObserable = observableToShare.pipe(share()); sharedObserable.subscribe( () => console.log(`subscription1 at ${new Date()}`) ); sharedObserable.subscribe( () => console.log(`subscription2 at ${new Date()}`) );/* * Promise - Always asynchronous */ const asyncPromise = new Promise( (resolve) => resolve(5) ); asyncPromise.then( value => console.log(`Everytime Asyn promise ${value}!`) ); console.log('And now we are here with promise');/* * Observable - possibly asynchronous */ const possibleAsyncObservable = new Observable( (observer) => observer.next(5) ); possibleAsyncObservable.subscribe( value => console.log(`Possible Async observable ${value} !`) ); console.log('And now we are here with observables');

Going deeper into the life cycle

https://stackblitz.com/edit/rxjs-af7iiu

Operators (creation)

  1. ​ajax
  2. create
  3. defer
  4. empty
  5. from
  6. fromEvent
  7. generate
  8. interval
  9. of
  10. range
  11. throw
  12. timer

Operators (combination)

  1. ​combineAll
  2. combineLatest
  3. concat
  4. concatAll
  5. endWith
  6. forkJoin
  7. merge
  8. mergeAll
  9. pairwise
  10. race
  11. startWith
  12. withLatestFrom
  13. zip

Operators (error handling)

  1. ​catchError
  2. retry
  3. retryWhen

Operators (filtering)

  1. ​audit
  2. auditTime
  3. debounce
  4. debounceTime
  5. distinct
  6. distinctUntilChanged
  7. distinctUntilKeyChanged
  8. filter
  9. find
  10. first
  11. ignoreElements
  12. last
  13. sample
  14. single
  15. skip
  16. skipUntil
  17. skipWhile
  18. take
  19. takeLast
  20. takeUntil
  21. takeWhile
  22. throttle
  23. throttleTime

Operators (multicasting)

  1. publish
  2. multicast
  3. share
  4. shareReplay

Operators (transformation)

  1. buffer
  2. bufferCount
  3. bufferTime
  4. bufferToggle
  5. bufferWhen
  6. concatMap
  7. concatMapTo
  8. exhaustMap
  9. expand
  10. groupBy
  11. map
  12. mapTo
  13. mergeMap
  14. mergeScan
  15. partition
  16. reduce
  17. scan
  18. switchMap
  19. switchMapTo
  20. toArray
  21. window
  22. windowCount
  23. windowTime
  24. windowToggle
  25. windowWhen

Operators (and many more...)

There is a naming convention behind all operators.

map, mergeMap, switchMap, concatMap, exhaustMap

Custom operator (giving a name to a section of your pipe)

import { OperatorFunction } from 'rxjs'; import { distinctUntilChanged, map } from 'rxjs/operators'; import { autocompleteFilterById } from '../../../../core/filter/autocomplete/property-filter'; import { FilterEvent } from '../../../../material/model/DataTableFilter'; import { City } from '../../../model/city.model'; export const countryEvents = (): OperatorFunction<FilterInputs, FilterEvent<City>> => { return filterInputs => filterInputs.pipe( map(filter => filter.country), distinctUntilChanged(), map(country => ({ filterId: 'country', filterValue: country, filterFunction: autocompleteFilterById, accessorFunction: (city: City) => city.country })) ); };

Testing

export const scheduled = (testingFuntion: (runHelpers: RunHelpers) => any): any => () => { const testScheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected); }); testScheduler.run(testingFuntion); };

Testing operator

describe('toInitialZoom', () => { it( 'continent', scheduled(({ hot, expectObservable }) => { const zoomLevel = hot('a', { a: true }).pipe(toInitialZoom()); expectObservable(zoomLevel).toBe('(aa)', { a: ZOOM_LEVEL_CONTINENT }); }) ); it( 'stop', scheduled(({ hot, expectObservable }) => { const zoomLevel = hot('a', { a: false }).pipe(toInitialZoom()); expectObservable(zoomLevel).toBe('(ab)', { a: ZOOM_LEVEL_CONTINENT, b: ZOOM_LEVEL_STOP }); }) ); });

React examples

  1. https://stackblitz.com/edit/react-ts-tbffzd
  2. https://stackblitz.com/edit/rxjs-hooks-beer