ConcatMap in RxJS processes only the last item in the queue

Currently, I am implementing NGRX with RXJS. Within the effects layer, I am utilizing a concatMap to organize my requests in a queue fashion. However, once the latest request is finished, I aim to execute the most recent item added to the queue instead of processing all the remaining ones simultaneously. Is this scenario achievable?

I have experimented with mergeMap, switchMap, and other methods, but my requirement is to run the requests sequentially rather than concurrently. This specific necessity led me to stick to using concatMap (or any similar alternative).

    updateThing$ = createEffect(() =>
    this.actions$.pipe(
        ofType(updateThingRequest),
        concatMap((action) => {

            return this.myService
            //After finishing this request, I want to execute only the last pending request from the concatMap queue,
            //discarding all other pending requests.
                .update(action)
                .pipe(
                    map(() => {

                        return someActionComplete();
                    }),
                    catchError((err) => {
                        return of(
                            someActionError()
                        );
                    })
                );
        })
    )
);

Answer №1

If you're facing backpressure issues, the best strategy to use is the "latest" backpressure strategy.

RXJS may not be the most efficient solution for handling backpressure - while in RxJava it's just one line of code, in RXJS it's a bit more complicated. In versions after 4.x of RXJS, there isn't built-in support for backpressure, but you can mimic it using certain operators.

For more information, check out: https://reactivex.io/documentation/operators/backpressure.html#collapseRxJS

In your case, creating a custom operator might be a good approach...

P.S. I was able to come up with a solution without using a custom operator, though it's quite complex. If you're interested, I'll share the code here.

EDIT - RXJS Solution without Custom Operator

This code involves using two streams - one for requests and another for triggering new requests:

const requests = interval(200).pipe(take(20));
const requestCompletedStream = new BehaviorSubject(true);

combineLatest(requests, requestCompletedStream)
  .pipe(
    tap(([requestNumber]) =>
      // This will print twice for executed requests
      console.log(`Request ${requestNumber} has been processed`)
    ),
    filter(([_, requestComplete]) => requestComplete), 
    map(([requestNumber]) => requestNumber),
    distinctUntilChanged(), 
    concatMap((requestNumber) => {
      console.log(`Handling async request for: ${requestNumber}`);
      requestCompletedStream.next(false);

      return of(1).pipe(
        delay(2000), 
        tap(() => {
          requestCompletedStream.next(true);
        }),
        map(() => requestNumber)
      );
    })
  )
  .subscribe((val) =>
    console.log(`Client received completed request ${val}`)
  );

You can test it here: https://stackblitz.com/edit/rxjs-backpressure-last-strategy?devtoolsheight=100&file=index.ts

The example sends 20 requests with a 200ms delay between them. Each request takes 2000ms to handle, so only requests 0, 10, and 19 are successfully processed and sent to the subscriber.

You can see the process in the console output below:

Request 0 has been processed
Handling async request for: 0
Request 0 has been processed
Request 1 has been processed
Request 2 has been processed
...
Client received completed request 0
Handling async request for: 9
Request 10 has been processed
...
Client received completed request 10
Handling async request for: 19
Request 19 has been processed
Request 19 has been processed
Client received completed request 19

Answer №3

Perhaps considering using the finalize operator could be a potential solution, although I cannot guarantee its correctness. Alternatively, you may want to explore the appropriate operator using therxjs decision tree.

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Mastering the Art of Ag-Grid Integration in Angular 11

Can the Ag-Grid enterprise or community version be utilized with Angular 11? I am currently working with Angular 11 in my application, but have been unable to find any information confirming that AG-GRID supports this version. ...

Error: Missing provider for MatBottomSheetRef

While experimenting in this StackBlitz, I encountered the following error message (even though the MatBottomSheetModule is imported): ERROR Error: StaticInjectorError(AppModule)[CountryCodeSelectComponent -> MatBottomSheetRef]: S ...

What is the best way to provide JSON data instead of HTML in Angular?

Is it possible to output processed data as json instead of html? I want to process backend data and output it as json for a specific url. How can I prepare a component to do this? Currently, the app serves html pages where components process backend data ...

Getting the button element in Angular when submitting a form

My web page contains multiple forms, each with a set of buttons. I want to incorporate a loading spinner on buttons after they are clicked. When using a regular click event, I am able to pass the button element: HTML <button #cancelButton class="butto ...

Encountered issue: The type 'Teacher[]' cannot be assigned to the type 'Teacher'

I am currently working on enhancing my angular application by adding a new service. However, I have encountered an error that I need help fixing: The error message states: Type 'Teacher[]' is not assignable to type 'Teacher'. Property ...

Namespace remains ambiguous following compilation

I'm currently developing a game engine in TypeScript, but I encountered an issue when compiling it to JavaScript. Surprisingly, the compilation process itself did not throw any errors. The problem arises in my main entry file (main.ts) with these ini ...

Tips for changing a value in an ngIf template

Hi there, I'm fairly new to Angular and I am trying to make some changes in the ngIf template. I have created a component called research-list and I want to display the research data defined in research-list.ts file. However, when I try to use modify( ...

Angular HttpClient Catch Return Value

In my quest to develop a universal service for retrieving settings from the server, I've encountered an issue. When errors arise, I want to intercept them and provide a default value (I have a predetermined configuration that should be utilized when e ...

Error in Next.js when trying to use Firebase Cloud Messaging: ReferenceError - navigator is not defined in the Component.WindowMessagingFactory instanceFactory

Currently, I am in the process of setting up push notifications with Firebase in my next.js application. I have been following a guide from the documentation which you can find here: https://firebase.google.com/docs/cloud-messaging/js/receive?hl=es-419 Ho ...

Having trouble accessing undefined properties? Facing issues with the latest Angular version?

Why am I encountering an error and what steps can be taken to resolve it? Currently using the latest version of Angular. ERROR TypeError: Cannot read properties of undefined (reading 'id') Here is the JSON data: { "settings": [ { ...

Tips for simulating a Ref

I have a Vue3 component where, within the setup(), I have defined the following function: const writeNote(note: Ref<Note>) => { console.log(`note ${note.id}`) } This function takes a Ref<Note>, with Note being an Interface. There are two s ...

The Next.js app's API router has the ability to parse the incoming request body for post requests, however, it does not have the

In the process of developing an API using the next.js app router, I encountered an issue. Specifically, I was successful in parsing the data with const res = await request.json() when the HTTP request type was set to post. However, I am facing difficulties ...

What is the importance of always catching errors in a Promise?

In my project, I have implemented the @typescript-eslint/no-floating-promises rule. This rule highlights code like this - functionReturningPromise() .then(retVal => doSomething(retVal)); The rule suggests adding a catch block for the Promise. While ...

CSS code for a fixed fullscreen menu with scrolling

I have implemented a full-screen menu that covers the entire screen, excluding the header and menu tab bar. You can view it here: https://i.stack.imgur.com/h5cQa.png On smaller screens, the entire menu cannot be displayed at once, as shown here: https://i. ...

Having an excess of 32 individual byte values

My current project involves developing a permission system using bitwise operators. A question came up regarding the limitation of having only 32 permissions in place: enum permissions { none = 0, Founder = 1 << 0, SeeAdmins = 1 << ...

Guide to highlighting manually selected months in the monthpicker by utilizing the DoCheck function in Angular

I'm facing an issue and I could really use some assistance. The problem seems quite straightforward, but I've hit a roadblock. I have even created a stackblitz to showcase the problem, but let me explain it first. So, I've developed my own t ...

Launching the Skeleton feature in NextJS with React integration

I have been working on fetching a set of video links from an Amazon S3 bucket and displaying them in a video player component called HoverVideoPlayer. However, during the loading process, multiple images/videos scale up inside a Tailwind grid component, ca ...

Having issues with parameterized URL integration between Django2 and Angular2

I am encountering an issue with integrating a URL containing parameters in Angular and Django. When making a call to the url, Django expects a slash at the end while Angular appends a question mark before the parameters. How can this be resolved? Below is ...

Arranging elements within an outer array by the contents of their inner arrays

I need help organizing an array based on the alphabetical order of a specific value within the inner arrays. For example: I want to sort this array by the prefix "old," so old A, old B, etc. const array = [ { personName: "Vans", personTags: ["young", " ...

Retrieve GPS data source details using Angular 2

In my Angular 2 application, I am looking to access the GPS location of the device. While I am aware that I can utilize window.geolocation.watchposition() to receive updates on the GPS position, I need a way to distinguish the source of this information. ...