I am seeking clarity on how Subjects behave when used with the resiliency operators, specifically retry and retryWhen.
The code samples below may differ slightly from the JSBin examples as I have used arrow functions and types for better understanding. This is based on version 4.0.0 - 4.0.7
My expected resiliency behavior is demonstrated in the following example:
Rx.Observable
.interval(1000)
.flatMap((count:number) => {
return count === 4 ? Rx.Observable.throw('Break') : Rx.Observable.return(count);
})
.retry()
.take(5);
Output
// 0
// 1
// 2
// 3
// 0 <-- Retry means we start again from scratch (expected)
After the error occurs on the fourth notification, the entire stream restarts from scratch, maintaining a stateless architecture.
However, adding a multicast operator and an underlying Subject (in this case, a ReplaySubject with a buffer of 1) leads to some confusion, as shown in this example:
const consumer : Rx.Observable<number> = Rx.Observable
.interval(1000)
.flatMap((count:number) => {
return count === 4 ? Rx.Observable.throw('Break') : Rx.Observable.return(count);
})
.shareReplay(1) /* multicast(new Rx.ReplaySubject(1)).refCount() */
.retry()
.take(5);
const firstSubscriber : Rx.Disposable = consumer.subscribe((next:number) => {
console.log('first subscriber: ' + next);
});
setTimeout(() => {
firstSubscriber.dispose(); /* Lets start fresh in that refCount === 0 */
const secondSubscriber : Rx.Disposable = consumer.subscribe((next) => {
console.log('second subscriber: ' + next);
});
}, 5000 );
Output (before error is thrown)
// "first subscriber: 0"
// "first subscriber: 1"
// "first subscriber: 2"
// "first subscriber: 3"
Output (after error is thrown)
// "first subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
Upon investigating a Subject, I found that when an error occurs, the subject is marked as inError and each future subscriber will receive the last notification before an onError call is made.
Based on this observation, it seems challenging to use a resilience operator after any other operator containing a Subject (such as shareReplay or publish).
One possible solution could be to create a new Subject whenever an error occurs and a node is disposed. The use of multicast with a factory/subjectSelector could be helpful:
.multicast(() => new Rx.ReplaySubject(1), (source:Rx.ConnectableObservable) => source);
By utilizing a subjectSelector in multicast, a new ConnectableObservable will be created for each new subscription.
It remains uncertain whether sharing and disposing of Subjects will achieve multicasting to subscribers effectively.
In my exploration of this topic, I have even developed a RecoverableReplaySubject that removes the error state upon disposal. However, I recognize that the RxJS team likely has reasons for implementing error handling in their workflow.
Any insights or experiences regarding this issue are highly appreciated.
Thank you.