I'm currently working on developing an Observable that actively queries an external service for updates and emits the new update when available:
this._loop = new Rx.Observable<TDL.Result>(subscriber =>
{
let shouldLoop = true;
while (shouldLoop)
{
if (!this._client)
throw new Error("This client is not initialised.");
const update = this._lib.receiveSync(this._client, 5);
if (!update)
continue;
if (update._ === "error")
this.emit("error", update);
else
this.emit("update", update);
subscriber.next(update);
}
// The while loop prevents reaching this point, causing blocking when subscribing to this Observable
// Cancellation logic
return () =>
{
shouldLoop = false;
this._loop = null;
};
}).pipe(RxOp.publish()) as Rx.ConnectableObservable<TDL.Result>;
this._loopSubscription = this._loop.connect();
However, I've noticed that the subscribe function blocks the code when I call connect()
. How can I modify this to make the subscription non-blocking?