@@ -3,6 +3,7 @@ package bwlimit
33import (
44 "bytes"
55 "io"
6+ "sync"
67 "testing"
78 "testing/synctest"
89 "time"
@@ -59,3 +60,77 @@ func TestTicker_Stop_unblocksLimitedOperation(t *testing.T) {
5960 }
6061 })
6162}
63+
64+ func TestTicker_ConcurrentStopStress (t * testing.T ) {
65+ const (
66+ iterations = 25
67+ limiters = 16
68+ payload = 64 * 1024
69+ )
70+
71+ for iter := 0 ; iter < iterations ; iter ++ {
72+ synctest .Test (t , func (t * testing.T ) {
73+ ticker := NewTicker ()
74+ ls := make ([]* Limiter , limiters )
75+ ioDone := make (chan struct {}, limiters * 2 )
76+
77+ for i := 0 ; i < limiters ; i ++ {
78+ l := ticker .NewLimiter (1000 , 1000 )
79+ ls [i ] = l
80+
81+ go func (l * Limiter ) {
82+ _ , _ = l .Reads .io (bytes .NewReader (make ([]byte , payload )).Read , make ([]byte , payload ))
83+ ioDone <- struct {}{}
84+ }(l )
85+
86+ go func (l * Limiter ) {
87+ _ , _ = l .Writes .io (io .Discard .Write , make ([]byte , payload ))
88+ ioDone <- struct {}{}
89+ }(l )
90+ }
91+
92+ // Ensure operations have entered limited mode.
93+ <- ticker .WaitCh ()
94+
95+ var stopWG sync.WaitGroup
96+ stopWG .Add (limiters + 1 )
97+
98+ for i , l := range ls {
99+ l := l
100+ go func () {
101+ defer stopWG .Done ()
102+ if i % 2 == 0 {
103+ <- ticker .WaitCh ()
104+ }
105+ l .Stop ()
106+ }()
107+ }
108+
109+ go func () {
110+ defer stopWG .Done ()
111+ <- ticker .WaitCh ()
112+ ticker .Stop ()
113+ }()
114+
115+ stopped := make (chan struct {})
116+ go func () {
117+ stopWG .Wait ()
118+ close (stopped )
119+ }()
120+
121+ select {
122+ case <- stopped :
123+ case <- time .After (10 * time .Second ):
124+ t .Fatal ("timeout waiting for concurrent limiter/ticker Stop calls" )
125+ }
126+
127+ for i := 0 ; i < limiters * 2 ; i ++ {
128+ select {
129+ case <- ioDone :
130+ case <- time .After (10 * time .Second ):
131+ t .Fatal ("timeout waiting for active IO goroutine to return" )
132+ }
133+ }
134+ })
135+ }
136+ }
0 commit comments