Skip to content

Commit 461790c

Browse files
committed
avoid throughput quantized
1 parent 2c26590 commit 461790c

2 files changed

Lines changed: 43 additions & 11 deletions

File tree

operation.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,25 +73,25 @@ func (op *Operation) run(ch chan<- int64) {
7373

7474
if stopCh != nil {
7575
for {
76-
var limitCh chan<- int64
7776
var todo int64
78-
var batch int64
7977
if limit := op.Limit.Load(); limit > 0 {
8078
carry += limit
8179
todo = carry / secparts
8280
carry = carry % secparts
8381
todo += op.avail.Swap(0)
84-
if todo > 0 {
85-
limitCh = ch
86-
batch = min(batchsize, todo)
87-
}
8882
} else {
8983
carry = 0
9084
}
9185
waitCh := op.WaitCh()
9286

9387
partialsecond:
9488
for {
89+
var limitCh chan<- int64
90+
var batch int64
91+
if todo > 0 {
92+
limitCh = ch
93+
batch = min(batchsize, todo)
94+
}
9595
select {
9696
case <-stopCh:
9797
return
@@ -100,15 +100,15 @@ func (op *Operation) run(ch chan<- int64) {
100100
case limitCh <- batch:
101101
todo -= batch
102102
todo += op.avail.Swap(0)
103-
if todo < batch {
104-
<-waitCh
105-
break partialsecond
106-
}
107103
case <-waitCh:
108104
break partialsecond
109105
}
110106
}
111107

108+
// Preserve any budget left in this slice for the next one.
109+
if todo > 0 {
110+
op.avail.Add(todo)
111+
}
112112
count := op.count.Swap(0)
113113
op.Count.Add(count)
114114
counts[seccount] = count

operation_test.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func TestOperation_read_rate_low(t *testing.T) {
164164
t.Log(l.Reads.Limit.Load())
165165
t.Error(elapsed)
166166
}
167-
if rate < 900 || rate > 1000 {
167+
if rate < 900 || rate > 1010 {
168168
t.Error(rate)
169169
}
170170
})
@@ -239,6 +239,38 @@ func TestOperation_read_rate_high(t *testing.T) {
239239
})
240240
}
241241

242+
func TestOperation_read_rate_nonBatchMultiple(t *testing.T) {
243+
synctest.Test(t, func(t *testing.T) {
244+
const limit = 50000
245+
ticker := NewTicker()
246+
defer ticker.Stop()
247+
l := ticker.NewLimiter(limit)
248+
defer l.Stop()
249+
250+
r := bytes.NewReader(make([]byte, limit))
251+
buf := make([]byte, limit)
252+
253+
now := time.Now()
254+
n, err := l.Reads.io(r.Read, buf)
255+
elapsed := time.Since(now)
256+
<-l.WaitCh()
257+
synctest.Wait()
258+
259+
if n != limit {
260+
t.Fatal(n)
261+
}
262+
if err != nil {
263+
t.Fatal(err)
264+
}
265+
if elapsed < time.Millisecond*900 || elapsed > time.Millisecond*1300 {
266+
t.Fatal(elapsed)
267+
}
268+
if rate := l.Reads.Rate.Load(); rate < limit-(limit/10) {
269+
t.Fatalf("rate too low: got %d want at least %d", rate, limit-(limit/10))
270+
}
271+
})
272+
}
273+
242274
func TestOperation_write_rate(t *testing.T) {
243275
synctest.Test(t, func(t *testing.T) {
244276
ticker := NewTicker()

0 commit comments

Comments
 (0)