Skip to content

Commit e32ad34

Browse files
committed
Add dynamic database name configuration
Introduced ability to specify the database name in SQL queries through a new `DbName` field in the `Config` struct. This change allows greater flexibility when executing queries across different databases. Updated job management functions to utilize this new configuration.
1 parent 12fff4a commit e32ad34

3 files changed

Lines changed: 38 additions & 36 deletions

File tree

config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ type Config struct {
3131
// DB is the user-provided database connection where the jobs table is stored.
3232
DB *sql.DB
3333

34+
// DbName is name of the database.
35+
DbName string
36+
3437
// RetryCount is how many times we allow a job to fail before ignoring it.
3538
RetryCount uint
3639

database.go

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,27 @@ import (
1313
// getPendingJob looks for a job with status in (PENDING, FAILED), not locked, retry < cfg.RetryCount, available now.
1414
func getPendingJob(tx *sql.Tx, cfg *Config) (*JobRecord, error) {
1515
query := `
16-
SELECT
17-
id,
18-
operation,
19-
status,
20-
payload,
21-
output,
22-
locked_by,
23-
locked_until,
24-
retry_count,
25-
available_at,
26-
created_at,
27-
updated_at
28-
FROM jobs
29-
WHERE
30-
(status = 'PENDING' OR status = 'FAILED')
31-
AND (locked_until IS NULL OR locked_until < NOW())
32-
AND retry_count < ?
33-
AND available_at <= NOW()
34-
ORDER BY available_at
35-
LIMIT 1
36-
FOR UPDATE
16+
SELECT
17+
id,
18+
operation,
19+
status,
20+
payload,
21+
output,
22+
locked_by,
23+
locked_until,
24+
retry_count,
25+
available_at,
26+
created_at,
27+
updated_at
28+
FROM ` + cfg.DbName + `.jobs
29+
WHERE
30+
(status = 'PENDING' OR status = 'FAILED')
31+
AND (locked_until IS NULL OR locked_until < NOW())
32+
AND retry_count < ?
33+
AND available_at <= NOW()
34+
ORDER BY available_at
35+
LIMIT 1
36+
FOR UPDATE
3737
`
3838
row := tx.QueryRow(query, cfg.RetryCount)
3939
var rec JobRecord
@@ -63,15 +63,14 @@ FOR UPDATE
6363
return &rec, nil
6464
}
6565

66-
func assignJobToWorker(tx *sql.Tx, jobID uint64, workerID string, lockUntil time.Time) error {
67-
stmt := `
68-
UPDATE jobs
69-
SET
70-
status = ?,
71-
locked_by = ?,
72-
locked_until = ?,
73-
updated_at = ?
74-
WHERE id = ?
66+
func assignJobToWorker(cfg *Config, tx *sql.Tx, jobID uint64, workerID string, lockUntil time.Time) error {
67+
stmt := `UPDATE ` + cfg.DbName + `.jobs
68+
SET
69+
status = ?,
70+
locked_by = ?,
71+
locked_until = ?,
72+
updated_at = ?
73+
WHERE id = ?
7574
`
7675
_, err := tx.Exec(stmt,
7776
JobInProgress,
@@ -83,7 +82,7 @@ WHERE id = ?
8382
return err
8483
}
8584

86-
func finishJob(db *sql.DB, jobID uint64, finalStatus JobStatus, output any, incrementRetry bool, availableAt *time.Time, errorOutput error) error {
85+
func finishJob(cfg *Config, jobID uint64, finalStatus JobStatus, output any, incrementRetry bool, availableAt *time.Time, errorOutput error) error {
8786
outputJson, err := json.Marshal(output)
8887
if err != nil {
8988
return err
@@ -130,8 +129,8 @@ func finishJob(db *sql.DB, jobID uint64, finalStatus JobStatus, output any, incr
130129

131130
args = append(args, jobID)
132131

133-
query := fmt.Sprintf("UPDATE jobs SET %s WHERE id = ?", strings.Join(setClauses, ", "))
134-
_, err = db.Exec(query, args...)
132+
query := fmt.Sprintf("UPDATE %s.jobs SET %s WHERE id = ?", cfg.DbName, strings.Join(setClauses, ", "))
133+
_, err = cfg.DB.Exec(query, args...)
135134
return err
136135
}
137136

@@ -141,7 +140,7 @@ func createJob(ctx context.Context, tf *TaskFlow, operation Operation, payload a
141140
return 0, err
142141
}
143142
now := time.Now().Round(time.Microsecond)
144-
query := "INSERT INTO jobs (operation, status, payload, locked_by, locked_until, retry_count, available_at, created_at, updated_at) VALUES (?, ?, ?, NULL, NULL, 0, ?, ?, ?)"
143+
query := fmt.Sprintf("INSERT INTO %s.jobs (operation, status, payload, locked_by, locked_until, retry_count, available_at, created_at, updated_at) VALUES (?, ?, ?, NULL, NULL, 0, ?, ?, ?)", tf.cfg.DbName)
145144
res, err := tf.cfg.DB.ExecContext(ctx, query, operation, JobPending, plq, executeAt, now, now)
146145
if err != nil {
147146
return 0, fmt.Errorf("failed to insert job: %w", err)

worker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (w *Worker) fetchAndProcess(ctx context.Context) {
9292
}
9393

9494
lockUntil := time.Now().Add(w.cfg.JobTimeout)
95-
if err := assignJobToWorker(tx, jobRec.ID, w.id, lockUntil); err != nil {
95+
if err := assignJobToWorker(w.cfg, tx, jobRec.ID, w.id, lockUntil); err != nil {
9696
_ = tx.Rollback()
9797
w.cfg.logError(LogEvent{
9898
Message: fmt.Sprintf("Error assigning job %d to worker %s", jobRec.ID, w.id),
@@ -142,7 +142,7 @@ func (w *Worker) fetchAndProcess(ctx context.Context) {
142142
nextAvailableAt = &t
143143
}
144144

145-
if err = finishJob(w.cfg.DB, jobRec.ID, finalStatus, output, incrementRetry, nextAvailableAt, execErr); err != nil {
145+
if err = finishJob(w.cfg, jobRec.ID, finalStatus, output, incrementRetry, nextAvailableAt, execErr); err != nil {
146146
w.cfg.logError(LogEvent{
147147
Message: fmt.Sprintf("Error finishing job %d", jobRec.ID),
148148
WorkerID: w.id,

0 commit comments

Comments
 (0)