#
rxjs
#
Observable
A producer of multiple values, "pushing" them to observers (consumers). An observable represents a stream, or source of data that can arrive over time.
Not often used, just for example:
const observable = new Observable((subscriber) => {
subscriber.next(10); // emits data
subscriber.complete(10); // indicate complete
});
#
Observer
A consumer of vaulues delivered by an observable.
const observer = {
next: (value) => { console.log(value); }, // get next value
error: (error) => { console.log(error) }, // catch error
complete: () => { console.log('completed'); }// completed
}
observable.subscribe(observer);
Any time there is error or complete, any other emissions after will be ignored!
#
Subject and BehaviourSubject
#
Subject
A special type of observable that allows values to be multicasted to many observers.
const sub = new Subject();
sub.next(1);
sub.subscribe(x => {
console.log('Subscriber A', x);
});
#
BehaviourSubject
Stores the current value. Able to have an initial value.
const subject = new BehaviorSubject(123);
subject.subscribe(console.log);
subject.next(456);
#
When to use Subject and BehaviourSubject
- Use Subject when you have multiple observers and you want updates as they come.
- Use BehaviourSubject when you have multiple observers and you want the latest updates as soon as they subscribe.
#
Pipe and operator
Values will now go through pipe first before reaching observer, which contains operators.
const observable = new Observable((subscriber) => {
subscriber.next(data); // emits data
}).pipe(
map((value) => {
return value.data;
}),
map((value) => {
return value.filter((v) => v.status === 'active');
}),
map((value) => {
return value.reduce((sum, user) => sum + user.age, 0); // reduce array into something
}),
map((value) => {
if (value < 18) throw new Error('error'); // to test error
else return value;
}),
);
#
Convert promise to observable
const promise = new Promise((resolve, reject) => {
setTimeout(() => {
resolve('resolved');
}, 1000);
})
const obvsPromise = fromPromise(promise);
Can also convert observable back to promise by calling to promise on it.
#
Operators
#
Creation
#
timer
After given duration, emit numbers in sequence every specified duration.
//emit 0 after 1 second then complete, since no second argument is supplied
const source = timer(1000);
//output: 0
const subscribe = source.subscribe(val => console.log(val));
#
interval
Emit numbers in sequence based on provided timeframe.
//emit value in sequence every 1 second
const source = interval(1000);
//output: 0,1,2,3,4,5....
const subscribe = source.subscribe(val => console.log(val));
#
of
Emit variable amount of values in a sequence and then emits a complete notification.
//emits values of any type
const source = of({ name: 'Brian' }, [1, 2, 3], function hello() {
return 'Hello';
});
//output: {name: 'Brian'}, [1,2,3], function hello() { return 'Hello' }
const subscribe = source.subscribe(val => console.log(val));
#
from
Turn an array, promise, or iterable into an observable.
const arraySource = from([1, 2, 3, 4, 5]);
const subscribe = arraySource.subscribe(val => console.log(val));
#
Transformation
#
map
Apply projection with each value from source.
//emit (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
//add 10 to each value
const example = source.pipe(map(val => val + 10));
#
switchMap
Map to observable, complete previous inner observable, emit values. On each emission the previous inner observable (the result of the function you supplied) is cancelled and the new observable is subscribed.
fromEvent(document, 'click')
.pipe(
// restart counter on every click
switchMap(() => interval(1000))
)
.subscribe(console.log);
#
Filtering
#
filter
Emit values that pass the provided condition.
const example = source.pipe(filter(num => num % 2 === 0));
#
debounceTime
Discard emitted values that take less than the specified time between output
💡 This operator is popular in scenarios such as type-ahead where the rate of user input must be controlled!
this.myFormControl.valueChanges.pipe(
debounceTime(300)
).subscribe(value => {
// handle the value after 300ms of inactivity
});
#
trottleTime
Emit first value then ignore for specified duration
/*
emit the first value, then ignore for 5 seconds. repeat...
*/
const example = source.pipe(throttleTime(5000));
#
scan
Reduce over time.
const source = of(1, 2, 3);
// basic scan example, sum over time starting with zero
const example = source.pipe(scan((acc, curr) => acc + curr, 0));
#
takeUntil
Emit values until provided observable emits.
private destroy$ = new Subject<void>();
observable$
.pipe(takeUntil(this.destroy$))
.subscribe(data => console.log(data));
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
#
takeWhile
Emit values until provided expression is false.
//allow values until value from source is greater than 4, then complete
source$
.pipe(takeWhile(val => val <= 4))
// log: 1,2,3,4
.subscribe(val => console.log(val));
#
first/last
Emit the first/last value or first/last to pass provided expression.
const example = source.pipe(first());
const example = source.pipe(last());
#
Combination
#
zip
After all observables emit, emit values as an array. This operator is ideal when you want to combine values from multiple observables in a pairwise fashion.
//emit every 1s
const source = interval(1000);
//when one observable completes no more values will be emitted
const example = zip(source, source.pipe(take(2)));
//output: [0,0]...[1,1]
const subscribe = example.subscribe(val => console.log(val));
#
forkJoin
When all observables complete, emit the last emitted value from each.
This operator is best used when you have a group of observables and only care about the final emitted value of each.
forkJoin(
// as of RxJS 6.5+ we can use a dictionary of sources
{
google: ajax.getJSON('https://api.github.com/users/google'),
microsoft: ajax.getJSON('https://api.github.com/users/microsoft'),
users: ajax.getJSON('https://api.github.com/users')
}
)
// { google: object, microsoft: object, users: array }
.subscribe(console.log);
#
Error handling
#
catch/catchError
Gracefully handle errors in an observable sequence.
const source = throwError('This is an error!');
const example = source.pipe(catchError(val => of(`I caught: ${val}`)));
#
retry
Retry an observable sequence a specific number of times should an error occur.
const example = source.pipe(
mergeMap(val => {
//throw error for demonstration
if (val > 5) {
return throwError('Error!');
}
return of(val);
}),
//retry 2 times on error
retry(2)
);
#
Utility
#
do/tap
Transparently perform actions or side-effects, such as logging.
const source = of(1, 2, 3, 4, 5);
// transparently log values from source with 'tap'
const example = source.pipe(
tap(val => console.log(`BEFORE MAP: ${val}`)),
map(val => val + 10),
tap(val => console.log(`AFTER MAP: ${val}`))
);