To easily achieve this functionality, you can create a small wrapper class that encapsulates the initial AsyncSubject
.
import { AsyncSubject, Subject, Observable, Subscription } from 'rxjs/RX';
class SingleSubscriberObservable<T> {
private newSubscriberSubscribed = new Subject();
constructor(private sourceObservable: Observable<T>) {}
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
this.newSubscriberSubscribed.next();
return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}
}
You can then test it in your example like so:
const as = new AsyncSubject();
const single = new SingleSubscriberObservable(as);
let fired = false;
function setFired(label:string){
return ()=>{
if(fired == true) throw new Error("Multiple subscriptions executed");
console.log("FIRED", label);
fired = true;
};
}
function logDone(label: string){
return ()=>{
console.log(`${label} Will stop subscribing to source observable`);
};
}
const subscription1 = single.subscribe(setFired('First'), ()=>{}, logDone('First'));
const subscription2 = single.subscribe(setFired('Second'), ()=>{}, logDone('Second'));
const subscription3 = single.subscribe(setFired('Third'), ()=>{}, logDone('Third'));
setTimeout(()=>{
as.next(undefined);
as.complete();
}, 500);
The key aspect is seen here:
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
this.newSubscriberSusbscribed.next();
return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}
On each call to subscribe, we trigger the newSubscriberSubscribed
subject.
By using
takeUntil(this.newSubscriberSubscribed)
when subscribing to the underlying Observable,
each time a new subscriber calls:
this.newSubscriberSubscribed.next()
the previous subscription will be completed.
This approach ensures that whenever a new subscriber arises, the prior subscription will end, meeting the desired outcome.
The expected output would be:
First Will stop subscribing to source observable
Second Will stop subscribing to source observable
FIRED Third
Third Will stop subscribing to source observable
UPDATE:
If you wish for the first subscriber to remain subscribed while subsequent subscribers immediately get a complete signal, blocking any further subscriptions until the primary subscriber unsubscribes, you can modify the implementation as follows:
class SingleSubscriberObservable<T> {
private isSubscribed: boolean = false;
constructor(private sourceObservable: Observable<T>) {}
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
if(this.isSubscribed){
return Observable.empty().subscribe(next, error, complete);
}
this.isSubscribed = true;
var unsubscribe = this.sourceObservable.subscribe(next, error, complete);
return new Subscription(()=>{
unsubscribe.unsubscribe();
this.isSubscribed = false;
});
}
}
We utilize a flag this.isSusbscribed
to track if there is an active subscriber. Additionally, we return a custom subscription for updating the flag upon unsubscription.
In case of a subscription attempt when someone else is already subscribed, we instead subscribe them to an empty Observable
, which completes instantly. The resulting output will display:
Second Will stop subscribing to source observable
Third Will stop subscribing to source observable
FIRED First
First Will stop subscribing to source observable