Is there a way to watch multiple observables and execute a function whenever any of them change? I am looking for a solution similar to the functionality of zip, but without requiring every observable to update its value. Also, forkJoin isn't suitable as it only triggers when all observed observables have fired.
For example, I would like the subscribe function to be triggered whenever the value of any observable one$
, two$
, or three$
changes, providing the current state of the Observable. Currently, I am using BehavourSubject's so that the value can be accessed using BehaviourSubject.getValue()
.
magicCombinator(
one$,
two$,
three$,
).subscribe(([ one, two, three ]) => {
console.log({ one, two, three });
})
Does a combinator with this functionality already exist?
Here is a link to a stackblitz containing example code, in case you need it.
The following code currently works by combining the observables into a single stream and caching the results of each stream using BehavourSubjects.
const magicCombinator = <T = any>(...observables: Observable<T>[]) =>
new Observable((subscriber: Subscriber<T>) => {
// convert the given Observables into BehaviourSubjects so we can synchronously get the values
const cache = observables.map(toBehavourSubject);
// combine all the observables into a single stream
merge(...observables)
// map the stream output to the values of the BehaviourSubjects
.pipe(map(() => cache.map(item => item.getValue())))
// trigger the combinator Observable
.subscribe(subscriber);
});