Type: Distributed Job/Task Queue System with Pub/Sub Notifications
Purpose: Asynchronous job processing with reliability and monitoring
Architecture Pattern: Producer-Consumer with Work Queues
┌─────────────────────────────────────────────────────────────────────────┐
│ JOB QUEUE SYSTEM ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌───────────────────────────┐ ┌─────────────┐│
│ │ PRODUCER │─────▶│ REDIS STORE │◀────│ CONSUMERS ││
│ │ (Express API)│ │ │ │ (Workers) ││
│ └─────────────┘ │ ┌─────────────────────┐ │ └─────────────┘│
│ │ │ job_queue (List) │ │ │ │
│ ┌─────────────┐ │ │ ┌──┐ ┌──┐ ┌──┐ ┌──┐ │ │ │ │
│ │ MONITOR │◀─────│ │ │J1│ │J2│ │J3│ │J4│ │ │ ▼ │
│ │ (CLI) │ │ │ └──┘ └──┘ └──┘ └──┘ │ │ ┌─────────────┐ │
│ └─────────────┘ │ │ LPUSH BRPOP │ │ │ JOB PROCESS │ │
│ │ └─────────────────────┘ │ │ (Business │ │
│ │ │ │ Logic) │ │
│ │ ┌─────────────────────┐ │ └─────────────┘ │
│ │ │ Processing Queue │ │ │ │
│ │ └─────────────────────┘ │ │ │
│ │ │ ▼ │
│ │ ┌─────────────────────┐ │ ┌─────────────┐ │
│ │ │ Completed Queue │ │ │ SUCCESS/ │ │
│ │ └─────────────────────┘ │ │ FAILURE │ │
│ │ │ └─────────────┘ │
│ │ ┌─────────────────────┐ │ │ │
│ │ │ Failed Queue │ │ │ │
│ │ └─────────────────────┘ │ │ │
│ │ │ ▼ │
│ │ ┌─────────────────────┐ │ ┌─────────────┐ │
│ │ │ job_status (Pub/Sub)│◀────│ STATUS UPDATE│ │
│ │ └─────────────────────┘ │ └─────────────┘ │
│ └───────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
├── Lists (Core Queue Mechanism)
│ ├── job_queue (FIFO queue)
│ ├── job_queue:processing (Active jobs)
│ ├── job_queue:completed (Successful jobs)
│ └── job_queue:failed (Failed jobs)
│
├── Pub/Sub Channels
│ └── job_status (Real-time notifications)
│
└── Operations
├── LPUSH (Left Push) - Add to queue
├── BRPOP (Blocking Right Pop) - Consume jobs
├── LLEN - Get queue length
├── LRANGE - List jobs
├── LREM - Remove specific job
└── DEL - Clear queue
┌─────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ QUEUED │────▶│ PROCESSING │────▶│ SUCCESS │────▶│ COMPLETED │
└─────────┘ └──────────────┘ └──────────────┘ └──────────────┘
│ │ │
│ │ │
│ ▼ ▼
│ ┌──────────────┐ ┌──────────────┐
└──────────▶│ RETRY │ │ FAILED │
└──────────────┘ └──────────────┘
JobQueue
├── Properties
│ ├── redis (Producer connection)
│ └── pubSubRedis (Notification connection)
│
├── Methods
│ ├── addJob(jobData)
│ │ ├── Generate unique job ID
│ │ ├── Create job object with metadata
│ │ ├── LPUSH to job_queue
│ │ └── Publish 'queued' event
│ │
│ ├── getStats()
│ │ └── Get all queue lengths (LLEN)
│ │
│ ├── getJobs(queueName)
│ │ └── Retrieve jobs (LRANGE)
│ │
│ └── clearQueue(queueName)
│ └── Delete queue (DEL)
└── Job Structure
{
id: "job_1700000000000_abc123",
data: {task: "process_payment", amount: 100},
createdAt: "2024-01-01T10:00:00.000Z",
status: "queued",
attempts: 0
}
Worker
├── Properties
│ ├── workerId (Unique identifier)
│ ├── isRunning (Control flag)
│ ├── redis (Worker connection)
│ └── pubSubRedis (Notification connection)
│
├── Methods
│ ├── processJob(job)
│ │ ├── Simulate processing (1-4 seconds)
│ │ ├── 10% random failure rate
│ │ └── Return processed result
│ │
│ └── start()
│ ├── while(isRunning)
│ │ ├── BRPOP(job_queue, 5s) [Blocking call]
│ │ ├── If job received:
│ │ │ ├── Move to processing queue
│ │ │ ├── Publish 'processing' event
│ │ │ ├── Execute processJob()
│ │ │ │ ├── Success: Move to completed
│ │ │ │ └── Failure: Move to failed
│ │ │ └── Publish status event
│ │ └── If timeout: Continue loop
│ └── Graceful shutdown handler
API Endpoints
├── POST /jobs
│ └── Add new job to queue
│
├── GET /jobs/stats
│ └── Get queue statistics
│
├── GET /jobs/:queueType
│ └── List jobs (queued|processing|completed|failed)
│
└── DELETE /jobs/:queueType
└── Clear specific queue
┌─────────────┐ 1. POST /jobs ┌─────────────┐ 2. LPUSH ┌─────────────┐
│ Client │────────────────────▶│ Express │───────────────▶│ Redis │
│ │ │ API │ │ │
│ │◀────────────────────│ │ │ │
│ │ 6. Job Created │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
│ 3. Create Job Object │
│ │
│ 4. Publish 'queued' event │
└───────────────────────────────┘
│ │
┌─────────────┐ 5. Event ┌─────────────┐
│ Pub/Sub │◀──────────────│ Redis │
│ Subscriber │ │ (Pub/Sub) │
└─────────────┘ └─────────────┘
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Worker │ │ Redis │ │ Business │
│ │ │ │ │ Logic │
├─────────────┤ ├─────────────┤ ├─────────────┤
│ 1. BRPOP() │───────────────────▶│ Get Job │ │ │
│ (Blocks) │ │ from List │ │ │
│ │◀───────────────────│ │ │ │
│ │ 2. Job Data │ │ │ │
│ │ │ │ │ │
│ 3. LREM+Lpush│───────────────────▶│ Move to │ │ │
│ │ │ Processing │ │ │
│ │ │ │ │ │
│ 4. Publish │───────────────────▶│Update Status│ │ │
│ 'processing'│ │ │ │ │
│ │ │ │ │ │
│ 5. Execute │───────────────────────────────────────────────────────▶│ Process │
│ processJob()│ │ │ │ │
│ │◀───────────────────────────────────────────────────────│ │
│ │ 6. Result/Error │ │ │ │
│ │ │ │ │ │
│ 7. Move to │───────────────────▶│ Final Queue │ │ │
│ final queue │ │(Completed/ │ │ │
│ │ │ Failed) │ │ │
│ │ │ │ │ │
│ 8. Publish │───────────────────▶│Update Status│ │ │
│ final status│ │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
Connection Pool
├── Producer Connection (redisProducer)
│ └── Used for: LPUSH, LRANGE, LLEN, DEL
│
├── Pub/Sub Connection (redisPubSub)
│ └── Used for: PUBLISH notifications
│
├── Worker Connections (per worker)
│ └── Used for: BRPOP, LREM, LPUSH
│
└── Subscriber Connection
└── Used for: SUBSCRIBE to job_status
FIFO Implementation:
Producers: LPUSH (add to left) → [new, ..., old]
Consumers: BRPOP (take from right) ←
Result: First-In-First-Out
State Management:
queued → processing → {completed | failed}
Each state has dedicated Redis list
Retry Mechanism:
Job.attempts counter
No automatic retry in this example (but pattern is established)
Dead Letter Queue:
job_queue:failed acts as DLQ
Failed jobs preserved for inspection
1. Atomic Operations
└── BRPOP is atomic (no race conditions)
2. Job Persistence
└── Jobs survive process crashes (stored in Redis)
3. Worker Isolation
└── Each worker has independent connection
4. Graceful Shutdown
└── SIGTERM handler stops workers cleanly
5. Connection Management
└── Separate connections for different purposes
BRPOP (Blocking) vs RPOP (Non-blocking)
┌────────────────────────┬────────────────────────┐
│ BRPOP │ RPOP │
├────────────────────────┼────────────────────────┤
│ Waits for job │ Returns null if empty │
│ No busy-waiting │ Requires polling │
│ Better CPU usage │ Higher latency │
│ Built-in timeout │ Manual sleep needed │
└────────────────────────┴────────────────────────┘
Queue Separation Benefits:
┌──────────────────┬─────────────────────────────────────────┐
│ Queue │ Purpose │
├──────────────────┼─────────────────────────────────────────┤
│ job_queue │ Pending jobs (BRPOP source) │
│ processing │ Currently executing jobs │
│ completed │ Success history (audit trail) │
│ failed │ Error analysis and manual retry │
└──────────────────┴─────────────────────────────────────────┘
Dual Communication Channels:
┌────────────────────────┬─────────────────────────────────┐
│ Lists │ Pub/Sub │
├────────────────────────┼─────────────────────────────────┤
│ Job data transfer │ Status notifications │
│ Durable storage │ Real-time updates │
│ Order preservation │ Event broadcasting │
│ Worker coordination │ Monitoring/alerting │
└────────────────────────┴─────────────────────────────────┘
// Example stats output
{
"queued": 5, // Jobs waiting for processing
"processing": 2, // Currently being processed
"completed": 10, // Successfully processed
"failed": 1 // Failed jobs
}// Real-time job lifecycle events
{
"jobId": "job_1700000000000_abc123",
"status": "processing", // queued|processing|completed|failed
"workerId": 2, // Which worker is processing
"timestamp": "2024-01-01T10:00:01.000Z",
"error": "Random processing error" // Only for failed
}Add More Workers:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker 3 │
│ │ │ │ │ │
│ BRPOP() │ │ BRPOP() │ │ BRPOP() │
│ │ │ │ │ │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└──────────────────┼──────────────────┘
│
┌─────────────┐
│ Redis │
│ job_queue │
│ (Shared) │
└─────────────┘
// Possible extension: Priority queues
const HIGH_PRIORITY_QUEUE = 'job_queue:high';
const NORMAL_QUEORITY_QUEUE = 'job_queue:normal';
// Workers check high priority first
const result = await this.redis.brpop(
HIGH_PRIORITY_QUEUE,
NORMAL_QUEORITY_QUEUE,
5
);1. No job timeout mechanism
2. No retry policy (immediate failure)
3. No job prioritization
4. No job scheduling (delayed execution)
5. No job deduplication
6. Memory growth (completed/failed queues grow indefinitely)
1. Add job TTL (expire completed/failed jobs)
2. Implement retry with exponential backoff
3. Add dead letter queue after N retries
4. Implement job result persistence (RDBMS)
5. Add rate limiting per worker
6. Implement job dependencies/chaining
7. Add job progress tracking
Operation Complexity Description
───────────── ─────────── ─────────────────────────────
LPUSH O(1) Add job to queue
BRPOP O(1) Get job from queue (blocking)
LLEN O(1) Get queue length
LRANGE O(S+N) Get range of jobs
LREM O(N) Remove specific job
PUBLISH O(N+M) Broadcast to N clients, M patterns
Assuming:
- Redis: 50k ops/sec
- Job processing: 2s average
- 3 workers
Max throughput:
- Queue ops: ~25k jobs/sec (theoretical)
- Processing: ~90 jobs/min (1.5 jobs/sec) - bottleneck
Bottleneck: Business logic, not Redis
This architecture provides a robust foundation for asynchronous job processing that can scale horizontally, provides real-time monitoring, and ensures job persistence through system failures. The separation of concerns between producers, consumers, and monitors makes it suitable for distributed systems.