-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathsampling_config.go
More file actions
222 lines (192 loc) · 6.62 KB
/
sampling_config.go
File metadata and controls
222 lines (192 loc) · 6.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package mtlog
import (
"sync/atomic"
"time"
"github.com/willibrandon/mtlog/core"
"github.com/willibrandon/mtlog/internal/filters"
)
// SamplingConfigBuilder provides a fluent interface for building complex sampling configurations.
type SamplingConfigBuilder struct {
filters []core.LogEventFilter
}
// Sampling creates a new sampling configuration builder.
func Sampling() *SamplingConfigBuilder {
return &SamplingConfigBuilder{
filters: make([]core.LogEventFilter, 0),
}
}
// validateBackoffFactor ensures the backoff factor is valid (> 1.0).
// Returns DefaultBackoffFactor if the provided factor is invalid.
func validateBackoffFactor(factor float64) float64 {
if factor <= 1.0 {
return filters.DefaultBackoffFactor
}
return factor
}
// Every samples every Nth message.
func (s *SamplingConfigBuilder) Every(n uint64) *SamplingConfigBuilder {
s.filters = append(s.filters, filters.NewCounterSamplingFilter(n))
return s
}
// Rate samples a percentage of messages (0.0 to 1.0).
func (s *SamplingConfigBuilder) Rate(rate float32) *SamplingConfigBuilder {
s.filters = append(s.filters, filters.NewRateSamplingFilter(rate))
return s
}
// Duration samples at most once per duration.
func (s *SamplingConfigBuilder) Duration(d time.Duration) *SamplingConfigBuilder {
s.filters = append(s.filters, filters.NewDurationSamplingFilter(d))
return s
}
// First logs only the first N occurrences.
func (s *SamplingConfigBuilder) First(n uint64) *SamplingConfigBuilder {
s.filters = append(s.filters, filters.NewFirstNSamplingFilter(n))
return s
}
// Group samples within a named group.
func (s *SamplingConfigBuilder) Group(name string, n uint64) *SamplingConfigBuilder {
s.filters = append(s.filters, filters.NewGroupSamplingFilter(name, n, globalSamplingGroupManager))
return s
}
// When samples conditionally based on a predicate.
func (s *SamplingConfigBuilder) When(predicate func() bool, n uint64) *SamplingConfigBuilder {
s.filters = append(s.filters, filters.NewConditionalSamplingFilter(predicate, n))
return s
}
// Backoff samples with exponential backoff.
func (s *SamplingConfigBuilder) Backoff(key string, factor float64) *SamplingConfigBuilder {
factor = validateBackoffFactor(factor)
s.filters = append(s.filters, filters.NewBackoffSamplingFilter(key, factor, globalBackoffState))
return s
}
// Build returns an Option that applies all the configured sampling filters in a pipeline.
//
// Pipeline Mode (Build):
// Each filter processes the output of the previous filter sequentially.
//
// Event → Every(2) → Rate(0.5) → First(10) → Output
// ↓ 50% ↓ 25% ↓ 10 max
//
// In this example:
// - Every(2): Passes 50% of events (every 2nd message)
// - Rate(0.5): Processes only the events that passed Every(2), passes 50% of those (25% total)
// - First(10): Processes only events that passed both previous filters, passes first 10
//
func (s *SamplingConfigBuilder) Build() Option {
return func(c *config) {
// Add all sampling filters to the configuration
c.filters = append(c.filters, s.filters...)
}
}
// AsOption is an alias for Build for convenience.
func (s *SamplingConfigBuilder) AsOption() Option {
return s.Build()
}
// CombineAND creates a composite filter that requires all conditions to pass.
//
// Composite AND Mode:
// All filters evaluate the same event independently. ALL must approve for the event to pass.
//
// Event → [Every(2)] ⎤
// → [Rate(0.5)] ⎬ → AND → Output (only if ALL approve)
// → [First(10)] ⎦
//
// In this example, an event passes only if:
// - Every(2) approves (even-numbered events), AND
// - Rate(0.5) approves (50% random chance), AND
// - First(10) approves (within first 10 evaluations)
//
func (s *SamplingConfigBuilder) CombineAND() Option {
return WithFilter(&compositeSamplingFilter{
filters: s.filters,
mode: compositeAND,
})
}
// CombineOR creates a composite filter that passes if any condition passes.
//
// Composite OR Mode:
// All filters evaluate the same event independently. ANY can approve for the event to pass.
//
// Event → [Every(2)] ⎤
// → [Rate(0.5)] ⎬ → OR → Output (if ANY approve)
// → [First(10)] ⎦
//
// In this example, an event passes if:
// - Every(2) approves (even-numbered events), OR
// - Rate(0.5) approves (50% random chance), OR
// - First(10) approves (within first 10 evaluations)
//
func (s *SamplingConfigBuilder) CombineOR() Option {
return WithFilter(&compositeSamplingFilter{
filters: s.filters,
mode: compositeOR,
})
}
// compositeSamplingFilter combines multiple filters with AND/OR logic.
type compositeSamplingFilter struct {
filters []core.LogEventFilter
mode compositeMode
}
type compositeMode int
const (
compositeAND compositeMode = iota
compositeOR
)
// IsEnabled implements core.LogEventFilter.
func (c *compositeSamplingFilter) IsEnabled(event *core.LogEvent) bool {
if len(c.filters) == 0 {
return true
}
switch c.mode {
case compositeAND:
// All filters must pass
for _, filter := range c.filters {
if filter != nil && !filter.IsEnabled(event) {
return false
}
}
return true
case compositeOR:
// Any filter can pass
for _, filter := range c.filters {
if filter != nil && filter.IsEnabled(event) {
return true
}
}
return false
default:
return true
}
}
// WithSamplingPolicy creates an Option from a custom SamplingPolicy.
func WithSamplingPolicy(policy core.SamplingPolicy) Option {
return WithFilter(&samplingPolicyFilter{policy: policy})
}
// samplingPolicyFilter adapts a SamplingPolicy to a LogEventFilter.
type samplingPolicyFilter struct {
policy core.SamplingPolicy
}
// IsEnabled implements core.LogEventFilter.
func (f *samplingPolicyFilter) IsEnabled(event *core.LogEvent) bool {
return f.policy.ShouldSample(event)
}
// samplingDebugEnabled controls whether sampling decisions are logged
var samplingDebugEnabled atomic.Bool
// EnableSamplingDebug enables logging of sampling decisions to selflog for debugging.
// This helps understand why certain events are being sampled or skipped.
// The debug output includes the template, the sampling decision, and the filter that made it.
func EnableSamplingDebug() {
samplingDebugEnabled.Store(true)
// Also enable it in the filters package
filters.SetSamplingDebugEnabled(true)
}
// DisableSamplingDebug disables logging of sampling decisions.
func DisableSamplingDebug() {
samplingDebugEnabled.Store(false)
// Also disable it in the filters package
filters.SetSamplingDebugEnabled(false)
}
// IsSamplingDebugEnabled returns whether sampling debug logging is enabled.
func IsSamplingDebugEnabled() bool {
return samplingDebugEnabled.Load()
}