diff --git a/client.go b/client.go index 3f29e545..6d7014ca 100644 --- a/client.go +++ b/client.go @@ -11,6 +11,7 @@ import ( "regexp" "strings" "sync" + "sync/atomic" "time" "github.com/riverqueue/river/internal/dblist" @@ -46,11 +47,12 @@ const ( FetchPollIntervalDefault = 1 * time.Second FetchPollIntervalMin = 1 * time.Millisecond - JobTimeoutDefault = 1 * time.Minute - MaxAttemptsDefault = rivercommon.MaxAttemptsDefault - PriorityDefault = rivercommon.PriorityDefault - QueueDefault = rivercommon.QueueDefault - QueueNumWorkersMax = 10_000 + JobStuckThresholdDefault = 10 * time.Second + JobTimeoutDefault = 1 * time.Minute + MaxAttemptsDefault = rivercommon.MaxAttemptsDefault + PriorityDefault = rivercommon.PriorityDefault + QueueDefault = rivercommon.QueueDefault + QueueNumWorkersMax = 10_000 ) var ( @@ -192,6 +194,25 @@ type Config struct { // instances of rivertype.JobInsertMiddleware). JobInsertMiddleware []rivertype.JobInsertMiddleware + // JobStuckHandler is invoked when a producer detects that a job exceeded + // its timeout and did not return from context cancellation within the + // allotted JobStuckThreshold (and if it didn't, we usually assume it won't + // return at all). The handler receives minimal information about the stuck + // job and the total number of jobs currently considered stuck across the + // client. + // + // JobStuckHandler lets an implementation indicate that a new worker slot + // should be opened to replace the one now occupied by a stuck job. It can + // also be used (for example) to stop and exit the program if too many jobs + // have been reported stuck. + JobStuckHandler JobStuckHandler + + // JobStuckThreshold is the amount of time after JobTimeout elapses to + // wait before a still-running job is considered stuck. + // + // Defaults to 10 seconds. + JobStuckThreshold time.Duration + // JobTimeout is the maximum amount of time a job is allowed to run before its // context is cancelled. A timeout of zero means JobTimeoutDefault will be // used, whereas a value of -1 means the job's context will not be cancelled @@ -471,6 +492,8 @@ func (c *Config) WithDefaults() *Config { ID: valutil.ValOrDefaultFunc(c.ID, func() string { return defaultClientID(time.Now().UTC()) }), Hooks: c.Hooks, JobInsertMiddleware: c.JobInsertMiddleware, + JobStuckHandler: c.JobStuckHandler, + JobStuckThreshold: cmp.Or(c.JobStuckThreshold, JobStuckThresholdDefault), JobTimeout: cmp.Or(c.JobTimeout, JobTimeoutDefault), Logger: logger, MaxAttempts: cmp.Or(c.MaxAttempts, MaxAttemptsDefault), @@ -521,6 +544,9 @@ func (c *Config) validate() error { if c.JobTimeout < -1 { return errors.New("JobTimeout cannot be negative, except for -1 (infinite)") } + if c.JobStuckThreshold < 0 { + return errors.New("JobStuckThreshold cannot be less than zero") + } if c.MaxAttempts < 0 { return errors.New("MaxAttempts cannot be less than zero") } @@ -670,6 +696,7 @@ type Client[TTx any] struct { queues *QueueBundle services []startstop.Service stopped <-chan struct{} + stuckJobCount atomic.Int32 subscriptionManager *subscriptionManager testSignals clientTestSignals @@ -2260,6 +2287,9 @@ func (c *Client[TTx]) producerAdd(queueName string, queueConfig QueueConfig) (*p FetchPollInterval: cmp.Or(queueConfig.FetchPollInterval, c.config.FetchPollInterval), HookLookupByJob: c.hookLookupByJob, HookLookupGlobal: c.hookLookupGlobal, + JobStuckHandler: c.config.JobStuckHandler, + JobStuckCount: &c.stuckJobCount, + JobStuckThreshold: c.config.JobStuckThreshold, JobTimeout: c.config.JobTimeout, MaxWorkers: queueConfig.MaxWorkers, MiddlewareLookupGlobal: c.middlewareLookupGlobal, diff --git a/client_test.go b/client_test.go index 9352dcbb..d3c666fe 100644 --- a/client_test.go +++ b/client_test.go @@ -8094,6 +8094,8 @@ func Test_NewClient_Defaults(t *testing.T) { require.Nil(t, client.config.ErrorHandler) require.Equal(t, FetchCooldownDefault, client.config.FetchCooldown) require.Equal(t, FetchPollIntervalDefault, client.config.FetchPollInterval) + require.Nil(t, client.config.JobStuckHandler) + require.Equal(t, JobStuckThresholdDefault, client.config.JobStuckThreshold) require.Equal(t, JobTimeoutDefault, client.config.JobTimeout) require.Nil(t, client.config.Hooks) require.NotZero(t, client.baseService.Logger) @@ -8117,6 +8119,9 @@ func Test_NewClient_Overrides(t *testing.T) { ) errorHandler := &testErrorHandler{} + jobStuckHandler := JobStuckHandler(func(ctx context.Context, params JobStuckHandlerParams) JobStuckHandlerResult { + return JobStuckHandlerResult{} + }) logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) workers := NewWorkers() @@ -8146,6 +8151,8 @@ func Test_NewClient_Overrides(t *testing.T) { FetchPollInterval: 124 * time.Millisecond, Hooks: []rivertype.Hook{&noOpHook{}}, JobInsertMiddleware: []rivertype.JobInsertMiddleware{&noOpInsertMiddleware{}}, + JobStuckHandler: jobStuckHandler, + JobStuckThreshold: 126 * time.Millisecond, JobTimeout: 125 * time.Millisecond, Logger: logger, MaxAttempts: 5, @@ -8184,6 +8191,8 @@ func Test_NewClient_Overrides(t *testing.T) { require.Equal(t, 123*time.Millisecond, client.config.FetchCooldown) require.Equal(t, 124*time.Millisecond, client.config.FetchPollInterval) require.Len(t, client.config.JobInsertMiddleware, 1) + require.NotNil(t, client.config.JobStuckHandler) + require.Equal(t, 126*time.Millisecond, client.config.JobStuckThreshold) require.Equal(t, 125*time.Millisecond, client.config.JobTimeout) require.Equal(t, []rivertype.Hook{&noOpHook{}}, client.config.Hooks) require.Equal(t, logger, client.baseService.Logger) @@ -8366,6 +8375,13 @@ func Test_NewClient_Validations(t *testing.T) { require.Equal(t, MaxAttemptsDefault, client.config.MaxAttempts) }, }, + { + name: "JobStuckThreshold cannot be less than zero", + configFunc: func(config *Config) { + config.JobStuckThreshold = -1 + }, + wantErr: errors.New("JobStuckThreshold cannot be less than zero"), + }, { name: "Middleware can be configured independently", configFunc: func(config *Config) { diff --git a/example_job_stuck_handler_test.go b/example_job_stuck_handler_test.go new file mode 100644 index 00000000..ab4e71fe --- /dev/null +++ b/example_job_stuck_handler_test.go @@ -0,0 +1,118 @@ +package river_test + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/util/testutil" +) + +type StuckJobHandlerArgs struct{} + +func (StuckJobHandlerArgs) Kind() string { return "stuck_job_handler" } + +type StuckJobHandlerWorker struct { + river.WorkerDefaults[StuckJobHandlerArgs] + + releaseJobs chan struct{} + started chan struct{} +} + +func (w *StuckJobHandlerWorker) Work(ctx context.Context, job *river.Job[StuckJobHandlerArgs]) error { + w.started <- struct{}{} + + // Ignore ctx.Done() to simulate a job that doesn't respond to cancellation. + <-w.releaseJobs + + return nil +} + +// Example_jobStuckHandler demonstrates how to use JobStuckHandler to stop a +// client when too many jobs are stuck so the process can be restarted. For the +// first couple stuck jobs it uses OpenWorkerSlot to add additional worker slots +// to replace those occupied by stuck jobs, but after maxStuckJobsBeforeRestart +// it gives up and exits so it can be restarted. +func Example_jobStuckHandler() { + ctx := context.Background() + + dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL()) + if err != nil { + panic(err) + } + defer dbPool.Close() + + var riverClient *river.Client[pgx.Tx] + + const maxStuckJobsBeforeRestart = 2 + + var ( + releaseJobs = make(chan struct{}) + restartRequested = make(chan struct{}) + started = make(chan struct{}, maxStuckJobsBeforeRestart+1) + stopOnce sync.Once + ) + + workers := river.NewWorkers() + river.AddWorker(workers, &StuckJobHandlerWorker{releaseJobs: releaseJobs, started: started}) + + riverClient, err = river.NewClient(riverpgxv5.New(dbPool), initTestConfig(ctx, dbPool, &river.Config{ + JobStuckHandler: func(ctx context.Context, params river.JobStuckHandlerParams) river.JobStuckHandlerResult { + if params.TotalStuckJobs <= maxStuckJobsBeforeRestart { + fmt.Printf("stuck jobs: %d; opening replacement worker slot\n", params.TotalStuckJobs) + return river.JobStuckHandlerResult{OpenWorkerSlot: true} + } + + stopOnce.Do(func() { + fmt.Printf("too many stuck jobs: %d; shutting down so the process can restart\n", params.TotalStuckJobs) + close(restartRequested) + + shutdownCtx := context.WithoutCancel(ctx) + go func() { + if err := riverClient.Stop(shutdownCtx); err != nil { + panic(err) + } + }() + }) + + return river.JobStuckHandlerResult{} + }, + JobStuckThreshold: time.Millisecond, + JobTimeout: 10 * time.Millisecond, + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 1}, + }, + Workers: workers, + })) + if err != nil { + panic(err) + } + + if err := riverClient.Start(ctx); err != nil { + panic(err) + } + + for range maxStuckJobsBeforeRestart + 1 { + if _, err := riverClient.Insert(ctx, StuckJobHandlerArgs{}, nil); err != nil { + panic(err) + } + } + + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), started, maxStuckJobsBeforeRestart+1) + + <-restartRequested + close(releaseJobs) + <-riverClient.Stopped() + + // Output: + // stuck jobs: 1; opening replacement worker slot + // stuck jobs: 2; opening replacement worker slot + // too many stuck jobs: 3; shutting down so the process can restart +} diff --git a/internal/jobexecutor/job_executor.go b/internal/jobexecutor/job_executor.go index dab44f39..6c5f2eba 100644 --- a/internal/jobexecutor/job_executor.go +++ b/internal/jobexecutor/job_executor.go @@ -9,6 +9,7 @@ import ( "log/slog" "runtime" "strings" + "sync/atomic" "time" "github.com/tidwall/gjson" @@ -116,7 +117,7 @@ type JobExecutor struct { MiddlewareLookupGlobal middlewarelookup.MiddlewareLookupInterface ProducerCallbacks struct { JobDone func(jobRow *rivertype.JobRow) - Stuck func() + Stuck func(ctx context.Context, jobRow *rivertype.JobRow) Unstuck func() } SchedulerInterval time.Duration @@ -125,8 +126,17 @@ type JobExecutor struct { WorkUnit workunit.WorkUnit // Meant to be used from within the job executor only. - start time.Time - stats *jobstats.JobStatistics // initialized by the executor, and handed off to completer + slotClosed atomic.Bool + start time.Time + stats *jobstats.JobStatistics // initialized by the executor, and handed off to completer +} + +// TryCloseSlot marks this executor's producer slot as closed. A closed slot +// means the producer has already stopped counting this executor against its +// active worker capacity, although the executor goroutine may still be running. +// It returns true only the first time the slot is closed. +func (e *JobExecutor) TryCloseSlot() bool { + return e.slotClosed.CompareAndSwap(false, true) } func (e *JobExecutor) Cancel(ctx context.Context) { @@ -262,9 +272,8 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { // their job timeout (plus a small margin) and don't appear to be responding to // context cancellation (unfortunately, quite an easy error to make in Go). // -// Currently we don't do anything if we notice a job is stuck. Knowing about -// stuck jobs is just used for informational purposes in the producer in -// generating periodic stats. +// Producers use stuck-job notifications for periodic stats and optional user +// handlers. func (e *JobExecutor) watchStuck(ctx context.Context, jobTimeout time.Duration) context.CancelFunc { // We add a WithoutCancel here so that this inner goroutine becomes // immune to all context cancellations _except_ the one where it's @@ -281,7 +290,7 @@ func (e *JobExecutor) watchStuck(ctx context.Context, jobTimeout time.Duration) // context cancelled as we leave JobExecutor.execute case <-time.After(jobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)): - e.ProducerCallbacks.Stuck() + e.ProducerCallbacks.Stuck(ctx, e.JobRow) e.Logger.WarnContext(ctx, e.Name+": Job appears to be stuck", slog.Int64("job_id", e.JobRow.ID), diff --git a/internal/jobexecutor/job_executor_test.go b/internal/jobexecutor/job_executor_test.go index 1674fa7e..c149dac3 100644 --- a/internal/jobexecutor/job_executor_test.go +++ b/internal/jobexecutor/job_executor_test.go @@ -195,11 +195,11 @@ func TestJobExecutor_Execute(t *testing.T) { MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(nil), ProducerCallbacks: struct { JobDone func(jobRow *rivertype.JobRow) - Stuck func() + Stuck func(ctx context.Context, jobRow *rivertype.JobRow) Unstuck func() }{ JobDone: func(jobRow *rivertype.JobRow) {}, - Stuck: func() {}, + Stuck: func(ctx context.Context, jobRow *rivertype.JobRow) {}, Unstuck: func() {}, }, SchedulerInterval: riverinternaltest.SchedulerShortInterval, @@ -720,7 +720,7 @@ func TestJobExecutor_Execute(t *testing.T) { informProducerStuckReceived = make(chan struct{}) informProducerUnstuckReceived = make(chan struct{}) ) - executor.ProducerCallbacks.Stuck = func() { + executor.ProducerCallbacks.Stuck = func(ctx context.Context, jobRow *rivertype.JobRow) { t.Log("Job executor reported stuck") close(informProducerStuckReceived) } @@ -761,7 +761,7 @@ func TestJobExecutor_Execute(t *testing.T) { informProducerStuckReceived = make(chan struct{}) informProducerUnstuckReceived = make(chan struct{}) ) - executor.ProducerCallbacks.Stuck = func() { + executor.ProducerCallbacks.Stuck = func(ctx context.Context, jobRow *rivertype.JobRow) { t.Log("Job executor reported stuck") close(informProducerStuckReceived) } @@ -809,7 +809,7 @@ func TestJobExecutor_Execute(t *testing.T) { informProducerStuckReceived = make(chan struct{}) informProducerUnstuckReceived = make(chan struct{}) ) - executor.ProducerCallbacks.Stuck = func() { + executor.ProducerCallbacks.Stuck = func(ctx context.Context, jobRow *rivertype.JobRow) { t.Log("Job executor reported stuck") close(informProducerStuckReceived) } diff --git a/producer.go b/producer.go index dbd40653..d9aba6ad 100644 --- a/producer.go +++ b/producer.go @@ -84,6 +84,9 @@ type producerConfig struct { HookLookupByJob *hooklookup.JobHookLookup HookLookupGlobal hooklookup.HookLookupInterface + JobStuckHandler JobStuckHandler + JobStuckCount *atomic.Int32 + JobStuckThreshold time.Duration JobTimeout time.Duration MaxWorkers int MiddlewareLookupGlobal middlewarelookup.MiddlewareLookupInterface @@ -127,6 +130,15 @@ func (c *producerConfig) mustValidate() *producerConfig { if c.FetchPollInterval <= 0 { panic("producerConfig.FetchPollInterval must be greater than zero") } + if c.JobStuckCount == nil { + c.JobStuckCount = &atomic.Int32{} + } + if c.JobStuckThreshold == 0 { + c.JobStuckThreshold = JobStuckThresholdDefault + } + if c.JobStuckThreshold < 0 { + panic("producerConfig.JobStuckThreshold must be greater or equal to zero") + } if c.JobTimeout < -1 { panic("producerConfig.JobTimeout must be greater or equal to zero") } @@ -729,12 +741,44 @@ func (p *producer) addActiveJob(id int64, executor *jobexecutor.JobExecutor) { } func (p *producer) removeActiveJob(job *rivertype.JobRow) { + executor := p.activeJobs[job.ID] delete(p.activeJobs, job.ID) - p.numJobsActive.Add(-1) + if executor == nil || executor.TryCloseSlot() { + p.numJobsActive.Add(-1) + } p.numJobsRan.Add(1) p.state.JobFinish(job) } +func (p *producer) handleWorkerStuck(ctx context.Context, executor *jobexecutor.JobExecutor, job *rivertype.JobRow) { + p.numJobsStuck.Add(1) + totalStuckJobs := int(p.config.JobStuckCount.Add(1)) + + if p.config.JobStuckHandler == nil { + return + } + + result := p.config.JobStuckHandler(ctx, JobStuckHandlerParams{ + ID: job.ID, + Kind: job.Kind, + Queue: job.Queue, + TotalStuckJobs: totalStuckJobs, + }) + if !result.OpenWorkerSlot || !executor.TryCloseSlot() { + return + } + + p.numJobsActive.Add(-1) + if p.fetchLimiter != nil { + p.fetchLimiter.Call() + } +} + +func (p *producer) handleWorkerUnstuck() { + p.numJobsStuck.Add(-1) + p.config.JobStuckCount.Add(-1) +} + func (p *producer) maybeCancelJob(ctx context.Context, id int64) { executor, ok := p.activeJobs[id] if !ok { @@ -822,7 +866,8 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype. // jobCancel will always be called by the executor to prevent leaks. jobCtx, jobCancel := context.WithCancelCause(workCtx) - executor := baseservice.Init(&p.Archetype, &jobexecutor.JobExecutor{ + var executor *jobexecutor.JobExecutor + executor = baseservice.Init(&p.Archetype, &jobexecutor.JobExecutor{ CancelFunc: jobCancel, ClientJobTimeout: p.jobTimeout, ClientRetryPolicy: p.retryPolicy, @@ -835,15 +880,16 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype. JobRow: job, ProducerCallbacks: struct { JobDone func(jobRow *rivertype.JobRow) - Stuck func() + Stuck func(ctx context.Context, jobRow *rivertype.JobRow) Unstuck func() }{ JobDone: p.handleWorkerDone, - Stuck: func() { p.numJobsStuck.Add(1) }, - Unstuck: func() { p.numJobsStuck.Add(-1) }, + Stuck: func(ctx context.Context, jobRow *rivertype.JobRow) { p.handleWorkerStuck(ctx, executor, jobRow) }, + Unstuck: p.handleWorkerUnstuck, }, - SchedulerInterval: p.config.SchedulerInterval, - WorkUnit: workUnit, + SchedulerInterval: p.config.SchedulerInterval, + StuckThresholdOverride: p.config.JobStuckThreshold, + WorkUnit: workUnit, }) p.addActiveJob(job.ID, executor) diff --git a/producer_test.go b/producer_test.go index c03bd766..0fa1681f 100644 --- a/producer_test.go +++ b/producer_test.go @@ -517,6 +517,105 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin require.Zero(t, producer.maxJobsToFetch()) // zero because all slots are occupied }) + t.Run("JobStuckHandler", func(t *testing.T) { + t.Parallel() + + producer, bundle := setup(t) + producer.config.JobTimeout = 10 * time.Millisecond + producer.config.JobStuckThreshold = time.Millisecond + producer.config.MaxWorkers = 2 + producer.jobTimeout = producer.config.JobTimeout + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + + Num int `json:"num"` + } + + releaseJobs := make(chan struct{}) + defer close(releaseJobs) + + handlerParamsCh := make(chan JobStuckHandlerParams, 2) + producer.config.JobStuckHandler = func(ctx context.Context, params JobStuckHandlerParams) JobStuckHandlerResult { + handlerParamsCh <- params + return JobStuckHandlerResult{} + } + + AddWorker(bundle.workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + <-releaseJobs + return nil + })) + + mustInsert(ctx, t, producer, bundle, &JobArgs{Num: 1}) + mustInsert(ctx, t, producer, bundle, &JobArgs{Num: 2}) + + startProducer(t, ctx, ctx, producer) + + handlerParams := riversharedtest.WaitOrTimeoutN(t, handlerParamsCh, 2) + require.ElementsMatch(t, []int{1, 2}, []int{handlerParams[0].TotalStuckJobs, handlerParams[1].TotalStuckJobs}) + for _, params := range handlerParams { + require.NotZero(t, params.ID) + require.Equal(t, (&JobArgs{}).Kind(), params.Kind) + require.Equal(t, producer.config.Queue, params.Queue) + } + }) + + t.Run("JobStuckHandlerOpensExecutorSlot", func(t *testing.T) { + t.Parallel() + + producer, bundle := setup(t) + producer.config.JobTimeout = 20 * time.Millisecond + producer.config.JobStuckThreshold = time.Millisecond + producer.config.MaxWorkers = 1 + producer.jobTimeout = producer.config.JobTimeout + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + + Num int `json:"num"` + } + + handlerParamsCh := make(chan JobStuckHandlerParams, 2) + producer.config.JobStuckHandler = func(ctx context.Context, params JobStuckHandlerParams) JobStuckHandlerResult { + handlerParamsCh <- params + return JobStuckHandlerResult{OpenWorkerSlot: true} + } + + var ( + firstStarted = make(chan struct{}) + releaseJobs = make(chan struct{}) + secondStarted = make(chan struct{}) + ) + defer close(releaseJobs) + + AddWorker(bundle.workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + switch job.Args.Num { + case 1: + close(firstStarted) + case 2: + close(secondStarted) + default: + require.FailNow(t, "unexpected job num", "num=%d", job.Args.Num) + } + + <-releaseJobs + return nil + })) + + mustInsert(ctx, t, producer, bundle, &JobArgs{Num: 1}) + mustInsert(ctx, t, producer, bundle, &JobArgs{Num: 2}) + + startProducer(t, ctx, ctx, producer) + + riversharedtest.WaitOrTimeout(t, firstStarted) + + handlerParams := riversharedtest.WaitOrTimeout(t, handlerParamsCh) + require.Equal(t, 1, handlerParams.TotalStuckJobs) + + riversharedtest.WaitOrTimeout(t, secondStarted) + require.Equal(t, int32(1), producer.numJobsActive.Load()) + }) + t.Run("StartStopStress", func(t *testing.T) { t.Parallel() diff --git a/rivertest/worker.go b/rivertest/worker.go index 8da01ae0..da4f13f5 100644 --- a/rivertest/worker.go +++ b/rivertest/worker.go @@ -211,11 +211,11 @@ func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(append(rivermiddleware.DefaultMiddleware(), w.config.Middleware...)), ProducerCallbacks: struct { JobDone func(jobRow *rivertype.JobRow) - Stuck func() + Stuck func(ctx context.Context, jobRow *rivertype.JobRow) Unstuck func() }{ JobDone: func(job *rivertype.JobRow) { close(executionDone) }, - Stuck: func() {}, + Stuck: func(ctx context.Context, jobRow *rivertype.JobRow) {}, Unstuck: func() {}, }, SchedulerInterval: maintenance.JobSchedulerIntervalDefault, diff --git a/stuck_job.go b/stuck_job.go new file mode 100644 index 00000000..cb3cb4af --- /dev/null +++ b/stuck_job.go @@ -0,0 +1,32 @@ +package river + +import "context" + +// JobStuckHandler is invoked when a producer detects that a job exceeded its +// timeout and did not return within the configured stuck-job timeout margin. +type JobStuckHandler func(ctx context.Context, params JobStuckHandlerParams) JobStuckHandlerResult + +// JobStuckHandlerParams are parameters passed to JobStuckHandler. +type JobStuckHandlerParams struct { + // ID is the ID of the stuck job. + ID int64 + + // Kind is the kind of the stuck job. + Kind string + + // Queue is the queue where the stuck job is running. + Queue string + + // TotalStuckJobs is the total number of jobs currently considered stuck + // across the client (includes all queues). + TotalStuckJobs int +} + +// JobStuckHandlerResult is the result returned by JobStuckHandler. +type JobStuckHandlerResult struct { + // OpenWorkerSlot instructs River to treat the stuck job as no longer + // occupying a worker slot so another job can begin executing. This can be + // dangerous because the stuck job's goroutine is still running, so the queue + // may temporarily have more active job goroutines than MaxWorkers. + OpenWorkerSlot bool +}