What is the correct way to forkJoin a series of observables or their subscriptions?

I have a substantial list of ordered observables that need to be executed in parallel. Each observable appends its result to a behavior subject upon completion, but I also require a specific function to be invoked when all observables have finished.

These observables are responsible for downloading images (including related metadata) from an API. Speed is crucial, and I must handle each result as it comes in, signaling the end with an empty value after all observables have completed. To accomplish this, they must run concurrently.

The initial implementation lacks a callback for completion:

const requests: Observable[] = getRequests();

requests.forEach(obs => obs.subscribe(res => {
    const currentImages = this.$images.value;
    currentImages.push(res);
    this.$images.next(currentImages);
}));

To add the desired callback functionality, I attempted the following approach:

const requests: Observable[] = getRequests();
const finishedTracker = new Subject<void>();

requests.forEach(obs => obs.subscribe(res => {
    const currentImages = this.$images.value;
    currentImages.push(res);
    this.$images.next(currentImages);
}));

forkJoin(requests).subscribe(() => {
    finishedTracker.next();
    finishedTracker.complete();
    console.log('requests done');
});

While this solution works, it feels cumbersome to separate the forkJoin and request subscriptions. Is there a more efficient way to achieve this outcome? I explored mergeMap but couldn't implement it successfully.

Edit Upon receiving feedback, I realized that subscribing twice results in duplicate requests. Hence, I revised my strategy:

from(requests).pipe(
    mergeMap(o => {
        o.subscribe(res => {
            const currentImages = this.$images.value;
            currentImages.push(res);
            this.$images.next(currentImages);
        }
        return o;
    }, 10)
).subscribe(() => {
    finishedTracker.next();
    console.log('requests done');
})

Choosing not to use the output from forkJoin since it waits for all requests to finish before providing results, I opted for quick processing of individual outcomes due to the numerous, albeit fast, requests.

Edit 2 Here's the final solution I implemented:

from(requests).pipe(
    mergeMap(request => request, 10),
    scan<ImageResponse, ImageResponse[]>((all, current, index) => {
        all = all.concat(current);
        this.$images.next(all);
        return all;
    }, [])
).subscribe({
    complete: () => {
    finishedTracker.next();
    console.log('requests done');
}});

Answer №1

There is no need to subscribe within the mergeMap function. Actually, it leads to a double subscription since mergeMap internally subscribes to the observable returned by the function you provide.

To handle responses as they occur, you can simply use a pipe and include your handling logic inside. If you are performing a side effect (something that does not alter the current stream's output), using the appropriate tap operator:

from(requests).pipe(
    mergeMap(o => o.pipe(
        tap(res => {
            const currentImages = this.$images.value;
            currentImages.push(res);
            this.$images.next(currentImages);
        }),
    }, 10)
).subscribe(() => {
    finishedTracker.next();
    console.log('requests done');
})

While this approach is functional, it seems like you might be over complicating the observable flow. Without knowing the exact use case, it appears that Subjects may not be necessary at all. If your objective is to emit an accumulated array of results during processing, you can achieve this using scan without involving any Subject or BehaviorSubject. Additionally, for specific actions upon completion of all requests, you can utilize a partial Observer, specifying only the complete callback (instead of the implicit next callback with subscribe()):

from(requests).pipe(
    mergeMap(request => request, 10),
    scan((all, current) => all.concat(current), [])
).subscribe({
    complete: () => console.log('requests done')
});

EDIT: @AdrianBrand suggested using merge instead of from/mergeMap:

merge(...requests, 10).pipe(
    scan((all, current) => all.concat(current), [])
).subscribe({
    complete: () => console.log('requests done')
})

Answer №2

How about implementing a straightforward combine latest method?

import { of, Observable, combineLatest, startWith } from 'rxjs';

declare const getRequests: () => Observable<any>[];

const requests: Observable<any>[] = getRequests();


combineLatest(requests.map(r => r.pipe(startWith(null)))).subscribe(result => {

  if (!result.some(r => r === null)) {
    // everything loaded
  } else {
    // not finished
  }
})

Playground

Answer №3

Alright, so if I understand correctly, you're looking to keep track of multiple asynchronous HTTP requests individually and handle them as soon as they complete, while also ensuring that all requests have finished before proceeding. Your current approach with forEach and forkJoin seems suitable, but be cautious as subscribing twice to an HTTP request may result in double server calls, which is not ideal. One alternative could be creating a dedicated Observable array solely for monitoring the completion of all events.

const requests: Observable[] = getRequests();
const finishTracker: Observable[] = [];

requests.forEach(obs => {
    const innerTracker = new Subject();
    finishTracker.push(innerTracker.asObservable());

    obs.subscribe(res => {
        innerTracker.next();
        innerTracker.complete();

        // Handle the response accordingly
        doSomethingWithRes(res);
    });
});

forkJoin(finishTracker).subscribe(res => console.log('all tasks completed'));

NOTE! Keep in mind that calling subscribe within a forEach loop, due to the asynchronous nature of HTTP requests, might cause unexpected behavior where forkJoin either completes prematurely or does not execute at all. To mitigate this issue, consider initializing a separate array of objects before subscribing.

const requests: Observable[] = getRequests();
const additionalArray: any[] = [];
const finishTracker: Observable[] = [];

requests.forEach(obs => {
    const innerTracker = new Subject();
    finishTracker.push(innerTracker.asObservable());
    additionalArray.push({myObs: obs, tracker: innerTracker});
});

forkJoin(finishTracker).subscribe(res => console.log('all tasks completed'));

additionalArray.forEach(myObj => {
    myObj.myObs.subscribe(
        res => {
            myObj.tracker.next();
            myObj.tracker.complete();

            // Handle the response accordingly
            doSomethingWithRes(res);
        },
        err => {
            doSomethingWithErr(err)
        }
    )
});

You can integrate this initialization process into your getRequests() function. While perhaps not the most elegant solution, it should serve its purpose effectively.

Answer №4

Here is a code snippet that demonstrates using an accumulator with an array of requests and responses.

const { of, expand, takeWhile, map, delay } = rxjs;

const requests = Array.from({ length: 10 }, (_, i) => of(`Response ${i + 1}`).pipe(delay(Math.random() * 500)));

const responses$ = of({ requests, responses: [] })
  .pipe(
    expand((acc) =>
      acc.requests[0].pipe(
        map((response) => {
          const [, ...remainingRequests] = acc.requests;
          return { requests: remainingRequests, responses: [...acc.responses, response] };
        })
      )
    ),
    takeWhile((acc) => acc.requests.length, true),
    map((acc) => acc.responses)
  );
  
  
responses$.subscribe((responses) => {
  console.log(responses);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js" integrity="sha512-v0/YVjBcbjLN6scjmmJN+h86koeB7JhY4/2YeyA5l+rTdtKLv0VbDBNJ32rxJpsaW1QGMd1Z16lsLOSGI38Rbg==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Is it possible to capture and generate an AxiosPromise inside a function?

I am looking to make a change in a function that currently returns an AxiosPromise. Here is the existing code: example(){ return api.get(url); } The api.get call returns an object of type AxiosPromise<any>. I would like to modify this function so ...

What is the best way to specify a function parameter as a Function type in TypeScript?

I'm currently delving into the world of TypeScript and I am unsure about how to specify a function parameter as a function type. For instance, in this piece of code, I am passing a setState function through props to a child component. const SelectCity ...

How to toggle code block visibility in Angular's ngOnInit() method

I'm currently analyzing different design patterns to optimize the number of REST calls when implementing a 'save while typing feature'. To provide a general overview, in my ngOnInit() function, I have included the following code snippet (wit ...

Navigating through the nested object values of an Axios request's response can be achieved in React JS by using the proper

I am attempting to extract the category_name from my project_category object within the Axios response of my project. This is a singular record, so I do not need to map through an array, but rather access the entire object stored in my state. Here is an ex ...

Encountering an issue with testing CKEditor in Jest

Testing my project configured with vite (Typescript) and using jest showed an error related to ckeditor. The error is displayed as follows: [![enter image description here][1]][1] The contents of package.json: { "name": "test-project" ...

Finding the size of an array in Typescript by accessing its elements

I am working with a specific data type type Vehicle={ [key: string]: string; }; let allCars:Vehicle = {Ford: "Ka", GM:"Sonic", Audi:"A4"}; Is there a way to determine the length of this object? It seems like using allCars ...

The error message indicates that the property 'current' is not found in the type '[boolean, Dispatch<SetStateAction<boolean>>]'

During my React/Typescript project, I encountered an issue involving cursor animations. While researching the topic, I stumbled upon a CodePen (Animated Cursor React Component) that functioned perfectly. However, when attempting to convert it into a Types ...

Angular 2 file upload encountering CORS issue leading to 401 unauthorized error

Encountered a "401 Unauthorized" error in Chrome and Firefox while attempting to upload files using angular 2 CLI to an apache2-server with a PHP backend. Despite trying three different node modules, the issue persists from the OPTIONS-preflight stage, ...

Removing empty options from a select dropdown in Angular 9

In the process of working with Angular 9, I am currently in the process of constructing a dropdown menu that contains various options. However, I have encountered an issue where there is a blank option displayed when the page initially loads. How can I eli ...

"Encountering a problem during the installation of the Angular

After investing numerous hours, I am still unable to identify the issue with running an angular based project. node version: v12.16.1 I executed npm install -g @angular/[email protected] However, upon entering the command ng build --prod, I enco ...

Leveraging symbols as object key type in TypeScript

I am attempting to create an object with a symbol as the key type, following MDN's guidance: A symbol value may be used as an identifier for object properties [...] However, when trying to use it as the key property type: type obj = { [key: s ...

The Standalone Component does not appear for debugging in webpack:source when utilizing an incompatible version of Node

I have developed two components: https://i.sstatic.net/fSNqa.png However, after running ng serve, I am only able to see one component in the source of the Chrome browser: https://i.sstatic.net/VzdDS.png How can I troubleshoot this standalone component? ...

How can you verify the correctness of imports in Typescript?

Is there a way to ensure the validity and usage of all imports during the build or linting phase in a Typescript based project? validity (checking for paths that lead to non-existent files) usage (detecting any unused imports) We recently encountered an ...

Dealing with a multi-part Response body in Angular

When working with Angular, I encountered an issue where the application was not handling multipart response bodies correctly. It seems that the HttpClient in Angular is unable to parse multipart response bodies accurately, as discussed in this GitHub issue ...

Ensuring that a function in Typescript returns the same data type it was

Is there a way to ensure that a function returns a value with the same type as the argument given to it, while still maintaining broader argument type restrictions than the original value? For instance: I need to trim a string value, but if the value is n ...

Issue with nullable return types in Typescript not being implemented correctly

Upon reviewing this snippet: export interface ICollectionService { get(id: string): Promise<Collection | null>; } const collection = await collectionService.get(collectionAddress); I noticed that the collection variable in my IDE is now displayin ...

TS Intellisense in Visual Studio Code for JavaScript Events and File Types

Whilst attempting a similar action, onDragOver(event: Event): void in Visual Studio Code, an error is thrown by IntelliSense: [ts] Cannot find name 'Event'. The same issue arises when trying something like this: let file: File = new File() ...

Is it possible to specify the timing for executing Typescript decorators?

One issue I've encountered is that when I define a parameterized decorator for a method, the decorator runs before the method itself. Ideally, I'd like the decorator to run after the method has been called. function fooDecorator(value: boolean) ...

Refine the list by filtering out multiple search criteria using a nested array

I am working with an Array of objects that looks like this: data = [{ cities: [ {name: 'TATUÍ', federatedUnit: 'SP'}, {name: 'BOITUVA', federatedUnit: 'SP'}, {name: 'PORTO FELIZ', federatedUnit: ...

Continue running the code after a function has completed its execution in Angular 4

My function uses web3.js to generate a new account and retrieve the account address. Here is my service implementation: @Injectable() export class ContractsService { private web3: any; public acc_no: any; public createAccount(): any { this.we ...