RxJS 4:
I am attempting to capture and emit values after a certain interval of time has passed since the first value was received from a websocket. Essentially, once the first value is received, a timer will start to store subsequent incoming values and then emit them after x seconds. Once the values are emitted, the timer stops and no more values are emitted until a new one comes in from the websocket, at which point the cycle begins again.
The motivation behind this setup is that currently in my application, data is being emitted almost constantly (at a nanosecond level) via the websocket, impacting performance. I want to aggregate as many values as possible over a set amount of time, then emit them together for batch processing.
Despite my efforts, the code I have attempted doesn't seem to be working as intended.
public testObs = new Observable<any>();
public bufferStarted = false;
private subject = new Subject<any>();
webSocket.onmessage = ((event: any) => {
this.subject.next(event.data);
if(!bufferStarted) {
bufferStarted = true;
// Start the buffer now
const startInterval = Observable.timer();
// Emit value after 1s and close buffer
const closingInterval = val => {
console.log(`Buffer is open! Emitting value after 1s`)
bufferStarted = false;
return Observable.interval(1000);
}
this.testObs = this.subject.bufferToggle(startInterval, closingInterval);
}
}
In the component, I subscribe to
testObs.subscribe((e) => ... )
. For example, if a value is sent through the websocket, triggering a timer to open a buffer for 1 second. Within that second, 50 additional values are received. I expected to receive an array of 51 values in the component. However, the observable appears to be undefined. Any help would be greatly appreciated.