Executing RxJS calls in a sequential manner while allowing inner calls to operate in

I want to achieve a scenario using rxjs where a group of http calls are made with the value returned from a previous call. The inner calls should run in parallel, while still being able to return the value from the first call as soon as it's available. Subscription to the result and error handling are also necessary. Importantly, errors from inner calls should not stop sibling calls.

The current code I have works fine but waits to return the initial value until all inner calls are complete due to using forkJoin:

public create(customer: Customer, groupIds: number[]): Observerable<Customer> {
    const response = this.http.post<Customer>('/customer', customer).pipe(mergeMap(
        (created) => {
            return forkJoin([
                of(created),
                forkJoin(groupIds.map(groupId => {
                    const membership = new Membership(created.Id, groupId);
                    return this.http.post<Membership>('/membership', membership);
                )
            ]);
        }
    )).pipe(map(a => a[0]));

    return response;
}

Is there a way to operate such that the created customer is returned without waiting for inner calls to finish? Could the above code be written more succinctly?

(NB: this.http refers to type HttpClient from Angular, which returns an Observable<T>)

Answer №1

If you are handling a local variable for the client, you can implement something along these lines:

public add(client: Client, projectIds: number[]): Observerable<Client> {
    const response = this.http.post<Client>('/client', client).pipe(
      tap(added => this.client = added), // <---
      mergeMap((added) => {
            return forkJoin([
                of(added),
                forkJoin(projectIds.map(projectId => {
                    const project = new Project(added.Id, projectId);
                    return this.http.post<Project>('/project', project);
                )
            ])
        }
    )).pipe(map(a => a[0]));

    return response;
}

Integrating a tap before the mergeMap assigns the client to a local variable. This allows access to the newly created client before other operations are finished.

I demonstrated this using stackblitz at: https://stackblitz.com/edit/angular-create-project-johndoe

(Please note that it uses different terminology but follows a similar concept.)

Alternatively

You might also consider the following approach:

  private taskAddedSubject = new BehaviorSubject<Task>(null);
  taskAddedAction$ = this.taskAddedSubject.asObservable();

  private projectSubject = new BehaviorSubject<number[]>([5, 10]);
  projectAction$ = this.projectSubject.asObservable();

  task$ = this.taskAddedAction$.pipe(
    mergeMap(task =>
      this.http.post<Task>(this.taskUrl, task, { headers: this.headers })
    )
  );

  projects$ = combineLatest([this.task$, this.projectAction$]).pipe(
    switchMap(([added, projectIds]) =>
      forkJoin(
        projectIds.map(projectId => {
          const project = {
            userId: added.userId,
            title: "Project:" + projectId,
            description: added.title
          } as Project;
          return this.http
            .post<Project>(this.projectUrl, project, { headers: this.headers })
            .pipe(tap(project => console.log(project)));
        })
      )
    )
  );

This setup defines two distinct streams. task$ mirrors your client and becomes instantly accessible after being posted.

projects$ is akin to your projects.

A respective stackblitz for this style can be found here: https://stackblitz.com/edit/angular-create-declarative-project-johndoe

Answer №2

If the parallel calls do not require the returned values:

public createCustomer(customer: Customer, groupIds: number[]): Observable<Customer> {
  const response = this.http.post <Customer>('/customer', customer)
    .pipe(
      tap(() => groupIds.map(groupId => {
          const membership = new Membership(created.Id, groupId);
          return this.http.post <Membership>('/membership', membership).subscribe();
        }))
      }
    ));

  return response;
}

If you need to use the data, consider using startWith with forkJoin:

public createCustomer(customer: Customer, groupIds: number[]): Observable<Customer> {
    const response = this.http.post<Customer>('/customer', customer).pipe(mergeMap(
        (created) => {
            return forkJoin(groupIds.map(groupId => {
                    const membership = new Membership(created.Id, groupId);
                    return this.http.post<Membership>('/membership', membership);
            ).pipe(startWith(created);
        }
    ));

    return response;
}

Answer №3

It appears that you are looking to create an Observable that:

  1. Immediately emits the customer upon fetching it, but also returns the value from the initial call as soon as possible.
  2. Makes several parallel HTTP calls using the customer data to save memberships, emitting once all memberships have been saved.

If I understand correctly, here is how I would approach this:

First, a function can be created to handle the parallel calls using `forkJoin`:

function createMemberships(customerId: string, groupIds: number[]) {
  return forkJoin(groupIds.map(gId => 
    createMembershipHttpSimulation(customerId, gId))
  )
}

Then, another function can be created to return the desired Observable:

function create(customer: Customer, groupIds: number[]) {
  const createCustomerObs = createCustomerHttpSimulation(myCustomer).pipe(
      shareReplay(1)
  )
  const createMembershipsObs = createCustomerObs.pipe(
    concatMap(customer => createMemberships(customer.id, groupIds))
  )
  return concat(createCustomerObs, createMembershipsObs)
}

This function first creates an Observable for creating the Customer and ensures it is only called once using `shareReplay(1)`. Then, it sets up another Observable for creating memberships in parallel with the use of `concatMap` and `forkJoin`, followed by concatenating both Observables. This means the result will emit when the Customer is created and then again when all memberships are saved in parallel. Using `shareReplay` helps prevent redundant API calls.

You can view an implementation of this approach on StackBlitz, where delays demonstrate the sequential emission of Customer and Memberships.

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

The error message "global.HermesInternal - Property 'HermesInternal' is not found in the type 'Global & typeof globalThis'" appeared

I ran the auto-generated code below: $ react-native init RepeatAloud App.tsx /** * Sample React Native App * https://github.com/facebook/react-native * * @format * @flow strict-local * ...

When users install my npm module, they are not seeing text prediction (intellisense) in their editors

I'm currently focused on developing my package @xatsuuc/startonomy. I'm encountering an issue where Intellisense is not working properly when the package is installed on the user's side. Even though the .d.ts files are visible in node_modul ...

Issue with unresolved module in ESLint

Currently, I am utilizing the vss-web-extension-sdk in my project. To ensure the validity of my files, I have integrated ESLint along with eslint-plugin-import and eslint-import-resolver-typescript. import { WidgetSettings, WidgetStatus } from "TFS/Dashbo ...

The automatic filtering feature does not kick in when the sorting is changed

I've been working on an app that features a video database, allowing users to filter videos by category and sort them by rating. https://i.sstatic.net/cESZT.png Currently, the filtering system works fine once the options are changed. However, there ...

Struggling with importing aliases in TypeScript for shadcn-ui library

I am facing a challenge with resolving TypeScript path aliases in my project. I have set up the tsconfig.json file to include path aliases using the "baseUrl" and "paths" configurations, but alias imports are not functioning as intended. My goal is to imp ...

Issue: The observer's callback function is not being triggered when utilizing the rxjs interval

Here is a method that I am using: export class PeriodicData { public checkForSthPeriodically(): Subscription { return Observable.interval(10000) .subscribe(() => { console.log('I AM CHECKING'); this.getData(); }); } ...

What is the reason behind TypeScript treating numbers as strings when adding them together?

Although TypeScript is strongly typed, can you explain why the code below outputs 12 instead of 3? function add_numbers(a: number, b: number){ return a + b; } var a = '1'; var b = 2; var result = add_numbers(<number><any>a, b) ...

"Error encountered: Array is undefined when using the map and subscribe functions in Ionic

I have developed a service that is supposed to retrieve data from a JSON file and assign it to an array called 'countries', which will be used throughout the application on multiple pages. However, when I call the method getCountries, the countri ...

Ways to customize the OverridableComponent interface within Material-UI

Is there a way to effectively use the Container component with styled-components incorporating ContainerProps, while still being able to pass the component prop that belongs to the OverridableComponent interface? Currently, I am encountering an error when ...

Stop Mat-chip from automatically inserting a row upon selection

I am working on preventing the automatic addition of a row by the mat-chip module after a single chip has been selected. Even though the max chip count is set to 1, the input remains enabled and adds a new row beneath it as if the user can still type more ...

Angular - postpone function execution until Subject has completed its operation

In my code, there is a function that stops a running process using a specified processId. Before this function is executed, there is a single if statement that checks if a valid processId exists, and if so, it calls the cancel() function. if (this.process ...

Encountering an error message about 'resolving symbol values statically' while building an Angular 2 project

Currently, I am utilizing an older module called angular-2-local-storage. The initialization process is as follows: const LOCAL_STORAGE_SERVICE_CONFIG_TOKEN: string = 'LOCAL_STORAGE_SERVICE_CONFIG'; export const LOCAL_STORAGE_SERVICE_CONFIG = ne ...

I'm having trouble getting my Node.js and TypeScript project to run because I keep encountering the error message ".ts is recognized as an unknown file extension."

I encountered the following error message: TypeError [ERR_UNKNOWN_FILE_EXTENSION]: Unknown file extension ".ts" This issue arose after inserting "type": "module" into the package.json package.json { "name": &qu ...

Function modifies global variable

What could be causing the global variable to change when using the function write_ACK_ONLY()? I'm passing the array rxUartBuffer to write_ACK_ONLY() as data = new Array(20), but upon checking the Log Output, it seems that the function is also modifyin ...

Difficulty displaying API information on a web browser with react.js

I am currently working on developing a trivia game using React.js Typescript and The Trivia API. I have been successfully passing data between components with useContext and navigating through components using react-router-dom. However, I encountered an is ...

The Express API controller is unexpectedly receiving empty strings

I am encountering an issue where my API is receiving an empty string instead of the expected data when I send post requests with a single string in the body. Below are the client, server, and controller components involved: Function call (client): const ...

Unpacking the information in React

My goal is to destructure coinsData so I can access the id globally and iterate through the data elsewhere. However, I am facing an issue with TypeScript on exporting CoinProvider: Type '({ children }: { children?: ReactNode; }) => void' is no ...

Broaden the scope of the generic interface utilized within the package

Wanting to incorporate automatic behaviors in RTK Query, I decided to implement debounced mutations and handle optimistic updates before the actual mutation request is made. The implementation has been successful so far. However, I am now focusing on gett ...

Strange activities observed during the management of state in react hooks, where the splice() function ends up eliminating the

My current setup involves maintaining a state to handle the addition of new JSX elements: const [display, setDisplay] = useState<IDisplay>({ BookingFormDropDown: [], } ); I have a function in onClick() which adds an elem ...

Is there a way to determine the tally of users who have achieved a score greater than that of [userID]?

I have this SQL code that helps to determine the position of a specific entry based on its score compared to other entries. select (select count(*) from entries e2 where e1.score < e2.score) + 1 as pos from entries e1 where e1.ID = 36; Transla ...