Skip to content

Commit 1aa436f

Browse files
committed
fix: Kubernetes autodiscover & register cluster
1 parent 8096322 commit 1aa436f

3 files changed

Lines changed: 81 additions & 2 deletions

File tree

internal/agent/agent.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package agent
44
import (
55
"context"
66
"fmt"
7+
"os"
78
"sync"
89
"time"
910

@@ -123,6 +124,16 @@ func New(cfg *config.Config, logger *zap.Logger) (*Agent, error) {
123124
}
124125
}
125126

127+
// Auto-detect Kubernetes: enable collector + sync when running inside a K8s cluster.
128+
// KUBERNETES_SERVICE_HOST is injected by the kubelet into every pod.
129+
if !cfg.Collector.Kubernetes.Enabled && os.Getenv("KUBERNETES_SERVICE_HOST") != "" {
130+
cfg.Collector.Kubernetes.Enabled = true
131+
if !cfg.Collector.Kubernetes.SyncToBackend {
132+
cfg.Collector.Kubernetes.SyncToBackend = true
133+
}
134+
logger.Info("Kubernetes environment auto-detected, enabling K8s collector")
135+
}
136+
126137
// Add Kubernetes collector if enabled
127138
var k8sSync *exporter.KubernetesSync
128139
if cfg.Collector.Kubernetes.Enabled {
@@ -134,11 +145,35 @@ func New(cfg *config.Config, logger *zap.Logger) (*Agent, error) {
134145
} else {
135146
collectors = append(collectors, k8sCollector)
136147
logger.Info("Kubernetes collector enabled",
137-
zap.String("cluster", cfg.Collector.Kubernetes.ClusterName),
148+
zap.String("cluster", k8sCollector.ClusterName()),
149+
zap.String("provider", k8sCollector.ClusterProvider()),
138150
)
139151

152+
// Auto-register cluster with backend if ClusterID is not pre-configured.
153+
if cfg.Collector.Kubernetes.SyncToBackend && cfg.Collector.Kubernetes.ClusterID == "" {
154+
regCtx, regCancel := context.WithTimeout(context.Background(), 30*time.Second)
155+
regResp, regErr := client.AgentRegisterCluster(regCtx, &api.AgentRegisterClusterRequest{
156+
Name: k8sCollector.ClusterName(),
157+
Provider: k8sCollector.ClusterProvider(),
158+
})
159+
regCancel()
160+
if regErr != nil {
161+
logger.Warn("Failed to auto-register Kubernetes cluster, sync disabled",
162+
zap.Error(regErr),
163+
zap.String("cluster", k8sCollector.ClusterName()),
164+
)
165+
} else {
166+
cfg.Collector.Kubernetes.ClusterID = regResp.ID
167+
logger.Info("Kubernetes cluster auto-registered",
168+
zap.String("clusterID", regResp.ID),
169+
zap.String("name", regResp.Name),
170+
zap.Bool("isNew", regResp.IsNew),
171+
)
172+
}
173+
}
174+
140175
// Create K8s state sync exporter when sync_to_backend is enabled
141-
if cfg.Collector.Kubernetes.SyncToBackend {
176+
if cfg.Collector.Kubernetes.SyncToBackend && cfg.Collector.Kubernetes.ClusterID != "" {
142177
syncInterval := cfg.Collector.Kubernetes.SyncInterval
143178
if syncInterval == 0 {
144179
syncInterval = 60 * time.Second

internal/collector/kubernetes/kubernetes.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,16 @@ func (k *KubernetesCollector) LastClusterState() *ClusterState {
306306
return k.lastState
307307
}
308308

309+
// ClusterName returns the detected or configured cluster name.
310+
func (k *KubernetesCollector) ClusterName() string {
311+
return k.cfg.ClusterName
312+
}
313+
314+
// ClusterProvider returns the detected or configured cluster provider.
315+
func (k *KubernetesCollector) ClusterProvider() string {
316+
return k.cfg.ClusterProvider
317+
}
318+
309319
// SetKubeletFetcher replaces the kubelet stats fetcher (used in tests).
310320
func (k *KubernetesCollector) SetKubeletFetcher(f KubeletStatsFetcher) {
311321
k.kubeletFetcher = f

pkg/api/kubernetes.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,23 @@ type RegisterClusterResponse struct {
2727
ClusterName string `json:"cluster_name"`
2828
}
2929

30+
// AgentRegisterClusterRequest is the payload for agent-driven cluster registration.
31+
// Uses API key auth and is safe to call on every restart (find-or-create semantics).
32+
type AgentRegisterClusterRequest struct {
33+
Name string `json:"name"`
34+
Provider string `json:"provider,omitempty"`
35+
Version string `json:"version,omitempty"`
36+
Region string `json:"region,omitempty"`
37+
Labels map[string]string `json:"labels,omitempty"`
38+
}
39+
40+
// AgentRegisterClusterResponse is returned by the agent-register endpoint.
41+
type AgentRegisterClusterResponse struct {
42+
ID string `json:"id"`
43+
Name string `json:"name"`
44+
IsNew bool `json:"isNew"`
45+
}
46+
3047
// SyncKubernetesState sends the full cluster state snapshot to the TFO backend.
3148
func (c *Client) SyncKubernetesState(ctx context.Context, clusterID string, payload interface{}) error {
3249
path := fmt.Sprintf("/monitoring/kubernetes/clusters/%s/sync", clusterID)
@@ -54,6 +71,23 @@ func (c *Client) RegisterCluster(ctx context.Context, req *RegisterClusterReques
5471
return &result, nil
5572
}
5673

74+
// AgentRegisterCluster auto-registers a Kubernetes cluster using API key auth.
75+
// Implements find-or-create — safe to call on every agent restart.
76+
func (c *Client) AgentRegisterCluster(ctx context.Context, req *AgentRegisterClusterRequest) (*AgentRegisterClusterResponse, error) {
77+
resp, err := c.Request(ctx, http.MethodPost, "/monitoring/kubernetes/clusters/agent-register", req)
78+
if err != nil {
79+
return nil, fmt.Errorf("agent register cluster: %w", err)
80+
}
81+
if !resp.IsSuccess() {
82+
return nil, fmt.Errorf("agent register cluster failed with status %d", resp.StatusCode)
83+
}
84+
var result AgentRegisterClusterResponse
85+
if err := resp.JSON(&result); err != nil {
86+
return nil, fmt.Errorf("parse agent register cluster response: %w", err)
87+
}
88+
return &result, nil
89+
}
90+
5791
// DeregisterCluster removes a Kubernetes cluster from the TFO backend.
5892
func (c *Client) DeregisterCluster(ctx context.Context, clusterName string) error {
5993
path := fmt.Sprintf("/monitoring/kubernetes/clusters/%s", clusterName)

0 commit comments

Comments
 (0)