I have a paginated external resource stored in a web service. My goal is to transform that paginated resource into a continuous flow of values, allowing the client to determine how many elements to utilize. Essentially, the client should be unaware that the original resource is paginated.
Here is the code I have developed so far:
import { from, Observable, of } from 'rxjs';
import { expand, mergeMap } from 'rxjs/operators';
interface Result {
page: number;
items: number[];
}
// Simulating an actual HTTP request
function fetchPage(page: number = 0): Observable<Result> {
let items = [...Array(3).keys()].map((e) => e + 3 * page);
console.log('Fetching page ' + page);
return of({ page, items });
}
// Converting a paginated request into an infinite stream of numbers
function createInfiniteStream(): Observable<number> {
return fetchPage().pipe(
expand((res) => fetchPage(res.page + 1)),
mergeMap((res) => from(res.items))
);
}
const infinite$ = createInfiniteStream();
This approach functions effectively, generating an endless lazy stream of numbers. The client can simply use infinite$.pipe(take(n))
to retrieve the first n
elements without knowledge of the underlying pagination.
Now, my objective is to share these values when multiple subscribers are involved:
infinite$.pipe(take(10)).subscribe((v) => console.log('[1] received ', v));
// Assume new subscribers will come later
setTimeout(() => {
infinite$.pipe(take(5)).subscribe((v) => console.log('[2] received ', v));
}, 1000);
setTimeout(() => {
infinite$.pipe(take(4)).subscribe((v) => console.log('[3] received ', v));
}, 1500);
In this scenario, we anticipate having numerous subscribers to the infinite stream and aim to reuse previously emitted values to minimize the necessity for additional fetchPage
calls. For example, once one client requests 10 items (take(10)
), any subsequent clients asking for fewer than 10 items (e.g., 5 items) should not trigger additional fetchPage
calls since those items have already been emitted.
I attempted the following solutions but did not achieve the desired outcome:
const infinite$ = createInfiniteStream().pipe(share())
This method fails as late subscribers lead to multiple 'fetchPage' calls.
const infinite$ = createInfiniteStream().pipe(shareReplay())
This option demands all stream values, even if they are unnecessary (i.e., no client has requested all items yet).
If anyone has any suggestions, it would be greatly appreciated. To experiment with the code, visit: https://stackblitz.com/edit/n4ywfw