It is a well-known behavior described in this article
The fundamental issue is outlined in the provided link, but below is the relevant code snippet (excerpted from the link)
// This will exhibit unusual behavior
const source$ = Observable.interval(1000).share();
const mapped$ = source$.map(x => {
if (x === 1) {
throw new Error('oops');
}
return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"
If an error occurs in a subscription, it will terminate the entire source stream.
The solution for an Observable is to use .observeOn(Rx.Scheduler.asap);
I am new to reactive programming and finding it challenging to apply this solution to my Subject
, as subjects do not support observeOn
.
However, I require a Subject
because I need to add new values to the stream.
How can I overcome this issue or incorporate observeOn
with a Subject
?
observeOn
returns an Observable
. However, I am unsure how to integrate observeOn
with my Subject
.
How can I utilize observeOn while still being able to push values to my subject?
Here is the simplified version of my current code
export class MyClass{
private messages: Subject<Message> = new Subject<Message>();
dispatchMessage(message: Message) {
this.messages.next(message);
}
Any ideas?
P.S.
For those using Angular (like myself), observeOn may have unintended side effects. https://github.com/angular/angular/issues/14316
This information is provided as supplementary for future reference.