Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ const (
// deployment record API. Once an artifact is known to be missing,
// we suppress further API calls for this duration.
unknownArtifactTTL = 1 * time.Hour

// informerSyncTimeoutDuration is the maximum duration of time allowed
// for the informers to sync to prevent the controller from hanging indefinitely.
informerSyncTimeoutDuration = 60 * time.Second
)

type ttlCache interface {
Expand Down Expand Up @@ -92,6 +96,9 @@ type Controller struct {
// best effort cache to suppress API calls for artifacts that
// returned a 404 (no artifact found). Keyed by image digest.
unknownArtifacts ttlCache
// informerSyncTimeout is the maximum time allowed for all informers to sync
// and prevents sync from hanging indefinitely.
informerSyncTimeout time.Duration
}

// New creates a new deployment tracker controller.
Expand Down Expand Up @@ -160,6 +167,7 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato
cfg: cfg,
observedDeployments: amcache.NewExpiring(),
unknownArtifacts: amcache.NewExpiring(),
informerSyncTimeout: informerSyncTimeoutDuration,
}

// Add event handlers to the informer
Expand Down Expand Up @@ -320,16 +328,23 @@ func (c *Controller) Run(ctx context.Context, workers int) error {

// Wait for the caches to be synced
slog.Info("Waiting for informer caches to sync")
if !cache.WaitForCacheSync(ctx.Done(),
informerSyncCtx, cancel := context.WithTimeout(ctx, c.informerSyncTimeout)

if !cache.WaitForCacheSync(informerSyncCtx.Done(),
c.podInformer.HasSynced,
c.deploymentInformer.HasSynced,
c.daemonSetInformer.HasSynced,
c.statefulSetInformer.HasSynced,
c.jobInformer.HasSynced,
c.cronJobInformer.HasSynced,
) {
return errors.New("timed out waiting for caches to sync")
cancel()
if ctx.Err() != nil {
return fmt.Errorf("cache sync interrupted: %w", ctx.Err())
}
return errors.New("timed out waiting for caches to sync - please ensure deployment tracker has the correct kubernetes permissions")
}
cancel()

slog.Info("Starting workers",
"count", workers,
Expand Down
53 changes: 53 additions & 0 deletions internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"errors"
"fmt"
"sync"
"testing"
Expand All @@ -12,7 +13,11 @@ import (
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
amcache "k8s.io/apimachinery/pkg/util/cache"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/util/workqueue"
)

// mockPoster records all PostOne calls and returns a configurable error.
Expand Down Expand Up @@ -533,3 +538,51 @@ func TestIsTerminalPhase(t *testing.T) {
})
}
}

func TestRun_InformerSyncTimeout(t *testing.T) {
t.Parallel()
fakeClient := fake.NewSimpleClientset()
blocker := make(chan struct{})
fakeClient.PrependReactor("list", "*", func(_ k8stesting.Action) (bool, runtime.Object, error) {
// Block until the test completes.
<-blocker
return true, nil, errors.New("fail")
})
defer close(blocker)

factory := createInformerFactory(fakeClient, "", "")

ctrl := &Controller{
clientset: fakeClient,
podInformer: factory.Core().V1().Pods().Informer(),
deploymentInformer: factory.Apps().V1().Deployments().Informer(),
daemonSetInformer: factory.Apps().V1().DaemonSets().Informer(),
statefulSetInformer: factory.Apps().V1().StatefulSets().Informer(),
jobInformer: factory.Batch().V1().Jobs().Informer(),
cronJobInformer: factory.Batch().V1().CronJobs().Informer(),
workqueue: workqueue.NewTypedRateLimitingQueue(
workqueue.DefaultTypedControllerRateLimiter[PodEvent](),
),
apiClient: &mockPoster{},
cfg: &Config{},
observedDeployments: amcache.NewExpiring(),
unknownArtifacts: amcache.NewExpiring(),
informerSyncTimeout: 2 * time.Second,
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errCh := make(chan error, 1)
go func() {
errCh <- ctrl.Run(ctx, 1)
}()

select {
case err := <-errCh:
require.Error(t, err)
assert.Contains(t, err.Error(), "timed out waiting for caches to sync")
case <-time.After(5 * time.Second):
t.Fatal("Run did not return within 5 seconds — informer sync timeout was 2 seconds")
}
}
Loading