Tips for transferring data between connected observers

Challenging Scenario: Whenever a source Observable emits an event, it triggers a sequence of API calls and Angular services. Some of these invocations rely on previous results.

For instance, the source Observable startUpload$ initiates a series of interdependent calls.

This can be achieved using destructuring as shown below:

this.startUploadEvent$.pipe(
      concatMap(event => this.getAuthenticationHeaders(event)),
      map(({ event, headers }) => this.generateUploadId(event, headers)),
      tap(({ event, headers, id }) => this.emitUploadStartEvent(id, event)),
      concatMap(({ event, headers, id }) => this.createPdfDocument(event, headers, id)),
      concatMap(({ event, headers, id, pdfId }) => this.uploadBilderForPdf(event, pdfId, headers, id)),
      mergeMap(({ event, headers, id, pdfId, cloudId }) => this.closePdf(cloudId, event, headers, id, pdfId)),
      tap(({ event, headers, id, pdfId, cloudId }) => this.emitUploadDoneEvent(id, event, cloudId)),
).subscribe()

While this approach resembles imperative programming, it comes with its own set of challenges:

  • The long and repetitive destructuring chain
    { event, headers, id, pdfId, cloudId }
    throughout the code
  • Methods like generateUploadId(event, headers) need to receive all previous values, even if they don't require them, just to pass them along to the next pipe stage
  • Inner Observables within methods are needed to map the values for further stages of the pipe:

_

private closePdf(cloudId, event, headers, id, pdfId) {
    return this.httpClient.post(..., { headers } )
        .pipe(
             //...,
             map(() => ({ event, headers, id, pdfId, cloudId }))
        )
}

Wouldn't it be ideal if the compiler could handle this boilerplate, similar to how async await simplifies asynchronous code? The desired code structure would eliminate the mentioned issues and look something like this:

private startUpload(event: StartUploadEvent) {
    const headers = this.getAuthenticationHeaders(event)
    const id = this.generateUploadId()

    this.emitUploadStartEvent(id, event)

    const pdfId = this.createPdfDocument(event, headers, id)
    this.uploadBilderForPdf(event, pdfId, headers, id)

    const cloudId = this.closePdf(headers, pdfId)
    this.emitUploadDoneEvent(id, event, cloudId)

    return cloudId
  }

Is there a way to seamlessly pass results between chained observables without encountering the previously discussed challenges? Are there any concepts in rxjs that address this issue?

Answer №1

Avoid passing unnecessary parameters to your methods!

Regarding your main question:

How can you transfer results between chained observables without encountering the issues mentioned?

Utilize a single scope (nested pipes)

The provided code is similar to your original code but eliminates the need to pass irrelevant properties. Values returned previously can still be accessed in subsequent function calls within the chain:

1   startUploadEvent$.pipe(
2     concatMap(event => getAuthenticationHeaders(event).pipe(
3       map(headers => generateUploadId(event, headers).pipe(
4         tap(id => emitUploadStartEvent(id, event)),
5         concatMap(id => createPdfDocument(event, headers, id)),
6         concatMap(pdfId => uploadBilderForPdf(event, pdfId)),
7         tap(cloudId => closePdf(cloudId, event))
8       ))
9     ))
10  ).subscribe();

Notice how event and headers remain accessible downstream without the need to pass them unnecessarily.

Have you overlooked any rxjs concepts?

Possibly... But not really! :-)

The key is adding a .pipe to group operators effectively so they all have access to input parameters.

Typically, we aim to keep the code flat within the .pipe:

1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/${id}`)),
3     map(response => response.data.userName),
4     map(name => `Hello ${name}!`),
5     tap(greeting => console.log(greeting))
6   );

However, this structure is essentially the same as:

1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/${id}`).pipe(
3       map(response => response.data.userName),
4       map(name => `Hello ${name}! (aka User #${id})`)
5     )),
6     tap(greeting => console.log(greeting))
7   );

In the latter case, line #4 has access to both name and id, whereas in the former it only has access to name.

Observe the signature of the first example:

userId$.pipe(switchMap(), map(), map(), tap())

While the second one is: userId$.pipe(switchMap(), tap()).

Answer №2

Your methods should not rely on specific contexts or mapping results to a fixed shape.

RxJS focuses on functional programming, where there is a concept called Adapting Arguments to Parametersref

This pattern allows for separating method signatures from contexts.

To achieve this, you can create context-specific versions of operators like map, contentMap, mergMap. The final solution may look like:

this.startUploadEvent$.pipe(
      map(withKey('event')),
      concatMap_(({event}) => this.getAuthenticationHeaders(event), 'headers'),
      map_(({ headers }) => this.generateUploadId(headers), 'id'),
      tap(({ event, id }) => this.emitUploadStartEvent(id, event)),
      concatMap_(({ id }) => this.createPdfDocument(id), 'pdfId'),
      concatMap_(({ pdfId }) => this.uploadBuilderForPdf(pdfId), 'cloudId'),
      mergeMap_(({ cloudId }) => this.closePdf(cloudId)),
      tap(({id, event, cloudId}) => this.emitUploadDoneEvent(id, event, cloudId)),
    ).subscribe(console.log);

Note the underscore _ after those operators.

Stackblitz Example

The goal of these custom operators is to pass parameters through a projection function and add the result to the original parameters object.

(Function code examples provided in the original text)

In some cases, over-engineering may not be necessary. Stick to simpler implementations if complexity is not required in your existing codebase.

Doesn't it look like over-engineering?

In most cases, follow the YAGNI (You aren't gonna need it) principle to avoid unnecessary complexity. Consider simple ways to share parameters between operators when needed instead.

(Code example given in the original text)

There are alternative methods that require changing the way you use RxJS operators:

Answer №3

Here are some ways you can optimize your actions using observables:

  • Assign the outcome of each action to an observable

  • Link subsequent function calls based on previous results

  • You can reuse those outcomes in later actions by using withLatestFrom

  • To prevent earlier functions from re-executing due to subsequent withLatestFrom subscriptions, utilize shareReplay

    function startUpload(event$: Observable<string>) {
      const headers$ = event$.pipe(
        concatMap(event => getAuthenticationHeaders(event)),
        shareReplay()
        );
    
      const id$ = headers$.pipe(
        map(() => generateUploadId()),
        shareReplay()
        );
    
      const emitUploadEvent$ = id$.pipe(
        withLatestFrom(event$),   // use earlier result
        map(([id, event]) => emitUploadStartEvent(id, event)),
        shareReplay()
        );
    
       // etc
    }
    

The functions strictly take only the necessary parameters without any additional pass-through data.

Check out a demo example here: https://stackblitz.com/edit/so-rxjs-chaining-1?file=index.ts

This process can be streamlined by creating a custom RxJS operator (note that further refinements, including typing, can be implemented):

function call<T, R, TArgs extends any[], OArgs extends Observable<any>[]>(
  operator: (func: ((a: TArgs) => R)) => OperatorFunction<TArgs,R>,
  action: (...args: any[]) => R,
  ignoreInput: boolean,
  ...observableArgs: OArgs
): (args: Observable<T>) => Observable<R> {
  return (input: Observable<T>) => input.pipe(
    withLatestFrom(...observableArgs),
    operator((args: any[]) => action(...args.slice(ignoreInput ? 1: 0))),
    shareReplay(1)
  );
}

You can apply this as follows:

function startUpload(event$: Observable<string>) {
  const headers$ = event$.pipe(
    call(concatMap, getAuthenticationHeaders, true)
  );

  const id$ = headers$.pipe(
    call(map, generateUploadId, false)
  );

  const startEmitted$ = id$.pipe(
    call(map, emitUploadStartEvent, true, event$)
  );

  const pdfId$ = startEmitted$.pipe(
    call(map, createPdfDocument, false, event$, headers$, id$)
  );

  const uploaded$ = pdfId$.pipe(
    call(map, uploadBuilderForPdf, false, event$, pdfId$, headers$, id$)
  );

  const cloudId$ = uploaded$.pipe(
    call(map, closePdf, false, headers$, pdfId$)
  );

  const uploadDone$ = cloudId$.pipe(
    call(map, emitUploadDoneEvent, true, id$, event$)
  );

  // To maintain the observable chain, return cloudId$ instead of uploadDone$
  return uploadDone$.pipe(concatMap(() => cloudId$));    
}

Explore the working demo here: https://stackblitz.com/edit/so-rxjs-chaining-4?file=index.ts

Answer №4

Is it possible to utilize an object for storing the data set? Perhaps something along these lines:

Data Structure:

export interface Packet {
  event: string;
  headers?: string;
  id?: number;
  pdfId?: number;
  cloudId?: number;
}

Subsequently, in the code, consider implementing a setup like this:

Service Logic:

  this.startUploadEvent$.pipe(
    concatMap(packet => this.doTaskOne(packet)),
    map(packet => this.doTaskTwo(packet)),
    tap(packet => this.doTaskThree(packet)),
    // ...
  );

This method allows each function to access the necessary components of the object and pass on the remainder. However, it will be essential to adjust each function to accept and process the object accordingly.

Answer №5

It seems like your main concern is readability and avoiding carrying the payload from method to method.

Have you considered converting an Observable to a Promise? The key point here is that the observables need to complete for the promise to be fulfilled and resolved (similar to completion but specific to promises).

Following your suggestion mentioned above (such as using async await), I have come up with this proposal.

private async startUpload(event: StartUploadEvent) {
    const headers = await this.getAuthenticationHeaders(event).toPromise();
    const id = await this.generateUploadId().toPromise();
    
    this.emitUploadStartEvent(id, event);
    
    const pdfId = await this.createPdfDocument(event, headers, id).toPromise();
    await this.uploadBilderForPdf(event, pdfId, headers, id).toPromise();
    
    const cloudId = await this.closePdf(headers, pdfId).toPromise();
    this.emitUploadDoneEvent(id, event, cloudId)
    
    return cloudId
}

Information: You can find out what happens when you convert an observable into a promise without completing the observable here: Why converted promise from Subject (Observable) does not work as expected

Note: I am meeting your expectations accordingly.

There might be other solutions to the issue that do not go against common best practices.

Answer №6

It is evident that the code poses certain challenges, and the key solution lies in shifting the responsibility of combining results and passing correct arguments from methods to the pipe.

A number of simple improvements can be implemented without much difficulty. tap operator does not alter the value, allowing us to remove unnecessary properties during destructuring. map simply transforms the result, so instead of

map(({ event, headers }) => this.generateUploadId(event, headers)),

We can now write

map(({ event, headers }) => ({
  event,
  headers,
  id: this.generateUploadId(event, headers)
}))

The method this.generateUploadId no longer needs to return an object.

When it comes to high-order mapping operators, several options come to mind. Most 'xMap' operators support a result selector as the final argument, which serves our purpose effectively - combining source value with the result. Result selectors have been deprecated, so nested pipes are currently the preferred approach. However, let's explore how this could look using a result selector

Option 0. Result Selector (deprecated)

this.startUploadEvent$
  .pipe(
    concatMap(
      event => this.getAuthenticationHeaders(event),
      (event, headers) => ({ event, headers }) // <-- Result Selector
    )
  );

Option 1. Nested Pipes (aka "use closures")

This is quite similar to Option 0, but the event is preserved within the closure rather than the inner observable.

this.startUploadEvent$
  .pipe(
    concatMap(
      event => this.getAuthenticationHeaders(event)
        .pipe(map(headers => ({ event, headers })))
    )
  );

Option 2. Custom Operator (Closures here as well)

A custom operator can be created to achieve a syntax resembling Result Selectors

function withResultSelector(operator, transformer) {
  let sourceValue;
  return pipe(
    tap(value => (sourceValue = value)),
    operator,
    map(value => transformer(sourceValue, value))
  );
}

Usage:

this.startUploadEvent$
  .pipe(
    withResultSelector(
      concatMap(event => this.getAuthenticationHeaders(event)),
      (event, headers) => ({ event, headers })
    )
  );

Furthermore, it is feasible to extract repetitive elements and enhance overall functionality:

const mergeAs = propName => (a, b) => ({ ...a, [propName]: b });
const opAndMergeAs = (operator, propName) => withResultSelector(operator, mergeAs(propName));

this.startUploadEvent$
  .pipe(
    opAndMergeAs(concatMap(event => this.getAuthenticationHeaders(event)), "headers")
  );

While defining proper types for these operations might pose some challenges, it presents a separate issue

Here is the playground I utilized while crafting this response.

Answer №7

Your observation about the concerns and issues mentioned is accurate, but the challenge here lies in transitioning your mindset from an imperative approach to a Reactive/Functional one. Let's first examine the imperative code:

private startUpload(event: StartUploadEvent) {
    const headers = this.getAuthenticationHeaders(event)
    const id = this.generateUploadId()

    this.emitUploadStartEvent(id, event)

    const pdfId = this.createPdfDocument(event, headers, id)
    this.uploadBilderForPdf(event, pdfId, headers, id)

    const cloudId = this.closePdf(headers, pdfId)
    this.emitUploadDoneEvent(id, event, cloudId)

    return cloudId
}

In this code snippet, you can see that having the 'event' parameter allows for passing only necessary data to subsequent functions. The goal is to transition this code to a more Reactive/Functional approach.

From my perspective, the main issue is that functions lose their context. For instance, 'getAuthenticationHeaders' should solely return 'headers' without involving 'event.' This often arises when working with RxJS (Reactive Approach), maintaining purity by ensuring that 'pure' operators handle data within the same pipeline, avoiding side effects for predictability.

A potential solution lies in utilizing 'nested pipes,' which I find to be the most effective approach:

concatMap(event => this.getAuthenticationHeaders(event).pipe(
    map(headers => this.generateUploadId(event, headers).pipe())
))

This technique is prevalent in RxJS backend libraries like Marble.js. Other suggestions provided could enhance functionality while retaining code clarity.

While converting to an 'async/await' approach is possible, it sacrifices the reactivity offered by RxJS. I suggest delving into reactive programming concepts and exploring related resources such as CycleJS and Functional Programming materials, including the informative books Mostly adequate guide to FP (in javascript) and Composing Software.

I highly recommend watching the insightful talk on RxJS Recipes as it can revolutionize your approach to using RxJS effectively.

Additional Resources Worth Exploring:

Answer №8

Contained within this answer are four different methods, each utilizing four consecutive streams (allowing backend calls to make use of the previous stream's response).

(While some methods may have been previously mentioned in other answers, this comparison is presented using the same input assumptions - where the streams represent HTTP calls with a single emission, thereby allowing operators like mergeMap, concatMap, or switchMap to be used.)

Each method comes with its own set of advantages and disadvantages.

Before delving into the methods, here is the common preparation code:

import { of, map, Observable, exhaustMap } from 'rxjs';

const a$ = of('a');
const b$ = of('b');
const c$ = of('c');
const d$ = of('d');

interface Result {
  a: string;
  b: string; 
  c: string;
  d: string;
} 

Method 1: Nested Pipe Using Map/Previously Known as Result Selector

This approach involves two levels of nested pipes, which may make it slightly less readable and more prone to errors.

function makeBeCalls(): Observable<Result> {
  return a$
      .pipe(
          exhaustMap(a =>
              b$.pipe(
                  map(b => ({b, a}))
              )
          ),
          exhaustMap(({b, a}) =>
              c$.pipe(
                  map(c => ({b, a, c}))
              )
          ),
          exhaustMap(({a, b, c}) =>
              d$.pipe(
                  map(d => ({a, b, c , d}))
              )
          )
      );
}

makeBeCalls().subscribe(v => console.log(v))

Method 2: Using a Shared Variable for Previous Values

In this method, an additional temporary variable is utilized to store subsequent emissions - resulting in a side effect. However, the solution appears shorter and more readable.

function makeBeCalls1(): Observable<Result> {
  const events: Result = { a: null, b: null, c: null, d: null };
  return a$
      .pipe(
          exhaustMap(a => (events.a = a) && b$),
          exhaustMap(b => (events.b = b) && c$),
          exhaustMap(c => (events.c = c) && d$),
          map(d => (events.d = d) && events) 
      );
}

Method 3: Using Nested Pipes/Closure Scope

This particular approach nests each stream, reminiscent of callback hell. It is considered the least readable and potentially more error-prone, especially when dealing with multiple levels of nesting.

function makeBeCalls2(): Observable<Result> {
  return a$
      .pipe(
          exhaustMap(a => b$
              .pipe(
                  exhaustMap(b => c$
                      .pipe(
                          exhaustMap(c => d$
                              .pipe(
                                  map(d => ({a, b, c, d}))
                              )
                          )
                      )
                  )
              )
          )
      );
}

makeBeCalls2().subscribe(v => console.log(v))

Method 4: Utilizing Promises with Async/Await

This approach, while not fully reactive, offers a straightforward and readable solution - especially beneficial for HTTP calls.

async function makeBeCalls3(): Promise<Result> {
  const a = await lastValueFrom(a$);
  const b = await lastValueFrom(b$);
  const c = await lastValueFrom(c$);
  const d = await lastValueFrom(d$);
  
  return {a, b, c, d}
}

makeBeCalls3().then(v => console.log(v))

Visit StackBlitz for a working example.

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Utilizing an Angular Service within the main.ts script

My main.ts file currently has the following code snippet: declare const require; const translations = require("raw-loader!./locale/messages.de.xlf"); platformBrowserDynamic().bootstrapModule(AppModule, { providers: [ { provide: TRANSLATIONS, useVa ...

A guide to resolving the Angular 11 error of exceeding the maximum call stack size

I am currently working with 2 modules in Angular 11 CustomerModule AccountingModule I have declared certain components as widget components in these modules that are used interchangeably: CustomerModule -> CustomerBlockInfoWidget AccountingModule -&g ...

Practical strategy for developing and launching a TypeScript/Node.js application

I'm currently developing a node.js app using Typescript, which requires compilation to JS before running. As someone with a background in java/jvm, I'm hesitant about the deployment process where code is pushed to git, built/compiled on the serve ...

Run JavaScript code whenever the table is modified

I have a dynamic table that loads data asynchronously, and I am looking for a way to trigger a function every time the content of the table changes - whether it's new data being added or modifications to existing data. Is there a method to achieve th ...

You are unable to assign to 'total' as it is a constant or a property that cannot be modified

Upon running ng build --prod in my project, I encountered the following error: src\app\components\xxxx\xxxx.component.html(116,100): : Cannot assign to 'total' because it is a constant or a read-only property. The proble ...

Leverage the TypeScript Compiler API to verify whether an interface property signature permits the value of undefined (for example, prop1?:

Currently, I am utilizing the TypeScript Compiler API to extract Interface information in order to generate database tables. The process is functioning effectively, however, I am seeking a method to determine if certain fields are nullable, or as it is phr ...

What is the best way to leverage TypeScript for destructuring from a sophisticated type structure?

Let's consider a scenario where I have a React functional component that I want to implement using props that are destructured for consistency with other components. The component in question is a checkbox that serves two roles based on the amount of ...

Change a nested for-loop into an Observable that has been transformed using RxJS

Currently, the following function is operational, but I consider it a temporary solution as I'm extracting .value from a BehaviorSubject instead of maintaining it as an observable. Existing Code Snippet get ActiveBikeFilters(): any { const upd ...

What is the best way to shift focus to the next input field once the character limit has been reached, especially when the input is contained

My challenge lies in having six input fields arranged side by side in a single row: In my component.html file: onDigitInput(event: any) { let element; if (event.code !== 'Backspace') element = event.srcElement.nextElementSibling; consol ...

What could be causing my Bootstrap accordion to not expand or collapse within my Angular application?

Struggling with my Accordion Angular component that utilizes Bootstrap - the collapsing and expanding feature isn't functioning properly. I've simply copied the bootstrap code into my accordion.component.html. <div class="accordion accord ...

Angular 2 - Error: Unexpected token < in Http request

I encountered a JavaScript error in my browser while working with the following code: Uncaught SyntaxError: Unexpected token < The error disappears when I remove constructor(private _http:Http) { } from image.service.ts. Could it be that I am incorre ...

Using TypeScript to pass parameter in a chained function

When using node.js, I am calling a script with `exec` from the `child_process` module. In the closed state, I need to verify if there was any error so that I can handle it properly. However, I am facing an issue where I cannot access the `error` parameter ...

Validate the data type based on the property

I have a CARD type that can be either a TEXT_CARD or an IMAGE_CARD: declare type TEXT_CARD = { type: "paragraph" | "h1" | "h2"; text: string; }; declare type IMAGE_CARD = { type: "img"; src: string; orient ...

Why doesn't angular generate ng-reflect-_opened="false" in its production build?

We are currently utilizing the following technologies (which cannot be changed in the near future): Angular 2 RC5 Angular CLI 1.0.0-beta.10 Material Design Side Nav Control Node 6.9.1 npm 3.10.8 Windows 10 When we compile the code (using ng serve with d ...

Struggling with Dependency Problems in Two Separate tsconfig Files

Here's a situation where I have come up with a solution using two ts config files. The structure of the solution looks like this. The main tsconfig.json at the root level is as follows: { "compilerOptions": { "declaration": true, ...

Utilize the fetch function within a React functional component

I have been experimenting with different methods to fetch data only once before rendering, but I am encountering some challenges: It is not possible to call dispatch in componentDidMount as there is a restriction that it can only be done in Functional c ...

How can I bypass a unit test suite in Angular?

Is there a method to skip certain unit tests that should not be executed currently, without resorting to using fdescribe on the ones I do want to run? ...

Manipulate the name of a property or field using TypeScript

I am in the process of creating a bilingual blog using Nuxt.js and TypeScript. This application interacts with a REST API to retrieve data that is structured like this: { "Headline_de": "Mein erster Blogpost", "Headline_en" ...

Verify the presence of data in Firebase using Angular

Currently, I am working on a web project that involves Angular connected with Firebase console. In my service class, I have defined a function to verify if a certain value exists in the database before saving it. However, whenever I call this function in m ...

Encountering a d3.js runtime issue following the transition to Angular 8

I've been experimenting with upgrading my Angular 6 application to Angular 8. It compiles fine, but I'm facing a runtime error as soon as it runs: "d3.js:8 Uncaught TypeError: Cannot read property 'document' of undefined". The specific ...