Develop an rxjs pipeline that merges values according to their type prior to executing them in an async manner using concatMap

In my code, there's an eventStream that deals with different types of events and sends them to the server via HTTP.

import { from, Observable } from 'rxjs';
import { concatMap } from 'rxjs/operators';

type Update = number[];

interface Event {
  type: 'add' | 'delete' | 'update';
  data: Update;
}

const eventStream = from([
  { type: 'update', data: [1] },
  { type: 'update', data: [2] },
  { type: 'add', data: [3] },
  { type: 'update', data: [4] },
  { type: 'update', data: [5] },
  { type: 'delete', data: [6] },
  { type: 'update', data: [7] },
  // ... other events
]);

function postEvent(event: Event): Observable<any> {
  // ... posting event to server
}

eventStream.pipe(concatMap(event => postEvent(event)))

I am looking for a way to optimize the processing of consecutive unprocessed update events by combining them into one using the function combineUpdates, in order to reduce the number of HTTP requests while maintaining event order.

/*
Combines multiple consecutive update events into a single update event:
 { type: 'update', data: [1] }, { type: 'update', data: [2] }, => { type: 'update', data: [1, 2] }
*/
function combineUpdates(updates: Event[]): Event {
  return { type: 'update', data: updates.map(e => e.data).flat()};
}

For example: eventStream should be transformed into the following series of postEvent() calls:

  { type: 'update', data: [1, 2] },
  { type: 'add', data: [3] },
  { type: 'update', data: [4, 5] },
  { type: 'delete', data: [6] },
  { type: 'update', data: [7] },

Answer №1

If you want to manipulate data using the concatMap method and store it in an array, check out the example below for a clear demonstration!

import { from, Observable, of, switchMap } from 'rxjs';
import { concatMap } from 'rxjs/operators';

type Modification = number[];

interface Task {
  action: 'create' | 'edit' | 'delete';
  modification: Update;
}

const taskStream = from(<Array<Task>>[
  { action: 'edit', data: [1] },
  { action: 'edit', data: [2] },
  { action: 'create', data: [3] },
  { action: 'create', data: [3] },
  { action: 'edit', data: [4] },
  { action: 'edit', data: [5] },
  { action: 'delete', data: [6] },
  { action: 'edit', data: [7] },
  // ... other tasks
]);

function performTask(task: Task): Observable<any> {
  console.log('performing task', task);
  return of(true);
}

function combineModifications(mods: Task[]): Task {
  return { action: 'edit', data: mods.map((t) => t.data).flat() };
}

let modArray: Array<Task> = [];
const streamActions$ = taskStream.pipe(
  concatMap((task: Task): any => {
    debugger;
    if (task.action === 'edit') {
      modArray.push(task);
      return of(null);
    } else {
      if (modArray.length) {
        const taskList = [];
        if (modArray?.length) {
          const data = combineModifications(modArray);
          modArray = [];
          taskList.push(performTask(data));
        }
        taskList.push(performTask(task));
        return from(taskList);
      } else {
        return performTask(task);
      }
    }
  })
);

streamActions$.subscribe({
  complete: () => {
    if (modArray.length) {
      const data = combineModifications(modArray);
      modArray = [];
      performTask(data).subscribe();
    }
  },
});

// { action: 'edit', data: [1, 2] },
//   { action: 'create', data: [3] },
//   { action: 'edit', data: [4, 5] },
//   { action: 'delete', data: [6] },
//   { action: 'edit', data: [7] },

Try this on StackBlitz

Answer №2

To streamline the process, I suggest creating a bespoke operator that consolidates a sequence of updates into a single update operation while leaving all other operations unaffected.

Below is an example of how this custom operator could be implemented:

// combineUpdates function defines a custom operator
function combineUpdates() {
  return (source: Observable<Event>) => {
    let isUpdate = false;
    let updateArray: number[] = [];
    
    return new Observable((subscriber: Subscriber<Event>) => {
      const subscription = source.subscribe({
        next: (value) => {
          isUpdate = value.type === 'update';
          if (isUpdate) {
            updateArray.push(...value.data);
            return;
          }
          if (updateArray.length > 0) {
            subscriber.next({ type: 'update', data: updateArray });
          }
          updateArray = [];
          subscriber.next(value);
        },
        error: (err) => subscriber.error(err),
        complete: () => subscriber.complete(),
      });
      return () => {
        subscription.unsubscribe();
      };
    });
  }
}

Once the custom operator is defined, it can be utilized in the following manner:

const streamLogic$ = eventStream.pipe(
  combineUpdates()
);

streamLogic$.subscribe({
  next: console.log,
});

For a working demo, you can visit this stackblitz link.

For further insight into custom operators, check out this informative article.

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Tips on preventing pooling in Angular 5

service.ts: // Fetch all AgentLog logs using pooling method getAgentLogStream(): Promise<string> { const url = `${this.testCaseUrl}/logfile`; return Observable .interval(5000) .flatMap((i)=> this.http.get(url).toPromise().then(respons ...

Modifying the values of various data types within a function

Is there a more refined approach to enhancing updateWidget() in order to address the warning in the else scenario? type Widget = { name: string; quantity: number; properties: Record<string,any> } const widget: Widget = { name: " ...

The main module's postinstall process is initiated before the sub-module's postinstall process

Update: I am seeking guidance on how to install a module from GitHub instead of npm. That's the main query. In case you're wondering why: I'm currently working on some confidential projects and prefer not to publish the code. As a result, ...

Having trouble accessing specific results using Firestore's multiple orderBy (composite index) feature

I am facing an issue with a query that I run on various data types. Recently, one of the collections stopped returning results after I included orderBy clauses. getEntitiesOfType(entityType: EntityType): Observable<StructuralEntity[]> { const col ...

A guide on dynamically showcasing/summoning icons in react by utilizing array data

QUESTION: 1 (SOLVED!) https://i.stack.imgur.com/1M1K7.png What is the best way to display icons based on an array of data codes? const data = [{ img: '01d' }, { img: '02d' }] if(data) { data.map((item) => ( <img src={`./ ...

Even when using module.exports, NodeJS and MongoDB are still experiencing issues with variable definitions slipping away

Hello there, I'm currently facing an issue where I am trying to retrieve partner names from my MongoDB database and assign them to variables within a list. However, when I attempt to export this information, it seems to lose its definition. Can anyone ...

Error TS2322: The specified type Login cannot be assigned to the given type

I've been facing an issue while working on my app in react native. The error message I keep encountering is as follows: TS2322: Type 'typeof Login' is not assignable to type ScreenComponentType<ParamListBase, "Login"> | undefined Type ...

What is the purpose of mapping through Object.keys(this) and accessing each property using this[key]?

After reviewing this method, I can't help but wonder why it uses Object.keys(this).map(key => (this as any)[key]). Is there any reason why Object.keys(this).indexOf(type) !== -1 wouldn't work just as well? /** * Checks if validation type is ...

Indulging in the fulfillment of my commitment within my Angular element

In my Angular service, I have a method that makes an AJAX call and returns a Promise (I am not using Observable in this case). Let's take a look at how the method is structured: @Injectable() export class InnerGridService { ... private result ...

What is the most effective way to send messages from Typescript to C#?

Could someone provide guidance on how to properly send a message from Typescript to C#? I have been attempting to receive the message in C# using WebView_WebMessageReceived with the code snippet below: private void WebView_WebMessageReceived(object sender, ...

How can we ensure a generic async function with a return type that is also generic in Typescript?

I'm currently working on a function that retries an async function multiple times before rejecting. I want to make sure the typescript typings of the retry function are maintained and also ensure that the passed function is of type PromiseLike. Creat ...

What is the best way to elucidate this concept within the realm of TypeScript?

While diving into my ts learning journey, I came across this interesting code snippet: export const Field:<T> (x:T) => T; I'm having trouble wrapping my head around it. It resembles the function definition below: type myFunction<T> = ...

Unexpected issue with Ionic 4 subarray returning as undefined even though the index is accurate

When attempting to use console.log to view the value, I noticed that the value of noticeSet2[index] is undefined. However, when I print noticeSet, all the data in the array is displayed. Additionally, after printing the index using console.log, it correctl ...

What is the procedure for transferring the inputted data from an HTML file to its corresponding TS file and subsequently to a different component file?

I have created two components, a login and a home-page. I am attempting to capture user input from the login template, pass it to the login component, and then display it on the home-page template using the home-page component. What is the best approach to ...

Using TypeScript, you can utilize RxJS to generate a fresh Observable named "Array" from a static array

I've successfully created an observable from an array, but the issue is that its type shows as Observable<number> instead of Observable<number[]> getUsers(ids: string[]): Observable<number[]> { const arraySource = Observable.from ...

One creative method for iterating through an array of objects and making modifications

Is there a more efficient way to achieve the same outcome? Brief Description: routes = [ { name: 'vehicle', activated: true}, { name: 'userassignment', activated: true}, { name: 'relations', activated: true}, { name: &apos ...

The eventsource property binding in Ionic 2 calendar does not correctly refresh the view

As a newcomer to the world of Ionic, Angular, and TypeScript, I am currently working on developing a calendar that allows users to set appointments (events) and make edits or deletions to them. To achieve this functionality, I have implemented a modal for ...

Error: Unable to locate Angular2 Custom Service

I have implemented a custom service to populate a list of people in my HTML. Below is the code for my custom service: app.peopleListService.ts import { Injectable } from '@angular/core'; import { Person } from "../model/peopleModel"; @Injecta ...

Converting Angular 2/TypeScript classes into JSON format

I am currently working on creating a class that will enable sending a JSON object to a REST API. The JSON object that needs to be sent is as follows: { "libraryName": "temp", "triggerName": "trigger", "currentVersion": "1.3", "createdUser": "xyz", ...

The enigma of TypeScript

Whenever I try to declare or initialize data members in a class, the following methods never seem to work: var view: string[]; var view: string[] = []; let view: string[]; let view: string[] = []; Even though the TypeScript documentation states that it s ...