I have a substantial list of ordered observables that need to be executed in parallel. Each observable appends its result to a behavior subject upon completion, but I also require a specific function to be invoked when all observables have finished.
These observables are responsible for downloading images (including related metadata) from an API. Speed is crucial, and I must handle each result as it comes in, signaling the end with an empty value after all observables have completed. To accomplish this, they must run concurrently.
The initial implementation lacks a callback for completion:
const requests: Observable[] = getRequests();
requests.forEach(obs => obs.subscribe(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}));
To add the desired callback functionality, I attempted the following approach:
const requests: Observable[] = getRequests();
const finishedTracker = new Subject<void>();
requests.forEach(obs => obs.subscribe(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}));
forkJoin(requests).subscribe(() => {
finishedTracker.next();
finishedTracker.complete();
console.log('requests done');
});
While this solution works, it feels cumbersome to separate the forkJoin and request subscriptions. Is there a more efficient way to achieve this outcome? I explored mergeMap but couldn't implement it successfully.
Edit Upon receiving feedback, I realized that subscribing twice results in duplicate requests. Hence, I revised my strategy:
from(requests).pipe(
mergeMap(o => {
o.subscribe(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}
return o;
}, 10)
).subscribe(() => {
finishedTracker.next();
console.log('requests done');
})
Choosing not to use the output from forkJoin
since it waits for all requests to finish before providing results, I opted for quick processing of individual outcomes due to the numerous, albeit fast, requests.
Edit 2 Here's the final solution I implemented:
from(requests).pipe(
mergeMap(request => request, 10),
scan<ImageResponse, ImageResponse[]>((all, current, index) => {
all = all.concat(current);
this.$images.next(all);
return all;
}, [])
).subscribe({
complete: () => {
finishedTracker.next();
console.log('requests done');
}});