diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 52febfb..b722d3b 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -38,6 +38,10 @@ const ( // deployment record API. Once an artifact is known to be missing, // we suppress further API calls for this duration. unknownArtifactTTL = 1 * time.Hour + + // informerSyncTimeoutDuration is the maximum duration of time allowed + // for the informers to sync to prevent the controller from hanging indefinitely. + informerSyncTimeoutDuration = 60 * time.Second ) type ttlCache interface { @@ -92,6 +96,9 @@ type Controller struct { // best effort cache to suppress API calls for artifacts that // returned a 404 (no artifact found). Keyed by image digest. unknownArtifacts ttlCache + // informerSyncTimeout is the maximum time allowed for all informers to sync + // and prevents sync from hanging indefinitely. + informerSyncTimeout time.Duration } // New creates a new deployment tracker controller. @@ -160,6 +167,7 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato cfg: cfg, observedDeployments: amcache.NewExpiring(), unknownArtifacts: amcache.NewExpiring(), + informerSyncTimeout: informerSyncTimeoutDuration, } // Add event handlers to the informer @@ -320,7 +328,9 @@ func (c *Controller) Run(ctx context.Context, workers int) error { // Wait for the caches to be synced slog.Info("Waiting for informer caches to sync") - if !cache.WaitForCacheSync(ctx.Done(), + informerSyncCtx, cancel := context.WithTimeout(ctx, c.informerSyncTimeout) + + if !cache.WaitForCacheSync(informerSyncCtx.Done(), c.podInformer.HasSynced, c.deploymentInformer.HasSynced, c.daemonSetInformer.HasSynced, @@ -328,8 +338,13 @@ func (c *Controller) Run(ctx context.Context, workers int) error { c.jobInformer.HasSynced, c.cronJobInformer.HasSynced, ) { - return errors.New("timed out waiting for caches to sync") + cancel() + if ctx.Err() != nil { + return fmt.Errorf("cache sync interrupted: %w", ctx.Err()) + } + return errors.New("timed out waiting for caches to sync - please ensure deployment tracker has the correct kubernetes permissions") } + cancel() slog.Info("Starting workers", "count", workers, diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go index f758a4d..e66a61b 100644 --- a/internal/controller/controller_test.go +++ b/internal/controller/controller_test.go @@ -2,6 +2,7 @@ package controller import ( "context" + "errors" "fmt" "sync" "testing" @@ -12,7 +13,11 @@ import ( "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" amcache "k8s.io/apimachinery/pkg/util/cache" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + "k8s.io/client-go/util/workqueue" ) // mockPoster records all PostOne calls and returns a configurable error. @@ -533,3 +538,51 @@ func TestIsTerminalPhase(t *testing.T) { }) } } + +func TestRun_InformerSyncTimeout(t *testing.T) { + t.Parallel() + fakeClient := fake.NewSimpleClientset() + blocker := make(chan struct{}) + fakeClient.PrependReactor("list", "*", func(_ k8stesting.Action) (bool, runtime.Object, error) { + // Block until the test completes. + <-blocker + return true, nil, errors.New("fail") + }) + defer close(blocker) + + factory := createInformerFactory(fakeClient, "", "") + + ctrl := &Controller{ + clientset: fakeClient, + podInformer: factory.Core().V1().Pods().Informer(), + deploymentInformer: factory.Apps().V1().Deployments().Informer(), + daemonSetInformer: factory.Apps().V1().DaemonSets().Informer(), + statefulSetInformer: factory.Apps().V1().StatefulSets().Informer(), + jobInformer: factory.Batch().V1().Jobs().Informer(), + cronJobInformer: factory.Batch().V1().CronJobs().Informer(), + workqueue: workqueue.NewTypedRateLimitingQueue( + workqueue.DefaultTypedControllerRateLimiter[PodEvent](), + ), + apiClient: &mockPoster{}, + cfg: &Config{}, + observedDeployments: amcache.NewExpiring(), + unknownArtifacts: amcache.NewExpiring(), + informerSyncTimeout: 2 * time.Second, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errCh := make(chan error, 1) + go func() { + errCh <- ctrl.Run(ctx, 1) + }() + + select { + case err := <-errCh: + require.Error(t, err) + assert.Contains(t, err.Error(), "timed out waiting for caches to sync") + case <-time.After(5 * time.Second): + t.Fatal("Run did not return within 5 seconds — informer sync timeout was 2 seconds") + } +}