Skip to content

Commit 894e4cf

Browse files
Haoning-Sunhaoning.sun
authored andcommitted
Flush the data before updating the report
1 parent 562131b commit 894e4cf

7 files changed

Lines changed: 52 additions & 19 deletions

File tree

cli_flags.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@ import (
1717

1818
const (
1919
// Default values for CLI flags
20-
defaultArgSamplesPerSecond = 20
21-
defaultArgReporterInterval = 5.0 * time.Second
22-
defaultArgMonitorInterval = 5.0 * time.Second
23-
defaultClockSyncInterval = 3 * time.Minute
24-
defaultProbabilisticThreshold = tracer.ProbabilisticThresholdMax
25-
defaultProbabilisticInterval = 1 * time.Minute
26-
defaultArgSendErrorFrames = false
27-
defaultOffCPUThreshold = 0
28-
defaultEnvVarsValue = ""
20+
defaultArgSamplesPerSecond = 20
21+
defaultArgMaxSamplesPerSecond = 0 // 0 means disabled
22+
defaultArgReporterInterval = 5.0 * time.Second
23+
defaultArgMonitorInterval = 5.0 * time.Second
24+
defaultClockSyncInterval = 3 * time.Minute
25+
defaultProbabilisticThreshold = tracer.ProbabilisticThresholdMax
26+
defaultProbabilisticInterval = 1 * time.Minute
27+
defaultArgSendErrorFrames = false
28+
defaultOffCPUThreshold = 0
29+
defaultEnvVarsValue = ""
2930

3031
// This is the X in 2^(n + x) where n is the default hardcoded map size value
3132
defaultArgMapScaleFactor = 0

internal/controller/controller.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,15 +168,14 @@ func (c *Controller) UpdateSamplingFrequency(newSamplesPerSecond int) error {
168168
return fmt.Errorf("tracer is not initialized")
169169
}
170170

171-
// Update the tracer's sampling frequency
172-
if err := c.tracer.UpdateSamplingFrequency(newSamplesPerSecond); err != nil {
173-
return fmt.Errorf("failed to update tracer sampling frequency: %w", err)
171+
if c.reporter != nil {
172+
if err := c.reporter.UpdateSamplingFrequency(newSamplesPerSecond); err != nil {
173+
log.Warnf("Failed to update reporter sampling frequency: %v", err)
174+
}
174175
}
175176

176-
// Update the reporter's sampling frequency for profile metadata
177-
if c.reporter != nil {
178-
c.reporter.UpdateSamplingFrequency(newSamplesPerSecond)
179-
log.Infof("Updated reporter sampling frequency to %d Hz", newSamplesPerSecond)
177+
if err := c.tracer.UpdateSamplingFrequency(newSamplesPerSecond); err != nil {
178+
return fmt.Errorf("failed to update tracer sampling frequency: %w", err)
180179
}
181180

182181
return nil

reporter/base_reporter.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@ func (b *baseReporter) Stop() {
4141
b.runLoop.Stop()
4242
}
4343

44-
func (b *baseReporter) UpdateSamplingFrequency(samplesPerSecond int) {
44+
// UpdateSamplingFrequency updates the sampling frequency used in profile generation.
45+
func (b *baseReporter) UpdateSamplingFrequency(samplesPerSecond int) error {
4546
b.pdata.UpdateSamplingFrequency(samplesPerSecond)
47+
return nil
4648
}
4749

4850
func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceEventMeta) error {

reporter/collector_reporter.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter"
55

66
import (
77
"context"
8+
"fmt"
89

910
log "github.com/sirupsen/logrus"
1011
"go.opentelemetry.io/collector/consumer/xconsumer"
@@ -97,3 +98,16 @@ func (r *CollectorReporter) reportProfile(ctx context.Context) error {
9798

9899
return r.nextConsumer.ConsumeProfiles(ctx, profiles)
99100
}
101+
102+
// Flush immediately reports all currently accumulated trace events.
103+
func (r *CollectorReporter) Flush(ctx context.Context) error {
104+
return r.reportProfile(ctx)
105+
}
106+
107+
// UpdateSamplingFrequency updates the sampling frequency used in profile generation.
108+
func (r *CollectorReporter) UpdateSamplingFrequency(samplesPerSecond int) error {
109+
if err := r.Flush(context.Background()); err != nil {
110+
return fmt.Errorf("failed to flush reporter before updating sampling frequency: %w", err)
111+
}
112+
return r.baseReporter.UpdateSamplingFrequency(samplesPerSecond)
113+
}

reporter/iface.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ type Reporter interface {
2626
Stop()
2727

2828
// UpdateSamplingFrequency updates the sampling frequency used in profile generation.
29-
UpdateSamplingFrequency(samplesPerSecond int)
29+
UpdateSamplingFrequency(samplesPerSecond int) error
30+
31+
// Flush immediately reports all currently accumulated trace events.
32+
Flush(ctx context.Context) error
3033
}
3134

3235
type TraceReporter interface {

reporter/otlp_reporter.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter"
66
import (
77
"context"
88
"crypto/tls"
9+
"fmt"
910
"time"
1011

1112
log "github.com/sirupsen/logrus"
@@ -134,6 +135,20 @@ func (r *OTLPReporter) reportOTLPProfile(ctx context.Context) error {
134135
return err
135136
}
136137

138+
// Flush immediately reports all currently accumulated trace events.
139+
func (r *OTLPReporter) Flush(ctx context.Context) error {
140+
return r.reportOTLPProfile(ctx)
141+
}
142+
143+
// UpdateSamplingFrequency updates the sampling frequency used in profile generation.
144+
// It first flushes accumulated data to ensure data isolation, then updates the frequency.
145+
func (r *OTLPReporter) UpdateSamplingFrequency(samplesPerSecond int) error {
146+
if err := r.Flush(context.Background()); err != nil {
147+
return fmt.Errorf("failed to flush reporter before updating sampling frequency: %w", err)
148+
}
149+
return r.baseReporter.UpdateSamplingFrequency(samplesPerSecond)
150+
}
151+
137152
// waitGrpcEndpoint waits until the gRPC connection is established.
138153
func waitGrpcEndpoint(ctx context.Context, cfg *Config) (*grpc.ClientConn, error) {
139154
// Sleep with a fixed backoff time added of +/- 20% jitter

tracer/events.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ func (t *Tracer) startTraceEventMonitor(ctx context.Context,
140140
if t.maxSamplesPerSecond > t.samplesPerSecond {
141141
effectiveMaxSPS = t.maxSamplesPerSecond
142142
}
143-
}
144143
eventReader, err := perf.NewReader(eventsMap,
145144
effectiveMaxSPS*support.Sizeof_Trace)
146145
if err != nil {

0 commit comments

Comments
 (0)