Scenario:
I have come across a scenario where I need to incorporate additional commands in a Subscription
using the rxjs Observable
system after it has been initiated.
In this case, the application I am working on must passively monitor a push notification system. Various messages can be disseminated through this system, which my application needs to act upon. However, there is a foreseeable situation where a dynamically-loaded view that will be added in the future might require adding a listener to the push notification system.
Query:
Considering that my app already has an existing Subscription
, can I append another pipe after .subscribe(() => {})
has been called?
// this.something is an Observable<any>, for discussion purposes.
const subscription = this.something.subscribe(() => { // commands });
this.something.pipe(
map((something) => {
// ...Commands to add to the subscription...
})
);
...And if I attempt that, what would be the outcome, if any?
Solution:
Both answers provided by @user2216584 and @SerejaBogolubov shed light on the answer to this question.
My overarching push notification listener service required two key components:
- To retain the subscription, and
- To be able to access a repository of listeners.
The challenge lies in each listener needing to monitor a different message. To put it simply, if a message arrives on foo_DEV
, the application should react differently than when a message is pushed on bar_DEV
.
Here's what I devised:
export interface PushNotificationListener {
name: string,
onMessageReceived: (msg: PushNotificationMessage) => any,
messageSubject$: Subject<PushNotificationMessage>
}
export class PushNotificationListenerService {
private connection$: Observable<PushNotificationConnection>;
private subscription$: Subscription;
private listeners: PushNotificationListener[] = [];
constructor(
private connectionManager: PushNotificationConnectionManager
) {
}
connect() {
// Step 1 - Establish the socket connection!
this.connection$ = this.connectionManager.connect(
// Details of setting up the websocket are irrelevant here.
// The implementation specifics are not pertinent either.
);
}
setListener(
name: string,
onMessageReceived: (msg: PushNotificationMessage) => any
) {
// Step 3...or maybe 2...(shrug)...
// Assign listeners that the subscription to the primary connection
// will utilize.
const newListener: PushNotificationListener = {
name: name,
onMessageReceived: onMessageReceived,
messageSubject$: null
};
this.listeners.push(newListener);
}
listen() {
// Step 2 - Monitor changes to the primary connection observable.
this.subscription$ = this.connection$
.subscribe((connection: PushNotificationConnection) => {
console.info('Push notification connection established');
for (let listener of this.listeners) {
listener.messageSubject$ = connection.subscribe(listener.name);
listener.messageSubject$.subscribe((message: PushNotificationMessage) => {
listener.onMessageReceived(message);
}
}
},
(error: any) => {
console.warn('Push notification connection error', error);
}
}
}
Upon examining the inner workings of my core push notification system code, I realized we already possessed a high-order Observable
. The websocket code generates an observable (connectionManager.connect()
) that must be cached in the service and subscribed to. Since this code pertains specifically to my workplace, further details cannot be divulged.
However, storing the listeners is also crucial! The subscribe
action in .listen()
iterates through all attached listeners each time the connection state changes, enabling me to dynamically include listeners via .addListener()
. Leveraging how rxjs' Observable
system inherently functions, coupled with having an in-scope list of listeners, allows me to dynamically configure listeners even if .connect()
is invoked before any listeners are set up.
This code could likely benefit from redesigning/refactoring, but having something functional marks a significant initial advancement in coding. Thank you all!