A robust, type-safe, and production-ready Google Pub/Sub module for NestJS.
A fully-featured NestJS module for Google Cloud Pub/Sub that provides a declarative, decorator-based way to subscribe to events. Designed for high-scale production environments, it handles infrastructure boilerplate, configuration, and error handling so you can focus on business logic.
- Zero Inheritance: Use the
@PubSubListener()decorator on any class. - Discovery Pattern: Automated registration. Just add the decorator and the module finds it.
- Clean Dependency Injection: Injected services (like
ConfigServiceorTypeORM) work exactly as they do in any other Nest service. - Type-Safe Payloads: Automatic JSON parsing of incoming messages.
- Production Ready: Built-in merging for global defaults and listener-specific subscription options (retry policies, ack deadlines).
- Emulator Support: seamless switching between local development and GCP.
npm install nestjs-pubsub-lib @google-cloud/pubsubBy default, this library uses Application Default Credentials (ADC).
If you are hosting outside of Google Cloud, you must provide a Service Account key. You can do this in two ways:
PubsubModule.register({
projectId: 'my-project-id',
keyFilename: '/path/to/service-account.json',
});When using environment variables, ensure you handle the private key newline characters correctly.
PubsubModule.registerAsync({
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
projectId: config.get('GCP_PROJECT_ID'),
credentials: {
client_email: config.get('GCP_CLIENT_EMAIL'),
// Important: replace escaped newlines
private_key: config.get<string>('GCP_PRIVATE_KEY').replace(/\\n/g, '\n'),
},
}),
});// src/app.module.ts
import { PubsubModule } from 'nestjs-pubsub-lib';
@Module({
imports: [
PubsubModule.register({
projectId: 'my-project-id',
autoCreateTopics: true, // Auto-create infrastructure (useful for local/dev)
}),
],
})
export class AppModule {}Simply mark any class with @PubSubListener. The library will automatically discover it and start the subscription.
// src/listeners/user-created.listener.ts
import { Injectable, Logger } from '@nestjs/common';
import { Message } from '@google-cloud/pubsub';
import { PubSubListener } from 'nestjs-pubsub-lib';
@Injectable()
@PubSubListener({
topicName: 'user.created',
subscriptionName: 'notification-service.user.created',
})
export class UserCreatedListener {
private readonly logger = new Logger(UserCreatedListener.name);
constructor(private readonly emailService: EmailService) {}
/**
* Method called automatically when a message arrives.
* payload is automatically parsed from JSON.
*/
async handle(payload: { userId: string; email: string }, message: Message) {
this.logger.log(`Processing user: ${payload.userId}`);
await this.emailService.sendWelcome(payload.email);
// Ack the message to remove it from the queue
message.ack();
}
}Inject PubsubPublisher into any service.
@Injectable()
export class UserService {
constructor(private readonly publisher: PubsubPublisher) {}
async create(user: UserDto) {
// ... logic ...
await this.publisher.dispatchEvent('user.created', {
userId: user.id,
email: user.email
});
}
}Each @PubSubListener can override global settings like acknowledgment deadlines or retry policies.
@PubSubListener({
topicName: 'video.processing',
subscriptionName: 'transcoder-sub',
subscriptionOptions: {
ackDeadlineSeconds: 600, // 10 minutes for long tasks
retryPolicy: {
minimumBackoff: { seconds: 30 },
maximumBackoff: { seconds: 600 },
},
},
})
export class VideoListener {
async handle(payload: any, message: Message) {
// ... business logic ...
message.ack();
}
}| Option | Type | Default | Description |
|---|---|---|---|
projectId |
string |
Required | Your Google Cloud Project ID. |
emulatorMode |
boolean |
false |
Set to true to connect to a local Pub/Sub emulator. |
port |
number |
undefined |
The port of the emulator (Required if emulatorMode is true). |
autoCreateTopics |
boolean |
false |
If true, the library will create missing topics/subscriptions on startup. Recommended: true for dev, false for prod. |
keyFilename |
string |
undefined |
Full path to your GCP service account JSON file. |
credentials |
object |
undefined |
Object containing client_email and private_key. |
logger |
LoggerService |
Logger |
Custom logger (e.g., Winston, Pino). |
Since listeners are plain NestJS classes, you can unit test them without any Pub/Sub infrastructure:
describe('UserCreatedListener', () => {
it('should send an email', async () => {
const mockEmailService = { sendWelcome: jest.fn() };
const listener = new UserCreatedListener(mockEmailService as any);
const mockMessage = { ack: jest.fn() };
await listener.handle({ userId: '1', email: 'test@test.com' }, mockMessage as any);
expect(mockEmailService.sendWelcome).toHaveBeenCalledWith('test@test.com');
expect(mockMessage.ack).toHaveBeenCalled();
});
});If you need access to the underlying @google-cloud/pubsub client (e.g., for creating snapshots or managing IAM policies):
import { PUBSUB_CLIENT } from 'nestjs-pubsub-lib';
import { PubSub } from '@google-cloud/pubsub';
@Injectable()
export class AdminService {
constructor(@Inject(PUBSUB_CLIENT) private readonly pubSub: PubSub) {}
}MIT © Stubbies