-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Description
Component(s)
receiver/k8scluster
Is your feature request related to a problem? Please describe.
Centralised polling with a single Collector might not scale well on large clusters.
With deterministic sharding we could support horizontal scaling: each Collector instance handles a disjoint subset of objects.
Describe the solution you'd like
Kube-state-metrics already implements this idea.
The idea is based on having SharedIndexInformers with cache.ListWatch which adds extra filtering:
type sharding struct {
shard uint64
totalShards uint64
}
func (s *sharding) keep(o metav1.Object) bool {
h := xxhash.New()
h.Write([]byte(o.GetUID()))
ret := (h.Sum64() % s.totalShards) == s.shard
return ret
}
// newShardedInformer builds a SharedIndexInformer with a ListWatch that filters objects by the sharding rule.
func newShardedInformer(
s sharding,
objType runtime.Object,
listWithCtx func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error),
watchWithCtx func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error),
resyncPeriod time.Duration,
indexers cache.Indexers,
) cache.SharedIndexInformer {
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return listWithCtx(context.Background(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
w, err := watchWithCtx(context.Background(), options)
if err != nil {
return nil, err
}
return watch.Filter(w, func(in watch.Event) (out watch.Event, keep bool) {
a, err := meta.Accessor(in.Object)
if err != nil {
return in, true
}
return in, s.keep(a)
}), nil
},
ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
obj, err := listWithCtx(ctx, options)
if err != nil {
return nil, err
}
items, err := meta.ExtractList(obj)
if err != nil {
// If extraction fails, do not drop the list.
return obj, nil
}
kept := make([]runtime.Object, 0, len(items))
for _, it := range items {
a, err := meta.Accessor(it)
if err != nil {
kept = append(kept, it)
continue
}
if s.keep(a) {
kept = append(kept, it)
}
}
_ = meta.SetList(obj, kept)
return obj, nil
},
WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
w, err := watchWithCtx(ctx, options)
if err != nil {
return nil, err
}
return watch.Filter(w, func(in watch.Event) (out watch.Event, keep bool) {
a, err := meta.Accessor(in.Object)
if err != nil {
return in, true
}
return in, s.keep(a)
}), nil
},
}
return cache.NewSharedIndexInformer(
lw,
objType,
resyncPeriod,
indexers,
)
}This addition would be a nice to have feature for the k8s_cluster receiver.
Sample config
receivers:
k8s_cluster:
collection_interval: 10s
sharding:
shard_instance_id: ${env:REPLICA_ID}
total_shards: 3where REPLICA_ID is set using apps.kubernetes.io/pod-index label set by the StatefulSet controller.
This feature makes mostly sense to automatically run as part of a StatefulSet Collector (as in ksm) leveraging the deterministic behavior of the StatefulSet workload type.
A sample values file for the existing Helm Chart:
mode: statefulset
replicaCount: 3
image:
repository: otelcontribcol-dev
tag: "latest"
pullPolicy: IfNotPresent
presets:
clusterMetrics:
enabled: true
resources:
limits:
cpu: 1
memory: 1Gi
extraEnvs:
- name: REPLICA_ID
valueFrom:
fieldRef:
fieldPath: metadata.labels['apps.kubernetes.io/pod-index']
config:
receivers:
k8s_cluster:
collection_interval: 10s
sharding:
shard_instance_id: ${env:REPLICA_ID}
total_shards: 3
exporters:
debug:
verbosity: detailed
service:
extensions: [health_check]
telemetry:
logs:
level: INFO
pipelines:
metrics:
receivers: [ k8s_cluster ]
processors: [ ]
exporters: [ debug ]Describe alternatives you've considered
No response
Additional context
A draft POC patch: https://github.com/open-telemetry/opentelemetry-collector-contrib/compare/main...ChrsMark:opentelemetry-collector-contrib:k8scluster_sharding?expand=1
Tip
React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.