-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathaimd.go
More file actions
277 lines (251 loc) · 10.1 KB
/
aimd.go
File metadata and controls
277 lines (251 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
package rate
import (
"fmt"
"math"
"time"
)
// AIMDTokenBucketLimiter wraps a TokenBucketLimiter to implement
// Additive Increase Multiplicative Decrease (AIMD) rate limiting.
// This provides a dynamic rate limiting strategy that gradually
// increases token rates during successful operations and quickly
// reduces rates when encountering failures or congestion. This is
// similar to the congestion control algorithm used in TCP.
type AIMDTokenBucketLimiter struct {
limiter *TokenBucketLimiter
rates atomicSliceFloat64 // Per-bucket rates in tokens per unit
rateMin float64 // Minimum rate (tokens per unit)
rateMax float64 // Maximum rate (tokens per unit)
rateAI float64 // Additive increase (tokens per unit)
rateMD float64 // Multiplicative decrease (multiplier)
rateUnit time.Duration // Time unit for rate calculations
}
// Compile-time assertion that AIMDTokenBucketLimiter implements Limiter
var _ Limiter = (*AIMDTokenBucketLimiter)(nil)
// NewAIMDTokenBucketLimiter creates a new AIMD token bucket limiter
// with the given parameters:
//
// - numBuckets: number of token buckets (automatically rounded up
// to the nearest power of two if not already a power of two, for
// efficient hashing)
// - burstCapacity: max number of tokens that can be consumed at
// once
// - rateMin: minimum token refill rate (must be positive and
// finite)
// - rateMax: maximum token refill rate (must be greater than or
// equal to rateMin)
// - rateInit: initial token refill rate (must be between rateMin
// and rateMax)
// - rateAdditiveIncrease: amount to increase rate by on success
// (typically a small value)
// - rateMultiplicativeDecrease: factor to decrease rate by on
// failure (typically a value like 2.0 meaning "divide by 2")
// - rateUnit: time unit for rate calculations (e.g., time.Second,
// must be positive)
//
// All rates are expressed in tokens per rateUnit.
//
// Input validation:
//
// - If rateInit is not a positive, finite number (e.g., negative,
// zero, NaN, or infinity), returns an error with message
// "refillRate must be a positive, finite number"
// - If rateUnit is not a positive duration, returns an error
// with message "refillRateUnit must represent a positive
// duration"
// - If the product of rateInit and rateUnit (in nanoseconds)
// exceeds maximum representable value, returns an error with
// message "refillRate per duration is too large"
// - If rateMin is not a positive, finite number (e.g., negative,
// zero, NaN, or infinity), returns an error with message
// "rateMin must be a positive, finite number"
// - If rateMax is not a positive, finite number (e.g., negative,
// zero, NaN, or infinity), returns an error with message
// "rateMax must be a positive, finite number"
// - If rateMin is greater than rateMax, returns an error with
// message "rateMin must be less than equal to rateMax"
// - If rateInit is not between rateMin and rateMax (inclusive),
// returns an error with message "rateInit must be a positive,
// finite number between rateMin and rateMax"
// - If rateAdditiveIncrease is not a positive, finite number
// (e.g., negative, zero, NaN, or infinity), returns an error
// with message "rateAdditiveIncrease must be a positive, finite
// number"
// - If rateMultiplicativeDecrease is not a finite number greater
// than or equal to 1.0 (e.g., NaN, infinity, or less than 1.0),
// returns an error with message "rateMultiplicativeDecrease must
// be a finite number greater than or equal to 1.0"
// - If the product of rateMin, rateMax, or rateAdditiveIncrease
// with rateUnit (in nanoseconds) exceeds maximum representable
// value, returns an error with message "[parameter] per duration
// is too large"
func NewAIMDTokenBucketLimiter(
numBuckets uint,
burstCapacity uint8,
rateMin float64,
rateMax float64,
rateInit float64,
rateAdditiveIncrease float64,
rateMultiplicativeDecrease float64,
rateUnit time.Duration,
) (*AIMDTokenBucketLimiter, error) {
limiter, err := NewTokenBucketLimiter(
numBuckets,
burstCapacity,
rateInit,
rateUnit,
)
if err != nil {
return nil, err
}
if math.IsNaN(rateMin) || math.IsInf(rateMin, 0) || rateMin <= 0 {
return nil, fmt.Errorf("rateMin must be a positive, finite number")
}
if math.IsNaN(rateMax) || math.IsInf(rateMax, 0) || rateMax <= 0 {
return nil, fmt.Errorf("rateMax must be a positive, finite number")
}
if rateMin > rateMax {
return nil, fmt.Errorf("rateMin must be less than equal to rateMax")
}
if math.IsNaN(rateInit) || math.IsInf(rateInit, 0) || rateInit < rateMin || rateInit > rateMax {
return nil, fmt.Errorf("rateInit must be a positive, finite number between rateMin and rateMax")
}
if math.IsNaN(rateAdditiveIncrease) || math.IsInf(rateAdditiveIncrease, 0) || rateAdditiveIncrease <= 0 {
return nil, fmt.Errorf("rateAdditiveIncrease must be a positive, finite number")
}
if math.IsNaN(rateMultiplicativeDecrease) || math.IsInf(rateMultiplicativeDecrease, 0) || rateMultiplicativeDecrease < 1.0 {
return nil, fmt.Errorf("rateMultiplicativeDecrease must be a finite number greater than or equal to 1.0")
}
rateParams := []struct {
value float64
name string
}{
{rateMin, "rateMin"},
{rateMax, "rateMax"},
{rateAdditiveIncrease, "rateAdditiveIncrease"},
}
rateUnitNanos := float64(rateUnit.Nanoseconds()) // validated by NewTokenBucketLimiter
for _, rateParam := range rateParams {
if rateUnitNanos > math.MaxFloat64/rateParam.value {
return nil, fmt.Errorf("%s per duration is too large", rateParam.name)
}
}
rates := newAtomicSliceFloat64(limiter.buckets.Len())
for i := range rates.Len() {
rates.Set(i, rateInit)
}
return &AIMDTokenBucketLimiter{
limiter: limiter,
rates: rates,
rateMin: rateMin,
rateMax: rateMax,
rateAI: rateAdditiveIncrease,
rateMD: rateMultiplicativeDecrease,
rateUnit: rateUnit,
}, nil
}
// TakeToken attempts to take a token for the given ID from the
// appropriate bucket. It returns true if a token was successfully
// taken, false otherwise. This method is thread-safe and can be
// called concurrently from multiple goroutines.
func (a *AIMDTokenBucketLimiter) TakeToken(id []byte) bool {
return a.TakeTokens(id, 1)
}
// TakeTokens attempts to take n tokens for the given ID. It returns
// true if all n tokens were successfully taken, false if the
// operation should be rate limited. This method is thread-safe and
// can be called concurrently from multiple goroutines. The operation
// is atomic: either all n tokens are taken, or none are taken.
func (a *AIMDTokenBucketLimiter) TakeTokens(id []byte, n uint8) bool {
now := nowfn()
index := a.limiter.index(id)
rate := a.rates.Get(index)
nano := nanoRate(a.rateUnit, rate)
return a.limiter.takeTokenInner(index, nano, now, n)
}
// CheckToken returns whether a token would be available for the given
// ID without actually taking it. This is useful for preemptively
// checking if an operation would be rate limited before attempting
// it. Returns true if a token would be available, false otherwise.
// This method is thread-safe and can be called concurrently from
// multiple goroutines.
func (a *AIMDTokenBucketLimiter) CheckToken(id []byte) bool {
return a.CheckTokens(id, 1)
}
// CheckTokens returns whether n tokens would be available for the
// given ID without actually taking them. This is useful for
// preemptively checking if an operation would be rate limited before
// attempting it. Returns true if all n tokens would be available,
// false otherwise. This method is thread-safe and can be called
// concurrently from multiple goroutines.
func (a *AIMDTokenBucketLimiter) CheckTokens(id []byte, n uint8) bool {
now := nowfn()
index := a.limiter.index(id)
rate := a.rates.Get(index)
nano := nanoRate(a.rateUnit, rate)
return a.limiter.checkInner(index, nano, now, n)
}
// IncreaseRate additively increases the rate for the bucket
// associated with the given ID. This implements the "additive
// increase" part of the AIMD algorithm, typically called on
// successful operations. The rate is increased by rateAI up to the
// maximum rate (rateMax). This method is thread-safe and uses atomic
// operations to ensure consistency.
//
// Returns the current rate (in tokens per rateUnit) for the bucket
// before the increase was applied.
func (a *AIMDTokenBucketLimiter) IncreaseRate(id []byte) float64 {
index := a.limiter.index(id)
for {
rate := a.rates.Get(index)
if rate == a.rateMax {
return rate
}
next := a.rateMax
if avail := a.rateMax - rate; a.rateAI < avail {
next = rate + a.rateAI
}
if rate == next {
return rate
}
if a.rates.CompareAndSwap(index, rate, next) {
return rate
}
}
}
// DecreaseRate multiplicatively decreases the rate for the bucket
// associated with the given ID. This implements the "multiplicative
// decrease" part of the AIMD algorithm, typically called when
// congestion or failures occur. The rate is decreased by dividing the
// distance from rateMin by rateMD, calculated as: rateMin +
// (currentRate - rateMin) / rateMD. This ensures more gradual
// decreases near the minimum rate. The rate will not go below the
// minimum rate (rateMin). This method is thread-safe and uses atomic
// operations to ensure consistency.
//
// Returns the current rate (in tokens per rateUnit) for the bucket
// before the decrease was applied.
func (a *AIMDTokenBucketLimiter) DecreaseRate(id []byte) float64 {
index := a.limiter.index(id)
for {
rate := a.rates.Get(index)
if rate == a.rateMin {
return rate
}
next := max(a.rateMin, a.rateMin+(rate-a.rateMin)/a.rateMD)
if rate == next {
return rate
}
if a.rates.CompareAndSwap(index, rate, next) {
return rate
}
}
}
// Rate returns the current token rate for the bucket associated with
// the given ID. The rate is expressed in tokens per rateUnit (e.g.,
// tokens per second if rateUnit is time.Second). This method is
// thread-safe and can be called concurrently from multiple
// goroutines.
func (a *AIMDTokenBucketLimiter) Rate(id []byte) float64 {
index := a.limiter.index(id)
return a.rates.Get(index)
}