Here are some tips to boost the process speed:
autoCompleteMessages
should be switched off to prevent building up unfinished messages, which can slow down the consumer. Make sure to swiftly complete messages after processing.
- Utilize the receiver's prefetchCount attribute. This determines the maximum number of messages the receiver can request in one go.
- The lockDuration setting specifies how long a message is locked for processing by a consumer.
receiveMode: "peekLock",
maxConcurrentCalls: 30,
autoCompleteMessages: false,
CODE:
const { ServiceBusClient } = require("@azure/service-bus");
async function startConsumer(connectionString, topic, subscriptionName) {
let receiver;
let sbClient;
try {
sbClient = new ServiceBusClient(connectionString);
console.log("Azure connection established successfully for topic:", topic);
} catch (err) {
console.error("Error in connecting to Azure Service Bus:", err);
return;
}
try {
receiver = sbClient.createReceiver(topic, subscriptionName, {
receiveMode: "peekLock",
maxConcurrentCalls: 30,
autoCompleteMessages: false,
});
console.log(`Receiver for ${topic} connected successfully.`);
} catch (err) {
sbClient.close();
console.error(`Error in creating receiver for ${topic}:`, err);
return null;
}
const processMessage = async (brokeredMessage) => {
const input = brokeredMessage.body.toString();
const result = await processData(input);
if (result) {
await receiver.completeMessage(brokeredMessage);
} else {
await receiver.abandonMessage(brokeredMessage);
}
};
const processError = async (args) => {
console.error(`Error from source ${args.errorSource} occurred:`, args.error);
if (args.error.code === "MessagingEntityDisabled" ||
args.error.code === "MessagingEntityNotFound" ||
args.error.code === "UnauthorizedAccess") {
console.error("An unrecoverable error occurred. Stopping processing.", args.error);
await receiver.close();
} else if (args.error.code === "MessageLockLost") {
console.error("Message lock lost for message", args.error);
} else if (args.error.code === "ServiceBusy") {
await customDelay(1000);
} else {
console.error("Error in processing message", args);
}
};
const subscription = receiver.subscribe({
processMessage,
processError,
});
return receiver;
}
async function processData(input) {
console.log("Processing message:", input);
return true;
}
function customDelay(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
const connectionString = "Endpoint=sb:";
const topic = "sam";
const subscriptionName = "sampath";
startConsumer(connectionString, topic, subscriptionName)
.then((receiver) => {
if (receiver) {
console.log("Consumer started successfully.");
} else {
console.log("Failed to start the consumer.");
}
})
.catch((err) => {
console.error("An error occurred while starting the consumer:", err);
});
Output: