When using forkJoin
, it's important to note that you should pass in Observables
instead of Subscriptions
. Calling subscribe
on an Observable
signifies the completion of the pipeline, resulting in a cancellation primitive (i.e., Subscription) being returned.
The subscribe
block should only be used once at the end. If you need to perform intermediate processing in forkJoin
, remember that it accepts an optional selector argument as well.
let observables:any[] = [];
observables.push(this.getValue1());
observables.push(this.getValue2());
Observable.forkJoin(observables, ([value1, value2]) => {
//Do something with value1 and value2
return [value1, value2];
}).subscribe(
data => {
console.log('initialized');
},
err => console.error(err)
);
Edit
Since forkJoin
waits for all Observables
to complete, you must ensure the sources are completed (and handle any errors). Therefore, your getValue1
function should look like this:
getValue1(): any {
return new Observable(observer => {
this.asyncFunc(arg1, (err, value1)=> {
if (err)
return observer.error(err);
observer.next(value1);
observer.complete();
}
}
}
An alternative approach is to use the bindNodeCallback
operator.
This would simplify the code:
this.getValue1 = Observable.bindNodeCallback(this.asyncFunc);
this.getValue2 = Observable.bindNodeCallback(this.asyncFunc);
Allowing you to call these functions uniformly:
let observables:any[] = [this.getValue1(), this.getValue2()];
To update progress on completion, you can utilize a secondary stream when using bindNodeCallback
.
// Incorporate length into the stream
Observable.of(observables.length)
.flatMap(len => Observable.merge(observables),
(len, _, inner, outer) => outer / len * 100)
.subscribe(percentCompleted => /*Update progress bar*/);
If opting not to use bindNodeCallback
, you'd need to do more work since each stream is cold. You can make them ConnectableObservables to mitigate this issue.
// Convert to ConnectableObservables
let observables:any[] = [this.getValue1().publishLast(), this.getValue2().publishLast()];
// Set up additional streams
Observable.of(...);
Observable.forkJoin(...);
// Activate Observables
observables.forEach(c => c.connect());