diff --git a/client_test.go b/client_test.go index 6148b584..00d4d821 100644 --- a/client_test.go +++ b/client_test.go @@ -987,6 +987,46 @@ func Test_Client_Common(t *testing.T) { }) }) + t.Run("CancelRunningJobPollOnly", func(t *testing.T) { + t.Parallel() + + config, bundle := setupConfig(t) + + client, err := NewClient(NewDriverPollOnly(bundle.dbPool), config) + require.NoError(t, err) + + jobStartedChan := make(chan int64) + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + jobStartedChan <- job.ID + <-ctx.Done() + return ctx.Err() + })) + + subscribeChan := subscribe(t, client) + startClient(ctx, t, client) + riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) + + insertRes, err := client.Insert(ctx, &JobArgs{}, nil) + require.NoError(t, err) + + startedJobID := riversharedtest.WaitOrTimeout(t, jobStartedChan) + require.Equal(t, insertRes.Job.ID, startedJobID) + + updatedJob, err := client.JobCancel(ctx, insertRes.Job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRunning, updatedJob.State) + + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCancelled, event.Kind) + require.Equal(t, rivertype.JobStateCancelled, event.Job.State) + require.WithinDuration(t, time.Now(), *event.Job.FinalizedAt, 2*time.Second) + }) + t.Run("CancelScheduledJob", func(t *testing.T) { t.Parallel() diff --git a/producer.go b/producer.go index c753814c..96cf3f7a 100644 --- a/producer.go +++ b/producer.go @@ -528,8 +528,11 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context) { case msg := <-p.queueControlCh: switch msg.Action { case controlActionCancel: - // Separate this case to make linter happy: - p.Logger.DebugContext(workCtx, p.Name+": Unhandled queue control action", "action", msg.Action) + select { + case p.cancelCh <- msg.JobID: + default: + p.Logger.WarnContext(workCtx, p.Name+": Job cancel notification dropped due to full buffer", slog.Int64("job_id", msg.JobID)) + } case controlActionMetadataChanged: p.Logger.DebugContext(workCtx, p.Name+": Queue metadata changed", slog.String("queue", p.config.Queue), slog.String("queue_in_message", msg.Queue)) p.testSignals.MetadataChanged.Signal(struct{}{})