diff --git a/cmd/cachewd/main.go b/cmd/cachewd/main.go index e1ccf95..788cada 100644 --- a/cmd/cachewd/main.go +++ b/cmd/cachewd/main.go @@ -111,7 +111,11 @@ func main() { }) s3ClientProvider := s3client.NewClientProvider(ctx, globalConfig.S3Config) - schedulerProvider := jobscheduler.NewProvider(ctx, globalConfig.SchedulerConfig) + // The scheduler gets its own context so workers keep running during + // graceful shutdown while in-flight HTTP handlers drain. We cancel it + // explicitly after server.Shutdown completes. + schedulerCtx, cancelScheduler := context.WithCancel(context.WithoutCancel(ctx)) + schedulerProvider := jobscheduler.NewProvider(schedulerCtx, globalConfig.SchedulerConfig) cr, mr, sr := newRegistries(schedulerProvider, gitManagerProvider, tokenManagerProvider, s3ClientProvider) @@ -163,6 +167,9 @@ func main() { } gracefulShutdown(ctx, logger, server, &shuttingDown, globalConfig.ShutdownReadinessDelay, globalConfig.ShutdownTimeout) + + cancelScheduler() + drainScheduler(ctx, logger, schedulerProvider) } // gracefulShutdown fails readiness, waits readinessDelay for load balancers @@ -191,6 +198,26 @@ func gracefulShutdown( } } +const schedulerDrainTimeout = 10 * time.Second + +func drainScheduler(ctx context.Context, logger *slog.Logger, provider jobscheduler.Provider) { + scheduler, err := provider() + if err != nil { + return + } + done := make(chan struct{}) + go func() { + scheduler.Wait() + close(done) + }() + select { + case <-done: + logger.InfoContext(ctx, "Scheduler drained cleanly") + case <-time.After(schedulerDrainTimeout): + logger.WarnContext(ctx, "Scheduler drain timed out, exiting with in-flight jobs") + } +} + func newRegistries( scheduler jobscheduler.Provider, cloneManagerProvider gitclone.ManagerProvider, diff --git a/internal/jobscheduler/jobs_test.go b/internal/jobscheduler/jobs_test.go index 228b6c9..75516b4 100644 --- a/internal/jobscheduler/jobs_test.go +++ b/internal/jobscheduler/jobs_test.go @@ -335,6 +335,57 @@ func TestJobSchedulerSubmitDroppedAfterShutdown(t *testing.T) { assert.False(t, executed.Load(), "submissions after shutdown should be dropped") } +// TestJobSchedulerSurvivesParentCancel verifies the shutdown ordering fix: +// when the scheduler is created with context.WithoutCancel, cancelling the +// parent (simulating SIGTERM) does NOT kill workers. Jobs submitted after the +// parent cancel still execute. Only cancelling the scheduler's own context +// stops the workers. +func TestJobSchedulerSurvivesParentCancel(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + parentCtx, cancelParent := context.WithCancel(ctx) + + // Simulate the production fix: scheduler gets context.WithoutCancel so + // it is decoupled from the signal context. + schedulerCtx, cancelScheduler := context.WithCancel(context.WithoutCancel(parentCtx)) + defer cancelScheduler() + + scheduler := newTestScheduler(schedulerCtx, t, jobscheduler.Config{Concurrency: 2}) + + // Submit a job and confirm it runs. + var firstJob atomic.Bool + scheduler.Submit("q1", "before-sigterm", func(_ context.Context) error { + firstJob.Store(true) + return nil + }) + eventually(t, time.Second, firstJob.Load, "job before parent cancel should run") + + // Cancel the parent context (simulates SIGTERM arriving). + cancelParent() + time.Sleep(50 * time.Millisecond) + + // Workers should still be alive — submit another job and verify it runs. + var afterCancel atomic.Bool + scheduler.Submit("q2", "after-sigterm", func(_ context.Context) error { + afterCancel.Store(true) + return nil + }) + eventually(t, time.Second, afterCancel.Load, + "job submitted after parent cancel should still execute") + + // Now cancel the scheduler's own context (simulates post-Shutdown teardown). + cancelScheduler() + time.Sleep(50 * time.Millisecond) + + var postShutdown atomic.Bool + scheduler.Submit("q3", "post-shutdown", func(_ context.Context) error { + postShutdown.Store(true) + return nil + }) + time.Sleep(100 * time.Millisecond) + assert.False(t, postShutdown.Load(), + "job submitted after scheduler cancel should be dropped") +} + func TestJobSchedulerMultipleQueues(t *testing.T) { _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) ctx, cancel := context.WithCancel(ctx)