I am working on creating a function that merges all event streams generated from an array of data channels and emits any event received from them.
Below is the initial function:
private dataChannels: BehaviorSubject<RTCDataChannel[]> = new BehaviorSubject([]);
// The datachannels array will be populated at some point
...
public on(): Observable<Event> {
const eventStreams = this.dataChannels.value.map((channel) => fromEvent(channel, 'message'));
return merge(...eventStreams);
}
However, I realized that the issue with the current function is that it does not utilize new values emitted to the data channels BehaviorSubject.
As a result, I started developing the following function:
const allEvents = this.dataChannels.pipe(map((channels) => channels.map((channel) => fromEvent(channel, 'message'))));
return merge(...allEvents);
The problem arises when allEvents
ends up being of type
Observable<Observable<Event>[]>
, which is not compatible with the merge
function. How can I convert the observable to type Observable<Event>[]
?