I have a stream called sourceStream
that contains objects of type BaseData
.
My goal is to split this stream into n
different streams, each filtering and transforming the BaseData
objects according to their specific criteria.
Ultimately, I want to end up with n
streams, each containing only one specific data type. The length of these forked streams may vary as data could be added or removed in the future.
I attempted to achieve this using the fork
method:
import * as _ from 'highland';
interface BaseData {
id: string;
data: string;
}
const sourceStream = _([
{id: 'foo', data: 'poit'},
{id: 'foo', data: 'fnord'},
{id: 'bar', data: 'narf'}]);
const partners = [
'foo',
'bar',
];
partners.forEach((partner: string) => {
const partnerStream = sourceStream.fork();
partnerStream.filter((baseData: BaseData) => {
return baseData.id === partner;
});
partnerStream.each(console.log);
});
My expectation was to have two streams after running the code. The foo
stream should contain two elements:
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
And the bar
stream should have one element:
{ id: 'bar', data: 'narf' }
However, instead of the desired outcome, an error occurs:
/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
throw new Error(
^
Error: Stream already being consumed, you must either fork() or observe()
at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
at Array.forEach (native)
at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
at Module._compile (module.js:570:32)
at Object.Module._extensions..js (module.js:579:10)
at Module.load (module.js:487:32)
at tryModuleLoad (module.js:446:12)
The question remains: How can I successfully split a stream into multiple streams?
I also experimented with chaining, but I didn't get the expected results:
partners.forEach((partner: string) => {
console.log(partner);
const partnerStream = sourceStream
.fork()
.filter((item: BaseData) => {
return item.id === partner;
});
partnerStream.each((item: BaseData) => {
console.log(item);
});
});
The output was:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
Instead of the expected:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}
It's possible I misunderstood how the fork
function works. According to its documentation:
Stream.fork() Forks a stream, allowing you to add additional consumers with shared back-pressure. A stream forked to multiple consumers will only pull values from its source as fast as the slowest consumer can handle them.
NOTE: Do not depend on a consistent execution order between the forks. This transform only guarantees that all forks will process a value foo before any will process a second value bar. It does not guarantee the order in which the forks process foo.
TIP: Be careful about modifying stream values within the forks (or using a library that does so). Since the same value will be passed to every fork, changes made in one fork will be visible in any fork that executes after it. Add to that the inconsistent execution order, and you can end up with subtle data corruption bugs. If you need to modify any values, you should make a copy and modify the copy instead.
Deprecation warning: It is currently possible to fork a stream after consuming it (e.g., via a transform). This will no longer be possible in the next major release. If you are going to fork a stream, always call fork on it.
Therefore, my revised question would be: How do I duplicate a highland stream dynamically into separate streams?