From 33439c273a54349d7bd3ce8d8bd3cdfe7d974e89 Mon Sep 17 00:00:00 2001 From: Ata Safapour Date: Sun, 10 May 2026 03:07:50 +0330 Subject: [PATCH] fix: forward controlActionCancel to cancelCh in poll-mode fetchAndRunLoop When using drivers that don't support LISTEN/NOTIFY (e.g. riverdatabasesql), job cancel events are routed in-process via queueControlCh. The controlActionCancel case was missing from fetchAndRunLoop's queueControlCh handler, causing cancel events to be silently dropped and ctx.Done() to never fire inside a running Work() call. Forward the job ID to cancelCh so the existing maybeCancelJob call handles it, matching the behaviour of the LISTEN/NOTIFY path in handleControlNotification. Adds a test that verifies ctx.Done() fires in a running job after JobCancel is called when using a poll-only driver (SupportsListener() == false). --- client_test.go | 40 ++++++++++++++++++++++++++++++++++++++++ producer.go | 7 +++++-- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/client_test.go b/client_test.go index 6148b584..00d4d821 100644 --- a/client_test.go +++ b/client_test.go @@ -987,6 +987,46 @@ func Test_Client_Common(t *testing.T) { }) }) + t.Run("CancelRunningJobPollOnly", func(t *testing.T) { + t.Parallel() + + config, bundle := setupConfig(t) + + client, err := NewClient(NewDriverPollOnly(bundle.dbPool), config) + require.NoError(t, err) + + jobStartedChan := make(chan int64) + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + jobStartedChan <- job.ID + <-ctx.Done() + return ctx.Err() + })) + + subscribeChan := subscribe(t, client) + startClient(ctx, t, client) + riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) + + insertRes, err := client.Insert(ctx, &JobArgs{}, nil) + require.NoError(t, err) + + startedJobID := riversharedtest.WaitOrTimeout(t, jobStartedChan) + require.Equal(t, insertRes.Job.ID, startedJobID) + + updatedJob, err := client.JobCancel(ctx, insertRes.Job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRunning, updatedJob.State) + + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCancelled, event.Kind) + require.Equal(t, rivertype.JobStateCancelled, event.Job.State) + require.WithinDuration(t, time.Now(), *event.Job.FinalizedAt, 2*time.Second) + }) + t.Run("CancelScheduledJob", func(t *testing.T) { t.Parallel() diff --git a/producer.go b/producer.go index c753814c..96cf3f7a 100644 --- a/producer.go +++ b/producer.go @@ -528,8 +528,11 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context) { case msg := <-p.queueControlCh: switch msg.Action { case controlActionCancel: - // Separate this case to make linter happy: - p.Logger.DebugContext(workCtx, p.Name+": Unhandled queue control action", "action", msg.Action) + select { + case p.cancelCh <- msg.JobID: + default: + p.Logger.WarnContext(workCtx, p.Name+": Job cancel notification dropped due to full buffer", slog.Int64("job_id", msg.JobID)) + } case controlActionMetadataChanged: p.Logger.DebugContext(workCtx, p.Name+": Queue metadata changed", slog.String("queue", p.config.Queue), slog.String("queue_in_message", msg.Queue)) p.testSignals.MetadataChanged.Signal(struct{}{})