My TypeScript application consists of multiple projects, each with its own set of microservices:
server
: A REST API that publishes messages to NATS for other services to update a (mongo) DB
synchronizer
: A process that consumes NATS messages and updates the DB.
The issue I am facing is running multiple instances of the same synchronizers. This results in having multiple consumers of the same type bound to the same stream.
I aim to have my pull consumers consume the same streams but different messages within those streams.
Currently, my `Consumer` class is structured like this:
import { AckPolicy, Codec, ConsumerConfig, DeliverPolicy, JetStreamClient, JetStreamManager, JsMsg, JSONCodec, ReplayPolicy, StringCodec } from "nats";
interface IEvent<T extends any> {
subject: string;
data: T
}
const createConsumerOptions = (serviceName: string): ConsumerConfig => {
return {
ack_policy: AckPolicy.Explicit,
deliver_policy: DeliverPolicy.New,
replay_policy: ReplayPolicy.Instant,
ack_wait: 1000,
flow_control: true,
max_ack_pending: 5,
max_waiting: 1,
durable_name: serviceName // <- Example: "Auth-Service"
};
};
export const initConsumer = async (js: JetStreamClient, serviceName: string, subject: string) => {
const jsm = await js.jetstreamManager();
if (!jsm) throw new Error("JetstreamManager Error");
await initStream(jsm, subject);
const options = createConsumerOptions(serviceName);
await jsm.consumers.add(subject, options).catch((ex) => console.log(ex));
let result = await js.consumers.get(subject, options);
if (!result) throw new Error("Consumer Error");
return result;
};
export const initStream = async (jsm: JetStreamManager, subject: string) => {
const stream = await jsm.streams.find(subject).catch((ex) => console.log(ex));
if (!stream) await jsm.streams.add({ name: subject.toString(), subjects: [subject] }).catch((ex) => console.log(ex));
};
export abstract class Consumer<T extends IEvent<any>> {
abstract subject: T["subject"];
abstract onMessage(data: T["data"], msg: JsMsg): Promise<any>;
protected _client: Client;
constructor(client: Client) {
this._client = client;
}
async listen() {
const js = await this._client.Connection?.jetstream();
if (!js) throw new Error("Jetstream Error");
const consumer = await initConsumer(js, this._client.ServiceName, this.subject);
const msgs = await consumer.consume();
for await (const msg of msgs) {
const data = JSONCodec().decode(msg.data)
try {
await this.onMessage(data, msg);
msg.ack();
} catch (err) {
msg.nack();
}
}
}
}