Skip to content

Commit cb0d776

Browse files
mmetcCopilot
andcommitted
apiserver/apic: migrate metrics lifecycle to context
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 83e94b1 commit cb0d776

4 files changed

Lines changed: 70 additions & 36 deletions

File tree

pkg/apiserver/apic.go

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ type apic struct {
6363
mu sync.Mutex
6464
pushTomb tomb.Tomb
6565
pullTomb tomb.Tomb
66-
metricsTomb tomb.Tomb
66+
metricsCancel context.CancelFunc
67+
metricsDone chan struct{}
6768
startup bool
6869
consoleConfig *csconfig.ConsoleConfig
6970
isPulling chan bool
@@ -195,7 +196,6 @@ func NewAPIC(ctx context.Context, config *csconfig.OnlineApiClientCfg, dbClient
195196
startup: true,
196197
pullTomb: tomb.Tomb{},
197198
pushTomb: tomb.Tomb{},
198-
metricsTomb: tomb.Tomb{},
199199
consoleConfig: consoleConfig,
200200
pullInterval: pullIntervalDefault,
201201
pullIntervalFirst: randomDuration(pullIntervalDefault, pullIntervalDelta),
@@ -295,7 +295,7 @@ func (a *apic) Push(ctx context.Context) error {
295295
select {
296296
case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
297297
a.pullTomb.Kill(nil)
298-
a.metricsTomb.Kill(nil)
298+
a.StopMetrics()
299299
log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
300300

301301
if len(cache) == 0 {
@@ -1073,18 +1073,70 @@ func (a *apic) Pull(ctx context.Context) error {
10731073
continue
10741074
}
10751075
case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
1076-
a.metricsTomb.Kill(nil)
1076+
a.StopMetrics()
10771077
a.pushTomb.Kill(nil)
10781078

10791079
return nil
10801080
}
10811081
}
10821082
}
10831083

1084+
func (a *apic) StartMetrics(ctx context.Context, sendUsageMetrics bool) {
1085+
a.mu.Lock()
1086+
if a.metricsCancel != nil {
1087+
a.mu.Unlock()
1088+
return
1089+
}
1090+
1091+
metricsCtx, cancel := context.WithCancel(ctx)
1092+
done := make(chan struct{})
1093+
a.metricsCancel = cancel
1094+
a.metricsDone = done
1095+
a.mu.Unlock()
1096+
1097+
go func() {
1098+
var wg sync.WaitGroup
1099+
1100+
wg.Add(1)
1101+
go func() {
1102+
defer wg.Done()
1103+
a.SendMetrics(metricsCtx, make(chan bool))
1104+
}()
1105+
1106+
if sendUsageMetrics {
1107+
wg.Add(1)
1108+
go func() {
1109+
defer wg.Done()
1110+
a.SendUsageMetrics(metricsCtx)
1111+
}()
1112+
}
1113+
1114+
wg.Wait()
1115+
close(done)
1116+
}()
1117+
}
1118+
1119+
func (a *apic) StopMetrics() {
1120+
a.mu.Lock()
1121+
cancel := a.metricsCancel
1122+
done := a.metricsDone
1123+
a.metricsCancel = nil
1124+
a.metricsDone = nil
1125+
a.mu.Unlock()
1126+
1127+
if cancel != nil {
1128+
cancel()
1129+
}
1130+
1131+
if done != nil {
1132+
<-done
1133+
}
1134+
}
1135+
10841136
func (a *apic) Shutdown() {
10851137
a.pushTomb.Kill(nil)
10861138
a.pullTomb.Kill(nil)
1087-
a.metricsTomb.Kill(nil)
1139+
a.StopMetrics()
10881140
}
10891141

10901142
func makeAddAndDeleteCounters() (map[string]map[string]int, map[string]map[string]int) {

pkg/apiserver/apic_metrics.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -291,13 +291,16 @@ func (a *apic) SendMetrics(ctx context.Context, stop chan bool) {
291291

292292
checkTicker := time.NewTicker(checkInt)
293293
metTicker := time.NewTicker(nextMetInt())
294+
defer checkTicker.Stop()
295+
defer metTicker.Stop()
294296

295297
for {
296298
select {
297299
case <-stop:
298-
checkTicker.Stop()
299-
metTicker.Stop()
300-
300+
return
301+
case <-ctx.Done():
302+
a.pullTomb.Kill(nil)
303+
a.pushTomb.Kill(nil)
301304
return
302305
case <-checkTicker.C:
303306
oldIDs := machineIDs
@@ -326,13 +329,6 @@ func (a *apic) SendMetrics(ctx context.Context, stop chan bool) {
326329
}
327330

328331
metTicker.Reset(nextMetInt())
329-
case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
330-
checkTicker.Stop()
331-
metTicker.Stop()
332-
a.pullTomb.Kill(nil)
333-
a.pushTomb.Kill(nil)
334-
335-
return
336332
}
337333
}
338334
}
@@ -345,8 +341,7 @@ func (a *apic) SendUsageMetrics(ctx context.Context) {
345341

346342
for {
347343
select {
348-
case <-a.metricsTomb.Dying():
349-
// The normal metrics routine also kills push/pull tombs, does that make sense ?
344+
case <-ctx.Done():
350345
ticker.Stop()
351346
return
352347
case <-ticker.C:

pkg/apiserver/apic_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,11 @@ func getAPIC(t *testing.T, ctx context.Context) *apic {
5757
return &apic{
5858
AlertsAddChan: make(chan []*models.Alert),
5959
// DecisionDeleteChan: make(chan []*models.Decision),
60-
dbClient: dbClient,
61-
mu: sync.Mutex{},
62-
startup: true,
63-
pullTomb: tomb.Tomb{},
64-
pushTomb: tomb.Tomb{},
65-
metricsTomb: tomb.Tomb{},
60+
dbClient: dbClient,
61+
mu: sync.Mutex{},
62+
startup: true,
63+
pullTomb: tomb.Tomb{},
64+
pushTomb: tomb.Tomb{},
6665
consoleConfig: &csconfig.ConsoleConfig{
6766
ShareManualDecisions: ptr.Of(false),
6867
ShareTaintedScenarios: ptr.Of(false),

pkg/apiserver/apiserver.go

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -302,19 +302,7 @@ func (s *APIServer) initAPIC(ctx context.Context) {
302302
}
303303
}
304304

305-
s.apic.metricsTomb.Go(func() error {
306-
defer trace.ReportPanic()
307-
s.apic.SendMetrics(ctx, make(chan bool))
308-
return nil
309-
})
310-
311-
if !s.cfg.DisableUsageMetricsExport {
312-
s.apic.metricsTomb.Go(func() error {
313-
defer trace.ReportPanic()
314-
s.apic.SendUsageMetrics(ctx)
315-
return nil
316-
})
317-
}
305+
s.apic.StartMetrics(ctx, !s.cfg.DisableUsageMetricsExport)
318306
}
319307

320308
func (s *APIServer) Run(ctx context.Context, apiReady chan bool) error {

0 commit comments

Comments
 (0)