In my code, there's an eventStream
that deals with different types of events and sends them to the server via HTTP.
import { from, Observable } from 'rxjs';
import { concatMap } from 'rxjs/operators';
type Update = number[];
interface Event {
type: 'add' | 'delete' | 'update';
data: Update;
}
const eventStream = from([
{ type: 'update', data: [1] },
{ type: 'update', data: [2] },
{ type: 'add', data: [3] },
{ type: 'update', data: [4] },
{ type: 'update', data: [5] },
{ type: 'delete', data: [6] },
{ type: 'update', data: [7] },
// ... other events
]);
function postEvent(event: Event): Observable<any> {
// ... posting event to server
}
eventStream.pipe(concatMap(event => postEvent(event)))
I am looking for a way to optimize the processing of consecutive unprocessed update
events by combining them into one using the function combineUpdates
, in order to reduce the number of HTTP requests while maintaining event order.
/*
Combines multiple consecutive update events into a single update event:
{ type: 'update', data: [1] }, { type: 'update', data: [2] }, => { type: 'update', data: [1, 2] }
*/
function combineUpdates(updates: Event[]): Event {
return { type: 'update', data: updates.map(e => e.data).flat()};
}
For example:
eventStream
should be transformed into the following series of postEvent()
calls:
{ type: 'update', data: [1, 2] },
{ type: 'add', data: [3] },
{ type: 'update', data: [4, 5] },
{ type: 'delete', data: [6] },
{ type: 'update', data: [7] },