Messages are received from an observable, sometimes out of order but with timestamps for sorting.
The goal is to deliver elements chronologically despite the unordered arrival and uncertain end time of the stream. A slight delay in processing is acceptable.
To achieve this ordering, buffering and sorting of items is necessary.
The approach involves maintaining a buffer of about three entries, sorting them, releasing the earliest one, and repeating the process after a set time or upon receiving a new item.
An example stream could look like:
//Time item appears: example item:
14:01:01 {time: '14:00:00', name: 'olga'}
14:01:02 {time: '14:00:03', name: 'peter'}
14:01:03 {time: '14:00:02', name: 'ouma'}
14:01:05 {time: '14:00:06', name: 'kat'}
14:01:06 {time: '14:00:05', name: 'anne'}
//... more to come
The desired output should be:
//Time item appears: example item:
14:01:05 {time: '14:00:00', name: 'olga'}
14:01:06 {time: '14:00:02', name: 'ouma'}
14:01:07 {time: '14:00:03', name: 'peter'}
14:01:08 {time: '14:00:05', name: 'anne'}
14:01:09 {time: '14:00:06', name: 'kat'}
// ... more to come
In this scenario, three elements are collected, sorted, and the earliest one (e.g., olga
) is released. Subsequently, as kat
arrives, it joins the sorting array, releases the next oldest element (ouma
), then waits for the next event or passage of time.
While the buffer
and bufferCount
operators may be useful, their implementation for this specific case remains unclear. Perhaps there is a more suitable operator available.
A potential solution involves storing results in an array, using setInterval with a delay
function for periodic sorting and release of the oldest three elements, flagging any idle intervals to send the remaining sorted elements before terminating the process, and triggering the interval on each new item if not already active.
For my suggested solution, refer to: https://stackblitz.com/edit/typescript-xtjuu8?file=index.ts
Is there a more efficient method to achieve this?