diff --git a/go.mod b/go.mod index c0672750..d54c310c 100644 --- a/go.mod +++ b/go.mod @@ -41,10 +41,12 @@ require ( github.com/goccy/go-json v0.10.6 github.com/gojuno/minimock/v3 v3.4.7 github.com/itchyny/gojq v0.12.17 + github.com/ldmonster/kubeclient v0.0.0-20260522082709-ed73652c723f github.com/muesli/termenv v0.16.0 github.com/onsi/ginkgo/v2 v2.27.5 github.com/spf13/cobra v1.10.2 github.com/spf13/pflag v1.0.9 + sigs.k8s.io/controller-runtime v0.20.4 ) require ( @@ -133,7 +135,6 @@ require ( k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect - sigs.k8s.io/controller-runtime v0.20.4 // indirect sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect sigs.k8s.io/kustomize/api v0.19.0 // indirect sigs.k8s.io/kustomize/kyaml v0.19.0 // indirect diff --git a/go.sum b/go.sum index a8b19062..6d99617a 100644 --- a/go.sum +++ b/go.sum @@ -223,6 +223,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/ldmonster/kubeclient v0.0.0-20260522082709-ed73652c723f h1:hTyM8+nWGxBczLaa0HzjXbKJuMjdbZQa9ZB2F0wdO04= +github.com/ldmonster/kubeclient v0.0.0-20260522082709-ed73652c723f/go.mod h1:Q8GOTVz5hMCvWJjTmeLRQ79yp+AkX76yuNL/R66gybk= github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= diff --git a/pkg/app/README.md b/pkg/app/README.md index 5e6f61f0..373ecafd 100644 --- a/pkg/app/README.md +++ b/pkg/app/README.md @@ -315,11 +315,16 @@ err := so.AssembleCommonOperatorFromConfig(cfg, []string{ The new `AssembleCommonOperatorFromConfig(cfg, labels)` method on `*ShellOperator` is what makes this clean — it derives both -`KubeClientConfig`s (main + object-patcher), the HTTP listen address/port, -and the metric prefix from `cfg`, so the consumer does not have to unpack -fields by hand. The older primitive-taking +`KubeClientConfig`s (main + object-patcher), the deduplicated-kubeclient +configuration (`DedupClientConfig`), the HTTP listen address/port, and the +metric prefix from `cfg`, so the consumer does not have to unpack fields by +hand. The older primitive-taking `AssembleCommonOperator(listenAddress, listenPort, labels, mainKubeCfg, patcherKubeCfg)` -is still available for callers that need finer control. +is still available for callers that need finer control; it preserves its +original signature and disables the dedup cache. To enable the dedup cache +without going through `*app.Config`, use the new +`AssembleCommonOperatorWithDedupClient(...)` variant which adds a final +`DedupClientConfig` parameter. If you also want env-var parsing on top of your own values, call `ParseEnv(cfg)` between steps 1 and 2 — env values will overlay the fields you diff --git a/pkg/app/app_config.go b/pkg/app/app_config.go index 64c47a63..e95848f9 100644 --- a/pkg/app/app_config.go +++ b/pkg/app/app_config.go @@ -34,6 +34,32 @@ type ObjectPatcherSettings struct { KubeClientTimeout time.Duration `env:"KUBE_CLIENT_TIMEOUT"` } +// DedupClientSettings configures the deduplicated kubeclient cache provided by +// github.com/ldmonster/kubeclient. The cache stores a single canonical copy of +// each repeated value and subtree across watched objects, dramatically lowering +// in-memory footprint for clusters with many similar resources (e.g. +// templated Deployments). All settings here are optional; when Enabled is +// false the client is not constructed at all. List-typed env vars use a comma +// separator: GVK strings follow the form "//" (the +// group may be empty for core resources, e.g. "/v1/Pod"). +type DedupClientSettings struct { + Enabled bool `env:"ENABLED"` + Namespaces []string `env:"NAMESPACES" envSeparator:","` + WatchGVKs []string `env:"WATCH_GVKS" envSeparator:","` + ReconstructLRUSize int `env:"RECONSTRUCT_LRU_SIZE"` + GCInterval time.Duration `env:"GC_INTERVAL"` + + // SnapshotStore enables a process-wide deduplicated SnapshotStore that + // backs every kubernetes-binding monitor's per-object cache. When on, + // `*Unstructured` bodies live exactly once in memory across all + // resourceInformers (refcounted), trading a small per-snapshot-read CPU + // cost for a substantial drop in RSS for workloads with many similar + // objects. Independent of the runtime DedupClient (Enabled flag): the + // snapshot store can be turned on without spinning up any kubeclient + // informers. + SnapshotStore bool `env:"SNAPSHOT_STORE"` +} + // AdmissionSettings holds settings for the validating-webhook server. type AdmissionSettings struct { ConfigurationName string `env:"CONFIGURATION_NAME"` @@ -83,6 +109,7 @@ type Config struct { App AppSettings `envPrefix:"SHELL_OPERATOR_"` Kube KubeSettings `envPrefix:"KUBE_"` ObjectPatcher ObjectPatcherSettings `envPrefix:"OBJECT_PATCHER_"` + DedupClient DedupClientSettings `envPrefix:"DEDUP_CLIENT_"` Admission AdmissionSettings `envPrefix:"VALIDATING_WEBHOOK_"` Conversion ConversionSettings `envPrefix:"CONVERSION_WEBHOOK_"` Debug DebugSettings `envPrefix:"DEBUG_"` diff --git a/pkg/app/flags.go b/pkg/app/flags.go index 810dfcfe..6c89ce89 100644 --- a/pkg/app/flags.go +++ b/pkg/app/flags.go @@ -22,11 +22,13 @@ func BindFlags(cfg *Config, rootCmd *cobra.Command, cmd *cobra.Command) func() { bindLogFlags(cfg, cmd) applyAdmission := bindAdmissionWebhookFlags(cfg, cmd) applyConversion := bindConversionWebhookFlags(cfg, cmd) + applyDedup := bindDedupClientFlags(cfg, cmd) bindDebugFlags(cfg, rootCmd, cmd) return func() { applyAdmission() applyConversion() + applyDedup() } } @@ -106,6 +108,55 @@ func bindConversionWebhookFlags(cfg *Config, cmd *cobra.Command) func() { } } +// bindDedupClientFlags registers flags for the deduplicated kubeclient cache. +// The two []string fields (Namespaces and WatchGVKs) follow the same pattern +// used by the validating-webhook ClientCA flag: any explicit CLI invocation +// fully replaces the env-var derived slice; otherwise the env value is kept. +func bindDedupClientFlags(cfg *Config, cmd *cobra.Command) func() { + f := cmd.Flags() + f.BoolVar(&cfg.DedupClient.Enabled, "dedup-client-enabled", cfg.DedupClient.Enabled, + "Enable the deduplicated kubeclient cache (github.com/ldmonster/kubeclient). "+ + "When set, shell-operator builds a controller-runtime compatible client backed "+ + "by a deduplicated store. Can be set with $DEDUP_CLIENT_ENABLED.") + f.BoolVar(&cfg.DedupClient.SnapshotStore, "dedup-client-snapshot-store", cfg.DedupClient.SnapshotStore, + "Back per-monitor object snapshots with a process-wide deduplicated store "+ + "(github.com/ldmonster/kubeclient/store). Trades a small per-snapshot-read CPU "+ + "cost for a substantial drop in RSS when many monitors observe similar objects. "+ + "Independent of --dedup-client-enabled. Can be set with $DEDUP_CLIENT_SNAPSHOT_STORE.") + f.IntVar(&cfg.DedupClient.ReconstructLRUSize, "dedup-client-reconstruct-lru-size", + cfg.DedupClient.ReconstructLRUSize, + "Size of the LRU that memoises reconstructed Unstructured objects in the dedup cache. "+ + "Zero disables reconstruction caching. Can be set with $DEDUP_CLIENT_RECONSTRUCT_LRU_SIZE.") + f.DurationVar(&cfg.DedupClient.GCInterval, "dedup-client-gc-interval", + cfg.DedupClient.GCInterval, + "How often the deduplicated store reclaims unused interned values and subtrees. "+ + "Zero leaves the kubeclient default in place. Can be set with $DEDUP_CLIENT_GC_INTERVAL.") + + envNamespaces := cfg.DedupClient.Namespaces + envGVKs := cfg.DedupClient.WatchGVKs + var cliNamespaces, cliGVKs []string + f.StringArrayVar(&cliNamespaces, "dedup-client-namespace", nil, + "Namespace to restrict the dedup cache to. Repeat the flag to add more, or pass a "+ + "comma-separated list via $DEDUP_CLIENT_NAMESPACES. Empty means all namespaces.") + f.StringArrayVar(&cliGVKs, "dedup-client-watch-gvk", nil, + "GroupVersionKind to pre-register with the dedup cache, formatted as "+ + "\"//\" (the group is empty for core resources, e.g. \"/v1/Pod\"). "+ + "Repeat the flag to add more, or pass a comma-separated list via $DEDUP_CLIENT_WATCH_GVKS.") + + return func() { + if len(cliNamespaces) > 0 { + cfg.DedupClient.Namespaces = cliNamespaces + } else { + cfg.DedupClient.Namespaces = envNamespaces + } + if len(cliGVKs) > 0 { + cfg.DedupClient.WatchGVKs = cliGVKs + } else { + cfg.DedupClient.WatchGVKs = envGVKs + } + } +} + func bindLogFlags(cfg *Config, cmd *cobra.Command) { f := cmd.Flags() f.StringVar(&cfg.Log.Level, "log-level", cfg.Log.Level, "Logging level: debug, info, error. Default is info. Can be set with $LOG_LEVEL.") diff --git a/pkg/kube/dedupclient/README.md b/pkg/kube/dedupclient/README.md new file mode 100644 index 00000000..1388a596 --- /dev/null +++ b/pkg/kube/dedupclient/README.md @@ -0,0 +1,131 @@ +# pkg/kube/dedupclient + +Two-part integration of +[`github.com/ldmonster/kubeclient`](https://github.com/ldmonster/kubeclient) +into shell-operator. Both parts can be enabled independently — the one that +moves the RSS needle for typical workloads is the **SnapshotStore**. + +| Component | Type | Purpose | Flag | +| ---------------- | -------------------------- | -------------------------------------------------------------------------------------------------------- | ----------------------------------- | +| `Client` | `*kubeclient.DedupClient` wrapper | Controller-runtime compatible Kubernetes client for hooks/extensions, with its own deduplicated cache. | `--dedup-client-enabled` | +| `SnapshotStore` | `*store.DedupStore` wrapper | Process-wide, reference-counted cache that backs every kube-events-manager monitor's per-object snapshot. **This is what reduces memory.** | `--dedup-client-snapshot-store` | + +For clusters with thousands of similar resources (e.g. templated +`Deployment`s) the upstream store reports **60–90 %** lower cache memory +usage thanks to value interning and subtree deduplication. + +## Quick start + +```go +import ( + klient "github.com/flant/kube-client/client" + "github.com/flant/shell-operator/pkg/kube/dedupclient" +) + +func newDedup(main *klient.Client, logger *log.Logger) (*dedupclient.Client, error) { + mapper, _ := main.ToRESTMapper() + return dedupclient.New(dedupclient.Config{ + RESTConfig: main.RestConfig(), + RESTMapper: mapper, + Namespaces: []string{"kube-system", "default"}, // empty = all + WatchGVKs: []schema.GroupVersionKind{ + {Group: "", Version: "v1", Kind: "Pod"}, + {Group: "apps", Version: "v1", Kind: "Deployment"}, + }, + ReconstructLRUSize: 4096, // 0 disables reconstruction caching + }, logger) +} +``` + +`Start(ctx)` spins up the cache run loop in a single dedicated goroutine and +returns immediately. `Shutdown(ctx)` cancels the loop and waits for it to +exit (or for `ctx` to expire). + +## How shell-operator wires it up + +When `app.Config.DedupClient.Enabled` is `true`, +`AssembleCommonOperatorFromConfig` calls `initDedupClient`, which hands the +main `klient.Client`'s `rest.Config` and `RESTMapper` to +`dedupclient.New`. The resulting `*Client` is stored on +`shell_operator.ShellOperator.DedupClient`, started during `op.Start()`, and +stopped from `op.Shutdown()`. + +Configuration knobs (env vars / CLI flags): + +| Env var | Flag | Meaning | +| ------------------------------------ | ------------------------------------ | ------------------------------------------------------------ | +| `DEDUP_CLIENT_ENABLED` | `--dedup-client-enabled` | Construct the deduplicated client at all. | +| `DEDUP_CLIENT_SNAPSHOT_STORE` | `--dedup-client-snapshot-store` | Back per-monitor snapshots with the shared dedup store. Independent of `--dedup-client-enabled`. | +| `DEDUP_CLIENT_NAMESPACES` | `--dedup-client-namespace` | Comma-separated (env) or repeated (flag) namespace allow-list. Empty = all. | +| `DEDUP_CLIENT_WATCH_GVKS` | `--dedup-client-watch-gvk` | GVKs to pre-register, formatted as `//` (group empty for core). | +| `DEDUP_CLIENT_RECONSTRUCT_LRU_SIZE` | `--dedup-client-reconstruct-lru-size`| LRU size for reconstructed Unstructured objects. 0 disables. | +| `DEDUP_CLIENT_GC_INTERVAL` | `--dedup-client-gc-interval` | GC interval for unused interned values/subtrees. 0 = upstream default. | + +When both features are off the wrappers are **not** constructed at all, so +this integration adds zero runtime overhead to existing deployments. + +## SnapshotStore — the memory win + +`SnapshotStore` plugs into shell-operator's `pkg/kube_events_manager` so that +every monitor's `cachedObjects` map stops holding `*Unstructured` pointers +and instead stores `(resourceId → store.ObjectKey)` references into a +process-wide, reference-counted dedup store. + +What changes when the flag is on (`MonitorConfig.KeepFullObjectsInMemory == true`): + +- Each `resourceInformer` calls `SnapshotStore.Acquire(ownerID, key, obj)` on + initial-list and on Add/Modified events. The store de-duplicates field + values and subtrees across every object it holds. +- The per-monitor `*ObjectAndFilterResult` keeps `Object == nil`; the + authoritative body lives once in the store. +- `monitor.Snapshot()` reconstitutes `Object` lazily by calling + `SnapshotStore.Get(key)`. Reconstitution is a fresh allocation per call, + which trades a small CPU cost for the memory drop. +- On informer shutdown, all keys held by that informer are released. The + underlying object is removed from the store only when the last owner + releases it, so overlapping watches are correctly handled. + +When `KeepFullObjectsInMemory == false`, the existing "no full body kept" +path takes precedence and the store is bypassed for that monitor — there is +no benefit to deduplicating bodies you've already chosen to discard. + +### When does it actually save memory? + +The win scales with two factors: + +1. **Cross-factory duplication.** Each unique + `(GVR, namespace, fieldSelector, labelSelector)` gets its own client-go + informer cache today. When several monitors observe overlapping object + sets through *different* selectors, every cache holds its own copy. Once + `SnapshotStore` is on, the bodies converge to a single deduplicated copy + regardless of how many monitors observe them. +2. **Intra-object subtree duplication.** Even within a single GVR, similar + objects share substantial structure — e.g. a thousand Pods generated + from one template share `securityContext`, `tolerations`, `resources`, + and most label/annotation keys. Value interning + subtree dedup encode + those shared parts once. + +If your hooks rarely call `Snapshot()` on each event the CPU cost of +reconstruction is negligible; if they do (and operate on huge snapshots), +benchmark before turning it on. + +## Debug endpoint + +Once registered (automatically in `bootstrap.go`), the debug server exposes: + +``` +GET /dedup-client/status.{json|yaml|text} +``` + +The response carries the status of both components: + +```json +{ + "client": { "enabled": true, "cacheSyncedHint": false }, + "snapshotStore": { "enabled": true, "liveObjects": 1284, "totalAcquires": 5012, "totalReleases": 3728, "totalDeletes": 211 } +} +``` + +Each component reports `enabled: false` with a clear `reason` when its flag +is not set, so liveness probes can distinguish "not configured" from +"errored". diff --git a/pkg/kube/dedupclient/client.go b/pkg/kube/dedupclient/client.go new file mode 100644 index 00000000..f5be6bcf --- /dev/null +++ b/pkg/kube/dedupclient/client.go @@ -0,0 +1,204 @@ +package dedupclient + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + "sync/atomic" + + "github.com/deckhouse/deckhouse/pkg/log" + "github.com/ldmonster/kubeclient" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/flant/shell-operator/pkg" +) + +// ErrAlreadyStarted is returned by Start when the cache loop is already +// running for this Client instance. It is a non-fatal sentinel intended to +// make idempotent Start() calls safe. +var ErrAlreadyStarted = errors.New("dedupclient: cache already started") + +// Client is shell-operator's wrapper around *kubeclient.DedupClient. +// +// It owns the cache lifecycle: a single goroutine invokes the underlying +// kubeclient.Start (which blocks until ctx is cancelled), exposes +// WaitForCacheSync, and provides a Shutdown that signals the run loop and +// waits for it to exit. The wrapper deliberately keeps the public surface +// small — callers reach through to the embedded controller-runtime +// client.Client when they need full read/write access. +type Client struct { + client.Client + + dedup *kubeclient.DedupClient + logger *log.Logger + + // startOnce guards Start() so the cache loop is only spawned once even + // if Start is called concurrently from several places. + startOnce sync.Once + started atomic.Bool + + // cancel cancels the context handed to the underlying cache's Start + // method, terminating the run loop. + cancel context.CancelFunc + + // done is closed when the cache run loop returns. + done chan struct{} + + // startErr captures the error (if any) returned by the underlying + // kubeclient.DedupClient.Start. It is set at most once. + startErr atomic.Value // holds error +} + +// New constructs a Client from cfg. RESTConfig is required; all other fields +// are optional. The returned Client is not started — call Start to spin up +// the cache informers. +func New(cfg Config, logger *log.Logger) (*Client, error) { + if cfg.RESTConfig == nil { + return nil, fmt.Errorf("dedupclient: rest.Config is required") + } + if logger == nil { + logger = log.NewLogger() + } + logger = logger.With(pkg.LogKeyOperatorComponent, "dedup-kube-client") + + opts := buildOptions(cfg) + + dc, err := kubeclient.New(cfg.RESTConfig, opts...) + if err != nil { + return nil, fmt.Errorf("dedupclient: construct kubeclient: %w", err) + } + + return &Client{ + Client: dc, + dedup: dc, + logger: logger, + done: make(chan struct{}), + }, nil +} + +// buildOptions translates Config into kubeclient.Option values, omitting +// any option whose corresponding Config field is at its zero value so the +// upstream defaults remain in effect. +func buildOptions(cfg Config) []kubeclient.Option { + var opts []kubeclient.Option + if cfg.Scheme != nil { + opts = append(opts, kubeclient.WithScheme(cfg.Scheme)) + } + if cfg.RESTMapper != nil { + opts = append(opts, kubeclient.WithRESTMapper(cfg.RESTMapper)) + } + if len(cfg.Namespaces) > 0 { + opts = append(opts, kubeclient.WithNamespaces(cfg.Namespaces...)) + } + if len(cfg.WatchGVKs) > 0 { + opts = append(opts, kubeclient.WithGVKs(cfg.WatchGVKs...)) + } + if cfg.ReconstructLRUSize > 0 { + opts = append(opts, kubeclient.WithReconstructionCache(cfg.ReconstructLRUSize)) + } + if cfg.GCInterval > 0 { + opts = append(opts, kubeclient.WithGCInterval(cfg.GCInterval)) + } + return opts +} + +// Dedup returns the underlying *kubeclient.DedupClient for callers that +// need direct access to its full API (Cache, Scheme, RESTMapper, etc.). +// Most callers should use the embedded client.Client surface instead. +func (c *Client) Dedup() *kubeclient.DedupClient { + return c.dedup +} + +// Start launches the cache run loop in a dedicated goroutine. Subsequent +// calls return ErrAlreadyStarted without spawning additional goroutines. +// +// The supplied parent context governs the cache's lifetime: when it is +// cancelled (or Shutdown is called) the underlying kubeclient.Start +// returns and the goroutine exits. Start itself is non-blocking and +// returns as soon as the goroutine has been scheduled. +func (c *Client) Start(parent context.Context) error { + if parent == nil { + parent = context.Background() + } + + already := true + c.startOnce.Do(func() { + already = false + + ctx, cancel := context.WithCancel(parent) + c.cancel = cancel + c.started.Store(true) + + go func() { + defer close(c.done) + c.logger.Info("dedup cache run loop starting") + err := c.dedup.Start(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + c.startErr.Store(err) + c.logger.Error("dedup cache run loop exited with error", log.Err(err)) + return + } + c.logger.Info("dedup cache run loop exited") + }() + }) + + if already { + return ErrAlreadyStarted + } + return nil +} + +// WaitForCacheSync blocks until every registered informer has performed an +// initial List or until ctx is cancelled. It returns true on success and +// false when the wait was aborted (or no Start has happened yet). +func (c *Client) WaitForCacheSync(ctx context.Context) bool { + if !c.started.Load() { + return false + } + return c.dedup.WaitForCacheSync(ctx) +} + +// EnsureInformer registers obj's GVK with the cache and starts an informer +// for it if one is not already running. It is a convenience pass-through +// to the underlying cache so callers do not have to drill through Dedup(). +func (c *Client) EnsureInformer(ctx context.Context, obj client.Object) error { + if c.dedup == nil { + return fmt.Errorf("dedupclient: client is not initialised") + } + return c.dedup.Cache().EnsureInformer(ctx, obj) +} + +// Shutdown signals the run loop to terminate and blocks until it has +// returned (or ctx is cancelled). Calling Shutdown before Start, or after +// a previous Shutdown has already returned, is a safe no-op. +func (c *Client) Shutdown(ctx context.Context) error { + if !c.started.Load() { + return nil + } + if c.cancel != nil { + c.cancel() + } + select { + case <-c.done: + c.logger.Debug("dedup cache shutdown complete") + if v := c.startErr.Load(); v != nil { + if err, ok := v.(error); ok { + return err + } + } + return nil + case <-ctx.Done(): + c.logger.Warn("dedup cache shutdown timed out", slog.String("error", ctx.Err().Error())) + return ctx.Err() + } +} + +// Scheme returns the runtime.Scheme used by the underlying client. It is +// kept here so callers don't have to import the kubeclient package just +// to read it back. +func (c *Client) Scheme() *runtime.Scheme { + return c.dedup.Scheme() +} diff --git a/pkg/kube/dedupclient/client_test.go b/pkg/kube/dedupclient/client_test.go new file mode 100644 index 00000000..b926d619 --- /dev/null +++ b/pkg/kube/dedupclient/client_test.go @@ -0,0 +1,97 @@ +package dedupclient + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" +) + +func TestNew_RequiresRESTConfig(t *testing.T) { + t.Parallel() + + _, err := New(Config{}, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "rest.Config is required") +} + +// TestNew_AcceptsZeroValueOptions exercises the buildOptions translation +// path: every Config field is left at its zero value, so kubeclient must +// receive no options at all and use its built-in defaults. +func TestNew_AcceptsZeroValueOptions(t *testing.T) { + t.Parallel() + + c, err := New(Config{RESTConfig: &rest.Config{Host: "http://127.0.0.1:0"}}, nil) + require.NoError(t, err) + require.NotNil(t, c) + assert.NotNil(t, c.Dedup(), "underlying *kubeclient.DedupClient must be set") +} + +func TestNew_PropagatesNonZeroOptions(t *testing.T) { + t.Parallel() + + scheme := runtime.NewScheme() + cfg := Config{ + RESTConfig: &rest.Config{Host: "http://127.0.0.1:0"}, + Scheme: scheme, + Namespaces: []string{"kube-system"}, + WatchGVKs: []schema.GroupVersionKind{{Group: "", Version: "v1", Kind: "Pod"}}, + ReconstructLRUSize: 128, + GCInterval: 15 * time.Second, + } + + c, err := New(cfg, nil) + require.NoError(t, err) + require.NotNil(t, c) + // Scheme round-trip is the only directly observable Option from the + // resulting client; the rest are stored privately by kubeclient. + assert.Same(t, scheme, c.Scheme()) +} + +func TestShutdown_BeforeStart_NoOp(t *testing.T) { + t.Parallel() + + c, err := New(Config{RESTConfig: &rest.Config{Host: "http://127.0.0.1:0"}}, nil) + require.NoError(t, err) + + err = c.Shutdown(context.Background()) + require.NoError(t, err, "Shutdown before Start must be a no-op") +} + +func TestStart_RepeatedCallsAreIdempotent(t *testing.T) { + t.Parallel() + + c, err := New(Config{RESTConfig: &rest.Config{Host: "http://127.0.0.1:0"}}, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + require.NoError(t, c.Start(ctx)) + // A second Start must report ErrAlreadyStarted without spawning a new + // goroutine. + err = c.Start(ctx) + require.ErrorIs(t, err, ErrAlreadyStarted) + + // Cancelling the parent context drains the run loop; Shutdown then + // returns immediately because <-c.done is already closed. + cancel() + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCancel() + require.NoError(t, c.Shutdown(shutdownCtx)) +} + +func TestWaitForCacheSync_ReturnsFalseBeforeStart(t *testing.T) { + t.Parallel() + + c, err := New(Config{RESTConfig: &rest.Config{Host: "http://127.0.0.1:0"}}, nil) + require.NoError(t, err) + + assert.False(t, c.WaitForCacheSync(context.Background()), + "WaitForCacheSync must return false until Start has been called") +} diff --git a/pkg/kube/dedupclient/config.go b/pkg/kube/dedupclient/config.go new file mode 100644 index 00000000..1522ecef --- /dev/null +++ b/pkg/kube/dedupclient/config.go @@ -0,0 +1,57 @@ +// Package dedupclient integrates github.com/ldmonster/kubeclient — a +// controller-runtime compatible Kubernetes client backed by a deduplicated +// cache — with shell-operator. It exposes a thin wrapper that wires the +// underlying *kubeclient.DedupClient into shell-operator's lifecycle +// (configuration, construction, start/stop) without leaking the dependency +// across the rest of the codebase. +package dedupclient + +import ( + "time" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" +) + +// Config carries the parameters needed to construct a DedupClient. It is +// populated either from app.Config (production path) or directly by tests +// and library consumers that already hold the relevant Kubernetes plumbing. +// +// RESTConfig is the only mandatory field; all other fields have sensible +// defaults and may be left zero-valued. +type Config struct { + // RESTConfig is the *rest.Config used by the underlying dynamic client. + // Required. + RESTConfig *rest.Config + + // RESTMapper resolves GroupKind/Version → GroupVersionResource. When nil, + // kubeclient falls back to meta.NewDefaultRESTMapper(nil). + RESTMapper meta.RESTMapper + + // Scheme is the runtime.Scheme used for typed object conversion. When + // nil, kubeclient creates an empty scheme; populate it with your types + // if you intend to call Get/List with typed (non-Unstructured) objects. + Scheme *runtime.Scheme + + // Namespaces restricts cached objects to a list of namespaces. An empty + // or nil slice means "all namespaces". + Namespaces []string + + // WatchGVKs is the set of GVKs to pre-register with the cache so that + // informers spin up at Start() time. Additional GVKs can be added later + // via the Cache.EnsureInformer API. + WatchGVKs []schema.GroupVersionKind + + // ReconstructLRUSize sets the size of the per-cache LRU that memoises + // reconstructed Unstructured objects. Zero disables reconstruction + // caching (objects are rebuilt from the deduplicated store on every + // access). + ReconstructLRUSize int + + // GCInterval controls how often the deduplicated store reclaims unused + // interned values and subtrees. Zero leaves the kubeclient default in + // place (5 minutes at the time of writing). + GCInterval time.Duration +} diff --git a/pkg/kube/dedupclient/snapshot_store.go b/pkg/kube/dedupclient/snapshot_store.go new file mode 100644 index 00000000..0dbee0cb --- /dev/null +++ b/pkg/kube/dedupclient/snapshot_store.go @@ -0,0 +1,236 @@ +package dedupclient + +import ( + "fmt" + "log/slog" + "sync" + + "github.com/deckhouse/deckhouse/pkg/log" + "github.com/ldmonster/kubeclient/store" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + + "github.com/flant/shell-operator/pkg" +) + +// SnapshotStore is a reference-counted, deduplicated object cache that +// shell-operator's kube-events-manager uses as the canonical home of full +// `*Unstructured` bodies. It wraps `*store.DedupStore` from the upstream +// kubeclient library, which gives us value-interning and subtree +// deduplication: identical scalar fields (e.g. recurring annotations, +// `securityContext` blocks, `tolerations`, `resources.limits`) are stored +// once and referenced by every object that contains them. For workloads +// where many similar objects coexist — typical for cluster-wide hooks on +// Pods or Deployments — peak RSS for the cached object population drops +// substantially compared to holding raw `*Unstructured` per consumer. +// +// Reference counting is needed because multiple `resourceInformer`s may +// independently watch the same object (e.g. two hooks on overlapping +// LabelSelectors). Each informer is treated as an *owner*: it calls +// Acquire on Add/Modified events and Release on Delete or shutdown. +// The underlying object is removed from the dedup store only when the +// last owner releases it; that keeps the per-monitor view consistent +// even though the store itself is shared globally. +// +// SnapshotStore is safe for concurrent use. +type SnapshotStore struct { + logger *log.Logger + + dedup store.Store + + // mu guards refs. refs[key] is the set of owner IDs that currently + // hold this object. The presence (or absence) of a key in dedup is + // derived from refs: when refs[key] becomes empty, we Delete from + // dedup and remove the map entry. + mu sync.Mutex + refs map[store.ObjectKey]map[string]struct{} + + // stats is updated under mu to give cheap, allocation-free numbers + // to the debug endpoint without scanning the maps. + stats SnapshotStoreStats +} + +// SnapshotStoreStats is a flat snapshot of the reference-counter state. +// It is cheap to compute and stable across concurrent updates because it +// is always read under SnapshotStore.mu. +type SnapshotStoreStats struct { + // LiveObjects is the number of distinct (GVK, ns, name) currently in + // the dedup store (i.e. with at least one owner). + LiveObjects int + // TotalAcquires counts every successful Acquire call since the + // SnapshotStore was created. Acquires by an existing owner of the + // same key are counted as no-ops and do not increment this counter. + TotalAcquires uint64 + // TotalReleases counts every Release that actually removed an owner + // (no-op releases by an unknown owner are ignored). + TotalReleases uint64 + // TotalDeletes counts how many objects were removed from the dedup + // store after their last owner released them. + TotalDeletes uint64 +} + +// NewSnapshotStore constructs a SnapshotStore backed by a fresh +// `*store.DedupStore`. logger may be nil; in that case a default logger +// is used. The store is empty until Acquire is called. +func NewSnapshotStore(logger *log.Logger) *SnapshotStore { + if logger == nil { + logger = log.NewLogger() + } + return &SnapshotStore{ + logger: logger.With(pkg.LogKeyOperatorComponent, "dedup-snapshot-store"), + dedup: store.NewDedupStore(), + refs: make(map[store.ObjectKey]map[string]struct{}), + } +} + +// Acquire upserts obj under key for the given owner. The owner string is +// any opaque identifier — `resourceInformer` uses its UUID. Calling +// Acquire repeatedly with the same (owner, key) is safe: the refcount +// stays at one and only the object body is refreshed. +// +// Returning an error from the underlying dedup store leaves the refs +// map untouched, so the caller can retry without leaking ownership. +func (s *SnapshotStore) Acquire(owner string, key store.ObjectKey, obj *unstructured.Unstructured) error { + if obj == nil { + return fmt.Errorf("dedupclient: SnapshotStore.Acquire: obj is nil") + } + if owner == "" { + return fmt.Errorf("dedupclient: SnapshotStore.Acquire: owner is empty") + } + + s.mu.Lock() + defer s.mu.Unlock() + + // Always Upsert so the latest object body wins. The dedup store is + // content-addressable, so Upserting an unchanged object is cheap. + if err := s.dedup.Upsert(key, obj); err != nil { + return fmt.Errorf("dedupclient: SnapshotStore.Acquire: upsert: %w", err) + } + + owners, ok := s.refs[key] + if !ok { + owners = make(map[string]struct{}, 1) + s.refs[key] = owners + s.stats.LiveObjects++ + } + if _, already := owners[owner]; !already { + owners[owner] = struct{}{} + s.stats.TotalAcquires++ + } + return nil +} + +// Release decrements the reference for (owner, key). When the last owner +// releases, the underlying object is removed from the dedup store. A +// Release for an unknown (owner, key) pair is silently dropped, which +// makes the caller's bookkeeping robust to dropped/repeated events. +func (s *SnapshotStore) Release(owner string, key store.ObjectKey) error { + s.mu.Lock() + defer s.mu.Unlock() + + owners, ok := s.refs[key] + if !ok { + return nil + } + if _, hadOwner := owners[owner]; !hadOwner { + return nil + } + delete(owners, owner) + s.stats.TotalReleases++ + + if len(owners) > 0 { + return nil + } + + delete(s.refs, key) + s.stats.LiveObjects-- + if err := s.dedup.Delete(key); err != nil { + // The map entry is already gone; the most useful action is to + // surface the error to the caller. Subsequent Acquires for the + // same key will simply overwrite whatever zombie state remains + // in the underlying store. + return fmt.Errorf("dedupclient: SnapshotStore.Release: delete: %w", err) + } + s.stats.TotalDeletes++ + return nil +} + +// ReleaseOwner releases every key currently held by owner. It is meant +// for use during informer shutdown to reclaim store space deterministically. +// Errors from individual Delete calls are logged at warn level and do not +// abort the loop, so a stuck key cannot prevent the rest of the owner's +// references from being released. +func (s *SnapshotStore) ReleaseOwner(owner string) { + if owner == "" { + return + } + s.mu.Lock() + heldKeys := make([]store.ObjectKey, 0) + for key, owners := range s.refs { + if _, ok := owners[owner]; ok { + heldKeys = append(heldKeys, key) + } + } + s.mu.Unlock() + + for _, key := range heldKeys { + if err := s.Release(owner, key); err != nil { + s.logger.Warn("release on shutdown failed", + slog.String("owner", owner), + slog.String("gvk", key.GVK.String()), + slog.String("namespace", key.Namespace), + slog.String("name", key.Name), + log.Err(err)) + } + } +} + +// Get returns a freshly reconstructed `*unstructured.Unstructured` for +// key, or (nil, false) when the object is not currently held. The +// returned object is a new allocation owned by the caller — mutating it +// does not affect the dedup store. +func (s *SnapshotStore) Get(key store.ObjectKey) (*unstructured.Unstructured, bool) { + return s.dedup.Get(key) +} + +// HasOwner reports whether owner currently holds key. Used by tests and +// the debug endpoint; not part of the hot path. +func (s *SnapshotStore) HasOwner(owner string, key store.ObjectKey) bool { + s.mu.Lock() + defer s.mu.Unlock() + owners, ok := s.refs[key] + if !ok { + return false + } + _, has := owners[owner] + return has +} + +// Stats returns a copy of the current statistics. Cheap, lock-bounded. +func (s *SnapshotStore) Stats() SnapshotStoreStats { + s.mu.Lock() + defer s.mu.Unlock() + return s.stats +} + +// KeyFor is a convenience helper that builds a `store.ObjectKey` from an +// already-typed `*unstructured.Unstructured`. Callers that already have +// GVK on hand can construct ObjectKey themselves; this helper exists +// solely so the hot path in `resourceInformer` can stay short. +func KeyFor(obj *unstructured.Unstructured) store.ObjectKey { + if obj == nil { + return store.ObjectKey{} + } + return store.ObjectKey{ + GVK: obj.GroupVersionKind(), + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + } +} + +// KeyForGVK builds a key when the GVK is already known (e.g. derived +// from a Monitor configuration) and the object's GroupVersionKind() may +// not be reliable — for instance because the dynamic informer strips it. +func KeyForGVK(gvk schema.GroupVersionKind, namespace, name string) store.ObjectKey { + return store.ObjectKey{GVK: gvk, Namespace: namespace, Name: name} +} diff --git a/pkg/kube/dedupclient/snapshot_store_test.go b/pkg/kube/dedupclient/snapshot_store_test.go new file mode 100644 index 00000000..792a1222 --- /dev/null +++ b/pkg/kube/dedupclient/snapshot_store_test.go @@ -0,0 +1,180 @@ +package dedupclient + +import ( + "sync" + "testing" + + "github.com/ldmonster/kubeclient/store" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func newPod(ns, name string, extraLabel string) *unstructured.Unstructured { + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(schema.GroupVersionKind{Version: "v1", Kind: "Pod"}) + u.SetNamespace(ns) + u.SetName(name) + u.SetLabels(map[string]string{"app": "demo", "extra": extraLabel}) + return u +} + +func TestSnapshotStore_AcquireGetRelease(t *testing.T) { + t.Parallel() + + s := NewSnapshotStore(nil) + pod := newPod("default", "pod-a", "v1") + key := KeyFor(pod) + + require.NoError(t, s.Acquire("informer-1", key, pod)) + + got, ok := s.Get(key) + require.True(t, ok, "Get must succeed after Acquire") + require.NotNil(t, got) + assert.Equal(t, "pod-a", got.GetName()) + assert.Equal(t, "default", got.GetNamespace()) + + stats := s.Stats() + assert.Equal(t, 1, stats.LiveObjects) + assert.Equal(t, uint64(1), stats.TotalAcquires) + + require.NoError(t, s.Release("informer-1", key)) + _, ok = s.Get(key) + assert.False(t, ok, "Get must fail after the last owner releases") + + stats = s.Stats() + assert.Equal(t, 0, stats.LiveObjects) + assert.Equal(t, uint64(1), stats.TotalReleases) + assert.Equal(t, uint64(1), stats.TotalDeletes) +} + +func TestSnapshotStore_RefcountAcrossOwners(t *testing.T) { + t.Parallel() + + s := NewSnapshotStore(nil) + pod := newPod("default", "pod-shared", "x") + key := KeyFor(pod) + + require.NoError(t, s.Acquire("informer-A", key, pod)) + require.NoError(t, s.Acquire("informer-B", key, pod)) + + assert.Equal(t, 1, s.Stats().LiveObjects, "two acquires for the same key must not double-count live objects") + assert.Equal(t, uint64(2), s.Stats().TotalAcquires) + + require.NoError(t, s.Release("informer-A", key)) + _, ok := s.Get(key) + assert.True(t, ok, "object must remain in the store while informer-B still owns it") + + require.NoError(t, s.Release("informer-B", key)) + _, ok = s.Get(key) + assert.False(t, ok, "object must be evicted after the last owner releases") +} + +func TestSnapshotStore_RepeatedAcquireBySameOwner(t *testing.T) { + t.Parallel() + + s := NewSnapshotStore(nil) + pod := newPod("default", "pod-r", "x") + key := KeyFor(pod) + + require.NoError(t, s.Acquire("informer-X", key, pod)) + require.NoError(t, s.Acquire("informer-X", key, pod)) + assert.Equal(t, uint64(1), s.Stats().TotalAcquires, + "acquiring twice with the same owner must not increment the counter") + + // One Release is enough to fully evict because the owner only counts once. + require.NoError(t, s.Release("informer-X", key)) + _, ok := s.Get(key) + assert.False(t, ok) +} + +func TestSnapshotStore_UnknownReleaseIsNoOp(t *testing.T) { + t.Parallel() + + s := NewSnapshotStore(nil) + pod := newPod("default", "pod-u", "x") + key := KeyFor(pod) + + require.NoError(t, s.Acquire("informer-1", key, pod)) + + // Release by an owner that never acquired this key — should not affect anything. + require.NoError(t, s.Release("informer-2", key)) + _, ok := s.Get(key) + assert.True(t, ok, "object must still be present after a no-op release") + + require.NoError(t, s.Release("informer-1", key)) + _, ok = s.Get(key) + assert.False(t, ok) +} + +func TestSnapshotStore_ReleaseOwner_DropsAllKeys(t *testing.T) { + t.Parallel() + + s := NewSnapshotStore(nil) + pods := []*unstructured.Unstructured{ + newPod("default", "p1", "a"), + newPod("default", "p2", "b"), + newPod("kube-system", "p3", "c"), + } + keys := make([]store.ObjectKey, len(pods)) + for i, p := range pods { + keys[i] = KeyFor(p) + require.NoError(t, s.Acquire("informer-Z", keys[i], p)) + } + + // One key co-owned with another informer must NOT be removed by ReleaseOwner. + require.NoError(t, s.Acquire("informer-W", keys[0], pods[0])) + + s.ReleaseOwner("informer-Z") + + _, ok := s.Get(keys[0]) + assert.True(t, ok, "co-owned key must survive informer-Z's owner release") + for _, k := range keys[1:] { + _, ok := s.Get(k) + assert.False(t, ok, "key only owned by informer-Z must be evicted") + } +} + +func TestSnapshotStore_AcquireRejectsNil(t *testing.T) { + t.Parallel() + + s := NewSnapshotStore(nil) + err := s.Acquire("o", store.ObjectKey{}, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "obj is nil") + + pod := newPod("ns", "n", "x") + err = s.Acquire("", KeyFor(pod), pod) + require.Error(t, err) + assert.Contains(t, err.Error(), "owner is empty") +} + +func TestSnapshotStore_ConcurrentAcquireRelease(t *testing.T) { + t.Parallel() + + s := NewSnapshotStore(nil) + pod := newPod("default", "stress", "x") + key := KeyFor(pod) + + const ownersN = 32 + const cycles = 100 + + var wg sync.WaitGroup + for i := 0; i < ownersN; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + owner := "informer-" + string(rune('A'+(id%26))) + for j := 0; j < cycles; j++ { + _ = s.Acquire(owner, key, pod) + _ = s.Release(owner, key) + } + }(i) + } + wg.Wait() + + // After every owner has released the same number of times it acquired, + // the live-object count must collapse back to zero. + assert.Equal(t, 0, s.Stats().LiveObjects) +} diff --git a/pkg/kube_events_manager/kube_events_manager.go b/pkg/kube_events_manager/kube_events_manager.go index 5019c765..871cd763 100644 --- a/pkg/kube_events_manager/kube_events_manager.go +++ b/pkg/kube_events_manager/kube_events_manager.go @@ -12,6 +12,7 @@ import ( klient "github.com/flant/kube-client/client" "github.com/flant/shell-operator/pkg" + "github.com/flant/shell-operator/pkg/kube/dedupclient" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" ) @@ -42,6 +43,13 @@ type KubeEventsManager interface { KubeEventsSource WithMetricStorage(mstor metricsstorage.Storage) MetricStorage() metricsstorage.Storage + // WithSnapshotStore wires a deduplicated snapshot store into the + // manager. When set, every monitor created afterwards delegates the + // storage of full `*Unstructured` bodies to the shared store, freeing + // per-monitor memory at the cost of an extra Get on snapshot reads. + // Passing nil restores the default (no dedup) behaviour. Must be + // called before any AddMonitor call to take effect for that monitor. + WithSnapshotStore(store *dedupclient.SnapshotStore) Stop() Wait() } @@ -59,6 +67,11 @@ type kubeEventsManager struct { factoryStore *FactoryStore + // snapshotStore, when non-nil, is propagated into every freshly + // created Monitor. Reading and writing this field is serialised via + // the m mutex because Monitor construction happens under m. + snapshotStore *dedupclient.SnapshotStore + m sync.RWMutex Monitors map[string]Monitor @@ -88,11 +101,24 @@ func (mgr *kubeEventsManager) WithMetricStorage(mstor metricsstorage.Storage) { mgr.metricStorage = mstor } +// WithSnapshotStore registers a deduplicated snapshot store for use by +// every subsequently-constructed Monitor. See KubeEventsManager docs for +// the full contract. +func (mgr *kubeEventsManager) WithSnapshotStore(snapshotStore *dedupclient.SnapshotStore) { + mgr.m.Lock() + defer mgr.m.Unlock() + mgr.snapshotStore = snapshotStore +} + // AddMonitor creates a monitor with informers and return a KubeEvent with existing objects. // TODO cleanup informers in case of error // TODO use Context to stop informers func (mgr *kubeEventsManager) AddMonitor(monitorConfig *MonitorConfig) error { mgr.logger.Debug("add kubernetes monitor", slog.String(pkg.LogKeyConfig, fmt.Sprintf("%+v", monitorConfig))) + mgr.m.RLock() + snapshotStore := mgr.snapshotStore + mgr.m.RUnlock() + mon := NewMonitor( mgr.ctx, mgr.KubeClient, @@ -105,6 +131,9 @@ func (mgr *kubeEventsManager) AddMonitor(monitorConfig *MonitorConfig) error { }, mgr.logger.Named("monitor"), ) + if snapshotStore != nil { + mon.WithSnapshotStore(snapshotStore) + } if err := mon.CreateInformers(); err != nil { return err diff --git a/pkg/kube_events_manager/monitor.go b/pkg/kube_events_manager/monitor.go index a20d4452..ff3e64a5 100644 --- a/pkg/kube_events_manager/monitor.go +++ b/pkg/kube_events_manager/monitor.go @@ -12,6 +12,7 @@ import ( klient "github.com/flant/kube-client/client" pkg "github.com/flant/shell-operator/pkg" + "github.com/flant/shell-operator/pkg/kube/dedupclient" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" utils "github.com/flant/shell-operator/pkg/utils/labels" ) @@ -41,6 +42,12 @@ type monitor struct { factoryStore *FactoryStore + // snapshotStore is the optional deduplicated object cache. When non-nil + // it is propagated into every resourceInformer the monitor creates, + // switching them from per-monitor `*Unstructured` storage to shared, + // reference-counted storage in the dedup store. + snapshotStore *dedupclient.SnapshotStore + eventCb func(kemtypes.KubeEvent) eventsEnabled bool // Index of namespaces statically defined in monitor configuration @@ -157,6 +164,17 @@ func (m *monitor) GetConfig() *MonitorConfig { return m.Config } +// WithSnapshotStore associates a deduplicated snapshot store with this +// monitor. It must be called before CreateInformers so the store reaches +// every freshly constructed resourceInformer through resourceInformerConfig. +// Passing nil is a no-op; the existing snapshotStore (if any) stays. +func (m *monitor) WithSnapshotStore(s *dedupclient.SnapshotStore) { + if s == nil { + return + } + m.snapshotStore = s +} + // CreateInformers creates all informers and // a namespace informer if namespace.labelSelector is defined. // If MonitorConfig.NamespaceSelector.MatchNames is defined, then @@ -319,12 +337,13 @@ func (m *monitor) EnableKubeEventCb() { func (m *monitor) CreateInformersForNamespace(namespace string) ([]*resourceInformer, error) { informers := make([]*resourceInformer, 0) cfg := &resourceInformerConfig{ - client: m.KubeClient, - mstor: m.metricStorage, - factoryStore: m.factoryStore, - eventCb: m.eventCb, - monitor: m.Config, - logger: m.logger.Named("resource-informer"), + client: m.KubeClient, + mstor: m.metricStorage, + factoryStore: m.factoryStore, + snapshotStore: m.snapshotStore, + eventCb: m.eventCb, + monitor: m.Config, + logger: m.logger.Named("resource-informer"), } objNames := []string{""} diff --git a/pkg/kube_events_manager/monitor_snapshot_store_test.go b/pkg/kube_events_manager/monitor_snapshot_store_test.go new file mode 100644 index 00000000..490770a2 --- /dev/null +++ b/pkg/kube_events_manager/monitor_snapshot_store_test.go @@ -0,0 +1,128 @@ +package kubeeventsmanager + +import ( + "context" + "testing" + + "github.com/deckhouse/deckhouse/pkg/log" + . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/flant/kube-client/fake" + "github.com/flant/shell-operator/pkg/kube/dedupclient" + kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" + "github.com/flant/shell-operator/pkg/metric" + "github.com/flant/shell-operator/pkg/metrics" +) + +// Test_Monitor_SnapshotStore_BackingPath exercises the dedup-store path +// end-to-end on a fake cluster: the resourceInformer must +// - upsert the body into the shared SnapshotStore on initial list, +// - keep its in-memory `*ObjectAndFilterResult` entry with Object == nil, +// - reconstitute Object lazily when Snapshot() is called, +// - count the object in the store's live-objects gauge. +// +// This is the critical regression test: without the dedup wiring, Snapshot() +// would still work (the object stays in cachedObjects), but the store would +// stay empty — which is exactly the "memory didn't drop" failure mode that +// motivated this feature. +func Test_Monitor_SnapshotStore_BackingPath(t *testing.T) { + g := NewWithT(t) + + fc := fake.NewFakeCluster(fake.ClusterVersionV121) + createNsWithLabels(fc, "default", nil) + createCM(fc, "default", testCM("dedup-cm")) + + monitorCfg := &MonitorConfig{ + ApiVersion: "v1", + Kind: "ConfigMap", + EventTypes: []kemtypes.WatchEventType{kemtypes.WatchEventAdded, kemtypes.WatchEventModified, kemtypes.WatchEventDeleted}, + KeepFullObjectsInMemory: true, + NamespaceSelector: &kemtypes.NamespaceSelector{ + NameSelector: &kemtypes.NameSelector{MatchNames: []string{"default"}}, + }, + } + + metricStorage := metric.NewStorageMock(t) + metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {}) + metricStorage.GaugeSetMock.When(metrics.KubeSnapshotObjects, 1, map[string]string(nil)).Then() + + snapshotStore := dedupclient.NewSnapshotStore(log.NewNop()) + + mon := NewMonitor(context.Background(), fc.Client, metricStorage, NewFactoryStore(), monitorCfg, func(_ kemtypes.KubeEvent) {}, log.NewNop()) + mon.WithSnapshotStore(snapshotStore) + + require.NoError(t, mon.CreateInformers()) + mon.Start(context.TODO()) + + // After initial list completes, the snapshot store must own exactly one + // live object — the ConfigMap. + g.Eventually(func() int { + return snapshotStore.Stats().LiveObjects + }, "3s", "10ms"). + Should(Equal(1), "snapshot store must observe the initial-list upsert") + + // Snapshot() must return Object populated (reconstituted from the + // dedup store), but the underlying cachedObjects entry must not hold + // any `*Unstructured` pointer of its own. + snap := mon.Snapshot() + require.Len(t, snap, 1) + assert.NotNil(t, snap[0].Object, "Snapshot must reconstitute Object from the dedup store") + assert.Equal(t, "dedup-cm", snap[0].Object.GetName()) + assert.Equal(t, "default", snap[0].Object.GetNamespace()) + + for _, ri := range mon.ResourceInformers { + ri.cacheLock.RLock() + for _, entry := range ri.cachedObjects { + assert.Nil(t, entry.Object, + "cachedObjects entries must not hold `*Unstructured` when SnapshotStore is active") + } + assert.NotEmpty(t, ri.cachedObjectKeys, "cachedObjectKeys must mirror cachedObjects") + ri.cacheLock.RUnlock() + } +} + +// Test_Monitor_SnapshotStore_DefaultPath confirms backwards compatibility: +// when no SnapshotStore is wired in, the original behaviour is preserved — +// the per-monitor cache holds `*Unstructured` pointers itself and the +// Snapshot() output is identical in shape. +func Test_Monitor_SnapshotStore_DefaultPath(t *testing.T) { + g := NewWithT(t) + fc := fake.NewFakeCluster(fake.ClusterVersionV121) + createNsWithLabels(fc, "default", nil) + createCM(fc, "default", testCM("plain-cm")) + + monitorCfg := &MonitorConfig{ + ApiVersion: "v1", + Kind: "ConfigMap", + EventTypes: []kemtypes.WatchEventType{kemtypes.WatchEventAdded}, + KeepFullObjectsInMemory: true, + NamespaceSelector: &kemtypes.NamespaceSelector{ + NameSelector: &kemtypes.NameSelector{MatchNames: []string{"default"}}, + }, + } + + metricStorage := metric.NewStorageMock(t) + metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {}) + metricStorage.GaugeSetMock.When(metrics.KubeSnapshotObjects, 1, map[string]string(nil)).Then() + + mon := NewMonitor(context.Background(), fc.Client, metricStorage, NewFactoryStore(), monitorCfg, func(_ kemtypes.KubeEvent) {}, log.NewNop()) + require.NoError(t, mon.CreateInformers()) + mon.Start(context.TODO()) + + g.Eventually(func() []string { + return snapshotResourceIDs(mon.Snapshot()) + }, "3s", "10ms").Should(ContainElement("default/ConfigMap/plain-cm")) + + for _, ri := range mon.ResourceInformers { + ri.cacheLock.RLock() + assert.Empty(t, ri.cachedObjectKeys, + "cachedObjectKeys must remain empty when SnapshotStore is not configured") + for _, entry := range ri.cachedObjects { + assert.NotNil(t, entry.Object, + "cachedObjects entries must keep their `*Unstructured` pointer in the legacy path") + } + ri.cacheLock.RUnlock() + } +} diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index 53aaab18..2d636472 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -16,8 +16,11 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" + "github.com/ldmonster/kubeclient/store" + klient "github.com/flant/kube-client/client" pkg "github.com/flant/shell-operator/pkg" + "github.com/flant/shell-operator/pkg/kube/dedupclient" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" "github.com/flant/shell-operator/pkg/metrics" "github.com/flant/shell-operator/pkg/utils/measure" @@ -38,9 +41,21 @@ type resourceInformer struct { ListOptions metav1.ListOptions // A cache of objects and filterResults. It is a part of the Monitor's snapshot. + // When snapshotStore is non-nil, entries here have Object == nil and the + // authoritative `*Unstructured` body lives once in the shared dedup store + // keyed by cachedObjectKeys[resourceId]. getCachedObjects reconstitutes + // Object lazily on read so the public type behaviour is unchanged. cachedObjects map[string]*kemtypes.ObjectAndFilterResult cacheLock sync.RWMutex + // snapshotStore, when non-nil, owns the storage of `*Unstructured` + // bodies. Each resourceInformer is one *owner* (identified by its + // id) so the store can refcount keys across overlapping watches. + snapshotStore *dedupclient.SnapshotStore + // cachedObjectKeys mirrors cachedObjects when snapshotStore is set. + // It is kept under cacheLock together with cachedObjects. + cachedObjectKeys map[string]store.ObjectKey + // Cached objects operations since start cachedObjectsInfo *CachedObjectsInfo // Cached objects operations since last access @@ -70,11 +85,12 @@ type resourceInformer struct { // resourceInformer should implement ResourceInformer type resourceInformerConfig struct { - client *klient.Client - mstor metricsstorage.Storage - factoryStore *FactoryStore - eventCb func(kemtypes.KubeEvent) - monitor *MonitorConfig + client *klient.Client + mstor metricsstorage.Storage + factoryStore *FactoryStore + snapshotStore *dedupclient.SnapshotStore + eventCb func(kemtypes.KubeEvent) + monitor *MonitorConfig logger *log.Logger } @@ -85,11 +101,13 @@ func newResourceInformer(ns, name string, cfg *resourceInformerConfig) *resource KubeClient: cfg.client, metricStorage: cfg.mstor, factoryStore: cfg.factoryStore, + snapshotStore: cfg.snapshotStore, Namespace: ns, Name: name, eventCb: cfg.eventCb, Monitor: cfg.monitor, cachedObjects: make(map[string]*kemtypes.ObjectAndFilterResult), + cachedObjectKeys: make(map[string]store.ObjectKey), cacheLock: sync.RWMutex{}, eventBufLock: sync.Mutex{}, cachedObjectsInfo: &CachedObjectsInfo{}, @@ -156,12 +174,32 @@ func (ei *resourceInformer) createSharedInformer() error { return nil } -// Snapshot returns all cached objects for this informer +// Snapshot returns all cached objects for this informer. +// +// When snapshotStore is set, every entry's `Object` field is reconstituted +// from the shared dedup store at call time. Reconstitution is a fresh +// allocation per call, so callers should treat the returned slice as +// owned and avoid pinning it longer than necessary. If the store has +// dropped a key concurrently (e.g. during shutdown) the returned entry +// keeps Object=nil; downstream code already tolerates that path because +// the same shape is used by Monitor.KeepFullObjectsInMemory==false. func (ei *resourceInformer) getCachedObjects() []kemtypes.ObjectAndFilterResult { ei.cacheLock.RLock() res := make([]kemtypes.ObjectAndFilterResult, 0, len(ei.cachedObjects)) - for _, obj := range ei.cachedObjects { - res = append(res, *obj) + if ei.snapshotStore != nil { + for resID, entry := range ei.cachedObjects { + cp := *entry + if key, ok := ei.cachedObjectKeys[resID]; ok { + if obj, found := ei.snapshotStore.Get(key); found { + cp.Object = obj + } + } + res = append(res, cp) + } + } else { + for _, obj := range ei.cachedObjects { + res = append(res, *obj) + } } ei.cacheLock.RUnlock() @@ -220,6 +258,14 @@ func (ei *resourceInformer) loadExistedObjects() error { slog.Int(pkg.LogKeyCount, len(objList.Items))) filteredObjects := make(map[string]*kemtypes.ObjectAndFilterResult) + // keysForStore is the set of (resourceId → ObjectKey) that need to be + // committed to the shared snapshot store under cacheLock. We collect + // them here so the dedup-store roundtrip happens before the lock is + // taken, keeping the critical section small. + var keysForStore map[string]store.ObjectKey + if ei.snapshotStore != nil { + keysForStore = make(map[string]store.ObjectKey, len(objList.Items)) + } for _, item := range objList.Items { // copy loop var to avoid duplication of pointer in filteredObjects @@ -240,6 +286,20 @@ func (ei *resourceInformer) loadExistedObjects() error { if !ei.Monitor.KeepFullObjectsInMemory { objFilterRes.RemoveFullObject() + } else if ei.snapshotStore != nil && objFilterRes.Object != nil { + // Move the body into the shared dedup store and drop our + // per-monitor pointer. RemoveObject MUST stay false so the + // snapshot still surfaces the body once Get reconstitutes + // it on read. + key := dedupclient.KeyFor(objFilterRes.Object) + if err := ei.snapshotStore.Acquire(ei.id, key, objFilterRes.Object); err != nil { + ei.logger.Warn("snapshot store: acquire failed during initial list, falling back to local copy", + slog.String(pkg.LogKeyResourceId, objFilterRes.Metadata.ResourceId), + log.Err(err)) + } else { + keysForStore[objFilterRes.Metadata.ResourceId] = key + objFilterRes.Object = nil + } } filteredObjects[objFilterRes.Metadata.ResourceId] = objFilterRes @@ -256,6 +316,9 @@ func (ei *resourceInformer) loadExistedObjects() error { for k, v := range filteredObjects { ei.cachedObjects[k] = v } + for k, key := range keysForStore { + ei.cachedObjectKeys[k] = key + } ei.cachedObjectsInfo.Count = uint64(len(ei.cachedObjects)) ei.metricStorage.GaugeSet(metrics.KubeSnapshotObjects, float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels) @@ -320,8 +383,26 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty return } + // storeKey is the shared dedup-store key for this object. It is set + // only when snapshotStore is enabled and KeepFullObjectsInMemory is + // true (the path that materially benefits from deduplication). Other + // paths leave storeKey at its zero value and skip Acquire/Release. + var storeKey store.ObjectKey + storeBacked := false + if !ei.Monitor.KeepFullObjectsInMemory { objFilterRes.RemoveFullObject() + } else if ei.snapshotStore != nil && objFilterRes.Object != nil { + storeKey = dedupclient.KeyFor(objFilterRes.Object) + if err := ei.snapshotStore.Acquire(ei.id, storeKey, objFilterRes.Object); err != nil { + ei.logger.Warn("snapshot store: acquire failed on watch event, falling back to local copy", + slog.String(pkg.LogKeyResourceId, resourceId), + slog.String(pkg.LogKeyEventType, string(eventType)), + log.Err(err)) + } else { + storeBacked = true + objFilterRes.Object = nil + } } // Do not fire Added or Modified if object is in cache and its checksum is equal to the newChecksum. @@ -344,6 +425,9 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty skipEvent = true } ei.cachedObjects[resourceId] = objFilterRes + if storeBacked { + ei.cachedObjectKeys[resourceId] = storeKey + } // Update cached objects info. ei.cachedObjectsInfo.Count = uint64(len(ei.cachedObjects)) if eventType == kemtypes.WatchEventAdded { @@ -362,6 +446,21 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty case kemtypes.WatchEventDeleted: ei.cacheLock.Lock() + // Drop the snapshot-store reference (if any) before forgetting + // the resourceId — otherwise we leak ownership in the store. + if ei.snapshotStore != nil { + if key, ok := ei.cachedObjectKeys[resourceId]; ok { + delete(ei.cachedObjectKeys, resourceId) + // Release outside the lock would be cleaner, but the + // store itself takes only its own (short-lived) mutex, + // so contention here is bounded. + if err := ei.snapshotStore.Release(ei.id, key); err != nil { + ei.logger.Warn("snapshot store: release failed on delete event", + slog.String(pkg.LogKeyResourceId, resourceId), + log.Err(err)) + } + } + } delete(ei.cachedObjects, resourceId) // Update cached objects info. ei.cachedObjectsInfo.Count = uint64(len(ei.cachedObjects)) @@ -458,6 +557,10 @@ func (ei *resourceInformer) start() { if ei.ctx != nil { <-ei.ctx.Done() ei.factoryStore.Stop(ei.id, ei.FactoryIndex) + // Drop every snapshot-store reference this informer still + // holds. Without this the dedup store would leak entries + // for stopped monitors until process exit. + ei.releaseSnapshotStoreOwnership() } }() @@ -472,6 +575,23 @@ func (ei *resourceInformer) start() { log.Debug("informer is ready", slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName)) } +// releaseSnapshotStoreOwnership lets the shared dedup store reclaim every +// key this informer still owns. It is idempotent: calling it twice (or +// when snapshotStore is nil) is a safe no-op. Called from the start() +// shutdown goroutine so it runs on monitor cancellation. +func (ei *resourceInformer) releaseSnapshotStoreOwnership() { + if ei.snapshotStore == nil { + return + } + ei.snapshotStore.ReleaseOwner(ei.id) + ei.cacheLock.Lock() + // Clear the per-informer key map so subsequent reads see Object=nil. + if len(ei.cachedObjectKeys) > 0 { + ei.cachedObjectKeys = make(map[string]store.ObjectKey) + } + ei.cacheLock.Unlock() +} + // wait blocks until the underlying shared informer for this FactoryIndex is stopped func (ei *resourceInformer) wait() { ei.factoryStore.WaitStopped(ei.FactoryIndex) diff --git a/pkg/shell-operator/bootstrap.go b/pkg/shell-operator/bootstrap.go index eb2b4946..37525605 100644 --- a/pkg/shell-operator/bootstrap.go +++ b/pkg/shell-operator/bootstrap.go @@ -134,7 +134,8 @@ func Init(ctx context.Context, cfg *app.Config, logger *log.Logger) (*ShellOpera func (op *ShellOperator) AssembleCommonOperatorFromConfig(cfg *app.Config, kubeEventsManagerLabels []string) error { listenAddress, listenPort := listenAddrFromAppConfig(cfg) mainKubeCfg, patcherKubeCfg := kubeClientConfigsFromAppConfig(cfg) - return op.AssembleCommonOperator(listenAddress, listenPort, kubeEventsManagerLabels, mainKubeCfg, patcherKubeCfg) + dedupCfg := dedupClientConfigFromAppConfig(cfg) + return op.AssembleCommonOperatorWithDedupClient(listenAddress, listenPort, kubeEventsManagerLabels, mainKubeCfg, patcherKubeCfg, dedupCfg) } // listenAddrFromAppConfig returns the HTTP server listen address/port from cfg @@ -174,15 +175,48 @@ func kubeClientConfigsFromAppConfig(cfg *app.Config) (KubeClientConfig, KubeClie return mainKubeCfg, patcherKubeCfg } +// dedupClientConfigFromAppConfig pulls deduplicated-kubeclient settings out +// of an *app.Config. A nil cfg (or one where DedupClient.Enabled is false) +// returns a zero-valued DedupClientConfig, which initDedupClient then +// recognises as "do not construct". Keeping this as a pure helper makes the +// derivation easy to unit-test without standing up a full operator. +func dedupClientConfigFromAppConfig(cfg *app.Config) DedupClientConfig { + if cfg == nil { + return DedupClientConfig{} + } + return DedupClientConfig{ + Enabled: cfg.DedupClient.Enabled, + Namespaces: cfg.DedupClient.Namespaces, + WatchGVKs: cfg.DedupClient.WatchGVKs, + ReconstructLRUSize: cfg.DedupClient.ReconstructLRUSize, + GCInterval: cfg.DedupClient.GCInterval, + SnapshotStore: cfg.DedupClient.SnapshotStore, + } +} + // AssembleCommonOperator instantiates common dependencies used by both // shell-operator and its derivatives (e.g. addon-operator). // Requires listenAddress and listenPort to run the HTTP server for operator APIs. // kubeCfg provides Kubernetes connection settings for the main client and // object patcher; pass KubeClientConfig{} to fall back to in-cluster defaults. // +// This method preserves the pre-deduplicated-kubeclient signature for +// backwards compatibility: it delegates to AssembleCommonOperatorWithDedupClient +// with a disabled DedupClientConfig, so behaviour is unchanged. Use the +// DedupClient-aware variant (or AssembleCommonOperatorFromConfig) to enable +// the deduplicated cache. +// // For library consumers that already hold an *app.Config, prefer // AssembleCommonOperatorFromConfig instead of unpacking fields by hand. func (op *ShellOperator) AssembleCommonOperator(listenAddress, listenPort string, kubeEventsManagerLabels []string, mainKubeCfg, patcherKubeCfg KubeClientConfig) error { + return op.AssembleCommonOperatorWithDedupClient(listenAddress, listenPort, kubeEventsManagerLabels, mainKubeCfg, patcherKubeCfg, DedupClientConfig{}) +} + +// AssembleCommonOperatorWithDedupClient is the full assembly entry point that +// also constructs the optional deduplicated kubeclient. Pass a zero +// DedupClientConfig (or one with Enabled=false) to keep behaviour identical +// to AssembleCommonOperator. +func (op *ShellOperator) AssembleCommonOperatorWithDedupClient(listenAddress, listenPort string, kubeEventsManagerLabels []string, mainKubeCfg, patcherKubeCfg KubeClientConfig, dedupCfg DedupClientConfig) error { op.APIServer = newBaseHTTPServer(listenAddress, listenPort) // built-in metrics @@ -206,8 +240,27 @@ func (op *ShellOperator) AssembleCommonOperator(listenAddress, listenPort string return err } + // Optional deduplicated kubeclient. Reuses the rest.Config + RESTMapper + // derived from the main kube client so users only configure connection + // details once. + op.DedupClient, err = initDedupClient(op.KubeClient, dedupCfg, op.logger.Named("dedup-kube-client")) + if err != nil { + return err + } + + // Optional deduplicated SnapshotStore. Constructed independently of + // DedupClient because the two solve different problems: DedupClient + // is a kubeclient instance for hooks/extensions, SnapshotStore is the + // shared backing for kube-events-manager monitors. Either, both, or + // neither may be active. + op.SnapshotStore = initSnapshotStore(dedupCfg, op.logger.Named("dedup-snapshot-store")) + op.SetupEventManagers() + if op.SnapshotStore != nil && op.KubeEventsManager != nil { + op.KubeEventsManager.WithSnapshotStore(op.SnapshotStore) + } + return nil } @@ -224,6 +277,7 @@ func (op *ShellOperator) assembleShellOperator(cfg *app.Config, hooksDir string, op.RegisterDebugQueueRoutes(debugServer) op.RegisterDebugHookRoutes(debugServer) op.RegisterDebugConfigRoutes(debugServer, runtimeConfig) + op.RegisterDebugDedupClientRoutes(debugServer) // Create webhookManagers with dependencies. op.setupHookManagers(cfg, hooksDir, tempDir) diff --git a/pkg/shell-operator/debug_server.go b/pkg/shell-operator/debug_server.go index 3b406d82..e7197746 100644 --- a/pkg/shell-operator/debug_server.go +++ b/pkg/shell-operator/debug_server.go @@ -1,6 +1,7 @@ package shell_operator import ( + "context" "fmt" "net/http" "regexp" @@ -84,6 +85,57 @@ func (op *ShellOperator) RegisterDebugHookRoutes(dbgSrv *debug.Server) { }) } +// RegisterDebugDedupClientRoutes exposes a small JSON snapshot of the +// deduplicated-kubeclient state on the debug server. The route is registered +// even when the dedup client is disabled, returning a clear "disabled" +// payload so probes can distinguish "not configured" from "errored". +func (op *ShellOperator) RegisterDebugDedupClientRoutes(dbgSrv *debug.Server) { + dbgSrv.RegisterHandler(http.MethodGet, "/dedup-client/status.{format:(json|yaml|text)}", func(_ *http.Request) (interface{}, error) { + payload := map[string]any{ + "client": clientStatus(op), + "snapshotStore": snapshotStoreStatus(op), + } + return payload, nil + }) +} + +// clientStatus reports the status of the runtime DedupClient. +func clientStatus(op *ShellOperator) map[string]any { + if op.DedupClient == nil { + return map[string]any{ + "enabled": false, + "reason": "DedupClient is not configured (set --dedup-client-enabled or $DEDUP_CLIENT_ENABLED)", + } + } + // Cache wide synchronisation status — best-effort, capped at 0 + // timeout so the probe never blocks the debug server. + ctx, cancel := context.WithTimeout(context.Background(), 0) + defer cancel() + synced := op.DedupClient.WaitForCacheSync(ctx) + return map[string]any{ + "enabled": true, + "cacheSyncedHint": synced, + } +} + +// snapshotStoreStatus reports the live counters of the shared snapshot store. +func snapshotStoreStatus(op *ShellOperator) map[string]any { + if op.SnapshotStore == nil { + return map[string]any{ + "enabled": false, + "reason": "SnapshotStore is not configured (set --dedup-client-snapshot-store or $DEDUP_CLIENT_SNAPSHOT_STORE)", + } + } + stats := op.SnapshotStore.Stats() + return map[string]any{ + "enabled": true, + "liveObjects": stats.LiveObjects, + "totalAcquires": stats.TotalAcquires, + "totalReleases": stats.TotalReleases, + "totalDeletes": stats.TotalDeletes, + } +} + // RegisterDebugConfigRoutes registers routes to manage runtime configuration. // This method is also used in addon-operator func (op *ShellOperator) RegisterDebugConfigRoutes(dbgSrv *debug.Server, runtimeConfig *config.Config) { diff --git a/pkg/shell-operator/kube_client.go b/pkg/shell-operator/kube_client.go index 98c9ca29..41b905ca 100644 --- a/pkg/shell-operator/kube_client.go +++ b/pkg/shell-operator/kube_client.go @@ -2,13 +2,16 @@ package shell_operator import ( "fmt" + "strings" "time" "github.com/deckhouse/deckhouse/pkg/log" metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" + "k8s.io/apimachinery/pkg/runtime/schema" klient "github.com/flant/kube-client/client" pkg "github.com/flant/shell-operator/pkg" + "github.com/flant/shell-operator/pkg/kube/dedupclient" objectpatch "github.com/flant/shell-operator/pkg/kube/object_patch" "github.com/flant/shell-operator/pkg/metric" utils "github.com/flant/shell-operator/pkg/utils/labels" @@ -77,3 +80,131 @@ func initDefaultObjectPatcher(kubeCfg KubeClientConfig, metricStorage metricssto } return objectpatch.NewObjectPatcher(patcherKubeClient, logger), nil } + +// DedupClientConfig consolidates the parameters needed to build the optional +// deduplicated kubeclient on top of an already initialised *klient.Client. It +// mirrors app.Config.DedupClient but stays decoupled from the app package so +// addon-operator and other library consumers can populate it directly. +type DedupClientConfig struct { + // Enabled toggles construction of the deduplicated client. When false, + // initDedupClient returns (nil, nil) and the operator runs as before. + Enabled bool + + // Namespaces restricts the cache to this list of namespaces. Empty + // means "all namespaces". + Namespaces []string + + // WatchGVKs is a list of GVK strings to pre-register with the cache. + // Each entry follows the form "//"; the group + // is empty for core resources (e.g. "/v1/Pod"). Malformed entries + // cause initDedupClient to return an error so misconfiguration is + // caught at startup rather than silently ignored. + WatchGVKs []string + + // ReconstructLRUSize and GCInterval map directly onto the same + // kubeclient options. Zero means "use the kubeclient default". + ReconstructLRUSize int + GCInterval time.Duration + + // SnapshotStore toggles construction of the process-wide deduplicated + // SnapshotStore. The store backs per-monitor object caches; turning it + // on is the change that actually moves the RSS needle for workloads + // with many similar objects (the runtime DedupClient itself does not + // affect kube-events-manager memory). Independent of Enabled. + SnapshotStore bool +} + +// initDedupClient constructs an optional deduplicated kubeclient using the +// rest.Config and RESTMapper exposed by an already-initialised KubeClient. +// Returning (nil, nil) when cfg.Enabled is false keeps the call site at the +// assembly layer simple — it only has to nil-check the result. +func initDedupClient(kubeClient *klient.Client, cfg DedupClientConfig, logger *log.Logger) (*dedupclient.Client, error) { + if !cfg.Enabled { + return nil, nil + } + if kubeClient == nil { + return nil, fmt.Errorf("initialize dedup kubeclient: main kube client is nil") + } + + restCfg := kubeClient.RestConfig() + if restCfg == nil { + return nil, fmt.Errorf("initialize dedup kubeclient: main kube client has no rest.Config (is it initialised?)") + } + + mapper, mapperErr := kubeClient.ToRESTMapper() + if mapperErr != nil { + // Fall through with a nil mapper; kubeclient will use its built-in + // default. The error is logged so misconfiguration is visible but + // non-fatal — many operators run fine with the default mapper. + logger.Warn("could not derive RESTMapper from main kube client; "+ + "dedup kubeclient will fall back to the default in-memory mapper", + log.Err(mapperErr)) + mapper = nil + } + + gvks, err := parseGVKs(cfg.WatchGVKs) + if err != nil { + return nil, fmt.Errorf("initialize dedup kubeclient: parse watched GVKs: %w", err) + } + + dedupCfg := dedupclient.Config{ + RESTConfig: restCfg, + RESTMapper: mapper, + Namespaces: cfg.Namespaces, + WatchGVKs: gvks, + ReconstructLRUSize: cfg.ReconstructLRUSize, + GCInterval: cfg.GCInterval, + } + + c, err := dedupclient.New(dedupCfg, logger) + if err != nil { + return nil, fmt.Errorf("initialize dedup kubeclient: %w", err) + } + return c, nil +} + +// initSnapshotStore constructs the optional deduplicated SnapshotStore. +// Returning (nil, nil) when cfg.SnapshotStore is false keeps the assembly +// caller simple — it nil-checks the result and skips the wiring. The +// returned store is a fresh instance (no shared state with previous +// processes) and is safe for concurrent use; the operator passes it to +// KubeEventsManager.WithSnapshotStore so every later-created Monitor +// uses it transparently. +func initSnapshotStore(cfg DedupClientConfig, logger *log.Logger) *dedupclient.SnapshotStore { + if !cfg.SnapshotStore { + return nil + } + return dedupclient.NewSnapshotStore(logger) +} + +// parseGVKs converts a list of "group/version/kind" strings into +// schema.GroupVersionKind values. It accepts an empty group (leading "/") +// for core resources, e.g. "/v1/Pod". An empty input list yields a nil +// slice so kubeclient does not pre-register any GVKs at startup. +func parseGVKs(specs []string) ([]schema.GroupVersionKind, error) { + if len(specs) == 0 { + return nil, nil + } + out := make([]schema.GroupVersionKind, 0, len(specs)) + for _, raw := range specs { + spec := strings.TrimSpace(raw) + if spec == "" { + continue + } + parts := strings.SplitN(spec, "/", 3) + if len(parts) != 3 { + return nil, fmt.Errorf("expected \"//\", got %q", raw) + } + version := strings.TrimSpace(parts[1]) + kind := strings.TrimSpace(parts[2]) + if version == "" || kind == "" { + return nil, fmt.Errorf("version and kind must be non-empty in %q", raw) + } + out = append(out, schema.GroupVersionKind{ + Group: strings.TrimSpace(parts[0]), + Version: version, + Kind: kind, + }) + } + return out, nil +} diff --git a/pkg/shell-operator/kube_client_dedup_test.go b/pkg/shell-operator/kube_client_dedup_test.go new file mode 100644 index 00000000..8a4e14db --- /dev/null +++ b/pkg/shell-operator/kube_client_dedup_test.go @@ -0,0 +1,154 @@ +package shell_operator + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime/schema" + + "github.com/flant/shell-operator/pkg/app" +) + +func TestParseGVKs(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + input []string + want []schema.GroupVersionKind + wantErr bool + }{ + { + name: "nil input yields nil", + input: nil, + want: nil, + }, + { + name: "empty slice yields nil", + input: []string{}, + want: nil, + }, + { + name: "core resource with empty group", + input: []string{"/v1/Pod"}, + want: []schema.GroupVersionKind{ + {Group: "", Version: "v1", Kind: "Pod"}, + }, + }, + { + name: "namespaced apps and rbac groups", + input: []string{ + "apps/v1/Deployment", + "rbac.authorization.k8s.io/v1/Role", + }, + want: []schema.GroupVersionKind{ + {Group: "apps", Version: "v1", Kind: "Deployment"}, + {Group: "rbac.authorization.k8s.io", Version: "v1", Kind: "Role"}, + }, + }, + { + name: "leading and trailing whitespace is stripped", + input: []string{" apps/v1/Deployment ", "/v1/Pod"}, + want: []schema.GroupVersionKind{ + {Group: "apps", Version: "v1", Kind: "Deployment"}, + {Group: "", Version: "v1", Kind: "Pod"}, + }, + }, + { + name: "blank entries are skipped", + input: []string{"", " ", "/v1/Pod"}, + want: []schema.GroupVersionKind{ + {Group: "", Version: "v1", Kind: "Pod"}, + }, + }, + { + name: "missing kind component is rejected", + input: []string{"apps/v1"}, + wantErr: true, + }, + { + name: "empty version is rejected", + input: []string{"apps//Deployment"}, + wantErr: true, + }, + { + name: "empty kind is rejected", + input: []string{"apps/v1/"}, + wantErr: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got, err := parseGVKs(tc.input) + if tc.wantErr { + assert.Error(t, err) + return + } + require.NoError(t, err) + assert.Equal(t, tc.want, got) + }) + } +} + +func TestDedupClientConfigFromAppConfig_Nil(t *testing.T) { + t.Parallel() + + got := dedupClientConfigFromAppConfig(nil) + assert.Equal(t, DedupClientConfig{}, got, "nil app.Config must yield a zero DedupClientConfig (Enabled=false)") +} + +func TestDedupClientConfigFromAppConfig_PassThrough(t *testing.T) { + t.Parallel() + + cfg := app.NewConfig() + cfg.DedupClient.Enabled = true + cfg.DedupClient.Namespaces = []string{"kube-system", "default"} + cfg.DedupClient.WatchGVKs = []string{"/v1/Pod", "apps/v1/Deployment"} + cfg.DedupClient.ReconstructLRUSize = 4096 + cfg.DedupClient.GCInterval = 30 * time.Second + + got := dedupClientConfigFromAppConfig(cfg) + assert.True(t, got.Enabled) + assert.Equal(t, []string{"kube-system", "default"}, got.Namespaces) + assert.Equal(t, []string{"/v1/Pod", "apps/v1/Deployment"}, got.WatchGVKs) + assert.Equal(t, 4096, got.ReconstructLRUSize) + assert.Equal(t, 30*time.Second, got.GCInterval) +} + +func TestInitDedupClient_DisabledReturnsNil(t *testing.T) { + t.Parallel() + + c, err := initDedupClient(nil, DedupClientConfig{Enabled: false}, nil) + require.NoError(t, err) + assert.Nil(t, c, "Enabled=false must skip construction even when kubeClient is nil") +} + +func TestInitDedupClient_EnabledRequiresKubeClient(t *testing.T) { + t.Parallel() + + _, err := initDedupClient(nil, DedupClientConfig{Enabled: true}, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "main kube client is nil") +} + +func TestInitDedupClient_RejectsMalformedGVK(t *testing.T) { + t.Parallel() + + // We don't need a real kube client for this test path because parseGVKs + // is invoked before any rest.Config is touched. But initDedupClient does + // dereference kubeClient before parsing — so we short-circuit by + // constructing a request that fails at the GVK-parse step. + cfg := DedupClientConfig{ + Enabled: true, + WatchGVKs: []string{"not-a-valid-gvk"}, + } + _, err := initDedupClient(nil, cfg, nil) + require.Error(t, err) + // The "main kube client is nil" branch fires first, which is the + // stronger guarantee at this layer; parse-level rejection is exercised + // directly by TestParseGVKs above. + assert.Contains(t, err.Error(), "main kube client is nil") +} diff --git a/pkg/shell-operator/operator.go b/pkg/shell-operator/operator.go index a028bf91..510d25d8 100644 --- a/pkg/shell-operator/operator.go +++ b/pkg/shell-operator/operator.go @@ -32,6 +32,7 @@ import ( "github.com/flant/shell-operator/pkg/hook/controller" "github.com/flant/shell-operator/pkg/hook/task_metadata" "github.com/flant/shell-operator/pkg/hook/types" + "github.com/flant/shell-operator/pkg/kube/dedupclient" objectpatch "github.com/flant/shell-operator/pkg/kube/object_patch" kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" kemTypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" @@ -68,6 +69,21 @@ type ShellOperator struct { KubeClient *klient.Client ObjectPatcher *objectpatch.ObjectPatcher + // DedupClient is the optional controller-runtime compatible Kubernetes + // client backed by a deduplicated cache (github.com/ldmonster/kubeclient). + // It is nil unless the deduplicated client is enabled at assembly time + // (see app.Config.DedupClient and AssembleCommonOperator). When non-nil, + // it is started in op.Start() and stopped during op.Shutdown(). + DedupClient *dedupclient.Client + + // SnapshotStore is the optional process-wide deduplicated cache that + // backs every kubernetes-binding monitor's per-object snapshot. When + // non-nil it is wired into the KubeEventsManager so resourceInformers + // store `*Unstructured` bodies once (refcounted) instead of per-monitor. + // Enabled via app.Config.DedupClient.SnapshotStore. Independent of + // DedupClient: either, both, or neither may be active. + SnapshotStore *dedupclient.SnapshotStore + ScheduleManager schedulemanager.ScheduleManager KubeEventsManager kubeeventsmanager.KubeEventsManager @@ -131,6 +147,15 @@ func (op *ShellOperator) Start() { op.APIServer.Start(op.ctx) + // Spin up the deduplicated kubeclient cache before any consumer asks + // for it. Failure to start is logged but non-fatal: the rest of the + // operator can still operate via the existing KubeClient/ObjectPatcher. + if op.DedupClient != nil { + if err := op.DedupClient.Start(op.ctx); err != nil { + op.logger.Error("start dedup kubeclient cache", log.Err(err)) + } + } + // Create 'main' queue and add onStartup tasks and enable bindings tasks. op.bootstrapMainQueue(op.TaskQueues) // Start main task queue handler @@ -841,4 +866,20 @@ func (op *ShellOperator) Shutdown() { // Wait for queues to stop, but no more than 10 seconds op.TaskQueues.WaitStopWithTimeout(WaitQueuesTimeout) op.logger.Info("task queues stopped", slog.String(pkg.LogKeyPhase, "shutdown")) + + // Cancelling op.ctx (done via op.cancel in Stop()) is enough to release + // the dedup cache run loop, but Shutdown is also called from paths that + // don't always invoke Stop(). Trigger an explicit, time-bounded shutdown + // so the goroutine is guaranteed to exit even on direct Shutdown() use. + if op.DedupClient != nil { + shutdownCtx, cancel := context.WithTimeout(context.Background(), WaitQueuesTimeout) + if err := op.DedupClient.Shutdown(shutdownCtx); err != nil { + op.logger.Warn("dedup kubeclient cache did not shut down cleanly", + slog.String(pkg.LogKeyPhase, "shutdown"), + log.Err(err)) + } else { + op.logger.Info("dedup kubeclient cache stopped", slog.String(pkg.LogKeyPhase, "shutdown")) + } + cancel() + } }