streamlining the process of buffering and organizing items in a flow

Messages are received from an observable, sometimes out of order but with timestamps for sorting.

The goal is to deliver elements chronologically despite the unordered arrival and uncertain end time of the stream. A slight delay in processing is acceptable.

To achieve this ordering, buffering and sorting of items is necessary.

The approach involves maintaining a buffer of about three entries, sorting them, releasing the earliest one, and repeating the process after a set time or upon receiving a new item.

An example stream could look like:

//Time item appears:   example item:
14:01:01               {time: '14:00:00', name: 'olga'}
14:01:02               {time: '14:00:03', name: 'peter'}
14:01:03               {time: '14:00:02', name: 'ouma'}
14:01:05               {time: '14:00:06', name: 'kat'}
14:01:06               {time: '14:00:05', name: 'anne'}
//... more to come

The desired output should be:

//Time item appears:   example item:
14:01:05               {time: '14:00:00', name: 'olga'}
14:01:06               {time: '14:00:02', name: 'ouma'}
14:01:07               {time: '14:00:03', name: 'peter'}
14:01:08               {time: '14:00:05', name: 'anne'}
14:01:09               {time: '14:00:06', name: 'kat'}
// ... more to come

In this scenario, three elements are collected, sorted, and the earliest one (e.g., olga) is released. Subsequently, as kat arrives, it joins the sorting array, releases the next oldest element (ouma), then waits for the next event or passage of time.

While the buffer and bufferCount operators may be useful, their implementation for this specific case remains unclear. Perhaps there is a more suitable operator available.

A potential solution involves storing results in an array, using setInterval with a delay function for periodic sorting and release of the oldest three elements, flagging any idle intervals to send the remaining sorted elements before terminating the process, and triggering the interval on each new item if not already active.

For my suggested solution, refer to: https://stackblitz.com/edit/typescript-xtjuu8?file=index.ts

Is there a more efficient method to achieve this?

Answer №1

This was a challenging puzzle!
I experimented with the buffer operator, and here's what I came up with:

// assuming a stream of events like { order: number, name: string }
const event$ = streamOrders().pipe(share())

event$.pipe(
  buffer(event$.pipe(
    map(({ order }) => order),
    startWith(null),
    exhaustMap(order => event$.pipe(order === null ? take(1) : filter(({ order: latest }) => latest === order + 1), take(1))),
  )),
  mergeMap(events => events.sort(({ order: a }, { order: b }) => a - b)),
).subscribe(console.log)

If streamOrders provides events according to the following timeline:

Time item appears example item
14:01:01 { order: 1, name: 'olga' }
14:01:02 { order: 3, name: 'peter' }
14:01:03 { order: 2, name: 'ouma' }
14:01:05 { order: 5, name: 'kat' }
14:01:06 { order: 4, name: 'anne' }

... then the resulting order would look like this:

Time item appears example item
14:01:01 { order: 1, name: 'olga' }
14:01:03 { order: 2, name: 'ouma' }
14:01:03 { order: 3, name: 'peter' }
14:01:06 { order: 4, name: 'anne' }
14:01:06 { order: 5, name: 'kat' }

The logic behind this solution involves buffering events until the expected one arrives, followed by sorting and merging them in the right order.
It's efficient for the given input data!

However, there is an issue where an early arrival of an order disrupts the sequence, as shown by the example: [1, 10, 3, 4, 5, 2]. This results in [1, 2, 3, 4, 5, 10], with unexpected out-of-order items.


To address this problem, I implemented a different strategy using a companion Subject<number> to synchronize each order received based on its position in the sequence.

const expected$ = new BehaviorSubject<number | null>(null)
const event$ = streamOrders().pipe(
  delayWhen(({ order }) => expected$.pipe(filter(next => next === null || next === order))),
  share(),
)

event$.pipe(
  map(({ order }) => order + 1), 
  observeOn(asyncScheduler)
).subscribe(expected$)

event$.subscribe(console.log)

Test it out here on StackBlitz.com

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Error: doc.data().updatedAt?.toMillis function is not defined in this context (NextJs)

I encountered an error message while trying to access Firestore documents using the useCollection hook. TypeError: doc.data(...)?.updatedAt?.toMillis is not a function Here is my implementation of the useCollection Hook: export const useCollection = (c, q ...

A guide to sketching the ellipsoid with three.js

Despite Three.js offering functions for drawing ellipses, I am in need of assistance to draw an ellipsoid instead. Can someone please help me? I have a specific requirement to draw an ellipsoid using three.js. ...

Utilizing TypeScript Variadic Tuple Types for an efficient Cartesian Product function

With the introduction of Variadic Tuple Types in TypeScript 4.0, a new type construct that allows for interesting possibilities such as concantenation functions has been made available. An example from the documentation illustrates this: type Arr = readonl ...

Navigating through JSON data to retrieve specific values and executing repetitive actions

When the form is submitted, I am making an AJAX request to PHP code and this is the response I receive. var data = { "empty":{ "game_sais_no":"Season cannot contain empty value", "game_sc_no":"Category cannot contain empty value", ...

Error Alert: Invalid type specified in React.createElement - Electron-React-Boilerplate

As a designer looking to expand my skills into coding for personal projects, I decided to start with the navigation UI in the [electron-react-boilerplate][1] that I cloned. However, I encountered the following error message: Error Warning: React.createEle ...

How can one change the data type specified in an interface in TypeScript?

I am currently diving into TypeScript and looking to integrate it into my React Native application. Imagine having a component structured like this: interface Props { name: string; onChangeText: (args: { name: string; value: string }) => void; s ...

What could be causing the content in my select box to change only when additional select boxes are introduced?

When working with a form in next.js and using select boxes from material UI, I encountered an issue. The number of select boxes should change based on user input, but when I modify the value inside a select box, the displayed text does not update until I a ...

What is the best way to effectively clear memory in THREE.js?

After successfully rendering the model, rotating and zooming work perfectly. However, when attempting to clear the scene by clicking the button#clear, issues arise. The expectation is to traverse through the scene, add its children to an array, iterate ov ...

Is there a way to simultaneously click on a link on one page and alter the position of a particular class on another page?

I have been working on designing two pages for a website. I am looking to use JavaScript to ensure that when a link on page 1 is clicked and the user transitions to page 2, the second tab button (btn) will have an id of "default" and receive the activate c ...

Jasmine and Karma: Unidentifiable variable runs

Is a special plugin or library required to use runs() and waits() with Jasmine? I checked the Jasmine wiki page for async tests at https://github.com/pivotal/jasmine/wiki/Asynchronous-specs, but there's no mention of needing additional libraries. Bas ...

Issues encountered when trying to modify the Content-Type of POST requests using ngResource versions 1.0.6 and 1.1.4

Despite numerous attempts and trying various solutions found online, I am still unable to solve this issue. Similar questions have been asked in the following places: How to specify headers parameter for custom Angular $resource action How can I post da ...

Error: The 'data' property cannot be found in the Redux action due to type 'void' restrictions

While attempting to carry out this action, I am encountering an issue where the data variable is returning as undefined, even though the backend is providing the correct data in response. Additionally, a TypeScript error is displayed stating Property &apos ...

Removing a hyperlink and adding a class to a unordered list generated in JavaScript - here's how!

In my JavaScript code, I have the following implementation: init: function() { var html = []; $.each(levels, function(nr) { html.push('<li><a href="#">'); html.push(nr+1); ...

JQuery horizontal navbar hover animations

Looking to design a simple navigation bar that displays a submenu when hovering over a link. The issue I'm facing is that the submenu disappears when moving from the link to the submenu itself, which is not the desired behavior. How can this be fixed ...

"Searching for existing data in MongoDB upon click event from the client side: A step-by-step guide

I have developed an express app that allows users to search for movies and add them to lists. If a movie is already added to the list, I want to show 'Already added' instead of 'Added to list'. How can I achieve this functionality from ...

Create a function that can dynamically filter an array of objects and extract specific properties into a new array

Here is the input that I have: [ { "metadata": { "id": 1071, "name": "First" }, "languages": [ { "name": "Alpha", "details": [ { "city" ...

Tips for displaying lesser-known checkboxes upon clicking a button in Angular

I have a form with 15 checkboxes, but only 3 are the most popular. I would like to display these 3 by default and have an icon at the end to expand and collapse the rest of the checkboxes. Since I'm using Angular for my website, I think I can simply ...

What could be the reason for eq not functioning properly in jQuery when using the after method

Why is "eq" not working in jQuery when using "after"? I am inserting my html using after on a button click. Here is the code snippet I tried: $(function(){ $('#btn').click(function(){ $(".topics-content").children("ul").eq(0).find("li").eq ...

Retrieving the newly inserted Id using nested queries in Express.js

I'm facing an issue with mysql nested queries in my express.js app where I am trying to retrieve the inserted id of all queries, but I am only getting the insertId of the global query. The error message I received is: TypeError: Cannot read property ...

retrieve the URL of a freshly opened page following a click

I am currently in the process of developing a test that involves clicking on a button to open a new tab and redirecting to a different website. My objective is to retrieve the value of this website for parsing purposes, specifically after the rfp code in t ...