Consuming NATS Jetstream with multiple threads

My TypeScript application consists of multiple projects, each with its own set of microservices:

server: A REST API that publishes messages to NATS for other services to update a (mongo) DB synchronizer: A process that consumes NATS messages and updates the DB.

The issue I am facing is running multiple instances of the same synchronizers. This results in having multiple consumers of the same type bound to the same stream.

I aim to have my pull consumers consume the same streams but different messages within those streams.

Currently, my `Consumer` class is structured like this:

import { AckPolicy, Codec, ConsumerConfig, DeliverPolicy, JetStreamClient, JetStreamManager, JsMsg, JSONCodec, ReplayPolicy, StringCodec } from "nats";

interface IEvent<T extends any> {
  subject: string;
  data: T
}

const createConsumerOptions = (serviceName: string): ConsumerConfig => {
  return {
    ack_policy: AckPolicy.Explicit,
    deliver_policy: DeliverPolicy.New,
    replay_policy: ReplayPolicy.Instant,
    ack_wait: 1000,
    flow_control: true,
    max_ack_pending: 5,
    max_waiting: 1,
    durable_name: serviceName // <- Example: "Auth-Service"
  };
};

export const initConsumer = async (js: JetStreamClient, serviceName: string, subject: string) => {
  const jsm = await js.jetstreamManager();
  if (!jsm) throw new Error("JetstreamManager Error");

  await initStream(jsm, subject);

  const options = createConsumerOptions(serviceName);

  await jsm.consumers.add(subject, options).catch((ex) => console.log(ex));
  let result = await js.consumers.get(subject, options);

  if (!result) throw new Error("Consumer Error");

  return result;
};

export const initStream = async (jsm: JetStreamManager, subject: string) => {
  const stream = await jsm.streams.find(subject).catch((ex) => console.log(ex));
  if (!stream) await jsm.streams.add({ name: subject.toString(), subjects: [subject] }).catch((ex) => console.log(ex));
};

export abstract class Consumer<T extends IEvent<any>> {
  abstract subject: T["subject"];
  abstract onMessage(data: T["data"], msg: JsMsg): Promise<any>;

  protected _client: Client;

  constructor(client: Client) {
    this._client = client;
  }

  async listen() {
    const js = await this._client.Connection?.jetstream();
    if (!js) throw new Error("Jetstream Error");

    const consumer = await initConsumer(js, this._client.ServiceName, this.subject);
    const msgs = await consumer.consume();
    for await (const msg of msgs) {
      const data = JSONCodec().decode(msg.data)

      try {
        await this.onMessage(data, msg);
        msg.ack();
      } catch (err) {
        msg.nack();
      }
    }
  }
}

Answer №1

Currently, I have replaced pull consumers with push consumers which is considered deprecated but it seems to be the only way to make it work.

I have included an example here for anyone who may encounter the same issue.

(Feel free to suggest a better solution if you have one)

I made the following replacement:

async listen() {
    const js = await this._client.Connection?.jetstream();
    if (!js) throw new Error("Jetstream Error");

    const options = consumerOpts()
      .ackExplicit()
      .consumerName(this._client.ServiceName)
      .deliverAll()
      .deliverGroup(this._client.ServiceName)
      .deliverTo(createInbox())
      .ackWait(1000)
      .durable(this._client.ServiceName)
      .queue(this._client.ServiceName);

    const subscription = await js.subscribe(this.subject, options); // js.subscribe IS DEPRECATED

    for await (const msg of subscription) {
      const data = parseMessage(msg);

      try {
        const details = await this.onMessage(data, msg);
        this.onAccept(msg, data, details);
      } catch (err) {
        this.onReject(msg, data, err);
      }
    }
  }

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Question from Student: Can a single function be created to manage all text fields, regardless of the number of fields present?

In my SPFX project using React, TypeScript, and Office UI Fabric, I've noticed that I'm creating separate functions for each text field in a form. Is there a way to create a single function that can handle multiple similar fields, but still maint ...

Display a picture retrieved from a POST request using Angular and a Spring Boot backend

Recently, I've been delving into the world of HTTP requests. My latest task involves uploading an image from the client and sending it to the server using a POST request. When testing this process in Postman, everything works smoothly as I receive th ...

"This error message states that the use of an import statement outside a module is not allowed

After searching for a solution without any luck, I decided to start a new discussion on this topic. Currently, I am working on azure functions using Typescript and encountering the following error: import { Entity, BaseEntity, PrimaryColumn, Column, Many ...

Angular TSLint: Proceed to the following stage despite any encountered errors

I'm facing issues with TSLint in my Azure Devops Build Pipeline. Despite encountering lint errors, I need the build pipeline to proceed to the next step. How can I achieve this? Command Line: - script: | npm run lint > tsLintReport.txt ...

What is the most reliable way to create an array ensuring that all potential values come from a specific dictionary?

I am seeking a method to define the testArray so that only keys from the example dictionary can be inserted into the array. enum example { key1 = 'A', key2 = 2, key3 = '3', }; const testArray: ?? = [example.key1, example.ke ...

Having trouble resolving the error "Cannot find name CSSTranslate" while working with VSCode and tsc? Let's tackle this issue together

My program runs smoothly in a development environment and appears flawless in VSCode, but I'm encountering issues with tsc pointing out unknown names and properties. It's puzzling because my file doesn't seem to have any problems in VSCode. ...

Is there a way to perform type narrowing within an Angular template?

I'm facing an issue with a component that requires a business object as an Input. Within the template of this component, I need to conditionally display some content based on the presence of a property that only exists in certain subclasses of the bus ...

Potential null object detected when using a ref(null)

After reading the Vue Composition API documentation, it seems I should be utilizing ref(null) within a sub-component located inside <template>...</template>. Within this sub-component, there are methods such as open(), and my current approach ...

The Battle of Identifiers: Named Functions against Anonymous Functions in TypeScript

When it comes to performance and performance alone, which option is superior? 1) function GameLoop() { // Performing complex calculations requestAnimationFrame(GameLoop); } requestAnimationFrame(GameLoop); 2) function GameLoop() { // ...

Using Typescript to replicate Object.defineProperties

Is there a way to emulate Object.defineProperties from JavaScript in Typescript? I am interested in achieving something similar using the syntax of Typescript: Object.defineProperties(someObject.prototype, { property: {get: function() { return v ...

The expected rendering of column headers was not achieved following the refactoring of <Column />

After making changes, the header is not rendering properly and I cannot see the "Product ID" header for the column: // DataTable.tsx // React Imports import React, { useState } from 'react'; // PrimeReact Imports import { DataTable as DT } from ...

The function http.get() will not give back an observable object

NOTE: The issue lies in the ionViewDidLoad() function not being executed, as the http.get method does return an observable. I am attempting to receive the observable when making a request to later retrieve its json response. However, I am not encountering ...

Retrieve every item in a JSON file based on a specific key and combine them into a fresh array

I have a JSON file containing contact information which I am retrieving using a service and the following function. My goal is to create a new array called 'contactList' that combines first names and last names from the contacts, adding an &apos ...

Encountering an error while trying to implement strong typing in a function on a Node API service: 'Unexpected token ":"'

I've developed a TypeScript Node API service and here's a snippet of my code: class dataStreamConfig { constructor() { } conclaveObj = (firstParam: string, secondParam: number, thirdParam: any): any => { //my ...

Dynamic Type in Typescript Record

Looking for a way to attach types to record names in a class that returns a Record. The current code snippet is as follows: interface DataInterface { bar: number; foo: string; fooBar: boolean; } export class MyClass { public bar: number; p ...

Having trouble assigning the class property in Angular 5

Upon loading the page, a list of products is retrieved from an external JSON source. Each product in the list has a corresponding BUY button displayed alongside it, with the ID of the respective product assigned to the button. The intention is that when a ...

What is the best way to retrieve a specific property from an array of objects in Angular 6 using typescript?

I am currently working on a budgeting application that incorporates an array of expenses with a price property. Each expense is defined within an Expense model in Typescript. While I can easily access the price property using ngFor loop in HTML, I'm c ...

Error encountered in Typescript parsing when setting EXTEND_ESLINT to true in create-react-app

Demo on GitHub - Simplified Version We are currently in the process of migrating our create-react-app project from Flow to Typescript incrementally. As part of this migration, I had to disable some ESLint rules that were causing issues. To customize ESLin ...

Leveraging arrays and objects for simulating data in Angular2

Here is a class I am working with: export class Character { id: number; name: string; portrait: string; abilities: array; equipment: array; statistics: object; money: number; description: string; } ...

Troubles with Angular2 template parser when parsing HTML fragment that is W3C validated

The online W3C validator successfully parses the following document: <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <title>test</title> </head&g ...