When I receive a stream of data in response from a service (grpc), it comes in the form of 5 streams within 2 seconds. Each stream is an observable object that I subscribe to. The processing logic for each stream involves handling heavy JSON objects with base64 encoded byte strings, complex type conversions, and conditional logic. Due to the fast processing speed of these streams (almost concurrent), I often miss verifying other streams while one is being processed. To address this issue, I need to aggregate all these Y streams after listening for X seconds into an observable array.
MyTestService.ts:
@Injectable()
export class MyTestService {
client: GrpcMyTestServiceClient;
public readonly coreData$: Observable<TestReply.AsObject>;
constructor(private readonly http: HttpClient) {
this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);
this.coreData$ = this.listCoreStream();
}
listCoreStream(): Observable<TestReply.AsObject> {
return new Observable(obs => {
const req = new SomeRequest();
//Get stream data from service(grpc)
const stream = this.client.getCoreUpdates(req);
stream.on('data', (message: any) => {
obs.next(message.toObject() as TestReply.AsObject);
});
});
}
}
MyComponent.ts
public testReply?: TestReply.AsObject;
private _subscription: Subscription;
constructor(private readonly _MyTestService: MyTestService) {
this._subscription = new Subscription();
}
ngOnInit(): void {
this._subscription = this._MyTestService.coreData$.subscribe((data) => {
if (data) {
let obj = JSON.parse(data);
//processing logic: condition checks, filtering based on child types,dynamic binding of styles, etc..
}
});
}
Due to the rapid influx of data, not all records are processed, leading to synchronization issues where the last satisfying stream overwrites previous ones. To ensure all streams are processed sequentially, I aim to merge them into an array to iterate through inside the subscribing component without considering stream data ordering.
I've attempted using rxjs operators like timer
, mergeMap
, concatMap
, scan
, merge
, but being new to these concepts, I'm struggling to implement them correctly. Below is my attempt using scan
, which didn't yield the desired results, leaving the array empty and uncertain how to access it via console.log
. Any guidance or suggestions would be greatly appreciated.
Attempted Solution:
let temp: TestReply.AsObject[] = [];
let test = this._MyTestService.coreData$
.pipe(
mergeMap(_ => timer(5000)),
scan<any>((allResponses, currentResponse) =>
[...allResponses, currentResponse], this.temp),
).subscribe(console.log);