Skip to content

Add sharding feature for k8s_cluster receiver #45311

@ChrsMark

Description

@ChrsMark

Component(s)

receiver/k8scluster

Is your feature request related to a problem? Please describe.

Centralised polling with a single Collector might not scale well on large clusters.

With deterministic sharding we could support horizontal scaling: each Collector instance handles a disjoint subset of objects.

Describe the solution you'd like

Kube-state-metrics already implements this idea.

The idea is based on having SharedIndexInformers with cache.ListWatch which adds extra filtering:

type sharding struct {
	shard       uint64
	totalShards uint64
}

func (s *sharding) keep(o metav1.Object) bool {
	h := xxhash.New()
	h.Write([]byte(o.GetUID()))
	ret := (h.Sum64() % s.totalShards) == s.shard
	return ret
}

// newShardedInformer builds a SharedIndexInformer with a ListWatch that filters objects by the sharding rule.
func newShardedInformer(
	s sharding,
	objType runtime.Object,
	listWithCtx func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error),
	watchWithCtx func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error),
	resyncPeriod time.Duration,
	indexers cache.Indexers,
) cache.SharedIndexInformer {
	lw := &cache.ListWatch{
		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
			return listWithCtx(context.Background(), options)
		},
		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
			w, err := watchWithCtx(context.Background(), options)
			if err != nil {
				return nil, err
			}
			return watch.Filter(w, func(in watch.Event) (out watch.Event, keep bool) {
				a, err := meta.Accessor(in.Object)
				if err != nil {
					return in, true
				}
				return in, s.keep(a)
			}), nil
		},
		ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
			obj, err := listWithCtx(ctx, options)
			if err != nil {
				return nil, err
			}
			items, err := meta.ExtractList(obj)
			if err != nil {
				// If extraction fails, do not drop the list.
				return obj, nil
			}
			kept := make([]runtime.Object, 0, len(items))
			for _, it := range items {
				a, err := meta.Accessor(it)
				if err != nil {
					kept = append(kept, it)
					continue
				}
				if s.keep(a) {
					kept = append(kept, it)
				}
			}
			_ = meta.SetList(obj, kept)
			return obj, nil
		},
		WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
			w, err := watchWithCtx(ctx, options)
			if err != nil {
				return nil, err
			}
			return watch.Filter(w, func(in watch.Event) (out watch.Event, keep bool) {
				a, err := meta.Accessor(in.Object)
				if err != nil {
					return in, true
				}
				return in, s.keep(a)
			}), nil
		},
	}
	return cache.NewSharedIndexInformer(
		lw,
		objType,
		resyncPeriod,
		indexers,
	)
}

This addition would be a nice to have feature for the k8s_cluster receiver.

Sample config

receivers:
  k8s_cluster:
    collection_interval: 10s
    sharding:
      shard_instance_id: ${env:REPLICA_ID}
      total_shards: 3

where REPLICA_ID is set using apps.kubernetes.io/pod-index label set by the StatefulSet controller.

This feature makes mostly sense to automatically run as part of a StatefulSet Collector (as in ksm) leveraging the deterministic behavior of the StatefulSet workload type.

A sample values file for the existing Helm Chart:

mode: statefulset
replicaCount: 3


image:
  repository: otelcontribcol-dev
  tag: "latest"
  pullPolicy: IfNotPresent

presets:
  clusterMetrics:
    enabled: true

resources:
  limits:
    cpu: 1
    memory: 1Gi

extraEnvs:
  - name: REPLICA_ID
    valueFrom:
      fieldRef:
        fieldPath: metadata.labels['apps.kubernetes.io/pod-index']


config:
  receivers:
    k8s_cluster:
      collection_interval: 10s
      sharding:
        shard_instance_id: ${env:REPLICA_ID}
        total_shards: 3
  exporters:
    debug:
      verbosity: detailed

  service:
    extensions: [health_check]
    telemetry:
      logs:
        level: INFO
    pipelines:
      metrics:
        receivers: [ k8s_cluster ]
        processors: [ ]
        exporters: [ debug ]

Describe alternatives you've considered

No response

Additional context

A draft POC patch: https://github.com/open-telemetry/opentelemetry-collector-contrib/compare/main...ChrsMark:opentelemetry-collector-contrib:k8scluster_sharding?expand=1

Tip

React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions