Skip to content

Latest commit

 

History

History
420 lines (376 loc) · 21.5 KB

File metadata and controls

420 lines (376 loc) · 21.5 KB

Redis Job Queue System - Deep Dive

📋 System Overview

Type: Distributed Job/Task Queue System with Pub/Sub Notifications
Purpose: Asynchronous job processing with reliability and monitoring
Architecture Pattern: Producer-Consumer with Work Queues

┌─────────────────────────────────────────────────────────────────────────┐
│                         JOB QUEUE SYSTEM ARCHITECTURE                   │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────┐      ┌───────────────────────────┐     ┌─────────────┐│
│  │  PRODUCER   │─────▶│       REDIS STORE         │◀────│  CONSUMERS  ││
│  │ (Express API)│     │                           │     │  (Workers)  ││
│  └─────────────┘      │  ┌─────────────────────┐  │     └─────────────┘│
│                       │  │   job_queue (List)  │  │           │        │
│  ┌─────────────┐      │  │ ┌──┐ ┌──┐ ┌──┐ ┌──┐ │  │           │        │
│  │   MONITOR   │◀─────│  │ │J1│ │J2│ │J3│ │J4│ │  │           ▼        │
│  │   (CLI)     │      │  │ └──┘ └──┘ └──┘ └──┘ │  │    ┌─────────────┐ │
│  └─────────────┘      │  │   LPUSH    BRPOP     │  │    │ JOB PROCESS │ │
│                       │  └─────────────────────┘  │    │  (Business   │ │
│                       │                           │    │   Logic)     │ │
│                       │  ┌─────────────────────┐  │    └─────────────┘ │
│                       │  │ Processing Queue    │  │           │        │
│                       │  └─────────────────────┘  │           │        │
│                       │                           │           ▼        │
│                       │  ┌─────────────────────┐  │    ┌─────────────┐ │
│                       │  │  Completed Queue    │  │    │  SUCCESS/   │ │
│                       │  └─────────────────────┘  │    │   FAILURE   │ │
│                       │                           │    └─────────────┘ │
│                       │  ┌─────────────────────┐  │           │        │
│                       │  │   Failed Queue      │  │           │        │
│                       │  └─────────────────────┘  │           │        │
│                       │                           │           ▼        │
│                       │  ┌─────────────────────┐  │    ┌─────────────┐ │
│                       │  │  job_status (Pub/Sub)│◀────│ STATUS UPDATE│ │
│                       │  └─────────────────────┘  │    └─────────────┘ │
│                       └───────────────────────────┘                    │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

🔑 Core Concepts

1. Redis Data Structures Used

├── Lists (Core Queue Mechanism)
│   ├── job_queue (FIFO queue)
│   ├── job_queue:processing (Active jobs)
│   ├── job_queue:completed (Successful jobs)
│   └── job_queue:failed (Failed jobs)
│
├── Pub/Sub Channels
│   └── job_status (Real-time notifications)
│
└── Operations
    ├── LPUSH (Left Push) - Add to queue
    ├── BRPOP (Blocking Right Pop) - Consume jobs
    ├── LLEN - Get queue length
    ├── LRANGE - List jobs
    ├── LREM - Remove specific job
    └── DEL - Clear queue

2. Queue Flow Pattern

┌─────────┐     ┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│  QUEUED │────▶│  PROCESSING  │────▶│   SUCCESS    │────▶│  COMPLETED   │
└─────────┘     └──────────────┘     └──────────────┘     └──────────────┘
     │                │                     │
     │                │                     │
     │                ▼                     ▼
     │          ┌──────────────┐     ┌──────────────┐
     └──────────▶│    RETRY     │     │    FAILED    │
                 └──────────────┘     └──────────────┘

🏗️ System Components

1. JobQueue Class (Producer Side)

JobQueue
├── Properties
│   ├── redis (Producer connection)
│   └── pubSubRedis (Notification connection)
│
├── Methods
│   ├── addJob(jobData)
│   │   ├── Generate unique job ID
│   │   ├── Create job object with metadata
│   │   ├── LPUSH to job_queue
│   │   └── Publish 'queued' event
│   │
│   ├── getStats()
│   │   └── Get all queue lengths (LLEN)
│   │
│   ├── getJobs(queueName)
│   │   └── Retrieve jobs (LRANGE)
│   │
│   └── clearQueue(queueName)
│       └── Delete queue (DEL)
└── Job Structure
    {
      id: "job_1700000000000_abc123",
      data: {task: "process_payment", amount: 100},
      createdAt: "2024-01-01T10:00:00.000Z",
      status: "queued",
      attempts: 0
    }

2. Worker Class (Consumer Side)

Worker
├── Properties
│   ├── workerId (Unique identifier)
│   ├── isRunning (Control flag)
│   ├── redis (Worker connection)
│   └── pubSubRedis (Notification connection)
│
├── Methods
│   ├── processJob(job)
│   │   ├── Simulate processing (1-4 seconds)
│   │   ├── 10% random failure rate
│   │   └── Return processed result
│   │
│   └── start()
│       ├── while(isRunning)
│       │   ├── BRPOP(job_queue, 5s) [Blocking call]
│       │   ├── If job received:
│       │   │   ├── Move to processing queue
│       │   │   ├── Publish 'processing' event
│       │   │   ├── Execute processJob()
│       │   │   │   ├── Success: Move to completed
│       │   │   │   └── Failure: Move to failed
│       │   │   └── Publish status event
│       │   └── If timeout: Continue loop
│       └── Graceful shutdown handler

3. Express API Layer

API Endpoints
├── POST /jobs
│   └── Add new job to queue
│
├── GET /jobs/stats
│   └── Get queue statistics
│
├── GET /jobs/:queueType
│   └── List jobs (queued|processing|completed|failed)
│
└── DELETE /jobs/:queueType
    └── Clear specific queue

🔄 Execution Flow

1. Job Submission Flow

┌─────────────┐   1. POST /jobs     ┌─────────────┐   2. LPUSH     ┌─────────────┐
│   Client    │────────────────────▶│   Express   │───────────────▶│    Redis    │
│             │                     │     API     │                │             │
│             │◀────────────────────│             │                │             │
│             │   6. Job Created    │             │                │             │
└─────────────┘                     └─────────────┘                └─────────────┘
                                          │                               │
                                          │ 3. Create Job Object          │
                                          │                               │
                                          │ 4. Publish 'queued' event     │
                                          └───────────────────────────────┘
                                          │                               │
                                   ┌─────────────┐   5. Event    ┌─────────────┐
                                   │  Pub/Sub    │◀──────────────│    Redis    │
                                   │ Subscriber  │               │  (Pub/Sub)  │
                                   └─────────────┘               └─────────────┘

2. Job Processing Flow

┌─────────────┐                    ┌─────────────┐                    ┌─────────────┐
│   Worker    │                    │    Redis    │                    │  Business   │
│             │                    │             │                    │   Logic     │
├─────────────┤                    ├─────────────┤                    ├─────────────┤
│ 1. BRPOP()  │───────────────────▶│   Get Job   │                    │             │
│ (Blocks)    │                    │  from List  │                    │             │
│             │◀───────────────────│             │                    │             │
│             │  2. Job Data       │             │                    │             │
│             │                    │             │                    │             │
│ 3. LREM+Lpush│───────────────────▶│ Move to    │                    │             │
│             │                    │ Processing  │                    │             │
│             │                    │             │                    │             │
│ 4. Publish  │───────────────────▶│Update Status│                    │             │
│  'processing'│                    │             │                    │             │
│             │                    │             │                    │             │
│ 5. Execute  │───────────────────────────────────────────────────────▶│   Process   │
│  processJob()│                    │             │                    │             │
│             │◀───────────────────────────────────────────────────────│             │
│             │   6. Result/Error   │             │                    │             │
│             │                    │             │                    │             │
│ 7. Move to  │───────────────────▶│ Final Queue │                    │             │
│ final queue │                    │(Completed/  │                    │             │
│             │                    │   Failed)   │                    │             │
│             │                    │             │                    │             │
│ 8. Publish  │───────────────────▶│Update Status│                    │             │
│ final status│                    │             │                    │             │
└─────────────┘                    └─────────────┘                    └─────────────┘

⚙️ Architecture Details

1. Redis Connection Strategy

Connection Pool
├── Producer Connection (redisProducer)
│   └── Used for: LPUSH, LRANGE, LLEN, DEL
│
├── Pub/Sub Connection (redisPubSub)
│   └── Used for: PUBLISH notifications
│
├── Worker Connections (per worker)
│   └── Used for: BRPOP, LREM, LPUSH
│
└── Subscriber Connection
    └── Used for: SUBSCRIBE to job_status

2. Queue Design Patterns

FIFO Implementation:
   Producers: LPUSH (add to left)  → [new, ..., old]
   Consumers: BRPOP (take from right) ←
   Result: First-In-First-Out

State Management:
   queued → processing → {completed | failed}
   Each state has dedicated Redis list

Retry Mechanism:
   Job.attempts counter
   No automatic retry in this example (but pattern is established)

Dead Letter Queue:
   job_queue:failed acts as DLQ
   Failed jobs preserved for inspection

3. Fault Tolerance Features

1. Atomic Operations
   └── BRPOP is atomic (no race conditions)

2. Job Persistence
   └── Jobs survive process crashes (stored in Redis)

3. Worker Isolation
   └── Each worker has independent connection

4. Graceful Shutdown
   └── SIGTERM handler stops workers cleanly

5. Connection Management
   └── Separate connections for different purposes

🎯 Key Design Decisions

1. Why BRPOP instead of RPOP?

BRPOP (Blocking) vs RPOP (Non-blocking)
┌────────────────────────┬────────────────────────┐
│       BRPOP            │        RPOP            │
├────────────────────────┼────────────────────────┤
│ Waits for job          │ Returns null if empty  │
│ No busy-waiting        │ Requires polling       │
│ Better CPU usage       │ Higher latency         │
│ Built-in timeout       │ Manual sleep needed    │
└────────────────────────┴────────────────────────┘

2. Why Multiple Queues?

Queue Separation Benefits:
┌──────────────────┬─────────────────────────────────────────┐
│     Queue        │              Purpose                    │
├──────────────────┼─────────────────────────────────────────┤
│ job_queue        │ Pending jobs (BRPOP source)             │
│ processing       │ Currently executing jobs                │
│ completed        │ Success history (audit trail)           │
│ failed           │ Error analysis and manual retry         │
└──────────────────┴─────────────────────────────────────────┘

3. Why Pub/Sub with Lists?

Dual Communication Channels:
┌────────────────────────┬─────────────────────────────────┐
│        Lists           │           Pub/Sub               │
├────────────────────────┼─────────────────────────────────┤
│ Job data transfer      │ Status notifications            │
│ Durable storage        │ Real-time updates               │
│ Order preservation     │ Event broadcasting              │
│ Worker coordination    │ Monitoring/alerting             │
└────────────────────────┴─────────────────────────────────┘

📊 Monitoring and Management

1. Queue Statistics

// Example stats output
{
  "queued": 5,      // Jobs waiting for processing
  "processing": 2,  // Currently being processed
  "completed": 10,  // Successfully processed
  "failed": 1       // Failed jobs
}

2. Event Stream

// Real-time job lifecycle events
{
  "jobId": "job_1700000000000_abc123",
  "status": "processing",  // queued|processing|completed|failed
  "workerId": 2,           // Which worker is processing
  "timestamp": "2024-01-01T10:00:01.000Z",
  "error": "Random processing error"  // Only for failed
}

🔄 Scaling Patterns

1. Horizontal Scaling

Add More Workers:
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Worker 1  │    │   Worker 2  │    │   Worker 3  │
│             │    │             │    │             │
│  BRPOP()    │    │  BRPOP()    │    │  BRPOP()    │
│             │    │             │    │             │
└──────┬──────┘    └──────┬──────┘    └──────┬──────┘
       │                  │                  │
       └──────────────────┼──────────────────┘
                          │
                   ┌─────────────┐
                   │   Redis     │
                   │  job_queue  │
                   │ (Shared)    │
                   └─────────────┘

2. Priority Queues (Extension)

// Possible extension: Priority queues
const HIGH_PRIORITY_QUEUE = 'job_queue:high';
const NORMAL_QUEORITY_QUEUE = 'job_queue:normal';

// Workers check high priority first
const result = await this.redis.brpop(
  HIGH_PRIORITY_QUEUE,
  NORMAL_QUEORITY_QUEUE,
  5
);

⚠️ Limitations and Considerations

1. Current Limitations

1. No job timeout mechanism
2. No retry policy (immediate failure)
3. No job prioritization
4. No job scheduling (delayed execution)
5. No job deduplication
6. Memory growth (completed/failed queues grow indefinitely)

2. Production Enhancements Needed

1. Add job TTL (expire completed/failed jobs)
2. Implement retry with exponential backoff
3. Add dead letter queue after N retries
4. Implement job result persistence (RDBMS)
5. Add rate limiting per worker
6. Implement job dependencies/chaining
7. Add job progress tracking

📈 Performance Characteristics

1. Redis Operations Complexity

Operation      Complexity   Description
─────────────  ───────────  ─────────────────────────────
LPUSH          O(1)         Add job to queue
BRPOP          O(1)         Get job from queue (blocking)
LLEN           O(1)         Get queue length
LRANGE         O(S+N)       Get range of jobs
LREM           O(N)         Remove specific job
PUBLISH        O(N+M)       Broadcast to N clients, M patterns

2. Throughput Estimates

Assuming:
- Redis: 50k ops/sec
- Job processing: 2s average
- 3 workers

Max throughput:
- Queue ops: ~25k jobs/sec (theoretical)
- Processing: ~90 jobs/min (1.5 jobs/sec) - bottleneck

Bottleneck: Business logic, not Redis

This architecture provides a robust foundation for asynchronous job processing that can scale horizontally, provides real-time monitoring, and ensures job persistence through system failures. The separation of concerns between producers, consumers, and monitors makes it suitable for distributed systems.