The decision on how to replace 'isStopped' depends entirely on your chosen approach.
- If a subject is stopped, it indicates that the subject has momentarily halted (paused) emitting values.
- On the other hand, when a subject is closed, it signifies that the subject has reached a definitive endpoint and will cease to emit any further values.
In most cases, marking a subject as stopped occurs immediately after calling complete()
, while marking it as closed is done internally by the subject itself upon invoking unsubscribe()
. It's possible to have a subject in a stopped state without being closed, resulting in no value emissions.
A workaround option:
You may opt for subject types that do not automatically stop upon completion if it aligns with your strategy. Prior to version 6 of Rxjs, ReplaySubject did not stop on completion (this changes from version 7 onwards).
import { ReplaySubject } from 'rxjs';
const replaySubject = new ReplaySubject<number>(2); // Keeps track of the last 2 emitted values
replaySubject.next(1);
replaySubject.next(2);
replaySubject.complete(); // Completes the ReplaySubject
replaySubject.next(3); // Emits a value after completion
const observable = replaySubject.asObservable();
observable.subscribe({
next: value => {
console.log(`Value received by Observable: ${value}`);
},
complete: () => console.log('Completed')
});
// Output:
// Value received by Observable: 2
// Value received by Observable: 3
// Completed
Another alternative:
If your strategy involves subjects with regular behavior, you can either subscribe directly to the subject to reset the 'stopped' flag to false, or assign a new subject to it since there are no observers actively watching it:
import { BehaviorSubject, Subject } from 'rxjs';
let behaviorSubject = new Subject<number>();
let observable = behaviorSubject.asObservable();
console.group('ACTIVE AND RUNNING');
let subscription = observable.subscribe({
next: value => console.log(`Subscriber 1 received value: ${value}`),
complete: () => console.log('Completion 1')
});
behaviorSubject.next(1); // Value is received by Subscription 1
subscription.unsubscribe(); // No impact on the subject; Subscription 1 simply won't receive more values
// At this point, behaviorSubject is neither closed nor stopped
console.log(`Stopped: ${behaviorSubject.isStopped}; Closed: ${behaviorSubject.closed}`);
console.groupEnd();
console.group('ACTIVE BUT STOPPED');
behaviorSubject.complete();
// Since previous completion, behaviorSubject is stopped but not closed
console.log(`Stopped: ${behaviorSubject.isStopped}; Closed: ${behaviorSubject.closed}`);
if(behaviorSubject.closed) {
// Any code here won't execute
} else {
behaviorSubject.next(2);
console.log('Value 2 won't be received as the subject is complete/stopped');
}
observable.subscribe({
next: value => console.log(`Subscriber 2 received value: ${value}`),
complete: () => console.log('Completion 2 anyway')
});
console.groupEnd();
console.group('CLOSED AND STOPPED');
behaviorSubject.unsubscribe(); // Closes the subject
try {
console.log("Subscription 3 won't get any value or completion notification, no error thrown either");
observable.subscribe({
next: value => console.log(`Subscriber 3 received value: ${value}`),
complete: () => console.log('Completion 3'),
error: err => console.log(err)
});
behaviorSubject.next(3);
} catch(err) {
console.log(`RxJs throws error: ${err}`);// ObjectUnsubscribedError: object unsubscribed
}
console.groupEnd();
console.group('NEW SUBJECT INSTANCE');
behaviorSubject = new BehaviorSubject<number>(4); // Fresh and open subject instance
observable = behaviorSubject.asObservable(); // Obtain an observable representation of the open subject
observable.subscribe({
next: value => console.log(`Subscriber 4 received value: ${value}`),
complete: () => console.log('Completion 4')
});
console.groupEnd();
Output: