From 93b36911df4bcba64e10abe28040f9e357ca8eef Mon Sep 17 00:00:00 2001 From: Brandur Date: Fri, 19 Jun 2026 20:26:51 -0600 Subject: [PATCH] Add `JobStuckHandler` callback Here, add a `JobStuckHandler` callback that's invoked when a producer consider a job to be "stuck". i.e. Passed its timeout, cancellation attempted, but job didn't respond to cancellation. The callback includes some basic information about the job that became stuck, along with the total number of stuck jobs. A result will optionally open a new executor slot to replace the one taken up by the stuck job so that a producer that continues to be run doesn't get completed starved by stuck jobs. The idea here is that clients can configure themselves to open new slots up to a certain point, but then may want to restart themselves if there's enough jobs stuck that they could become a memory liability. --- client.go | 40 +++++++- client_test.go | 16 +++ example_job_stuck_handler_test.go | 118 ++++++++++++++++++++++ internal/jobexecutor/job_executor.go | 23 +++-- internal/jobexecutor/job_executor_test.go | 10 +- producer.go | 60 +++++++++-- producer_test.go | 99 ++++++++++++++++++ rivertest/worker.go | 4 +- stuck_job.go | 32 ++++++ 9 files changed, 376 insertions(+), 26 deletions(-) create mode 100644 example_job_stuck_handler_test.go create mode 100644 stuck_job.go diff --git a/client.go b/client.go index 3f29e545..6d7014ca 100644 --- a/client.go +++ b/client.go @@ -11,6 +11,7 @@ import ( "regexp" "strings" "sync" + "sync/atomic" "time" "github.com/riverqueue/river/internal/dblist" @@ -46,11 +47,12 @@ const ( FetchPollIntervalDefault = 1 * time.Second FetchPollIntervalMin = 1 * time.Millisecond - JobTimeoutDefault = 1 * time.Minute - MaxAttemptsDefault = rivercommon.MaxAttemptsDefault - PriorityDefault = rivercommon.PriorityDefault - QueueDefault = rivercommon.QueueDefault - QueueNumWorkersMax = 10_000 + JobStuckThresholdDefault = 10 * time.Second + JobTimeoutDefault = 1 * time.Minute + MaxAttemptsDefault = rivercommon.MaxAttemptsDefault + PriorityDefault = rivercommon.PriorityDefault + QueueDefault = rivercommon.QueueDefault + QueueNumWorkersMax = 10_000 ) var ( @@ -192,6 +194,25 @@ type Config struct { // instances of rivertype.JobInsertMiddleware). JobInsertMiddleware []rivertype.JobInsertMiddleware + // JobStuckHandler is invoked when a producer detects that a job exceeded + // its timeout and did not return from context cancellation within the + // allotted JobStuckThreshold (and if it didn't, we usually assume it won't + // return at all). The handler receives minimal information about the stuck + // job and the total number of jobs currently considered stuck across the + // client. + // + // JobStuckHandler lets an implementation indicate that a new worker slot + // should be opened to replace the one now occupied by a stuck job. It can + // also be used (for example) to stop and exit the program if too many jobs + // have been reported stuck. + JobStuckHandler JobStuckHandler + + // JobStuckThreshold is the amount of time after JobTimeout elapses to + // wait before a still-running job is considered stuck. + // + // Defaults to 10 seconds. + JobStuckThreshold time.Duration + // JobTimeout is the maximum amount of time a job is allowed to run before its // context is cancelled. A timeout of zero means JobTimeoutDefault will be // used, whereas a value of -1 means the job's context will not be cancelled @@ -471,6 +492,8 @@ func (c *Config) WithDefaults() *Config { ID: valutil.ValOrDefaultFunc(c.ID, func() string { return defaultClientID(time.Now().UTC()) }), Hooks: c.Hooks, JobInsertMiddleware: c.JobInsertMiddleware, + JobStuckHandler: c.JobStuckHandler, + JobStuckThreshold: cmp.Or(c.JobStuckThreshold, JobStuckThresholdDefault), JobTimeout: cmp.Or(c.JobTimeout, JobTimeoutDefault), Logger: logger, MaxAttempts: cmp.Or(c.MaxAttempts, MaxAttemptsDefault), @@ -521,6 +544,9 @@ func (c *Config) validate() error { if c.JobTimeout < -1 { return errors.New("JobTimeout cannot be negative, except for -1 (infinite)") } + if c.JobStuckThreshold < 0 { + return errors.New("JobStuckThreshold cannot be less than zero") + } if c.MaxAttempts < 0 { return errors.New("MaxAttempts cannot be less than zero") } @@ -670,6 +696,7 @@ type Client[TTx any] struct { queues *QueueBundle services []startstop.Service stopped <-chan struct{} + stuckJobCount atomic.Int32 subscriptionManager *subscriptionManager testSignals clientTestSignals @@ -2260,6 +2287,9 @@ func (c *Client[TTx]) producerAdd(queueName string, queueConfig QueueConfig) (*p FetchPollInterval: cmp.Or(queueConfig.FetchPollInterval, c.config.FetchPollInterval), HookLookupByJob: c.hookLookupByJob, HookLookupGlobal: c.hookLookupGlobal, + JobStuckHandler: c.config.JobStuckHandler, + JobStuckCount: &c.stuckJobCount, + JobStuckThreshold: c.config.JobStuckThreshold, JobTimeout: c.config.JobTimeout, MaxWorkers: queueConfig.MaxWorkers, MiddlewareLookupGlobal: c.middlewareLookupGlobal, diff --git a/client_test.go b/client_test.go index 9352dcbb..d3c666fe 100644 --- a/client_test.go +++ b/client_test.go @@ -8094,6 +8094,8 @@ func Test_NewClient_Defaults(t *testing.T) { require.Nil(t, client.config.ErrorHandler) require.Equal(t, FetchCooldownDefault, client.config.FetchCooldown) require.Equal(t, FetchPollIntervalDefault, client.config.FetchPollInterval) + require.Nil(t, client.config.JobStuckHandler) + require.Equal(t, JobStuckThresholdDefault, client.config.JobStuckThreshold) require.Equal(t, JobTimeoutDefault, client.config.JobTimeout) require.Nil(t, client.config.Hooks) require.NotZero(t, client.baseService.Logger) @@ -8117,6 +8119,9 @@ func Test_NewClient_Overrides(t *testing.T) { ) errorHandler := &testErrorHandler{} + jobStuckHandler := JobStuckHandler(func(ctx context.Context, params JobStuckHandlerParams) JobStuckHandlerResult { + return JobStuckHandlerResult{} + }) logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) workers := NewWorkers() @@ -8146,6 +8151,8 @@ func Test_NewClient_Overrides(t *testing.T) { FetchPollInterval: 124 * time.Millisecond, Hooks: []rivertype.Hook{&noOpHook{}}, JobInsertMiddleware: []rivertype.JobInsertMiddleware{&noOpInsertMiddleware{}}, + JobStuckHandler: jobStuckHandler, + JobStuckThreshold: 126 * time.Millisecond, JobTimeout: 125 * time.Millisecond, Logger: logger, MaxAttempts: 5, @@ -8184,6 +8191,8 @@ func Test_NewClient_Overrides(t *testing.T) { require.Equal(t, 123*time.Millisecond, client.config.FetchCooldown) require.Equal(t, 124*time.Millisecond, client.config.FetchPollInterval) require.Len(t, client.config.JobInsertMiddleware, 1) + require.NotNil(t, client.config.JobStuckHandler) + require.Equal(t, 126*time.Millisecond, client.config.JobStuckThreshold) require.Equal(t, 125*time.Millisecond, client.config.JobTimeout) require.Equal(t, []rivertype.Hook{&noOpHook{}}, client.config.Hooks) require.Equal(t, logger, client.baseService.Logger) @@ -8366,6 +8375,13 @@ func Test_NewClient_Validations(t *testing.T) { require.Equal(t, MaxAttemptsDefault, client.config.MaxAttempts) }, }, + { + name: "JobStuckThreshold cannot be less than zero", + configFunc: func(config *Config) { + config.JobStuckThreshold = -1 + }, + wantErr: errors.New("JobStuckThreshold cannot be less than zero"), + }, { name: "Middleware can be configured independently", configFunc: func(config *Config) { diff --git a/example_job_stuck_handler_test.go b/example_job_stuck_handler_test.go new file mode 100644 index 00000000..ab4e71fe --- /dev/null +++ b/example_job_stuck_handler_test.go @@ -0,0 +1,118 @@ +package river_test + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/util/testutil" +) + +type StuckJobHandlerArgs struct{} + +func (StuckJobHandlerArgs) Kind() string { return "stuck_job_handler" } + +type StuckJobHandlerWorker struct { + river.WorkerDefaults[StuckJobHandlerArgs] + + releaseJobs chan struct{} + started chan struct{} +} + +func (w *StuckJobHandlerWorker) Work(ctx context.Context, job *river.Job[StuckJobHandlerArgs]) error { + w.started <- struct{}{} + + // Ignore ctx.Done() to simulate a job that doesn't respond to cancellation. + <-w.releaseJobs + + return nil +} + +// Example_jobStuckHandler demonstrates how to use JobStuckHandler to stop a +// client when too many jobs are stuck so the process can be restarted. For the +// first couple stuck jobs it uses OpenWorkerSlot to add additional worker slots +// to replace those occupied by stuck jobs, but after maxStuckJobsBeforeRestart +// it gives up and exits so it can be restarted. +func Example_jobStuckHandler() { + ctx := context.Background() + + dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL()) + if err != nil { + panic(err) + } + defer dbPool.Close() + + var riverClient *river.Client[pgx.Tx] + + const maxStuckJobsBeforeRestart = 2 + + var ( + releaseJobs = make(chan struct{}) + restartRequested = make(chan struct{}) + started = make(chan struct{}, maxStuckJobsBeforeRestart+1) + stopOnce sync.Once + ) + + workers := river.NewWorkers() + river.AddWorker(workers, &StuckJobHandlerWorker{releaseJobs: releaseJobs, started: started}) + + riverClient, err = river.NewClient(riverpgxv5.New(dbPool), initTestConfig(ctx, dbPool, &river.Config{ + JobStuckHandler: func(ctx context.Context, params river.JobStuckHandlerParams) river.JobStuckHandlerResult { + if params.TotalStuckJobs <= maxStuckJobsBeforeRestart { + fmt.Printf("stuck jobs: %d; opening replacement worker slot\n", params.TotalStuckJobs) + return river.JobStuckHandlerResult{OpenWorkerSlot: true} + } + + stopOnce.Do(func() { + fmt.Printf("too many stuck jobs: %d; shutting down so the process can restart\n", params.TotalStuckJobs) + close(restartRequested) + + shutdownCtx := context.WithoutCancel(ctx) + go func() { + if err := riverClient.Stop(shutdownCtx); err != nil { + panic(err) + } + }() + }) + + return river.JobStuckHandlerResult{} + }, + JobStuckThreshold: time.Millisecond, + JobTimeout: 10 * time.Millisecond, + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 1}, + }, + Workers: workers, + })) + if err != nil { + panic(err) + } + + if err := riverClient.Start(ctx); err != nil { + panic(err) + } + + for range maxStuckJobsBeforeRestart + 1 { + if _, err := riverClient.Insert(ctx, StuckJobHandlerArgs{}, nil); err != nil { + panic(err) + } + } + + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), started, maxStuckJobsBeforeRestart+1) + + <-restartRequested + close(releaseJobs) + <-riverClient.Stopped() + + // Output: + // stuck jobs: 1; opening replacement worker slot + // stuck jobs: 2; opening replacement worker slot + // too many stuck jobs: 3; shutting down so the process can restart +} diff --git a/internal/jobexecutor/job_executor.go b/internal/jobexecutor/job_executor.go index dab44f39..6c5f2eba 100644 --- a/internal/jobexecutor/job_executor.go +++ b/internal/jobexecutor/job_executor.go @@ -9,6 +9,7 @@ import ( "log/slog" "runtime" "strings" + "sync/atomic" "time" "github.com/tidwall/gjson" @@ -116,7 +117,7 @@ type JobExecutor struct { MiddlewareLookupGlobal middlewarelookup.MiddlewareLookupInterface ProducerCallbacks struct { JobDone func(jobRow *rivertype.JobRow) - Stuck func() + Stuck func(ctx context.Context, jobRow *rivertype.JobRow) Unstuck func() } SchedulerInterval time.Duration @@ -125,8 +126,17 @@ type JobExecutor struct { WorkUnit workunit.WorkUnit // Meant to be used from within the job executor only. - start time.Time - stats *jobstats.JobStatistics // initialized by the executor, and handed off to completer + slotClosed atomic.Bool + start time.Time + stats *jobstats.JobStatistics // initialized by the executor, and handed off to completer +} + +// TryCloseSlot marks this executor's producer slot as closed. A closed slot +// means the producer has already stopped counting this executor against its +// active worker capacity, although the executor goroutine may still be running. +// It returns true only the first time the slot is closed. +func (e *JobExecutor) TryCloseSlot() bool { + return e.slotClosed.CompareAndSwap(false, true) } func (e *JobExecutor) Cancel(ctx context.Context) { @@ -262,9 +272,8 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { // their job timeout (plus a small margin) and don't appear to be responding to // context cancellation (unfortunately, quite an easy error to make in Go). // -// Currently we don't do anything if we notice a job is stuck. Knowing about -// stuck jobs is just used for informational purposes in the producer in -// generating periodic stats. +// Producers use stuck-job notifications for periodic stats and optional user +// handlers. func (e *JobExecutor) watchStuck(ctx context.Context, jobTimeout time.Duration) context.CancelFunc { // We add a WithoutCancel here so that this inner goroutine becomes // immune to all context cancellations _except_ the one where it's @@ -281,7 +290,7 @@ func (e *JobExecutor) watchStuck(ctx context.Context, jobTimeout time.Duration) // context cancelled as we leave JobExecutor.execute case <-time.After(jobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)): - e.ProducerCallbacks.Stuck() + e.ProducerCallbacks.Stuck(ctx, e.JobRow) e.Logger.WarnContext(ctx, e.Name+": Job appears to be stuck", slog.Int64("job_id", e.JobRow.ID), diff --git a/internal/jobexecutor/job_executor_test.go b/internal/jobexecutor/job_executor_test.go index 1674fa7e..c149dac3 100644 --- a/internal/jobexecutor/job_executor_test.go +++ b/internal/jobexecutor/job_executor_test.go @@ -195,11 +195,11 @@ func TestJobExecutor_Execute(t *testing.T) { MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(nil), ProducerCallbacks: struct { JobDone func(jobRow *rivertype.JobRow) - Stuck func() + Stuck func(ctx context.Context, jobRow *rivertype.JobRow) Unstuck func() }{ JobDone: func(jobRow *rivertype.JobRow) {}, - Stuck: func() {}, + Stuck: func(ctx context.Context, jobRow *rivertype.JobRow) {}, Unstuck: func() {}, }, SchedulerInterval: riverinternaltest.SchedulerShortInterval, @@ -720,7 +720,7 @@ func TestJobExecutor_Execute(t *testing.T) { informProducerStuckReceived = make(chan struct{}) informProducerUnstuckReceived = make(chan struct{}) ) - executor.ProducerCallbacks.Stuck = func() { + executor.ProducerCallbacks.Stuck = func(ctx context.Context, jobRow *rivertype.JobRow) { t.Log("Job executor reported stuck") close(informProducerStuckReceived) } @@ -761,7 +761,7 @@ func TestJobExecutor_Execute(t *testing.T) { informProducerStuckReceived = make(chan struct{}) informProducerUnstuckReceived = make(chan struct{}) ) - executor.ProducerCallbacks.Stuck = func() { + executor.ProducerCallbacks.Stuck = func(ctx context.Context, jobRow *rivertype.JobRow) { t.Log("Job executor reported stuck") close(informProducerStuckReceived) } @@ -809,7 +809,7 @@ func TestJobExecutor_Execute(t *testing.T) { informProducerStuckReceived = make(chan struct{}) informProducerUnstuckReceived = make(chan struct{}) ) - executor.ProducerCallbacks.Stuck = func() { + executor.ProducerCallbacks.Stuck = func(ctx context.Context, jobRow *rivertype.JobRow) { t.Log("Job executor reported stuck") close(informProducerStuckReceived) } diff --git a/producer.go b/producer.go index dbd40653..d9aba6ad 100644 --- a/producer.go +++ b/producer.go @@ -84,6 +84,9 @@ type producerConfig struct { HookLookupByJob *hooklookup.JobHookLookup HookLookupGlobal hooklookup.HookLookupInterface + JobStuckHandler JobStuckHandler + JobStuckCount *atomic.Int32 + JobStuckThreshold time.Duration JobTimeout time.Duration MaxWorkers int MiddlewareLookupGlobal middlewarelookup.MiddlewareLookupInterface @@ -127,6 +130,15 @@ func (c *producerConfig) mustValidate() *producerConfig { if c.FetchPollInterval <= 0 { panic("producerConfig.FetchPollInterval must be greater than zero") } + if c.JobStuckCount == nil { + c.JobStuckCount = &atomic.Int32{} + } + if c.JobStuckThreshold == 0 { + c.JobStuckThreshold = JobStuckThresholdDefault + } + if c.JobStuckThreshold < 0 { + panic("producerConfig.JobStuckThreshold must be greater or equal to zero") + } if c.JobTimeout < -1 { panic("producerConfig.JobTimeout must be greater or equal to zero") } @@ -729,12 +741,44 @@ func (p *producer) addActiveJob(id int64, executor *jobexecutor.JobExecutor) { } func (p *producer) removeActiveJob(job *rivertype.JobRow) { + executor := p.activeJobs[job.ID] delete(p.activeJobs, job.ID) - p.numJobsActive.Add(-1) + if executor == nil || executor.TryCloseSlot() { + p.numJobsActive.Add(-1) + } p.numJobsRan.Add(1) p.state.JobFinish(job) } +func (p *producer) handleWorkerStuck(ctx context.Context, executor *jobexecutor.JobExecutor, job *rivertype.JobRow) { + p.numJobsStuck.Add(1) + totalStuckJobs := int(p.config.JobStuckCount.Add(1)) + + if p.config.JobStuckHandler == nil { + return + } + + result := p.config.JobStuckHandler(ctx, JobStuckHandlerParams{ + ID: job.ID, + Kind: job.Kind, + Queue: job.Queue, + TotalStuckJobs: totalStuckJobs, + }) + if !result.OpenWorkerSlot || !executor.TryCloseSlot() { + return + } + + p.numJobsActive.Add(-1) + if p.fetchLimiter != nil { + p.fetchLimiter.Call() + } +} + +func (p *producer) handleWorkerUnstuck() { + p.numJobsStuck.Add(-1) + p.config.JobStuckCount.Add(-1) +} + func (p *producer) maybeCancelJob(ctx context.Context, id int64) { executor, ok := p.activeJobs[id] if !ok { @@ -822,7 +866,8 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype. // jobCancel will always be called by the executor to prevent leaks. jobCtx, jobCancel := context.WithCancelCause(workCtx) - executor := baseservice.Init(&p.Archetype, &jobexecutor.JobExecutor{ + var executor *jobexecutor.JobExecutor + executor = baseservice.Init(&p.Archetype, &jobexecutor.JobExecutor{ CancelFunc: jobCancel, ClientJobTimeout: p.jobTimeout, ClientRetryPolicy: p.retryPolicy, @@ -835,15 +880,16 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype. JobRow: job, ProducerCallbacks: struct { JobDone func(jobRow *rivertype.JobRow) - Stuck func() + Stuck func(ctx context.Context, jobRow *rivertype.JobRow) Unstuck func() }{ JobDone: p.handleWorkerDone, - Stuck: func() { p.numJobsStuck.Add(1) }, - Unstuck: func() { p.numJobsStuck.Add(-1) }, + Stuck: func(ctx context.Context, jobRow *rivertype.JobRow) { p.handleWorkerStuck(ctx, executor, jobRow) }, + Unstuck: p.handleWorkerUnstuck, }, - SchedulerInterval: p.config.SchedulerInterval, - WorkUnit: workUnit, + SchedulerInterval: p.config.SchedulerInterval, + StuckThresholdOverride: p.config.JobStuckThreshold, + WorkUnit: workUnit, }) p.addActiveJob(job.ID, executor) diff --git a/producer_test.go b/producer_test.go index c03bd766..0fa1681f 100644 --- a/producer_test.go +++ b/producer_test.go @@ -517,6 +517,105 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin require.Zero(t, producer.maxJobsToFetch()) // zero because all slots are occupied }) + t.Run("JobStuckHandler", func(t *testing.T) { + t.Parallel() + + producer, bundle := setup(t) + producer.config.JobTimeout = 10 * time.Millisecond + producer.config.JobStuckThreshold = time.Millisecond + producer.config.MaxWorkers = 2 + producer.jobTimeout = producer.config.JobTimeout + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + + Num int `json:"num"` + } + + releaseJobs := make(chan struct{}) + defer close(releaseJobs) + + handlerParamsCh := make(chan JobStuckHandlerParams, 2) + producer.config.JobStuckHandler = func(ctx context.Context, params JobStuckHandlerParams) JobStuckHandlerResult { + handlerParamsCh <- params + return JobStuckHandlerResult{} + } + + AddWorker(bundle.workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + <-releaseJobs + return nil + })) + + mustInsert(ctx, t, producer, bundle, &JobArgs{Num: 1}) + mustInsert(ctx, t, producer, bundle, &JobArgs{Num: 2}) + + startProducer(t, ctx, ctx, producer) + + handlerParams := riversharedtest.WaitOrTimeoutN(t, handlerParamsCh, 2) + require.ElementsMatch(t, []int{1, 2}, []int{handlerParams[0].TotalStuckJobs, handlerParams[1].TotalStuckJobs}) + for _, params := range handlerParams { + require.NotZero(t, params.ID) + require.Equal(t, (&JobArgs{}).Kind(), params.Kind) + require.Equal(t, producer.config.Queue, params.Queue) + } + }) + + t.Run("JobStuckHandlerOpensExecutorSlot", func(t *testing.T) { + t.Parallel() + + producer, bundle := setup(t) + producer.config.JobTimeout = 20 * time.Millisecond + producer.config.JobStuckThreshold = time.Millisecond + producer.config.MaxWorkers = 1 + producer.jobTimeout = producer.config.JobTimeout + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + + Num int `json:"num"` + } + + handlerParamsCh := make(chan JobStuckHandlerParams, 2) + producer.config.JobStuckHandler = func(ctx context.Context, params JobStuckHandlerParams) JobStuckHandlerResult { + handlerParamsCh <- params + return JobStuckHandlerResult{OpenWorkerSlot: true} + } + + var ( + firstStarted = make(chan struct{}) + releaseJobs = make(chan struct{}) + secondStarted = make(chan struct{}) + ) + defer close(releaseJobs) + + AddWorker(bundle.workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + switch job.Args.Num { + case 1: + close(firstStarted) + case 2: + close(secondStarted) + default: + require.FailNow(t, "unexpected job num", "num=%d", job.Args.Num) + } + + <-releaseJobs + return nil + })) + + mustInsert(ctx, t, producer, bundle, &JobArgs{Num: 1}) + mustInsert(ctx, t, producer, bundle, &JobArgs{Num: 2}) + + startProducer(t, ctx, ctx, producer) + + riversharedtest.WaitOrTimeout(t, firstStarted) + + handlerParams := riversharedtest.WaitOrTimeout(t, handlerParamsCh) + require.Equal(t, 1, handlerParams.TotalStuckJobs) + + riversharedtest.WaitOrTimeout(t, secondStarted) + require.Equal(t, int32(1), producer.numJobsActive.Load()) + }) + t.Run("StartStopStress", func(t *testing.T) { t.Parallel() diff --git a/rivertest/worker.go b/rivertest/worker.go index 8da01ae0..da4f13f5 100644 --- a/rivertest/worker.go +++ b/rivertest/worker.go @@ -211,11 +211,11 @@ func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(append(rivermiddleware.DefaultMiddleware(), w.config.Middleware...)), ProducerCallbacks: struct { JobDone func(jobRow *rivertype.JobRow) - Stuck func() + Stuck func(ctx context.Context, jobRow *rivertype.JobRow) Unstuck func() }{ JobDone: func(job *rivertype.JobRow) { close(executionDone) }, - Stuck: func() {}, + Stuck: func(ctx context.Context, jobRow *rivertype.JobRow) {}, Unstuck: func() {}, }, SchedulerInterval: maintenance.JobSchedulerIntervalDefault, diff --git a/stuck_job.go b/stuck_job.go new file mode 100644 index 00000000..cb3cb4af --- /dev/null +++ b/stuck_job.go @@ -0,0 +1,32 @@ +package river + +import "context" + +// JobStuckHandler is invoked when a producer detects that a job exceeded its +// timeout and did not return within the configured stuck-job timeout margin. +type JobStuckHandler func(ctx context.Context, params JobStuckHandlerParams) JobStuckHandlerResult + +// JobStuckHandlerParams are parameters passed to JobStuckHandler. +type JobStuckHandlerParams struct { + // ID is the ID of the stuck job. + ID int64 + + // Kind is the kind of the stuck job. + Kind string + + // Queue is the queue where the stuck job is running. + Queue string + + // TotalStuckJobs is the total number of jobs currently considered stuck + // across the client (includes all queues). + TotalStuckJobs int +} + +// JobStuckHandlerResult is the result returned by JobStuckHandler. +type JobStuckHandlerResult struct { + // OpenWorkerSlot instructs River to treat the stuck job as no longer + // occupying a worker slot so another job can begin executing. This can be + // dangerous because the stuck job's goroutine is still running, so the queue + // may temporarily have more active job goroutines than MaxWorkers. + OpenWorkerSlot bool +}