Currently, I am working on a feature in my Angular application that requires handling multiple concurrent fetches for the same data with only one HTTP request being made. This request should be shared among all the subscribers who are requesting the data simultaneously. The initial subscriber should trigger the request immediately, and while that request is still pending, any new subscribers should also subscribe to the same ongoing request. Unlike typical caching mechanisms, once a request is completed, subsequent fetches will trigger a fresh HTTP request for updated data, bypassing the cached content. The caching process is managed separately within the application.
The current implementation I have is as follows (this is a simplified version):
class DataLoader<T> {
private readonly requestInitiator = new Subject<void>();
private readonly requests$: Observable<T>;
constructor(requestCreator: () => Observable<T>) {
this.requests$ = this.requestInitiator
.pipe(
mergeMap(requestCreator, 1), // concurrency set to 1
share(),
);
}
load(): Observable<T> {
return new Observable<T>(observer => {
this.requests$.subscribe(observer);
this.requestInitiator.next();
});
}
}
This loader can be incorporated into an Angular singleton service like so:
@Injectable({ providedIn: 'root' })
export class SomeService {
private readonly loader = new DataLoader<Something>(
() => this.getHttp().get('/api/something'),
);
getSomething(): Observable<Something> {
if (this.cache.isSomethingCached()) {
return this.cache.getSomething();
}
return this.loader.load()
.pipe(/* Implement caching if necessary */);
}
}
When multiple components invoke SomeService.getSomething()
concurrently, they will all share the same HTTP request seamlessly. While this setup seems to be functioning correctly, I am puzzled by the inner workings of it. Specifically, I am unsure why mergeMap()
behaves as it does in this context. Initially, I tried using exhaustMap()
, which did work most of the time but failed under certain conditions, causing some observers to become stuck without emitting any values or completing. It seems that in this scenario, mergeMap()
with a concurrency setting of 1 behaves more like concatMap()
, triggering just one HTTP request for each new observer. However, new requests are only sent if there isn't already an outstanding request. This behavior aligns perfectly with my requirements.
It appears that the addition of share()
somehow alters the observable's behavior in a way that eludes me. Can someone shed light on why mergeMap()
works with a concurrency value of 1 in this situation, while exhaustMap()
does not?