From e913611bc8dc87748309a121776982a9cb195b50 Mon Sep 17 00:00:00 2001 From: Josh Friend Date: Fri, 15 May 2026 16:14:53 -0400 Subject: [PATCH] fix(cachewd): decouple scheduler context from signal context The graceful shutdown PR (#307) passes the signal-notified context to the scheduler, so SIGTERM immediately kills all workers even though server.Shutdown keeps HTTP handlers alive for up to 150s. In-flight jobs (snapshots, repacks, fetches) get context-cancelled mid-execution. Give the scheduler its own context via context.WithoutCancel (same pattern already used for the HTTP server BaseContext) and cancel it after server.Shutdown completes. This lets workers finish current jobs during the drain window. The scheduler drain is bounded to 10s so long-running jobs don't block process exit past the pod's terminationGracePeriodSeconds. --- cmd/cachewd/main.go | 29 ++++++++++++++++- internal/jobscheduler/jobs_test.go | 51 ++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/cmd/cachewd/main.go b/cmd/cachewd/main.go index e1ccf95..788cada 100644 --- a/cmd/cachewd/main.go +++ b/cmd/cachewd/main.go @@ -111,7 +111,11 @@ func main() { }) s3ClientProvider := s3client.NewClientProvider(ctx, globalConfig.S3Config) - schedulerProvider := jobscheduler.NewProvider(ctx, globalConfig.SchedulerConfig) + // The scheduler gets its own context so workers keep running during + // graceful shutdown while in-flight HTTP handlers drain. We cancel it + // explicitly after server.Shutdown completes. + schedulerCtx, cancelScheduler := context.WithCancel(context.WithoutCancel(ctx)) + schedulerProvider := jobscheduler.NewProvider(schedulerCtx, globalConfig.SchedulerConfig) cr, mr, sr := newRegistries(schedulerProvider, gitManagerProvider, tokenManagerProvider, s3ClientProvider) @@ -163,6 +167,9 @@ func main() { } gracefulShutdown(ctx, logger, server, &shuttingDown, globalConfig.ShutdownReadinessDelay, globalConfig.ShutdownTimeout) + + cancelScheduler() + drainScheduler(ctx, logger, schedulerProvider) } // gracefulShutdown fails readiness, waits readinessDelay for load balancers @@ -191,6 +198,26 @@ func gracefulShutdown( } } +const schedulerDrainTimeout = 10 * time.Second + +func drainScheduler(ctx context.Context, logger *slog.Logger, provider jobscheduler.Provider) { + scheduler, err := provider() + if err != nil { + return + } + done := make(chan struct{}) + go func() { + scheduler.Wait() + close(done) + }() + select { + case <-done: + logger.InfoContext(ctx, "Scheduler drained cleanly") + case <-time.After(schedulerDrainTimeout): + logger.WarnContext(ctx, "Scheduler drain timed out, exiting with in-flight jobs") + } +} + func newRegistries( scheduler jobscheduler.Provider, cloneManagerProvider gitclone.ManagerProvider, diff --git a/internal/jobscheduler/jobs_test.go b/internal/jobscheduler/jobs_test.go index 228b6c9..75516b4 100644 --- a/internal/jobscheduler/jobs_test.go +++ b/internal/jobscheduler/jobs_test.go @@ -335,6 +335,57 @@ func TestJobSchedulerSubmitDroppedAfterShutdown(t *testing.T) { assert.False(t, executed.Load(), "submissions after shutdown should be dropped") } +// TestJobSchedulerSurvivesParentCancel verifies the shutdown ordering fix: +// when the scheduler is created with context.WithoutCancel, cancelling the +// parent (simulating SIGTERM) does NOT kill workers. Jobs submitted after the +// parent cancel still execute. Only cancelling the scheduler's own context +// stops the workers. +func TestJobSchedulerSurvivesParentCancel(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + parentCtx, cancelParent := context.WithCancel(ctx) + + // Simulate the production fix: scheduler gets context.WithoutCancel so + // it is decoupled from the signal context. + schedulerCtx, cancelScheduler := context.WithCancel(context.WithoutCancel(parentCtx)) + defer cancelScheduler() + + scheduler := newTestScheduler(schedulerCtx, t, jobscheduler.Config{Concurrency: 2}) + + // Submit a job and confirm it runs. + var firstJob atomic.Bool + scheduler.Submit("q1", "before-sigterm", func(_ context.Context) error { + firstJob.Store(true) + return nil + }) + eventually(t, time.Second, firstJob.Load, "job before parent cancel should run") + + // Cancel the parent context (simulates SIGTERM arriving). + cancelParent() + time.Sleep(50 * time.Millisecond) + + // Workers should still be alive — submit another job and verify it runs. + var afterCancel atomic.Bool + scheduler.Submit("q2", "after-sigterm", func(_ context.Context) error { + afterCancel.Store(true) + return nil + }) + eventually(t, time.Second, afterCancel.Load, + "job submitted after parent cancel should still execute") + + // Now cancel the scheduler's own context (simulates post-Shutdown teardown). + cancelScheduler() + time.Sleep(50 * time.Millisecond) + + var postShutdown atomic.Bool + scheduler.Submit("q3", "post-shutdown", func(_ context.Context) error { + postShutdown.Store(true) + return nil + }) + time.Sleep(100 * time.Millisecond) + assert.False(t, postShutdown.Load(), + "job submitted after scheduler cancel should be dropped") +} + func TestJobSchedulerMultipleQueues(t *testing.T) { _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) ctx, cancel := context.WithCancel(ctx)