ConcatMap in RxJS processes only the last item in the queue

Currently, I am implementing NGRX with RXJS. Within the effects layer, I am utilizing a concatMap to organize my requests in a queue fashion. However, once the latest request is finished, I aim to execute the most recent item added to the queue instead of processing all the remaining ones simultaneously. Is this scenario achievable?

I have experimented with mergeMap, switchMap, and other methods, but my requirement is to run the requests sequentially rather than concurrently. This specific necessity led me to stick to using concatMap (or any similar alternative).

    updateThing$ = createEffect(() =>
    this.actions$.pipe(
        ofType(updateThingRequest),
        concatMap((action) => {

            return this.myService
            //After finishing this request, I want to execute only the last pending request from the concatMap queue,
            //discarding all other pending requests.
                .update(action)
                .pipe(
                    map(() => {

                        return someActionComplete();
                    }),
                    catchError((err) => {
                        return of(
                            someActionError()
                        );
                    })
                );
        })
    )
);

Answer №1

If you're facing backpressure issues, the best strategy to use is the "latest" backpressure strategy.

RXJS may not be the most efficient solution for handling backpressure - while in RxJava it's just one line of code, in RXJS it's a bit more complicated. In versions after 4.x of RXJS, there isn't built-in support for backpressure, but you can mimic it using certain operators.

For more information, check out: https://reactivex.io/documentation/operators/backpressure.html#collapseRxJS

In your case, creating a custom operator might be a good approach...

P.S. I was able to come up with a solution without using a custom operator, though it's quite complex. If you're interested, I'll share the code here.

EDIT - RXJS Solution without Custom Operator

This code involves using two streams - one for requests and another for triggering new requests:

const requests = interval(200).pipe(take(20));
const requestCompletedStream = new BehaviorSubject(true);

combineLatest(requests, requestCompletedStream)
  .pipe(
    tap(([requestNumber]) =>
      // This will print twice for executed requests
      console.log(`Request ${requestNumber} has been processed`)
    ),
    filter(([_, requestComplete]) => requestComplete), 
    map(([requestNumber]) => requestNumber),
    distinctUntilChanged(), 
    concatMap((requestNumber) => {
      console.log(`Handling async request for: ${requestNumber}`);
      requestCompletedStream.next(false);

      return of(1).pipe(
        delay(2000), 
        tap(() => {
          requestCompletedStream.next(true);
        }),
        map(() => requestNumber)
      );
    })
  )
  .subscribe((val) =>
    console.log(`Client received completed request ${val}`)
  );

You can test it here: https://stackblitz.com/edit/rxjs-backpressure-last-strategy?devtoolsheight=100&file=index.ts

The example sends 20 requests with a 200ms delay between them. Each request takes 2000ms to handle, so only requests 0, 10, and 19 are successfully processed and sent to the subscriber.

You can see the process in the console output below:

Request 0 has been processed
Handling async request for: 0
Request 0 has been processed
Request 1 has been processed
Request 2 has been processed
...
Client received completed request 0
Handling async request for: 9
Request 10 has been processed
...
Client received completed request 10
Handling async request for: 19
Request 19 has been processed
Request 19 has been processed
Client received completed request 19

Answer №3

Perhaps considering using the finalize operator could be a potential solution, although I cannot guarantee its correctness. Alternatively, you may want to explore the appropriate operator using therxjs decision tree.

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Type-constrained generic key access for enhanced security

While attempting to create a versatile function that retrieves the value of a boolean property using a type-safe approach, I encountered an issue with the compiler not recognizing the type of my value. type KeyOfType<T, V> = keyof { [P in keyof T a ...

selectize.js typescript: Unable to access values of an undefined object (reading '0')

I've been working on incorporating selectize.js into my project using webpack and typescript. After installing selectize.js and the necessary types, I added the following to my code: yarn add @selectize/selectize yarn add @types/select2 Within my c ...

Encountering error code TS1003 while trying to access object properties using bracket notation in Typescript

How can object property be referenced using bracket notation in TypeScript? In traditional JavaScript, it can be done like this: getValue(object, key) { return object[key]; } By calling getValue({someKey: 1}, "someKey"), the function will return 1. H ...

Delete the padding of a component with the power of angular

I'm looking to eliminate the margin applied to a particular element that is being created by Ionic. The specific element in question is located within the page-table-view, which is a subpage of page-board. https://i.stack.imgur.com/KPLDQ.png This is ...

Using sl-vue-tree with vue-cli3.1 on internet explorer 11

Hello, I am a Japanese individual and my proficiency in English is lacking, so please bear with me. Currently, I am using vue-cli3.1 and I am looking to incorporate the sl-vue-tree module into my project for compatibility with ie11. The documentation menti ...

Exploring the power of Typescript and Map in Node.js applications

I am feeling a little perplexed about implementing Map in my nodejs project. In order to utilize Map, I need to change the compile target to ES6. However, doing so results in outputted js files that contain ES6 imports which causes issues with node. Is t ...

Error: The utilization of the life cycle interface mandates the implementation of type checking

Currently, I am in the process of translating my typescript project to Webpack 2. While one project transitioned smoothly, I encountered an error with the other project... Error: use-life-cycle-interface necessitates type checking After conducting a br ...

Creating dynamic and engaging videos using Angular with the ability to make multiple requests

I am facing an issue while working with videos in Angular. I am fetching the video URLs from an API to embed them in my application using the sanitazer.bypassSecurityTrustResourceUrl function provided by Angular. The videos are being displayed correctly wi ...

Angular CLI produced the Git command

After starting a project with the Angular CLI, I know it should create a git for me. I typed the following commands in my project directory: git add . git commit -m "some message" Now I want to push. Where do I push this to? Or where is the GitHub r ...

Retrieve the total number of hours within a designated time frame that falls within a different time frame

Having a difficult time with this, let me present you with a scenario: A waiter at a restaurant earns $15/hour, but between 9:00 PM and 2:30 AM, he gets paid an additional $3/hour. I have the 'start' and 'end' of the shift as Date obje ...

Automating the process of rewirting git commit messages on a branch using Git

Is there a way to automate the rewriting of commit messages containing a specific substring on a particular branch? For example, in a repository like this: https://i.sstatic.net/3e4bW.png I want to modify all commit messages on the mybranch branch (not o ...

Navigating to a different page in Ionic 2 when a link or button is clicked

When I click on the link provided below, it should take me to the home page. However, I am facing issues and it is not redirecting me as expected. I want to be taken to the home page when clicking on the specified link or button. Do I need to add any Ang ...

React: Issue with input values not correctly updating across multiple fields when changing state toggles

I am working on a React component that needs to update input values for multiple players independently. However, I am facing an issue where toggling a state causes the first input's value to incorrectly propagate to all other inputs. Additionally, cle ...

Are push notifications supported in Ionic3?

I've been struggling to set up push notifications in my Ionic3 app for the past couple of days, and no matter what I try, it doesn't seem to work due to the current versions I'm using. Here are my current versions: rxjs: 5.5.11 Angular: 5 ...

Challenges transitioning syntax from Firebase 4 to Angularfire2 in Angular 4

Currently, I'm in the process of updating my Angular 2.3.1 and Firebase 2.x.x project to the newest version. However, I'm encountering difficulties with syntax and imports. I've been exploring resources like https://github.com/angular/angula ...

What could be causing the observable collection to display the correct number of objects, yet have them all appear empty?

I am offering the following service @Injectable() export class LMSVideoResulful { getVideos( enrolmentId : number ) :Observable<Array<Video>> { var x = new Array<Video>(); //https://www.youtube.com/embed/MV0vLcY65 ...

Creating a versatile function in TypeScript for performing the OR operation: A step-by-step guide

Is there a way in TypeScript to create a function that can perform an OR operation for any number of arguments passed? I currently have a function that works for 2 arguments. However, I need to make it work for any number of arguments. export const perfo ...

Combining actions in a chain within an NgRx effect for Angular

After successfully working on an effect, I now face the challenge of chaining it with a service called in a subsequent action after updating the state in the initial action through a reducer. Here is the effect code: @Effect() uploadSpecChange$: Observab ...

becoming a member of cdk scroll strategy notifications

In the process of creating a unique service that generates cdk overlays, I am faced with the challenge of listening to cdk scroll strategy events. Specifically, I am interested in detecting when the cdk closes an overlay using the "close" scroll strategy. ...

Encountering a 404 error when trying to deploy an Angular application

After following the Angular documentation for deployment, I am deploying my angular application on github pages. The steps I have taken include: 1. Running "ng build --prod --output-path docs --base-href /<project_name>/". 2. Making a copy of docs/ ...