What is the process of branching a stream with highland.js?

I have a stream called sourceStream that contains objects of type BaseData.

My goal is to split this stream into n different streams, each filtering and transforming the BaseData objects according to their specific criteria.

Ultimately, I want to end up with n streams, each containing only one specific data type. The length of these forked streams may vary as data could be added or removed in the future.

I attempted to achieve this using the fork method:

import * as _ from 'highland';

interface BaseData {
    id: string;
    data: string;
}

const sourceStream = _([
    {id: 'foo', data: 'poit'},
    {id: 'foo', data: 'fnord'},
    {id: 'bar', data: 'narf'}]);

const partners = [
    'foo',
    'bar',
];

partners.forEach((partner: string) => {
    const partnerStream = sourceStream.fork();

    partnerStream.filter((baseData: BaseData) => {
        return baseData.id === partner;
    });

    partnerStream.each(console.log);
});

My expectation was to have two streams after running the code. The foo stream should contain two elements:

{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }

And the bar stream should have one element:

{ id: 'bar', data: 'narf' }

However, instead of the desired outcome, an error occurs:

/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
        throw new Error(
        ^

Error: Stream already being consumed, you must either fork() or observe()
    at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
    at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
    at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
    at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
    at Array.forEach (native)
    at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
    at Module._compile (module.js:570:32)
    at Object.Module._extensions..js (module.js:579:10)
    at Module.load (module.js:487:32)
    at tryModuleLoad (module.js:446:12)

The question remains: How can I successfully split a stream into multiple streams?


I also experimented with chaining, but I didn't get the expected results:

partners.forEach((partner: string) => {
    console.log(partner);
    const partnerStream = sourceStream
        .fork()
        .filter((item: BaseData) => {
            return item.id === partner;
        });

    partnerStream.each((item: BaseData) => {
        console.log(item);
    });
});

The output was:

foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar

Instead of the expected:

foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}

It's possible I misunderstood how the fork function works. According to its documentation:

Stream.fork() Forks a stream, allowing you to add additional consumers with shared back-pressure. A stream forked to multiple consumers will only pull values from its source as fast as the slowest consumer can handle them.

NOTE: Do not depend on a consistent execution order between the forks. This transform only guarantees that all forks will process a value foo before any will process a second value bar. It does not guarantee the order in which the forks process foo.

TIP: Be careful about modifying stream values within the forks (or using a library that does so). Since the same value will be passed to every fork, changes made in one fork will be visible in any fork that executes after it. Add to that the inconsistent execution order, and you can end up with subtle data corruption bugs. If you need to modify any values, you should make a copy and modify the copy instead.

Deprecation warning: It is currently possible to fork a stream after consuming it (e.g., via a transform). This will no longer be possible in the next major release. If you are going to fork a stream, always call fork on it.

Therefore, my revised question would be: How do I duplicate a highland stream dynamically into separate streams?

Answer №1

partnerStream.filter() gives you a fresh stream. Then, you are once again utilizing partnerStream by using partnerStream.each(), but without invoking fork() or observe(). To handle this properly, either chain the partnerStream.filter().each() calls together or store the return value of partnerStream.filter() in a variable and then call .each() on that.

Answer №2

It is important to remember not to start using a divided stream until all the divisions have been made. If you start using a divided stream prematurely, both the divided stream and its "parent" will be affected, causing any subsequent division to come from an empty stream.

const partnerStreams: Array<Stream<BaseData>> = [];

partners.forEach((partner: string) => {
    const partnerStream = sourceStream
        .fork()
        .filter((item: BaseData) => {
            return item.id === partner;
         }
    );

    partnerStreams.push(partnerStream);
});

partnerStreams.forEach((stream, index) => {
    console.log(index, stream);
    stream.toArray((foo) => {
        console.log(index, foo);
    });
});

The output is as follows:

0 [ { id: 'foo', data: 'poit' }, { id: 'foo', data: 'fnord' } ]
1 [ { id: 'bar', data: 'narf' } ]

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Creating a Query String in a web URL address using the state go method in Angular State Router

On my product list page, there is a list of products. When I click on a particular product, a function is called that uses state.go. Issue with dynamic functionality: $state.go('home.product.detail', { 'productID': "redminote4", &apo ...

What is the best technique for creating a preloader that can seamlessly fill the background of a vector image?

I am looking for guidance on creating a CSS3 preloader using a vector image. My goal is to have the logo start off transparent with only the border visible, and as the loading occurs, fill in from bottom to top with the background color. Thank you to ever ...

Implementing functions on React component classes

Recently, I decided to convert a slideshow from w3s schools into a React component. The process involved changing all the functions into methods on the class and setting the initial state to display the first slide. However, upon clicking the buttons or do ...

Which is more effective: coding with just plain JavaScript and CSS, or utilizing frameworks?

As a student, is it more beneficial to focus on utilizing pure JavaScript & CSS or frameworks? And which approach is best suited for the professional field? ...

Switch from using getElementById to useRef in React components

There is a requirement to update a functional component that currently uses getElementById to instead utilize the useRef hook. The original code snippet is as follows: import React, { useState, useEffect, useRef } from 'react'; import { createPo ...

What could be causing my JSON to update for one API but not the other?

My frustration with the below snippet is preventing me from following the usual advice of "Go to bed and you'll see the error in the morning." The code snippet below contains various tables with an NVD3 chart and some plain p tags that hold data from ...

How can I get an object returned from a Mongoose find method in NodeJS?

I am currently working on developing a node.js API with Mongoose, However, for a specific task, I need to retrieve the object as a variable from my find operation. This is what I have so far: exports.get_info = function(_id) { Session.findById(_id, f ...

Steps to generate a new page when submitting a form:

When a form is submitted, I am aiming to generate a fresh page each time. This new page will serve as an order status tracker that will be updated regularly. Essentially, my goal is for users to see a confirmation page for the form submission and have acce ...

Display or conceal certain HTML form elements based on the selection made in the previous form element

I need assistance with a function that can dynamically show or hide certain HTML form elements based on the user's previous selection using JavaScript. For example, if a user selects "Bleached" from the Dyingtype drop-down menu, there is no need to di ...

Google Maps does not support markers appearing on the map

I have created a basic web application that displays markers from a MySQL database on Google Maps using a table called markers_titik. In order to process this data, I have written a simple PHP script named map_process.php. Here is the code: <?php //PH ...

Google Chrome extension with Autofocus functionality

I developed a user-friendly Chrome extension that allows users to search using a simple form. Upon opening the extension, I noticed that there is no default focus on the form, requiring an additional click from the user. The intended behavior is for the ...

Incorporating a Script into Your NextJS Project using Typescript

I've been trying to insert a script from GameChanger () and they provided me with this code: <!-- Place this div wherever you want the widget to be displayed --> <div id="gc-scoreboard-widget-umpl"></div> <!-- Insert th ...

Is there a way to locate and refine the blogs associated with a specific user?

Check out the following blog entries stored in the Database: [ { _id: 5fec92292bbb2c32acc0093c, title: 'Boxing ring', author: 'T. Wally', content: 'boxing stuff', likes: 0, user: { _id: 5fd90181 ...

trim() function acting strangely

There seems to be an unexpected occurrence with the trim() function, as it is removing the á character. https://i.stack.imgur.com/whZBN.png This particular code snippet is typically used in various JavaScript projects without any issues. However, a clie ...

Unable to activate the on('click') event when the button is loaded via AJAX

I am facing an issue with the on('click') event. I have a button that is loaded dynamically via ajax and has a click event attached to it. However, when I try clicking it, nothing happens even though the expected output should be showing an alert ...

Array Scope Lost During Click Event Loop

This Question May Have Been Asked Before: A Practical Example of Javascript Closure inside Loops I am faced with an issue involving an array of 4 objects, each containing a .t property which is a jQuery element. My goal is to assign an event to each t ...

Deny access to the viewing feature for unauthorized users

Objective: To restrict access to the profile page only for logged-in users. The authentication is done through mongodb and passport-local. Existing Code: Below is the express route used to verify if the request is authenticated. app.get('/loggedin ...

Discover the exact location of an HTML element within an iframe

I am currently attempting to determine the position of an element that is within an iframe. I have written the following code for this purpose: // Custom function to calculate the position of an element on the page function getElementPosition(elem){ var ...

Looking to clear a textfield when focused if it contains zero, and when unfocused if it is empty, then automatically insert zero?

What is the best way to clear a text field when it contains zero on focus, and set it back to zero if it's empty on focus out? How can this be implemented globally for every text field by adding a common class to all text fields? ...

Guidelines on Sharing Redux Store with Client during Routing in Redux

I'm currently learning Next.js and facing an issue with maintaining the dispatched state on a newly built page after routing. Can anyone provide guidance on how to retain the state? Specifically, I have a sidebar where I want to preserve the state of ...