Wind down with RxJS - Distribute the boundless flow generated by the 'expand' method

I have a paginated external resource stored in a web service. My goal is to transform that paginated resource into a continuous flow of values, allowing the client to determine how many elements to utilize. Essentially, the client should be unaware that the original resource is paginated.

Here is the code I have developed so far:

import { from, Observable, of } from 'rxjs';
import { expand, mergeMap } from 'rxjs/operators';

interface Result {
  page: number;
  items: number[];
}

// Simulating an actual HTTP request
function fetchPage(page: number = 0): Observable<Result> {
  let items = [...Array(3).keys()].map((e) => e + 3 * page);
  console.log('Fetching page ' + page);
  return of({ page, items });
}

// Converting a paginated request into an infinite stream of numbers
function createInfiniteStream(): Observable<number> {
  return fetchPage().pipe(
    expand((res) => fetchPage(res.page + 1)),
    mergeMap((res) => from(res.items))
  );
}

const infinite$ = createInfiniteStream();

This approach functions effectively, generating an endless lazy stream of numbers. The client can simply use infinite$.pipe(take(n)) to retrieve the first n elements without knowledge of the underlying pagination.

Now, my objective is to share these values when multiple subscribers are involved:

infinite$.pipe(take(10)).subscribe((v) => console.log('[1] received ', v));

// Assume new subscribers will come later
setTimeout(() => {
  infinite$.pipe(take(5)).subscribe((v) => console.log('[2] received ', v));
}, 1000);

setTimeout(() => {
  infinite$.pipe(take(4)).subscribe((v) => console.log('[3] received ', v));
}, 1500);

In this scenario, we anticipate having numerous subscribers to the infinite stream and aim to reuse previously emitted values to minimize the necessity for additional fetchPage calls. For example, once one client requests 10 items (take(10)), any subsequent clients asking for fewer than 10 items (e.g., 5 items) should not trigger additional fetchPage calls since those items have already been emitted.


I attempted the following solutions but did not achieve the desired outcome:

const infinite$ = createInfiniteStream().pipe(share())

This method fails as late subscribers lead to multiple 'fetchPage' calls.

const infinite$ = createInfiniteStream().pipe(shareReplay())

This option demands all stream values, even if they are unnecessary (i.e., no client has requested all items yet).


If anyone has any suggestions, it would be greatly appreciated. To experiment with the code, visit: https://stackblitz.com/edit/n4ywfw

Answer №1

It appears that we need to establish a method for storing certain state information, such as the last page number read and the fetched items, and ensuring that this state is preserved when new subscriptions are received.

One potential approach could involve utilizing closures in the following manner.

To begin with, you can create an infiniteStreamFactoryGenerator function that generates a function holding some state - specifically the lastPage and the itemsRead, using closures. This state is then maintained and updated within the function returned by the infiniteStreamFactoryGenerator.

function infiniteStreamFactoryGenerator() {
  let lastPage = 0;
  let itemsRead = [];
  return () => {
    // If there are previously read items, they are returned first before initiating
    // the infinite stream
    return concat(
      from(itemsRead),
      fetchPage(lastPage).pipe(
        expand((res) => {
          return res.page < 10 ? fetchPage(res.page + 1) : EMPTY;
        }),
        tap((res) => {
          // State update occurs here
          lastPage++;
          itemsRead = [...itemsRead, ...res.items];
        }),
        mergeMap((res) => from(res.items))
      )
    );
  };
}

We then call the infiniteStreamFactoryGenerator to generate the actual factory function, and use this function to instantiate different streams, like so:

const infiniteStreamFactory$ = infiniteStreamFactoryGenerator();

infiniteStreamFactory$()
  .pipe(take(10))
  .subscribe((v) => console.log("[1] got ", v));

// Assuming subsequent subscribers will join later
setTimeout(() => {
  infiniteStreamFactory$()
    .pipe(take(5))
    .subscribe((v) => console.log("[2] got ", v));
}, 1000);

setTimeout(() => {
  infiniteStreamFactory$()
    .pipe(take(4))
    .subscribe((v) => console.log("[3] got ", v));
}, 1500);

setTimeout(() => {
  infiniteStreamFactory$()
    .pipe(take(20))
    .subscribe((v) => console.log("[4] got ", v));
}, 2000);

In addition, a fourth subscriber that necessitates loading extra pages has been included.

Based on the demonstration provided in this stackblitz, it seems that this solution is effective.

To be honest, the concept of creating a factory generator function gives me the impression that there might be a simpler way to address this intriguing issue, though I have not yet discovered it.

Answer №2

If you want the ahing of the http request, it is necessary to move shareReplay within your fetchPage method in order for it to function correctly

return fetchPage().pipe(
 expand((res) => fetchPage(res.page + 1)),
 mergeMap((res) => from(res.items)),
 shareReplay()
);

Check out the updated code here

Here is another example that has been revised for you, demonstrating that the call was only made once

View the new example here

Answer №3

In order to arrive at a solution, I decided to create a stateful Observable using closures. Essentially, I crafted a custom version of the share functionality that is integrated directly into the Observable.

The only modification made was to the mkInfiniteStream function compared to the original code:

function mkInfiniteStream(): Observable<number> {
  let lastPage = 0;
  let itemsRead = [];
  return defer(() =>
    from(itemsRead).pipe(
      concatWith(
        fetchPage(lastPage).pipe(
          expand((res) => fetchPage(res.page + 1)),
          tap((res) => {
            lastPage++;
            itemsRead = [...itemsRead, ...res.items];
          }),
          mergeMap((res) => from(res.items))
        )
      )
    )
  );
}

// The usage remains unchanged:
const infinite$ = mkInfiniteStream();

The concept behind this approach is to internally keep tabs on all items and the most recent page that has been fetched. Whenever there is a request for an item that hasn't been fetched yet, the Observable expands by fetching items from the subsequent page. This action updates the list of available items (

itemsRead = itemsRead = [...itemsRead, ...res.items]
), enabling new subscribers to utilize previously fetched items stored in itemsRead before triggering further expansion of the Observable.

Applying share doesn't have any effect on this particular Observable as the data is already inherently shared within the implementation. It is not recommended to use shareReplay here since it would compel the stream to be infinite!

I am hopeful for a more "pure" implementation, so if you happen to devise one, please do share!


A special thanks goes out to @Picci for the initial concept.

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Using TypeScript to utilize an enum that has been declared in a separate file

Imagine I have defined an enum in one file (test1.ts): export enum Colors{ red=1, blue=2, green=3 } Then in another file (test2.ts), I am creating a class with a method. One of the parameters for that method is a Color from the Colors enum: ...

The safeguarding of Angular2 and Laravel against CSRF vulnerabilities

I have come across some interesting articles that I've read recently. The issue I am facing revolves around this particular section of code: <meta property="csrf-token" name="csrf-token" content="{{ csrf_token() }}"> I am utilizing Angular2 a ...

Having trouble connecting my chosen color from the color picker

Currently, I am working on an angularJS typescript application where I am trying to retrieve a color from a color picker. While I am successfully obtaining the value from the color picker, I am facing difficulty in binding this color as a background to my ...

Tips for properly waiting for an AngularFire2 subscription to complete before executing the subsequent lines of code within Angular2

Having an issue with my Angular2 Type Script code. The goal is to access Questions from a Firebase Database by subscribing to a FirebaseListObserver: this.af.list('/questions').subscribe( val => { this.questions = val console.log(th ...

Deceleration of an Angular 5 application

In an attempt to improve the performance of an Angular 5 application, I am currently profiling it to identify the cause of its slow loading when dealing with certain objects. To provide more context, the application displays a grid where each cell contains ...

Exploring the contrast between string enums and string literal types in TypeScript

If I want to restrict the content of myKey in an object like { myKey: '' } to only include either foo, bar, or baz, there are two possible approaches. // Using a String Literal Type type MyKeyType = 'foo' | 'bar' | &ap ...

What specific category does the enum object fall under?

How can I create a wrapper class for a collection of elements in an enumeration? export class Flags<ENUMERATION> { items = new Set<ENUMERATION>(); enu; // what type ? constructor(enu) { // what type ? ...

Combining Angular 6-powered web pages with an existing ASP.NET legacy web portal

Currently, I have a legacy Web Portal built on .NET 4.0 using plain ASP.Net. With new requirements surfacing, I am considering incorporating Angular 6 into the mix. However, the thought of completely rewriting the existing .NET app to Angular 6 seems daunt ...

Having trouble with uploading images on Angular 6 platform

Utilizing 'ngx-image-cropper' for image cropping and sending the base64 value of the image to a server has been causing occasional null value issues. Despite implementing 'DOMSanitizer' in Angular to upload and securely mark images, the ...

Can class properties be automatically generated based on its generic type?

Is it possible in TypeScript to automatically create a class with properties based on its generic type? For example: class CustomClass<T> { // Code to extract properties from T should go here } interface Characteristics { quality: string; pri ...

Initiate and terminate server using supertest

I've developed a server class that looks like this: import express, { Request, Response } from 'express'; export default class Server { server: any; exp: any; constructor() { this.exp = express(); this.exp.get('/' ...

Is there a way to view Deno's transpiled JavaScript code while coding in TypeScript?

As I dive into Typescript with Deno, I am curious about how to view the JavaScript result. Are there any command line options that I may have overlooked in the documentation? P.S. I understand that Deno does not require a compilation step, but ultimately ...

Is it necessary for Eslint to require props fields to be read-only in React components?

Consider the code snippet below: interface MyProps { label: string; } function MyComponent(props: MyProps) { return ( <p> {props.label} </p> ); } Surprisingly, neither Eslint nor the TypeScript compiler will throw a warni ...

Hiding Clear button in Autocomplete to display only text

Exploring react-virtualization and Autocomplete features here. I currently have a list set up to display multiple texts when a checkbox is selected. Here's the code snippet: https://codesandbox.io/s/material-demo-forked-1qzd3?file=/demo.tsx The goal ...

Access Select without needing to click on the child component

I am curious to learn how to open a Select from blueprint without relying on the click method of the child component used for rendering the select. <UserSelect items={allUsers} popoverProps={{ minimal: false }} noResults={<MenuItem disabled={ ...

determine the values of objects based on their corresponding keys

Still on the hunt for a solution to this, but haven't found an exact match yet. I've been grappling with the following code snippet: interface RowData { firstName: string; lastName: string; age: number; participate: boolean; } c ...

When the value is empty, MUI Autocomplete will highlight all items

I have encountered a specific issue. I am working on developing a custom Autocomplete Component for filtering purposes. However, I recently came across the following Warning. MUI: The value provided to Autocomplete is invalid. None of the options matc ...

What is the best way to adjust the border-radius of a mat-form-field in Angular?

<mat-form-field class="new-inputs" style="border-radius: 10px;"> <mat-label>Ime</mat-label> <input matInput type="text" maxlength="15" id="firstName" fo ...

Mastering the TypeScript syntax for executing the MongoDB find method

Having trouble properly typing the find method of MongoDB in my TypeScript project that involves MongoDB. Here's the snippet I'm working on: import { ReitsType } from '@/app/api/types/reits'; import { NextRequest, NextResponse } from &a ...

The Expo React Native project encountered an Android build failure

I have a project that was created using expo 49 and typescript 5.1.3, and now I need to prepare it for submission to Google Play. To achieve this, I followed a series of steps in the VS Code terminal: 1) Ran npm install -g eas-cli 2) Executed eas login ...