Merge rxjs streams, identify modifications, and yield a single result

In the context of using Angular with .net Core WebApi, let's consider the development of a time management application designed to monitor task durations. The user initiates a task on the front end which triggers a timer displaying elapsed time.

The concept involves retrieving the elapsed time data from the backend. When a user begins a task on the front end, a timer starts on the backend and sends the current elapsed time back to the front end for display. The goal is to ensure accurate time tracking even in cases of connection issues between the backend and frontend.

To illustrate this scenario: the backend stream sends out values every second (representing the elapsed time displayed to the user). However, if there is a temporary connection disruption after 6 seconds, causing a delay before sending "9" ("0:09"), it might confuse the user ("it was at 6 seconds and now suddenly 9?"). To address this, an interval is set up on the front end to emit a new value each second. This allows for checking whether the backend stream has sent a new value within that timeframe, and if not, the previous value can be adjusted to ensure the user sees the correct elapsed time.

bStream => ---1---2---3---4---5---6---x---x---9
fStream => ---1---2---3---4---5---6---7---8---9

What the user observes:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 (pause) -> 00:09

Desired user experience:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 -> (pause - frontend detects no new value and adds 1 second to the previous)
Resulting sequence:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 -> 00:07 -> 00:08 -> 00:09

Facing some challenges in determining where to begin this process.

Created a quick fiddle with two streams but struggling to figure out how to detect when bStream does not emit a new value.

https://stackblitz.com/edit/typescript-yookbj

Answer №1

Here is a potential solution:

const be$ = concat(
  of(1).pipe(delay(100)),
  of(2).pipe(delay(100)),
  of(3).pipe(delay(100)),
  of(4).pipe(delay(100)),
  of(5).pipe(delay(100)),
  of(6).pipe(delay(100)),

  of(10).pipe(delay(500)), // Wait after freeze
  of(11).pipe(delay(100)),
  of(12).pipe(delay(100)),
).pipe(shareReplay({ bufferSize: 1, refCount: true, }), endWith(null));

// Skip(1) - the ReplaySubject used by shareReplay() will give us the latest value and it's not needed
const beReady$ = be$.pipe(skip(1), take(1));

const fe$ = be$.pipe(
  mergeMap(v => merge(
    of(v),
    of(v).pipe(
      expand(v => timer(100).pipe(map(v1 => 1 + v))),
      takeUntil(beReady$),
    )
  )),
  distinctUntilChanged(),
  filter(v => v !== null)
).subscribe(console.log)

endWith(null) - to halt the recursion when the final value (12) is emitted, we need the source to emit something else

shareReplay - the source must be shared as there will be an additional subscriber (beReady$), apart from the main subscriber (fe$)

mergeMap(v => merge(
  of(v), // Pass on the current value
  of(v).pipe(
    // If be$ does not emit within the next 100ms, send the currentValue + 1
    // and continue this process until be$ finally emits
    expand(v => timer(100).pipe(map(v1 => 1 + v))),
    takeUntil(beReady$),
  )
)),

expand works similarly to mergeMap, with some differences:

  • it passes along the inner value
  • it creates another inner observable based on the last inner value, making it recursive
  • takeUntil(beReady$) is how the recursion can be terminated

StackBlitz

Answer №2

Kindly review the code snippet provided below. To simulate a synchronization issue, modify the code v * 5 to v * 4. This change will ensure that the counter aligns with the value obtained from the "backend" once it has been received.

// emit every 5s
const source = interval(5000).pipe(
    map(v1 => v1 + 1), 
    startWith(0), 

    map(v => v * 5),
);
// emit every 1s
const secondSource = interval(1000).pipe(
    delay(50), 
    map(v1 => v1 + 1), 
    startWith(0)
);

source.pipe( 
    switchMap(v => secondSource.pipe(
        map(v1 => v + v1), 
    )),
).subscribe(v => console.log(v));

The updated code now ensures smooth counting from 0 to N even in the presence of lags from the backend.

UPDATED

There is an even simpler method available, although it does not depend on the time when the backend responds and operates independently on a one-second period.

pipe(
  bufferTime(1000), 
  scan((a, b) => b.length ? a + 1 : b[0], 0), 
);

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

The dropdown menu repeatedly opens the initial menu only

My script fetches data from a database to populate a table with one row for each member. Each row contains a dropdown list with the same class and ID. Although I attempted to open and close the dropdowns using specific codes, I am facing an issue where onl ...

Utilize Angular Guards and RxJS to coordinate the flow of data between parent and child Guards. Ensure that the Child Guard waits for the parent

Is it possible to validate data in a child component's guard that was fetched from the parent's guard? I have parent components with employees and used a guard to fetch data from the server. Now, I want to verify this data in the child component& ...

Tips for implementing UI properties in React

Utilizing an external UI Library, I have access to a Button component within that library. My goal is to create a customized NewButton component that not only inherits all props from the library Button component but also allows for additional props such as ...

Can I create interactive stacked shapes with CSS and/or JavaScript?

Trying to explain this may be a challenge, so please bear with me. I need to create an "upvote" feature for a website. The number of upvotes is adjustable in the system settings. The upvote controls should resemble green chevrons pointing upwards. For exa ...

It ceases to function when transferred to another file

I have a function written in coffeescript that goes like this: _skip_version = (currentVersion, skippedVersions) -> if (currentVersion.indexOf(skippedVersions) == -1) return false return true This function is currently located in my archive.sp ...

We've encountered an issue with Redux and Typescript: 'Unable to find property 'prop' in type 'string[]'

When attempting to fetch data in redux and return only a portion of it, I encountered an issue with Typescript stating that "Property 'xxx' does not exist on type 'string[]'". I have reviewed the interface and initialState, but I am una ...

Changing table data using a switch in mui-datatables when my information is stored as boolean values

How can I update my boolean data in a Switch component for each row fetched from Firestore? The data is currently being displayed correctly, but when I click on the Switch to change it from true to false or vice versa, nothing happens. Can someone help me ...

A JavaScript function that smoothly scrolls an element into view while considering any scrollable or positioned parent elements

I needed a function that could smoothly scroll a specific element into view with some intelligent behavior: If the element is a descendant of a scrollable element, I wanted the ancestor to be scrolled instead of the body. If the element is within a posit ...

Retrieving the chosen value when there is a change in the select tag

Building a shop and almost done, but struggling with allowing customers to change product quantities in the cart and update prices dynamically. I've created a select-tag with 10 options for choosing quantities. I'd like users to be able to click ...

"Customizing the template of the Angular Material 2 datepicker: A step-by-step

Looking to make changes to the templates of the angular 2 material date-picker? These templates are located within various internal components in @angular/material/esm5/datepicker.es5.js. One option is to directly modify the template in the node package, ...

The then() function in Node.js is triggered before the promise is fully resolved

I'm struggling to get my Promise function working as intended. Here's what I need to accomplish: I am receiving file names from stdout, splitting them into lines, and then copying them. Once the copy operation is complete, I want to initiate oth ...

Set up specific vue.config.js configurations for unique environments in Vue

I am working on a multi-page app where I want certain pages to only show up in my development environment. Here's how my vue.config.js looks: module.exports = { productionSourceMap: false, pages: { index: "src/main.js", admin: { ...

Updating a table without the need for a button in PHP and AJAX with an Excel-inspired approach

I'm facing a scenario where I need to dynamically update the contents of a table row. To achieve this, I want the cell to transform into a text box when clicked. Here's how I implemented it: <td contenteditable></td> After making ch ...

Retrieve information from arrays within objects in a nested structure

I am working with structured data that looks like the example below: const arr = [{ id: 0, name: 'Biomes', icon: 'mdi-image-filter-hdr', isParent: true, children: [{ id: 1, name: 'Redwood forest& ...

Having trouble with the DataTables jQuery plugin? Seeing a blank page instead of the expected results?

I've been trying to set up the Datatables jquery plugin for my HTML table but it's not working as expected. I have linked to the Datatables CDN for CSS styling and script, along with Google's hosted jQuery plugin. Additionally, I have a loca ...

What is the best way to receive the information that was transmitted to me through a callback function?

While web development is not my strong suit, I have a question that might sound silly, so bear with me as I explain. Once the user selects their desired purchase, an API call is made to generate a trans_id and redirects them to the bank's payment pag ...

Sending parameters within ajax success function

To streamline the code, I started by initializing the variables for the selectors outside and then creating a function to use them. Everything was working fine with the uninitialized selector, but as soon as I switched to using the variables, it stopped wo ...

Utilizing React hooks to capture the checkbox value upon change and transfer it to the submitForm function

I've got a functioning hook that monitors the onChange event of input fields, then spreads them into a variable and sends them to a Lambda function for sending an email with SendGrid. Everything is working fine. However, when I add an input type of c ...

Plotting Polyline Path on Google Maps using Angular

I am attempting to create a simple polyline connecting two markers using Angular-google-maps. While I have successfully positioned my two markers, I am encountering some complexity when trying to draw a polyline between them. It seems that my logic may no ...

"An issue arises when using req.body and res.render as the values retrieved are

Encountering an unusual problem - when using req.body to transmit form input to another page, the data is properly displayed when a single word is used in the input field (e.g. "FullName"). However, when there is a space, such as in the example "Full Name" ...