I am currently exploring ways to halt an observable. After studying blesh's post on github at https://github.com/ReactiveX/rxjs/issues/1542, I believe I am making progress. However, when I apply a switchMap() from my pauser Subject to the takeWhile() on my observer, it seems that the takeWhile() is being disregarded.
The following section functions correctly:
export class CompositionService {
cursor = -1;
pauser = new Subject();
interval;
init = (slides) => {
let waitUntil = 0;
return this.interval = Observable
.range(0, slides.length)
.mergeMap((i) => {
let next = Observable.of(i).delay(waitUntil);
waitUntil += !!slides[i]["duration"] ? slides[i]["duration"] : 0;
return next;
})
.scan((cursor) => {
return this.cursor = cursor = slides[cursor + 1] ? cursor + 1 : -1;
}, this.cursor)
.map(cursor => slides[cursor])
.takeWhile((slide) => {
return !!slide;
});
};
// these methods are not called for this sample
play = () => {
this.pauser.next(false);
};
pause = () => {
this.pauser.next(true);
};
};
This part works as expected when executed like this:
it("should subscribe to init", (done) => {
slides.forEach((slide, i) => {
if (slide.duration) {
slide.duration = slide.duration / 100;
}
});
composition.init(slides).subscribe(
(slide) => {
console.log(slide);
},
(err) => {
console.log("Error: " + err);
},
() => {
done();
});
});
Although the previous example operates as intended, the interval Observer never terminates when I introduce some additional logic:
export class CompositionService2 {
cursor = -1;
pauser = new Subject();
interval;
init = (slides) => {
let waitUntil = 0;
this.interval = Observable
.range(0, slides.length)
.mergeMap((i) => {
let next = Observable.of(i).delay(waitUntil);
waitUntil += !!slides[i]["duration"] ? slides[i]["duration"] : 0;
return next;
})
.scan((cursor) => {
return this.cursor = cursor = slides[cursor + 1] ? cursor + 1 : -1;
}, this.cursor)
.map(cursor => slides[cursor])
.takeWhile((slide) => {
return !!slide;
});
return this.pauser
// leaving commented for clarity of the end game
// .switchMap( paused => paused ? Observable.never() : this.interval );
// however, not even a straight forward switchMap is yeilding the expected results
.switchMap( paused => this.interval );
};
play = () => {
this.pauser.next(false);
};
pause = () => {
this.pauser.next(true);
};
};
When used in this manner:
r should subscribe to init", (done) => {
slides.forEach((slide, i) => {
if (slide.duration) {
slide.duration = slide.duration / 100;
}
});
composition.init(slides).subscribe(
(slide) => {
console.log(slide);
},
(err) => {
console.log("Error: " + err);
},
() => {
//I never get here!!!!!
done();
});
// kickstart my heart!
composition.play();
});
Can anyone provide insight into what might be going wrong here?