I'm seeking a way to detect if an inner observable was not successfully completed (due to everyone unsubscribing) and then emit a value in that scenario. Something akin to defaultIfEmpty
, but the current solution isn't effective.
- A trigger exists (interval).
- There's complex logic that can consume significant time (timer).
- When the trigger is activated again, causing cancellation of the complex logic (switchMap), I want to receive notification within the stream (to emit a value).
An example on stackblitz.com can be found here: https://stackblitz.com/edit/angular-pf8z5z?file=src%2Fapp%2Fapp.component.ts
The code snippet:
import {Component, OnDestroy, OnInit} from '@angular/core';
import * as rxjs from 'rxjs';
import * as operators from 'rxjs/operators';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css'],
})
export class AppComponent implements OnInit, OnDestroy {
name = 'Angular';
protected destroy$ = new rxjs.Subject();
ngOnInit(): void {
rxjs.interval(1000).pipe(
operators.map(() => `${Math.random()}`),
operators.switchMap(taskId => rxjs.timer(Math.random() * 1000 + 500).pipe(
operators.mapTo([taskId, 'executed']),
// some complex operation that can be cancelled
// TASK: Need to notify the stream upon cancellation
// start indicator, functioning correctly
operators.startWith([taskId, 'start']),
// end indicator, TODO: Ensure it works even if the pipe is cancelled
// currently working when completed
operators.endWith([taskId, 'end']),
operators.catchError(() => rxjs.of([taskId, 'end'])),
// this part functions properly but does not emit a value
operators.finalize(() => console.log(taskId, 'end')),
)),
// additional code for illustration purposes
// checking active tasks
operators.scan((activeTasks, [taskId, action]) => {
if (action === 'start') {
activeTasks.push(taskId);
}
if (action === 'end') {
activeTasks.splice(activeTasks.indexOf(taskId), 1);
}
if (action === 'cancelled') {
activeTasks.splice(activeTasks.indexOf(taskId), 1);
}
return [...activeTasks];
}, []),
// Only one active task should be displayed at all times
// since others were completed or cancelled
operators.tap(console.log),
operators.takeUntil(this.destroy$),
).subscribe();
}
ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
}
}
UPDATED
Potential solution using traditional functions.
operators.switchMap(function(taskId, idx) {
const self = this;
return rxjs.timer(Math.random() * 1000 + 500).pipe(
operators.mapTo([taskId, 'executed']),
operators.startWith([taskId, 'start']),
// SOLUTION: Works correctly with slightly messy access to switchMap source
operators.finalize(() => {
self.destination.next([taskId, 'end']);
}),
);
}),