How I Created the NPM Module nestjs-azure-service-bus
Creating Node.js modules is a fantastic way to package and distribute reusable functionality. Today, I’ll be explaining how I created nestjs-azure-service-bus
, a NestJS module to seamlessly interact with Azure Service Bus.
What Is nestjs-azure-service-bus
?
It’s a module designed for NestJS applications to easily communicate with Azure Service Bus. Azure Service Bus is a fully managed enterprise integration message broker. This module allows NestJS applications to send and receive messages through Azure Service Bus.
The Module Structure
I created two files for this module: azure-service-bus.decorator.ts
and azure-service-bus.module.ts
.
- azure-service-bus.decorator.ts contains two decorator functions
Sender()
andReceiver()
. They are used to inject Azure Service Bus sender and receiver clients into your services or controllers. - azure-service-bus.module.ts is the heart of this module, where the module is configured, and providers for the Service Bus client, sender, and receiver are created.
A Walkthrough of azure-service-bus.module.ts
DynamicModule
In NestJS, a DynamicModule
is a module created based on runtime inputs, making it a perfect fit for modules that require configuration. In nestjs-azure-service-bus
, I used DynamicModule to create a custom configured Azure Service Bus client.
The AzureServiceBusModule
Class
This class exports several static methods that return DynamicModules, providing the flexibility to configure the Azure Service Bus client in different ways:
forRoot(options: AzureSBOptions): DynamicModule
: This function accepts an options object that can contain either a connection string for the Azure Service Bus or the fullyQualifiedNamespace for the Service Bus instance. It creates a client using either the connection string or the Azure Default Credential and the namespace.forRootAsync(options: {...}): DynamicModule
: Similar toforRoot
, but allows for asynchronous creation of the options, useful for cases where the Azure Service Bus configuration is loaded asynchronously.forFeature(options: AzureSBSenderReceiverOptions): DynamicModule
: This function accepts an options object containing the senders and receivers array. It creates providers for every sender and receiver defined in the options object.forFeatureAsync(options: {...}): DynamicModule
: Similar toforFeature
, but allows for asynchronous creation of the options, useful for cases where the sender and receiver configurations are loaded asynchronously.
// azure-service-bus.module.ts
import { DynamicModule, Module, Provider, Global } from '@nestjs/common';
import { ServiceBusClient } from '@azure/service-bus';
import { DefaultAzureCredential } from '@azure/identity';
import { ConfigService } from '@nestjs/config';
export type AzureSBOptions =
| { connectionString: string }
| { fullyQualifiedNamespace: string };
export type AzureSBSenderReceiverOptions = {
senders?: string[];
receivers?: string[];
};
@Global()
@Module({})
export class AzureServiceBusModule {
static forRoot(options: AzureSBOptions): DynamicModule {
let clientProvider: Provider;
if ('connectionString' in options) {
clientProvider = {
provide: 'AZURE_SERVICE_BUS_CONNECTION',
useValue: new ServiceBusClient(options.connectionString),
};
} else {
const credential = new DefaultAzureCredential();
clientProvider = {
provide: 'AZURE_SERVICE_BUS_CONNECTION',
useValue: new ServiceBusClient(
options.fullyQualifiedNamespace,
credential,
),
};
}
return {
module: AzureServiceBusModule,
providers: [clientProvider],
exports: [clientProvider],
};
}
static forRootAsync(options: {
imports?: any[];
useFactory: (
configService: ConfigService,
) => Promise<AzureSBOptions> | AzureSBOptions;
inject?: any[];
}): DynamicModule {
const clientProvider: Provider = {
provide: 'AZURE_SERVICE_BUS_CONNECTION',
useFactory: async (
configService: ConfigService,
): Promise<ServiceBusClient> => {
const clientOptions = await options.useFactory(configService);
if ('connectionString' in clientOptions) {
return new ServiceBusClient(clientOptions.connectionString);
} else {
const credential = new DefaultAzureCredential();
return new ServiceBusClient(
clientOptions.fullyQualifiedNamespace,
credential,
);
}
},
inject: options.inject || [],
};
return {
module: AzureServiceBusModule,
imports: options.imports || [],
providers: [clientProvider],
exports: [clientProvider],
};
}
static forFeature(options: AzureSBSenderReceiverOptions): DynamicModule {
const senderProviders =
options.senders?.map((queue) => ({
provide: `AZURE_SB_SENDER_${queue.toUpperCase()}`,
useFactory: (client: ServiceBusClient) => client.createSender(queue),
inject: ['AZURE_SERVICE_BUS_CONNECTION'],
})) || [];
const receiverProviders =
options.receivers?.map((queue) => ({
provide: `AZURE_SB_RECEIVER_${queue.toUpperCase()}`,
useFactory: (client: ServiceBusClient) => client.createReceiver(queue),
inject: ['AZURE_SERVICE_BUS_CONNECTION'],
})) || [];
return {
module: AzureServiceBusModule,
providers: [...senderProviders, ...receiverProviders],
exports: [...senderProviders, ...receiverProviders],
};
}
static forFeatureAsync(options: {
imports?: any[];
useFactory: (
configService: ConfigService,
) => Promise<AzureSBSenderReceiverOptions> | AzureSBSenderReceiverOptions;
inject?: any[];
}): DynamicModule {
const optionsProvider: Provider = {
provide: 'AZURE_SB_OPTIONS',
useFactory: options.useFactory,
inject: options.inject || [],
};
const senderProviders = {
provide: 'AZURE_SB_SENDERS',
useFactory: (
client: ServiceBusClient,
options: AzureSBSenderReceiverOptions,
) => options.senders?.map((queue) => client.createSender(queue)),
inject: ['AZURE_SERVICE_BUS_CONNECTION', 'AZURE_SB_OPTIONS'],
};
const receiverProviders = {
provide: 'AZURE_SB_RECEIVERS',
useFactory: (
client: ServiceBusClient,
options: AzureSBSenderReceiverOptions,
) => options.receivers?.map((queue) => client.createReceiver(queue)),
inject: ['AZURE_SERVICE_BUS_CONNECTION', 'AZURE_SB_OPTIONS'],
};
return {
module: AzureServiceBusModule,
imports: options.imports || [],
providers: [optionsProvider, senderProviders, receiverProviders],
exports: [senderProviders, receiverProviders],
};
}
}
What Is sender.decorator.ts
?
This file contains two decorators: Sender
and Receiver
. These are used for injecting specific senders and receivers into the NestJS services or controllers. By using these decorators, it simplifies the process of sending or receiving messages from a specific queue.
// azure-service-bus.decorator.ts
import { Inject } from '@nestjs/common';
export const Sender = (queue: string) =>
Inject(`AZURE_SB_SENDER_${queue.toUpperCase()}`);
export const Receiver = (queue: string) =>
Inject(`AZURE_SB_RECEIVER_${queue.toUpperCase()}`);
Using and Installing the nestjs-azure-service-bus
Module
To begin using this module in your project, you need to install it first.
Installation
You can install nestjs-azure-service-bus
using NPM:
npm install nestjs-azure-service-bus
Once the module is installed, you can then import and configure it in your application module.
Basic Configuration
Here’s how to configure the Azure Service Bus client using the forRoot
method:
import { AzureServiceBusModule } from 'nestjs-azure-service-bus';
@Module({
imports: [
AzureServiceBusModule.forRoot({
connectionString: 'your-azure-service-bus-connection-string',
}),
],
})
export class AppModule {}
Async Configuration
If you want to load the Azure Service Bus configuration asynchronously, you can use forRootAsync
:
@Module({
imports: [
AzureServiceBusModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => {
return {
connectionString: configService.get("CONNECTION_STRING"),
};
},
inject: [ConfigService],
}),
],
})
export class AppModule {}
Creating Senders and Receivers
To create senders and receivers, use the forFeature
method:
@Module({
imports: [
AzureServiceBusModule.forFeature({
senders: ['queue1', 'queue2'],
receivers: ['queue3', 'queue4'],
}),
],
})
export class QueueModule {}
Again, there’s an async variant if you want to load your senders and receivers asynchronously:
@Module({
imports: [
AzureServiceBusModule.forFeatureAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => {
return {
senders: [configService.get("SENDERS_1")],
receivers: [configService.get("RECEIVERS_1")],
};
inject: [ConfigService],
},
}),
],
providers:[QueueSenderService, QueueReceiverService],
exports:[QueueSenderService]
})
export class QueueModule {}
Injecting Senders and Receivers
Finally, you can inject senders and receivers into your services or controllers using the Sender
and Receiver
decorators:
import { ServiceBusSender } from '@azure/service-bus';
import { Injectable } from '@nestjs/common';
import { Sender } from 'nestjs-azure-service-bus';
@Injectable()
export class QueueSenderService {
constructor(
@Sender('test-queue') private readonly sender: ServiceBusSender,
) {}
async sendMessage(body: string) {
await this.sender.sendMessages({ body });
}
}
import { ServiceBusReceiver } from '@azure/service-bus';
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Receiver } from 'nestjs-azure-service-bus';
@Injectable()
export class QueueReceiverService implements OnModuleInit {
constructor(
@Receiver('test-queue') private readonly receiver: ServiceBusReceiver,
) {}
onModuleInit() {
this.receiver.subscribe({
processMessage: async (message) => {
console.log(`message.body: ${message.body}`);
},
processError: async (args) => {
console.log(
`Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `,
args.error,
);
},
});
}
}
Conclusion
The nestjs-azure-service-bus
module is a powerful tool for NestJS applications to interact with Azure Service Bus. It provides an easy-to-use API to create senders and receivers and integrates smoothly with NestJS's dependency injection system.
Creating a module like this takes some knowledge of both the NestJS and Azure Service Bus APIs. But once created, it can significantly streamline your application’s interaction with Azure Service Bus.
This module is open-source and available on NPM. Feel free to use, contribute, or open issues on the GitHub page.