Trigger on the cancellation or completion of a nested observable

I'm seeking a way to detect if an inner observable was not successfully completed (due to everyone unsubscribing) and then emit a value in that scenario. Something akin to defaultIfEmpty, but the current solution isn't effective.

  1. A trigger exists (interval).
  2. There's complex logic that can consume significant time (timer).
  3. When the trigger is activated again, causing cancellation of the complex logic (switchMap), I want to receive notification within the stream (to emit a value).

An example on stackblitz.com can be found here: https://stackblitz.com/edit/angular-pf8z5z?file=src%2Fapp%2Fapp.component.ts

The code snippet:

import {Component, OnDestroy, OnInit} from '@angular/core';
import * as rxjs from 'rxjs';
import * as operators from 'rxjs/operators';

@Component({
    selector: 'my-app',
    templateUrl: './app.component.html',
    styleUrls: ['./app.component.css'],
})
export class AppComponent implements OnInit, OnDestroy {
    name = 'Angular';

    protected destroy$ = new rxjs.Subject();

    ngOnInit(): void {
        rxjs.interval(1000).pipe(
            operators.map(() => `${Math.random()}`),
            operators.switchMap(taskId => rxjs.timer(Math.random() * 1000 + 500).pipe(
                operators.mapTo([taskId, 'executed']),

                // some complex operation that can be cancelled
                // TASK: Need to notify the stream upon cancellation

                // start indicator, functioning correctly
                operators.startWith([taskId, 'start']),

                // end indicator, TODO: Ensure it works even if the pipe is cancelled
                // currently working when completed
                operators.endWith([taskId, 'end']),
                operators.catchError(() => rxjs.of([taskId, 'end'])),

                // this part functions properly but does not emit a value
                operators.finalize(() => console.log(taskId, 'end')),
            )),

            // additional code for illustration purposes

            // checking active tasks
            operators.scan((activeTasks, [taskId, action]) => {
                if (action === 'start') {
                    activeTasks.push(taskId);
                }
                if (action === 'end') {
                    activeTasks.splice(activeTasks.indexOf(taskId), 1);
                }
                if (action === 'cancelled') {
                    activeTasks.splice(activeTasks.indexOf(taskId), 1);
                }
                return [...activeTasks];
            }, []),

            // Only one active task should be displayed at all times
            // since others were completed or cancelled
            operators.tap(console.log),
            operators.takeUntil(this.destroy$),
        ).subscribe();
    }

    ngOnDestroy(): void {
        this.destroy$.next();
        this.destroy$.complete();
    }
}

UPDATED

Potential solution using traditional functions.

operators.switchMap(function(taskId, idx) {
    const self = this;
    return rxjs.timer(Math.random() * 1000 + 500).pipe(
        operators.mapTo([taskId, 'executed']),
        operators.startWith([taskId, 'start']),

        // SOLUTION: Works correctly with slightly messy access to switchMap source
        operators.finalize(() => {
            self.destination.next([taskId, 'end']);
        }),
    );
}),

Answer №1

Here is a potential solution:

rxjs.interval(1000).pipe(
  operators.tap(() => console.log('start')),
  operators.switchMap((_, idx) => rxjs.timer(1500).pipe(
    idx > 0 ? operators.startWith('completed') : rxjs.identity,
  )),
  operators.tap(() => console.log('end')),
  operators.takeUntil(this.destroy$),
).subscribe(console.log, console.log);

The switchMap function provides access to the index as well. Refer to this link for more information on the index.

If idx === 0, it indicates that there is no active inner observable to unsubscribe from.

However, if the current inner observable has not completed yet and a new outer value arrives, the active inner observable will be unsubscribed, as indicated by startWith.

Explanation of why 'endWith' did not work

operators.switchMap(() => rxjs.timer(1500).pipe(
  operators.finalize(() => console.log('this works')),
  operators.endWith('cancelled'),
)),

In this case, using endWith did not work as expected due to the behavior of mergeMap and its handling of inner observables.

To handle cases where the inner observable completes, consider the following approach:

  ngOnInit(): void {
    rxjs.interval(1000).pipe(
      operators.tap(() => console.log('start')),
      operators.switchMap(function (_, idx) { 
        return rxjs.timer(Math.random() * 1500).pipe(
        operators.tap(null, null, () => console.warn("complete!")),
        idx > 0 && this.innerSubscription ? operators.startWith(true) : rxjs.identity,
      )}),
      operators.tap(() => console.log('end')),
      operators.takeUntil(this.destroy$),
    ).subscribe(console.log, console.log);
  }

When dealing with cases where the inner observable may complete, you can also utilize a Subject to handle unsubscriptions:

const interruptSubject = new Subject();

src$ = src$.pipe(
  switchMap(
    (/* ... */) => timer(/* ... */)
      .pipe(
        /* ... */
        finalize(() => interruptSubject.next(/* ...your action here... */))
      )
  )
);

merge(src$, interruptSubject)
  .subscribe(/* ... */)

Answer №2

If you're facing an issue, consider using the catchError operator:

ngOnInit(): void {
    rxjs.interval(1000).pipe(
      operators.tap(() => console.log('start')),
      operators.switchMap(() => rxjs.timer(1500).pipe(
        operators.finalize(() => console.log('this works')),

        // If the timer observable above encounters a failure, it will emit 'cancelled'
        operators.catchError((e) => rxjs.of('cancelled')),
      )),
      operators.takeUntil(this.destroy$),
    ).subscribe(console.log);
  }

Answer №3

If I understand correctly, the issue at hand involves emitting a message when the trigger is activated. This message should indicate that the previous execution has been terminated and a new one has commenced.

To address this scenario, the focus should be on the function passed to switchMap. This function must return an Observable that promptly emits the cancellation message, followed by the results of the extensive processing once it is completed. An Observable structured in this manner would resemble the following:

merge(of([prevTaskId, "cancelled"]), doComplexStuff(taskId))

The function doComplexStuff contains the code for your complex operations, as demonstrated below:

function doComplexStuff(taskId: string) {
  prevTaskId = taskId;
  return timer(1500).pipe(
    mapTo([taskId, "executed"]),
    
    // intricate operations that can be cancelled
    // TASK: Notification in case of cancellation required
    
    startWith([taskId, "start"]),
    
    endWith([taskId, "end"]), // Completion indicator - needs refinement for cancellation handling
    catchError(() => of([taskId, "end"]))
  );
}

A potential solution could take the form of:

let prevTaskId;
interval(1000)
  .pipe(
    take(10),
    map(() => `${Math.random()}`),
    switchMap((taskId) =>
      merge(of([prevTaskId, "cancelled"]), doComplexStuff(taskId))
    ),
    
    // Additional code unrelated to task but highlighting console issue

    scan((activeTasks, [taskId, action]) => {
      if (action === "start") {
        console.log("Start:", taskId);
        activeTasks.push(taskId);
      }
      if (action === "end") {
        console.log("End:", taskId);
        activeTasks.splice(activeTasks.indexOf(taskId), 1);
      }
      if (action === "cancelled") {
        console.log("Cancelled:", taskId);
        activeTasks.splice(activeTasks.indexOf(taskId), 1);
      }
      return [...activeTasks];
    }, []),
    
    tap(console.log) // Displaying only one active task after completion or cancellation
  )
  .subscribe();

A working example of this code is available on StackBlitz.

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

flushMicrotasks does not function properly in conjunction with the image.onload event

Working on an Angular project, I'm currently developing an object with an image field. The method responsible for loading the image returns a promise that resolves in the onload function of the image. When trying to test this method using the flushMi ...

Verifying the legitimacy of the elements in an n-dimensional array

In my journey to create my own Tensor class (n-dimensional arrays) in typescript, I have devised a structure where the data is stored in a 1D array property, and the shape of the data is stored separately for indexing purposes. My goal is to develop a fun ...

Grouping Columns in an HTML Table using Angular 4

I'm currently faced with the task of retrieving flat data from an API and presenting it in an HTML table using Angular 4. I'm a bit unsure about how to iterate over the data, possibly using a for-each loop. I have attempted to utilize ngFor but I ...

What is the method for retrieving an attribute's value from an object that does not have key-value pairs?

My current project involves working with dynamoose and running a query that produces the following output: [ Document { cost: 100 }, lastKey: undefined, count: 1, queriedCount: undefined, timesQueried: 1 ] When I use typeof(output), it returns O ...

Guide on testing a function with a dependency in Angular through unit testing

Attempting to dive into unit testing, I have grasped some of the basics. However, my current challenge lies in testing a method within my code. This particular method involves calling a function from oidc-client.js that handles user sign-ins. My spec fi ...

Guide on installing an Angular8 application on a personal computer in a secure manner

Exploring two intriguing queries that may appear theoretical. Despite scouring the internet, a definitive answer remains elusive. Query one: The aspiration is to craft an application in MEAN stack (Mongo, Angular8, NodeJS server) sans reliance on a centr ...

Angular synchronous observables are designed to provide real-time data

API integration is a crucial part of my process for obtaining information. However, the data retrieval can be inconsistent at times; I may receive all the necessary information, only portions of it, or the data might not be in the correct order. getData(s ...

Does the class effectively implement the interface even if the method of a member variable has undefined arguments?

Let's take a closer look at my code, which lacks proper descriptions. Here is the interface: interface IModel<T = any> { effects: { [key: string]: (getState: () => T) => void; }; } interface IState { name: string; age: numbe ...

Access an array to filter by specific key-value pairs

How can I efficiently filter an array using key and value pairs from within nested arrays? I am in need of a method to filter the elements of an array based on specific key-value pairs nested within another array. The key will always contain boolean value ...

The error TS2300 arises from the duplicate identifier 'PropertyKey' found in the node_modules/@types/core-js/index.d.ts file

I am encountering errors in the file node_modules/@types/core-js/index.d.ts while using Visual Studio Code IDE: After running npm start to serve the application, the following errors are displayed: (list of errors here) This is how my tsconfig.json ...

Issues with loading AddMarker on initial launch in Ionic 2

Can someone help me figure out what's causing the issue in my code? When I try to load a google map in my ionic 2 app, the marker doesn't show up the first time. It only appears when I reload the map for the second time or later. I also need ass ...

Guide on utilizing a provider's variable in another provider within Ionic 2/3

In my code, I have a boolean variable called isConnection that is used to monitor network connection status. This variable is defined in a provider named 'network' and can be set to either true or false in different functions. Now, I need to acc ...

The Angular CLI suddenly decided to stop providing me with useful lines (without sourcemaps) in the browser console, but interestingly the terminal continues

I recently noticed a change in my Angular project that is using Angular CLI. Instead of receiving error lines from my code, I am getting errors from compiled files like main.js and vendor.js. The 'normal' error messages in my terminal are pointin ...

Issues with naming in Gulp, Angular2 Final, and Visual Studio: "Cannot find name" and "Duplicate identifier" errors

Recently, I updated my project to utilize the final release of Angular 2 and also upgraded Visual Studio to use TypeScript 2.0.3 from the Tool -> Extensions and Updates manager. I compile my TypeScript using Gulp, and it compiles without any errors. Ho ...

Troubleshooting error messages with Angular 2 HttpClient response payload

Currently, I am implementing the latest version (4.3) of HttpClient in angular to handle data POST requests to my backend server: this.httpClient.post<View>(`/path`, data).subscribe( (view: View) => console.log("Success"), (error: HttpErrorRe ...

Guide on setting an instance property through a callback function triggered by the instance

I am looking to set an attribute of an angular service using the result from a callback function. Here is my Angular service: @Injectable() export class SkillsManagerService { private skill: any; private event: any; constructor() { ...

Angular 4 CanActivateChild fails to function

I'm attempting to limit access to my route system by using the CanActivateChild feature. However, I've encountered an issue where the RouteGuard only works with the CanActivate function and not CanActivateChild. Here's a snippet of my routin ...

Dependencies between libraries within a workspace

Recently, I set up a brand new Angular 6 workspace that includes an application along with two libraries: library1 and library2. library2 depends on a module from library1, as shown below import {Library1Module} from "library1" I successfully compiled li ...

Toggle the visibility of a modal in code across various components in an Angular 4 project using Typescript

As I was working on my university App, I encountered an issue while attempting to open a Bootstrap modal in code from a different component. Opening a component in code from the same component posed no problems for me as I use JQuery and it functions perfe ...

Angular2 material dropdown menu

Currently, I am studying angular2 with its material design. One of the modules I am using is md-select for material and here is a snippet of my code: <md-select> <md-option value="1">1</md-option> <md-option value="2">2< ...