Using Rxjs, aggregate data streams from multiple observables (such as a grpc service) into an array after a specified time delay using the scan or mergeMap operators within

When I receive a stream of data in response from a service (grpc), it comes in the form of 5 streams within 2 seconds. Each stream is an observable object that I subscribe to. The processing logic for each stream involves handling heavy JSON objects with base64 encoded byte strings, complex type conversions, and conditional logic. Due to the fast processing speed of these streams (almost concurrent), I often miss verifying other streams while one is being processed. To address this issue, I need to aggregate all these Y streams after listening for X seconds into an observable array.

MyTestService.ts:

@Injectable()
export class MyTestService {    
    client: GrpcMyTestServiceClient;
    public readonly coreData$: Observable<TestReply.AsObject>;

    constructor(private readonly http: HttpClient) {
        this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);
        this.coreData$ = this.listCoreStream();
    }

    listCoreStream(): Observable<TestReply.AsObject> {    
        return new Observable(obs => {       
            const req = new SomeRequest(); 
            //Get stream data from service(grpc)
            const stream = this.client.getCoreUpdates(req);

            stream.on('data', (message: any) => {        
                obs.next(message.toObject() as TestReply.AsObject);
            });
        });
    }
}

MyComponent.ts

public testReply?: TestReply.AsObject;   
private _subscription: Subscription;

constructor(private readonly _MyTestService: MyTestService) {     
    this._subscription = new Subscription();
}

ngOnInit(): void {   
    this._subscription = this._MyTestService.coreData$.subscribe((data) => {
        if (data) {
            let obj = JSON.parse(data);
            //processing logic: condition checks, filtering based on child types,dynamic binding of styles, etc..
        }
    });
}

Due to the rapid influx of data, not all records are processed, leading to synchronization issues where the last satisfying stream overwrites previous ones. To ensure all streams are processed sequentially, I aim to merge them into an array to iterate through inside the subscribing component without considering stream data ordering.

I've attempted using rxjs operators like timer, mergeMap, concatMap, scan, merge, but being new to these concepts, I'm struggling to implement them correctly. Below is my attempt using scan, which didn't yield the desired results, leaving the array empty and uncertain how to access it via console.log. Any guidance or suggestions would be greatly appreciated.

Attempted Solution:

let temp: TestReply.AsObject[] = [];
let test = this._MyTestService.coreData$
    .pipe(
        mergeMap(_ => timer(5000)),
        scan<any>((allResponses, currentResponse) =>
            [...allResponses, currentResponse], this.temp),
    ).subscribe(console.log);

Answer №1

I have come up with a solution that involves merging all the arrays together by utilizing a next block in the subscribe function, and then executing the final actions within the complete block.

Here is how I implemented it in MyTestService.ts:

   @Injectable()
     export class MyTestService {    
            client: GrpcMyTestServiceClient;
            public readonly coreData$: Observable<TestReply.AsObject>;

  constructor(private readonly http: HttpClient) {
        this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);
        this.coreData$ = this.listCoreStream();
    }

listCoreStream(): Observable<TestReply.AsObject> {    
    return new Observable(obs => {       
     const req = new SomeRequest(); 
     //Retrieve stream data from the service (grpc)
     const stream = this.client.getCoreUpdates(req);
 
     stream.on('data', (message: any) => {        
        obs.next(message.toObject() as TestReply.AsObject);
      });

    stream.on('end', (message: any) => {        
        obs.complete();
      });
    });
}

In MyComponent.ts:

public testReply?: TestReply.AsObject;   
public dataArray = [];
private _subscription: Subscription;
  constructor(private readonly _MyTestService: MyTestService) {     
  this._subscription = new Subscription();
}

  ngOnInit(): void {   
     this._subscription = this._MyTestService.coreData$.subscribe({
             next: (data) => {
                 if (data) {
                     this.dataArray.push(JSON.parse(data));
                 } 
             },
             complete: () => {
                // Final data processing logic can be implemented here
                // This may include condition checks, filtering based on child types,
                // dynamic binding of styles, etc.
             })
  }

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Using a BehaviorSubject in conjunction with ngIf can rearrange the placement of elements

I am facing an issue with the placement of my HTML tags. Here is a snippet from my service: public showExportCsvModal = new BehaviorSubject<boolean>(false); public showDownloadModal = new BehaviorSubject<boolean>(false); And here is how it loo ...

How to automatically close an Angular 2 material dialog

Using angular2 material's MdDialog, I have implemented a form display feature. Upon form submission, a request is made to the backend. If the request is successful, I need to automatically close the dialog. However, if the backend request fails, the ...

Tips for adjusting the width of columns automatically in exceljs

One of my challenges is to automatically adjust column width using exceljs. I want the Excel sheet to be dynamic, saving only the columns specified by the user in the request. Here is the code snippet that accomplishes this: workSheet.getRow(1).values = dt ...

Using TypeScript to separate namespaces

tsconfig.json: ... "module": "none" ... file1.ts: namespace Myns { type Mytype = number } file2.ts: namespace Myns { let x: Mytype ^^^^^^ Error - unable to locate declaration in file1.ts } Why am I encountering an error when trying to us ...

Tips for displaying complete object in mocha diff during assertion errors

While working on a unit test case created with mocha and chai's expect, I encountered a scenario where I needed to deeply compare an array of value objects to the parsed content of a JSON file. The record object I am dealing with has approximately 20 ...

Importing Angular Material modules

I've integrated the Angular Material module into my project by updating the material.module.ts file with the following imports: import { NgModule } from '@angular/core'; import { CommonModule } from '@angular/common'; import { MatT ...

Loop through the array while handling a promise internally and ensure completion before proceeding

I am currently working on populating a response array with Firestore snapshots and creating download links for stored files within each snapshot. Despite trying various solutions involving Promises, the response array consistently ended up being null. do ...

Guide to integrating ua-parser-js with angular 4

Is there a way to retrieve the OS name, OS version, browser name, browser version, and device from the user agent using User Agent Parser JS in Angular 4? I attempted using [email protected], but it did not provide me with the OS version and device ...

Position components in Angular 2 based on an array's values

Hello all, I am a beginner in terms of Angular 2 and currently facing some obstacles. My goal is to create a board for a board game called Reversi, which has a similar layout to chess but with mono-color pieces. In order to store the necessary information, ...

Angular's NoopAnimationsModule is throwing a Type Error because the matches Element is not recognized as a

When I bring in NoopAnimationsModule for my Angular 6 application, I encounter this error message: ERROR TypeError: this.driver.matchesElement is not a function at TransitionAnimationEngine.push../node_modules/@angular/animations/fesm5/browser.js.Tr ...

Issue encountered with `vite:react-babel` due to an error related to the return type `ReturnType<typeof GenericConsumer<T>>` in Typescript

I am currently working on an application using Vite, React, and TypeScript. I have come across a piece of code that is causing Vite to fail: export type UseSearchFilters<T> = ReturnType<typeof useSearchFilters<T>> This is resulting in th ...

Trouble with Syntax in Angular App While Declaring a Variable

Managing pagination in my Angular application has been a smooth process with the function I have implemented. It involves subscribing to URL parameters and then using the router to navigate based on those parameters, including the page number passed as a v ...

Encountered an error while attempting to install the @typescript-eslint/eslint-plugin

After starting the installation process for eslint plugins, I encountered an issue with @typescript-eslint/eslint-plugin. The plugin requires installation using--legacy-peer-deps, but this approach may introduce potential bugs by accepting an incorrect an ...

Incorporate a JavaScript array into a TypeScript document

Having a file named array.js with a large collection of strings, structured like this: module.exports = ["word1","word2",...] I attempted to utilize this array in my validation.ts file by adding the following line: let wiki = require('./array.js&a ...

What is the best way to initiate a change event upon initial loading?

Is it possible to trigger an event change when the ngFor loop initially loads? I would like to push all the menus into an array with a status of 0 using an initial event trigger, and then update it to 1 if it is checked, otherwise keep it as 0. **app.ht ...

Is there a way to specify a type in TypeScript that is limited to a predefined list of words, ensuring it only accepts strings containing those specific values?

I have a complex TypeScript inquiry. Suppose I have an Icon component with the size prop. Sizes can be "2", "4", or "6". I associate these values with predefined Tailwind classes. Here is how I implement it: type SizeValues = '2' | '4&apos ...

Make sure to load Meteor.user() prior to initializing Angular 2

I am encountering an issue while setting up a new Meteor - Angular2 application where I am struggling to verify the logged-in user within my router. Below is my current auth-guard.service.ts content featuring an AdminAuthGuard that utilizes implements Can ...

Is there anyone out there who has integrated Leaflet and DataTables within the same user interface experience?

I'm currently facing a challenge with an application running in a sandbox environment where I am unable to create projects. Despite my efforts over the past few weeks, I can't seem to figure out how to get the DataTable to automatically select r ...

Error: Unable to locate angular2/http module

My service is attempting to make an HTTP request. Upon running Angular, I encounter an error stating that Angular2/http is not found. Error: Error: XHR error (404 Not Found) loading http://localhost:3000/angular2/http(…) service.ts import {Injectab ...

How can we optimize an Angular ng build to include only the essential files while still ensuring compatibility with Chrome browsers?

I have noticed when I build my Angular project, two files are generated - main-es5.js and main-es2015.js. I only need to support Chrome browser, so is there a way to disable the old JS file generation to minimize the number of files in the build? After up ...