Skip to content

Commit 69690af

Browse files
committed
switched to an inverted worker pool for better system efficiency
1 parent 700dd08 commit 69690af

2 files changed

Lines changed: 69 additions & 54 deletions

File tree

src/models/worker_semaphore.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package models
2+
3+
type WorkerSemaphore struct {
4+
semaphore chan struct{}
5+
}
6+
7+
func NewWorkerSemaphore(maxWorkers int) *WorkerSemaphore {
8+
sem := &WorkerSemaphore{
9+
semaphore: make(chan struct{}, maxWorkers),
10+
}
11+
for i := 0; i < maxWorkers; i++ {
12+
sem.Release() // fill the semaphore with maxWorkers
13+
}
14+
return sem
15+
}
16+
17+
// Acquire returns the semaphore channel for select statements
18+
// will block until resource is available
19+
func (s *WorkerSemaphore) Acquire() <-chan struct{} {
20+
return s.semaphore
21+
}
22+
23+
func (s *WorkerSemaphore) Release() {
24+
s.semaphore <- struct{}{}
25+
}

src/service/jobs/job_queue.go

Lines changed: 44 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@ package jobs
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
76
dk "github.com/docker/docker/api/types/container"
87
"github.com/docker/docker/pkg/stdcopy"
98
"github.com/makeopensource/leviathan/common"
10-
"github.com/makeopensource/leviathan/models"
9+
. "github.com/makeopensource/leviathan/models"
1110
"github.com/makeopensource/leviathan/service/docker"
1211
"github.com/rs/zerolog/log"
1312
"gorm.io/gorm"
@@ -17,41 +16,31 @@ import (
1716
)
1817

1918
type JobQueue struct {
20-
jobChannel chan *models.Job
21-
db *gorm.DB
22-
dkSrv *docker.DkService
23-
contextMap *models.Map[string, func()]
19+
jobSemaphore *WorkerSemaphore
20+
db *gorm.DB
21+
dkSrv *docker.DkService
22+
contextMap *Map[string, func()]
2423
}
2524

2625
func NewJobQueue(totalJobs uint, db *gorm.DB, dk *docker.DkService) *JobQueue {
2726
queue := &JobQueue{
28-
jobChannel: make(chan *models.Job, totalJobs),
29-
contextMap: &models.Map[string, func()]{},
30-
db: db,
31-
dkSrv: dk,
27+
contextMap: &Map[string, func()]{},
28+
db: db,
29+
dkSrv: dk,
30+
jobSemaphore: NewWorkerSemaphore(int(totalJobs)),
3231
}
3332

34-
queue.spawnWorkers(int(totalJobs))
3533
return queue
3634
}
3735

38-
func (q *JobQueue) spawnWorkers(workerCount int) {
39-
for i := 0; i < workerCount; i++ {
40-
go q.worker()
41-
}
42-
}
43-
44-
func (q *JobQueue) AddJob(mes *models.Job) error {
36+
func (q *JobQueue) AddJob(mes *Job) error {
4537
jog(mes.JobCtx).Info().Msg("sending job to queue")
4638
err := mes.ValidateForQueue()
4739
if err != nil {
4840
return common.ErrLog("job validation failed: "+err.Error(), err, jog(mes.JobCtx).Error())
4941
}
5042

51-
// run in go routine in case queue is full and gets blocked
52-
go func() {
53-
q.jobChannel <- mes
54-
}()
43+
go q.worker(mes)
5544
return nil
5645
}
5746

@@ -75,27 +64,28 @@ func (q *JobQueue) CancelJob(messageId string) {
7564
cancel()
7665
}
7766

78-
func (q *JobQueue) worker() {
79-
for msg := range q.jobChannel {
80-
if msg == nil {
81-
log.Error().Msg("job received was nil, THIS SHOULD NEVER HAPPEN")
82-
continue
83-
}
84-
85-
if errors.Is(msg.JobCtx.Err(), context.Canceled) {
86-
q.setJobAsCancelled(msg)
87-
q.cleanupJob(msg, nil)
88-
jog(msg.JobCtx).Warn().Msgf("job context was canceled before queue could process")
89-
continue
90-
}
67+
func (q *JobQueue) worker(msg *Job) {
68+
if msg == nil {
69+
log.Error().Msg("job received was nil, THIS SHOULD NEVER HAPPEN")
70+
return
71+
}
9172

73+
select {
74+
case <-q.jobSemaphore.Acquire():
75+
defer q.jobSemaphore.Release()
9276
jog(msg.JobCtx).Info().Msgf("worker is now processing job")
9377
q.runJob(msg)
78+
return
79+
case <-msg.JobCtx.Done():
80+
q.setJobAsCancelled(msg)
81+
q.cleanupJob(msg, nil)
82+
jog(msg.JobCtx).Warn().Msgf("job context was canceled before queue could process")
83+
return
9484
}
9585
}
9686

9787
// runJob should ALWAYS BE BLOCKING, as it prevents the worker from moving on to a new job
98-
func (q *JobQueue) runJob(job *models.Job) {
88+
func (q *JobQueue) runJob(job *Job) {
9989
client, contId, err, reason := q.setupJob(job)
10090
defer q.cleanupJob(job, client)
10191
if err != nil {
@@ -155,7 +145,7 @@ func (q *JobQueue) runJob(job *models.Job) {
155145
// setupJob Set up job like king, yes!
156146
// returns nil client if an error occurred while setup,
157147
// make sure to handle null ptr dereference
158-
func (q *JobQueue) setupJob(msg *models.Job) (*docker.DkClient, string, error, string) {
148+
func (q *JobQueue) setupJob(msg *Job) (*docker.DkClient, string, error, string) {
159149
q.setJobInSetup(msg)
160150

161151
machine, err := q.dkSrv.ClientManager.GetClientById(msg.MachineId)
@@ -180,8 +170,8 @@ func (q *JobQueue) setupJob(msg *models.Job) (*docker.DkClient, string, error, s
180170
}
181171

182172
resources := dk.Resources{
183-
NanoCPUs: msg.LabData.JobLimits.NanoCPU * models.CPUQuota,
184-
Memory: msg.LabData.JobLimits.Memory * models.MB,
173+
NanoCPUs: msg.LabData.JobLimits.NanoCPU * CPUQuota,
174+
Memory: msg.LabData.JobLimits.Memory * MB,
185175
PidsLimit: &msg.LabData.JobLimits.PidsLimit,
186176
}
187177

@@ -206,7 +196,7 @@ func (q *JobQueue) setupJob(msg *models.Job) (*docker.DkClient, string, error, s
206196

207197
// cleanupJob clean up job,
208198
// updates job in DB, removes the container and associated tmp job data
209-
func (q *JobQueue) cleanupJob(msg *models.Job, client *docker.DkClient) {
199+
func (q *JobQueue) cleanupJob(msg *Job, client *docker.DkClient) {
210200
jog(msg.JobCtx).Info().Msg("cleaning up job")
211201
q.updateJobVeryNice(msg)
212202

@@ -231,9 +221,9 @@ func (q *JobQueue) cleanupJob(msg *models.Job, client *docker.DkClient) {
231221
// Very nice!
232222
//
233223
// jobResult is the last line expected to be valid json string, returned to the job caller
234-
func (q *JobQueue) greatSuccess(job *models.Job, jobResult string) {
224+
func (q *JobQueue) greatSuccess(job *Job, jobResult string) {
235225
jog(job.JobCtx).Info().Msg("job completed successfully")
236-
job.Status = models.Complete
226+
job.Status = Complete
237227
job.StatusMessage = jobResult
238228
}
239229

@@ -244,46 +234,46 @@ func (q *JobQueue) greatSuccess(job *models.Job, jobResult string) {
244234
// The publicReason will be displayed to the end user, providing a user-friendly message.
245235
//
246236
// The err parameter holds the underlying error, used for debugging purposes.
247-
func (q *JobQueue) bigProblem(job *models.Job, publicReason string, err error) {
237+
func (q *JobQueue) bigProblem(job *Job, publicReason string, err error) {
248238
jog(job.JobCtx).Error().Err(err).Str("reason", publicReason).Msg("job failed")
249-
job.Status = models.Failed
239+
job.Status = Failed
250240
job.StatusMessage = publicReason
251241
if err != nil {
252242
job.Error = err.Error()
253243
}
254244
}
255245

256-
func (q *JobQueue) setJobAsCancelled(job *models.Job) {
246+
func (q *JobQueue) setJobAsCancelled(job *Job) {
257247
jog(job.JobCtx).Info().Msg("job was cancelled")
258-
job.Status = models.Canceled
248+
job.Status = Canceled
259249
job.StatusMessage = "Job was cancelled"
260250
}
261251

262252
// setJobInProgress set job status as models.Running
263253
//
264254
// Job is in progress, success soon!
265-
func (q *JobQueue) setJobInProgress(msg *models.Job) {
266-
msg.Status = models.Running
255+
func (q *JobQueue) setJobInProgress(msg *Job) {
256+
msg.Status = Running
267257
q.updateJobVeryNice(msg)
268258
}
269259

270260
// setJobInSetup set job status as models.Preparing
271261
//
272262
// job is being setup standby
273-
func (q *JobQueue) setJobInSetup(msg *models.Job) {
274-
msg.Status = models.Preparing
263+
func (q *JobQueue) setJobInSetup(msg *Job) {
264+
msg.Status = Preparing
275265
q.updateJobVeryNice(msg)
276266
}
277267

278268
// updateJobVeryNice Database updated, fresh like new wife!
279-
func (q *JobQueue) updateJobVeryNice(msg *models.Job) {
269+
func (q *JobQueue) updateJobVeryNice(msg *Job) {
280270
res := q.db.Save(msg)
281271
if res.Error != nil {
282272
jog(msg.JobCtx).Error().Err(res.Error).Msg("error occurred while saving job to db")
283273
}
284274
}
285275

286-
func writeLogs(client *docker.DkClient, msg *models.Job) (string, error) {
276+
func writeLogs(client *docker.DkClient, msg *Job) (string, error) {
287277
outputFile, err := os.OpenFile(msg.OutputLogFilePath, os.O_RDWR|os.O_CREATE, 0660)
288278
if err != nil {
289279
return "unable to open log file", err
@@ -308,8 +298,8 @@ func writeLogs(client *docker.DkClient, msg *models.Job) (string, error) {
308298
return "", nil
309299
}
310300

311-
func verifyLogs(msg *models.Job) (string, string, error) {
312-
if msg.Status == models.Failed {
301+
func verifyLogs(msg *Job) (string, string, error) {
302+
if msg.Status == Failed {
313303
return "", "Job failed, skipping parsing log file", nil
314304
}
315305

0 commit comments

Comments
 (0)