Here is a functional demonstration on stackblitz.
I decided to create a mock class called SushiState
instead of utilizing a store, which returns observables.
class SushiState {
private _gas = new BehaviorSubject(1);
private _starbucks = new BehaviorSubject(1);
public get gas() {
return this._gas.asObservable();
}
public get starbucks() {
return this._gas.asObservable();
}
public increaseSushi(n = 1) {
this._gas.next(this._gas.value + n);
this._starbucks.next(this._starbucks.value + n);
}
public static compareSushi(g: number, s: number): string {
return `gas is ${g}, starbucks is ${s}`;
}
}
Below you can find the code for the component.
export class AppComponent implements OnInit {
state: SushiState;
gasSushi: Observable<number>;
sbSushi: Observable<number>;
combined: string;
combinedTimes = 0;
zipped: string;
zippedTimes = 0;
ngOnInit() {
this.state = new SushiState;
this.gasSushi = this.state.gas;
this.sbSushi = this.state.gas;
const combined = combineLatest(
this.gasSushi,
this.sbSushi,
).pipe(
tap((sushi) => {
console.log('combined', sushi);
this.combinedTimes++;
}),
map((sushi) => SushiState.compareSushi(sushi[0], sushi[1])),
);
combined.subscribe(result => this.combined = result);
const zipped = zip(
this.gasSushi,
this.sbSushi,
).pipe(
tap((sushi) => {
console.log('zipped', sushi);
this.zippedTimes++;
}),
map((sushi) => SushiState.compareSushi(sushi[0], sushi[1])),
);
zipped.subscribe(result => this.zipped = result);
}
increaseSushi() {
this.state.increaseSushi();
}
}
If you run it on stackblitz and monitor the console, you will see the following activity:
https://i.sstatic.net/slFto.png
When using combine latest, we merge the observables individually and only focus on the most recent state, resulting in 2 instances of console.log
.
An alternative approach would be to utilize zip
, which waits for both observables to emit before generating an output. This seems suitable for our "Increase Both" button, however, if the starbucksSushi
gets incremented separately (perhaps from another section of the application), the 'zipped' version will wait for the gas station sushi to also update.
A third solution suggestion involves using combineLatest
to unite the sushi counters, then implementing the debounceTime
operator to delay emitting the output for a set number of milliseconds.
const debounced = zip(
this.gasSushi,
this.sbSushi,
).pipe(
tap((sushi) => {
console.log('debounced', sushi);
this.debouncedTimes++;
}),
map((sushi) => SushiState.compareSushi(sushi[0], sushi[1])),
debounceTime(100),
);
debounced.subscribe(result => this.debounced = result);
This approach reacts to changes in all sources, but not more frequently than once per 100ms
.
The need for first()
was due to...
forkJoin
merges the observables after they complete (which occurs only once, making it unsuitable for continuous streams) and is better suited for tasks resembling promises like HTTP calls or process completions. Additionally, when extracting just one element from a stream, the resultant stream completes after the single emission.
P.S.
It is recommended to utilize the async
pipe for handling observables (as demonstrated with properties such as
gasSushi: Observable<number>;
sbSushi: Observable<number>;
and within the template like so
<div>
<h3>starbucks sushi</h3>
<p>{{sbSushi | async}}</p>
</div>
instead of manually subscribing as in
result => this.zipped = result
In this example, both methods are used for comparison. In my experience, working with observables becomes much simpler once you cease prematurely unsubscribing and let the async
pipe handle subscriptions.
Furthermore, if you do employ subscribe
in your component, ensure to unsubscribe upon component destruction - although this is straightforward, by abstaining from explicit subscriptions and leaving them to the async
pipe, it automatically manages cleanup for us :)