-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker_pool.go
More file actions
78 lines (65 loc) · 1.25 KB
/
worker_pool.go
File metadata and controls
78 lines (65 loc) · 1.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package main
import (
"context"
"net"
"sync"
)
// Job represents a task to be processed by the worker pool.
type Job struct {
Conn net.Conn
Handler func(net.Conn)
}
// WorkerPool defines the interface for a task distribution system.
type WorkerPool interface {
Submit(job Job) bool
}
// channelWorkerPool implements WorkerPool using Go channels.
type channelWorkerPool struct {
jobQueue chan Job
ctx context.Context
wg *sync.WaitGroup
}
// NewWorkerPool creates and starts a new worker pool.
func NewWorkerPool(ctx context.Context, maxWorkers, queueSize int, wg *sync.WaitGroup) WorkerPool {
if maxWorkers <= 0 {
maxWorkers = 1
}
if queueSize <= 0 {
queueSize = 1
}
if wg == nil {
wg = &sync.WaitGroup{}
}
wp := &channelWorkerPool{
jobQueue: make(chan Job, queueSize),
ctx: ctx,
wg: wg,
}
for range maxWorkers {
wg.Add(1)
go wp.worker()
}
return wp
}
func (wp *channelWorkerPool) worker() {
defer wp.wg.Done()
for {
select {
case <-wp.ctx.Done():
return
case job, ok := <-wp.jobQueue:
if !ok {
return
}
job.Handler(job.Conn)
}
}
}
func (wp *channelWorkerPool) Submit(job Job) bool {
select {
case <-wp.ctx.Done():
return false
case wp.jobQueue <- job:
return true
}
}