Skip to content

stubbies/nestjs-pubsub-lib

Repository files navigation

NestJS Pub/Sub

A robust, type-safe, and production-ready Google Pub/Sub module for NestJS.

NPM Version Package License

A fully-featured NestJS module for Google Cloud Pub/Sub that provides a declarative, decorator-based way to subscribe to events. Designed for high-scale production environments, it handles infrastructure boilerplate, configuration, and error handling so you can focus on business logic.

✨ Features

  • Zero Inheritance: Use the @PubSubListener() decorator on any class.
  • Discovery Pattern: Automated registration. Just add the decorator and the module finds it.
  • Clean Dependency Injection: Injected services (like ConfigService or TypeORM) work exactly as they do in any other Nest service.
  • Type-Safe Payloads: Automatic JSON parsing of incoming messages.
  • Production Ready: Built-in merging for global defaults and listener-specific subscription options (retry policies, ack deadlines).
  • Emulator Support: seamless switching between local development and GCP.

Installation

npm install nestjs-pubsub-lib @google-cloud/pubsub

Authentication

By default, this library uses Application Default Credentials (ADC).

For Non-GCP Environments (AWS, On-Prem, Local)

If you are hosting outside of Google Cloud, you must provide a Service Account key. You can do this in two ways:

1. Using a Key File Path

PubsubModule.register({
  projectId: 'my-project-id',
  keyFilename: '/path/to/service-account.json',
});

2. Using Direct Credentials (Recommended for Docker/CI)

When using environment variables, ensure you handle the private key newline characters correctly.

PubsubModule.registerAsync({
  inject: [ConfigService],
  useFactory: (config: ConfigService) => ({
    projectId: config.get('GCP_PROJECT_ID'),
    credentials: {
      client_email: config.get('GCP_CLIENT_EMAIL'),
      // Important: replace escaped newlines
      private_key: config.get<string>('GCP_PRIVATE_KEY').replace(/\\n/g, '\n'),
    },
  }),
});

Getting Started

1. Register the Module

// src/app.module.ts
import { PubsubModule } from 'nestjs-pubsub-lib';

@Module({
  imports: [
    PubsubModule.register({
      projectId: 'my-project-id',
      autoCreateTopics: true, // Auto-create infrastructure (useful for local/dev)
    }),
  ],
})
export class AppModule {}

2. Create a Listener

Simply mark any class with @PubSubListener. The library will automatically discover it and start the subscription.

// src/listeners/user-created.listener.ts
import { Injectable, Logger } from '@nestjs/common';
import { Message } from '@google-cloud/pubsub';
import { PubSubListener } from 'nestjs-pubsub-lib';

@Injectable()
@PubSubListener({
  topicName: 'user.created',
  subscriptionName: 'notification-service.user.created',
})
export class UserCreatedListener {
  private readonly logger = new Logger(UserCreatedListener.name);

  constructor(private readonly emailService: EmailService) {}

  /**
   * Method called automatically when a message arrives.
   * payload is automatically parsed from JSON.
   */
  async handle(payload: { userId: string; email: string }, message: Message) {
    this.logger.log(`Processing user: ${payload.userId}`);
    
    await this.emailService.sendWelcome(payload.email);
    
    // Ack the message to remove it from the queue
    message.ack();
  }
}

3. Publish an Event

Inject PubsubPublisher into any service.

@Injectable()
export class UserService {
  constructor(private readonly publisher: PubsubPublisher) {}

  async create(user: UserDto) {
    // ... logic ...
    await this.publisher.dispatchEvent('user.created', { 
      userId: user.id, 
      email: user.email 
    });
  }
}

Configuration Overrides

Each @PubSubListener can override global settings like acknowledgment deadlines or retry policies.

@PubSubListener({
  topicName: 'video.processing',
  subscriptionName: 'transcoder-sub',
  subscriptionOptions: {
    ackDeadlineSeconds: 600, // 10 minutes for long tasks
    retryPolicy: {
      minimumBackoff: { seconds: 30 },
      maximumBackoff: { seconds: 600 },
    },
  },
})
export class VideoListener {
  async handle(payload: any, message: Message) {
    // ... business logic ...
    message.ack();
  }
}

Configuration Options

Option Type Default Description
projectId string Required Your Google Cloud Project ID.
emulatorMode boolean false Set to true to connect to a local Pub/Sub emulator.
port number undefined The port of the emulator (Required if emulatorMode is true).
autoCreateTopics boolean false If true, the library will create missing topics/subscriptions on startup. Recommended: true for dev, false for prod.
keyFilename string undefined Full path to your GCP service account JSON file.
credentials object undefined Object containing client_email and private_key.
logger LoggerService Logger Custom logger (e.g., Winston, Pino).

Testing

Since listeners are plain NestJS classes, you can unit test them without any Pub/Sub infrastructure:

describe('UserCreatedListener', () => {
  it('should send an email', async () => {
    const mockEmailService = { sendWelcome: jest.fn() };
    const listener = new UserCreatedListener(mockEmailService as any);
    const mockMessage = { ack: jest.fn() };

    await listener.handle({ userId: '1', email: 'test@test.com' }, mockMessage as any);

    expect(mockEmailService.sendWelcome).toHaveBeenCalledWith('test@test.com');
    expect(mockMessage.ack).toHaveBeenCalled();
  });
});

Advanced: Accessing the Raw Client

If you need access to the underlying @google-cloud/pubsub client (e.g., for creating snapshots or managing IAM policies):

import { PUBSUB_CLIENT } from 'nestjs-pubsub-lib';
import { PubSub } from '@google-cloud/pubsub';

@Injectable()
export class AdminService {
  constructor(@Inject(PUBSUB_CLIENT) private readonly pubSub: PubSub) {}
}

License

MIT © Stubbies

About

A robust, type-safe, and production-ready Google Pub/Sub module for NestJS.

Topics

Resources

License

Stars

Watchers

Forks

Contributors