The key to subscribing only once to an element from AsyncSubject in the consumer pattern

When working with rxjs5, I encountered a situation where I needed to subscribe to an AsyncSubject multiple times, but only one subscriber should be able to receive the next() event. Any additional subscribers (if still active) should automatically receive the complete() event without receiving the next() event.

For example:

let fired = false;
let as = new AsyncSubject();

const setFired = () => {
    if (fired == true) throw new Error("Multiple subscriptions executed");
    fired = true;
}

let subscription1 = as.subscribe(setFired);
let subscription2 = as.subscribe(setFired);

// Note that subscription1/2 could be unsubscribed in the future
// Still, only one subscriber should be triggered

setTimeout(() => {
    as.next(undefined);
    as.complete();
}, 500);

Answer №1

To easily achieve this functionality, you can create a small wrapper class that encapsulates the initial AsyncSubject.

import { AsyncSubject, Subject, Observable, Subscription } from 'rxjs/RX';

class SingleSubscriberObservable<T> {
    private newSubscriberSubscribed = new Subject();

    constructor(private sourceObservable: Observable<T>) {}

    subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
        this.newSubscriberSubscribed.next();
        return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
    }
}

You can then test it in your example like so:

const as = new AsyncSubject();
const single = new SingleSubscriberObservable(as);

let fired = false;

function setFired(label:string){
    return ()=>{
        if(fired == true) throw new Error("Multiple subscriptions executed");
        console.log("FIRED", label);
        fired = true;
    };
}

function logDone(label: string){
    return ()=>{
       console.log(`${label} Will stop subscribing to source observable`);
    };
}

const subscription1 = single.subscribe(setFired('First'), ()=>{}, logDone('First'));
const subscription2 = single.subscribe(setFired('Second'), ()=>{}, logDone('Second'));
const subscription3 = single.subscribe(setFired('Third'), ()=>{}, logDone('Third'));

setTimeout(()=>{
    as.next(undefined);
    as.complete();
}, 500);

The key aspect is seen here:

subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
    this.newSubscriberSusbscribed.next();
    return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}

On each call to subscribe, we trigger the newSubscriberSubscribed subject.

By using

takeUntil(this.newSubscriberSubscribed)

when subscribing to the underlying Observable,

each time a new subscriber calls:

this.newSubscriberSubscribed.next()

the previous subscription will be completed.

This approach ensures that whenever a new subscriber arises, the prior subscription will end, meeting the desired outcome.

The expected output would be:

First Will stop subscribing to source observable
Second Will stop subscribing to source observable
FIRED Third
Third Will stop subscribing to source observable

UPDATE:

If you wish for the first subscriber to remain subscribed while subsequent subscribers immediately get a complete signal, blocking any further subscriptions until the primary subscriber unsubscribes, you can modify the implementation as follows:

class SingleSubscriberObservable<T> {
    private isSubscribed: boolean = false;

    constructor(private sourceObservable: Observable<T>) {}

    subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
        if(this.isSubscribed){
            return Observable.empty().subscribe(next, error, complete);    
        }
        this.isSubscribed = true;
        var unsubscribe = this.sourceObservable.subscribe(next, error, complete);
        return new Subscription(()=>{
            unsubscribe.unsubscribe();
            this.isSubscribed = false;
        });
    }
}

We utilize a flag this.isSusbscribed to track if there is an active subscriber. Additionally, we return a custom subscription for updating the flag upon unsubscription.

In case of a subscription attempt when someone else is already subscribed, we instead subscribe them to an empty Observable, which completes instantly. The resulting output will display:

Second Will stop subscribing to source observable
Third Will stop subscribing to source observable
FIRED First
First Will stop subscribing to source observable

Answer №2

To simplify the process, you can encapsulate your AsyncSubject within a wrapper object that manages the task of only notifying one subscriber. If you specifically want to trigger the first subscriber only, the following code snippet serves as a solid starting point

let as = new AsyncSubject();

const createSingleSubscriberAsyncSubject = as => {
    let subscribers = [];

    const subscribe = callback => {
        if (typeof callback !== 'function') throw new Error('callback provided is not a function');

        subscribers.push(callback);

        const unsubscribe = () => { subscribers = subscribers.filter(cb => cb !== callback); };
        return unsubscribe;
    };

    const mainSubscriber = (...args) => {
        if (subscribers[0]) {
            subscribers[0](...args);
        }
    };

    as.subscribe(mainSubscriber);

    return {
        subscribe,
        next: as.next.bind(as),
        complete: as.complete.bind(as),
    };
};

// implementation

const singleSub = createSingleSubscriberAsyncSubject(as);

// add 3 subscribers
const unsub1 = singleSub.subscribe(() => console.log('subscriber 1'));
const unsub2 = singleSub.subscribe(() => console.log('subscriber 2'));
const unsub3 = singleSub.subscribe(() => console.log('subscriber 3'));

// remove first subscriber
unsub1();

setTimeout(() => {
    as.next(undefined);
    as.complete();
    // only 'subscriber 2' is printed
}, 500);

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Revolutionary Approach to Efficiently Handle Multiple rows with Jquery

Greetings to everyone, I am currently in the process of developing an application that retrieves data from a database via AJAX by calling a .php file. Within this app, I have a table with 4 columns. The first two columns consist of dropdown menus, the thi ...

What is the best way to implement autoplay sound on a JavaScript webpage?

I'm currently working on a web system aimed at monitoring values from a database, and I have the requirement to trigger a sound alert when a specific range of values is received. Despite trying various examples found online, none of them have been suc ...

Every file downloaded through the iframe is automatically stored in a designated server folder

Is it possible for my website to have an iframe that enables users to browse the web, and when they click on a download button on any external website, the file can be saved directly to a specific folder on my server instead of their own computer? I'm ...

What is the process for generating a new object by outputting key-value pairs?

I have an array that I'm trying to use to create a new object with specific keys and values. const arr = [ { name: 'ab', key: '584577', }, { name: 'cd', key: '344926', }, { name: &a ...

Guide on displaying multiple views along with their respective models fetched through AJAX in Backbone

Hey there! I'm currently working on painting a screen with multiple models and associated views in backbone. To achieve this, I have separate ajax calls to fetch data for these views. Initially, I thought using the jQuery function $when(ajaxcall1, aja ...

Transferring files with Node.js via UDP connections

I'm currently working on setting up a basic Node.js server that is designed to receive files from clients through UDP. The challenge I'm facing is that whenever I attempt to send a large file (anything over 100kb), the server appears unresponsive ...

When transitioning from another page, the header will cover a portion of the section

I am facing a specific issue that I need help with. My goal is to have the navigation bar link to different sections of a single page when clicked. For example, clicking on "Contact" should take the user to the contact section, and then if they click on "A ...

Dynamically Loading CSS files in a JQuery plugin using a Conditional Test

I'm trying to figure out the optimal way to dynamically load certain files based on specific conditions. Currently, I am loading three CSS files and two javascript files like this: <link href="core.min.css" rel="stylesheet" type="text/css"> & ...

I'm stumped trying to understand why I keep getting this syntax error. Any thoughts on how to fix it?

Our team is currently working on creating a dynamic SELECT box with autocomplete functionality, inspired by the Standard Select found at this link: We encountered an issue where the SELECT box was not populating as expected. After further investigation, ...

Displaying URLs stylishly with Pills Bootstrap

When a pill is selected from the list, I need to display a specific URL that can be used elsewhere. However, there is an href="#pills-something" attribute that directs to an ID below. I am looking for something like: mysite.com/myspecificpill or ...

Show a Pair of Images Upon Submission Utilizing Ajax

Imagine having two separate div containers displayed as shown below: What I achieved was clicking the submit button would upload the image to the designated area. However, my goal is for a single click on the button to load the image in both containers. ...

Custom control unable to display MP3 file

Hey, I came across this awesome button that I am really interested in using: https://css-tricks.com/making-pure-css-playpause-button/ I'm currently facing two issues with it. First, I can't seem to play the sound. I've placed the mp3 file ...

Developing an SQL table for a website using JavaScript

My command is not functioning as expected and I am struggling to identify the issue. The database opens successfully, however, it fails to process the table creation. var createtable2 = 'CREATE TABLE IF NOT EXISTS offlineCabinDefects (id INTEGER PRIM ...

The remaining visible portion of a viewport is equivalent to the height of an element

Is there a way to dynamically set a div's height so that it expands from its starting position to the end of its parent div, which is 100% of the viewport minus 20 pixels? Here is an example of how you can achieve this using jQuery: $(document).read ...

Issue with relative templateUrl in Angular 2 not resolving paths

As I embark on creating my first Angular 2 app, I'm faced with the task of setting my template in an HTML file using `templateUrl`. Currently, both the component.ts and view.html are stored in the same folder under src/ directory. Here's what I ...

DailyCodingChallenge: Discover two elements in an array that add up to a specified value

As someone who is relatively new to coding, I recently signed up for the daily coding problem mailing list and received the following question: If given a list of numbers and a specific number k, can you determine whether any two numbers from the list a ...

Why does IntelliJ IDEA 2016 show an error for using the [ngSwitch] attribute in this Angular2 template?

Every time I run precommit inspections, I receive a barrage of warnings even though everything seems to be functioning properly. The warnings include: Warning:(8, 58) Attribute [ngSwitch] is not allowed here Warning:(9, 42) Attribute [attr.for] is not all ...

Accessing a distinct element of e.parameter

I've implemented a Google App Script on my website that automatically writes form data to a spreadsheet upon submission. With multiple forms on the site, I want all their data to go to the same Spreadsheet but different 'Sheets' (tabs at th ...

Troubleshooting MySQL Database Insertion Errors caused by Dynamic Forms

<body> <?php $con = mysqli_connect('localhost','root','','cash'); $query = "SELECT DISTINCT category FROM cash"; $result = mysqli_query($con,$query); $dropDownList = &apo ...

Exploring the dynamics of Kendo UI's MVVM approach using J

Is there a way to make an ajax call in Kendo without involving the grid? I am new to Kendo and struggling to populate a span element with data fetched from one of my controller methods. The data is present as I can see it in the alert message, but it' ...