1+ /*
2+ * ====================================================================
3+ * Licensed to the Apache Software Foundation (ASF) under one
4+ * or more contributor license agreements. See the NOTICE file
5+ * distributed with this work for additional information
6+ * regarding copyright ownership. The ASF licenses this file
7+ * to you under the Apache License, Version 2.0 (the
8+ * "License"); you may not use this file except in compliance
9+ * with the License. You may obtain a copy of the License at
10+ *
11+ * http://www.apache.org/licenses/LICENSE-2.0
12+ *
13+ * Unless required by applicable law or agreed to in writing,
14+ * software distributed under the License is distributed on an
15+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+ * KIND, either express or implied. See the License for the
17+ * specific language governing permissions and limitations
18+ * under the License.
19+ * ====================================================================
20+ *
21+ * This software consists of voluntary contributions made by many
22+ * individuals on behalf of the Apache Software Foundation. For more
23+ * information on the Apache Software Foundation, please see
24+ * <http://www.apache.org/>.
25+ *
26+ */
27+ package org .apache .hc .core5 .pool ;
28+
29+ import java .util .ArrayList ;
30+ import java .util .List ;
31+ import java .util .SplittableRandom ;
32+ import java .util .concurrent .Future ;
33+ import java .util .concurrent .TimeUnit ;
34+ import java .util .stream .Stream ;
35+
36+ import org .apache .hc .core5 .io .CloseMode ;
37+ import org .apache .hc .core5 .util .TimeValue ;
38+ import org .apache .hc .core5 .util .Timeout ;
39+ import org .junit .jupiter .api .Assertions ;
40+ import org .junit .jupiter .params .ParameterizedTest ;
41+ import org .junit .jupiter .params .provider .Arguments ;
42+ import org .junit .jupiter .params .provider .MethodSource ;
43+
44+ final class TestConnPoolDeterministicFuzzer {
45+
46+ static Stream <Arguments > params () {
47+ final long [] seeds = new long []{1L , 2L , 3L , 4L };
48+ final List <Arguments > out = new ArrayList <>();
49+ for (final PoolConcurrencyPolicy policy : PoolConcurrencyPolicy .values ()) {
50+ for (final long seed : seeds ) {
51+ out .add (Arguments .of (policy , seed ));
52+ }
53+ }
54+ return out .stream ();
55+ }
56+
57+ @ ParameterizedTest
58+ @ MethodSource ("params" )
59+ void fuzzSingleThreaded (final PoolConcurrencyPolicy policy , final long seed ) throws Exception {
60+ final TestingClock clock = new TestingClock (0L );
61+
62+ final int defaultMaxPerRoute = 2 ;
63+ final int maxTotal = 4 ;
64+
65+ final ManagedConnPool <String , PoolTestSupport .DummyConn > pool =
66+ PoolTestSupport .createPool (policy , defaultMaxPerRoute , maxTotal , clock );
67+
68+ final Timeout requestTimeout = Timeout .of (0L , TimeUnit .MILLISECONDS );
69+
70+ final SplittableRandom rnd = new SplittableRandom (seed );
71+
72+ final List <PoolEntry <String , PoolTestSupport .DummyConn >> leased = new ArrayList <>();
73+ Future <PoolEntry <String , PoolTestSupport .DummyConn >> pending = null ;
74+
75+ try {
76+ final String [] routes = new String []{"r1" , "r2" , "r3" };
77+ final Object [] states = new Object []{null , "s1" };
78+
79+ final int steps = 10_000 ;
80+
81+ Assertions .assertTrue (pool .getMaxTotal () >= 0 );
82+ Assertions .assertTrue (pool .getDefaultMaxPerRoute () >= 0 );
83+ for (final String route : routes ) {
84+ Assertions .assertTrue (pool .getMaxPerRoute (route ) >= 0 );
85+ }
86+
87+ for (int i = 0 ; i < steps ; i ++) {
88+
89+ pending = drainPending (pending , leased );
90+
91+ final int op = rnd .nextInt (100 );
92+
93+ if (op < 45 ) {
94+ // LEASE
95+ final String route = routes [rnd .nextInt (routes .length )];
96+ final Object state = states [rnd .nextInt (states .length )];
97+
98+ if (pending == null ) {
99+ final Future <PoolEntry <String , PoolTestSupport .DummyConn >> f =
100+ pool .lease (route , state , requestTimeout , null );
101+ if (f .isDone ()) {
102+ final PoolEntry <String , PoolTestSupport .DummyConn > entry = getDone (f );
103+ if (entry != null ) {
104+ ensureConnection (entry );
105+ leased .add (entry );
106+ }
107+ } else {
108+ pending = f ;
109+ }
110+ } else {
111+ if (rnd .nextInt (8 ) == 0 ) {
112+ pending .cancel (true );
113+ pending = null ;
114+ validatePendingRequests (pool );
115+ }
116+ }
117+
118+ } else if (op < 75 ) {
119+ if (!leased .isEmpty ()) {
120+ final int idx = rnd .nextInt (leased .size ());
121+ final PoolEntry <String , PoolTestSupport .DummyConn > entry = leased .remove (idx );
122+ final boolean reusable = rnd .nextBoolean ();
123+ pool .release (entry , reusable );
124+ pending = drainPending (pending , leased );
125+ }
126+
127+ } else if (op < 85 ) {
128+ clock .advanceMillis (1L + rnd .nextInt (20 ));
129+
130+ } else if (op < 92 ) {
131+ pool .closeIdle (TimeValue .ofMilliseconds (1 ));
132+
133+ } else if (op < 97 ) {
134+ pool .closeExpired ();
135+
136+ } else {
137+ if (pending != null ) {
138+ pending .cancel (true );
139+ pending = null ;
140+ validatePendingRequests (pool );
141+ }
142+ }
143+
144+ // keep stats + invariants cost under control
145+ if ((i & 31 ) == 0 ) {
146+ validatePendingRequests (pool );
147+ pending = drainPending (pending , leased );
148+ assertCoreInvariants (policy , pool , leased , pending , routes );
149+ }
150+ }
151+
152+ // Cleanup
153+ if (pending != null ) {
154+ pending .cancel (true );
155+ pending = null ;
156+ }
157+ validatePendingRequests (pool );
158+
159+ while (!leased .isEmpty ()) {
160+ final PoolEntry <String , PoolTestSupport .DummyConn > entry = leased .remove (leased .size () - 1 );
161+ pool .release (entry , true );
162+ }
163+
164+ validatePendingRequests (pool );
165+ pending = drainPending (pending , leased );
166+ assertCoreInvariants (policy , pool , leased , pending , routes );
167+
168+ } finally {
169+ pool .close (CloseMode .IMMEDIATE );
170+ }
171+ }
172+
173+ private static void ensureConnection (final PoolEntry <String , PoolTestSupport .DummyConn > entry ) {
174+ if (!entry .hasConnection ()) {
175+ entry .assignConnection (new PoolTestSupport .DummyConn ());
176+ }
177+ }
178+
179+ private static PoolEntry <String , PoolTestSupport .DummyConn > getDone (
180+ final Future <PoolEntry <String , PoolTestSupport .DummyConn >> f ) throws Exception {
181+
182+ if (f .isCancelled ()) {
183+ return null ;
184+ }
185+ return f .get ();
186+ }
187+
188+ private static Future <PoolEntry <String , PoolTestSupport .DummyConn >> drainPending (
189+ final Future <PoolEntry <String , PoolTestSupport .DummyConn >> pending ,
190+ final List <PoolEntry <String , PoolTestSupport .DummyConn >> leased ) throws Exception {
191+
192+ if (pending != null && pending .isDone ()) {
193+ final PoolEntry <String , PoolTestSupport .DummyConn > entry = getDone (pending );
194+ if (entry != null ) {
195+ ensureConnection (entry );
196+ leased .add (entry );
197+ }
198+ return null ;
199+ }
200+ return pending ;
201+ }
202+
203+ private static void validatePendingRequests (final ManagedConnPool <String , PoolTestSupport .DummyConn > pool ) {
204+ if (pool instanceof StrictConnPool ) {
205+ ((StrictConnPool <?, ?>) pool ).validatePendingRequests ();
206+ } else if (pool instanceof LaxConnPool ) {
207+ ((LaxConnPool <?, ?>) pool ).validatePendingRequests ();
208+ }
209+ }
210+
211+ private static void assertCoreInvariants (
212+ final PoolConcurrencyPolicy policy ,
213+ final ManagedConnPool <String , PoolTestSupport .DummyConn > pool ,
214+ final List <PoolEntry <String , PoolTestSupport .DummyConn >> leased ,
215+ final Future <PoolEntry <String , PoolTestSupport .DummyConn >> pending ,
216+ final String [] routes ) {
217+
218+ final PoolStats totals = pool .getTotalStats ();
219+
220+ Assertions .assertTrue (pool .getMaxTotal () >= 0 );
221+ Assertions .assertTrue (pool .getDefaultMaxPerRoute () >= 0 );
222+
223+ Assertions .assertTrue (totals .getAvailable () >= 0 );
224+ Assertions .assertTrue (totals .getLeased () >= 0 );
225+ Assertions .assertTrue (totals .getPending () >= 0 );
226+ Assertions .assertTrue (totals .getMax () >= 0 );
227+
228+ final long allocated = (long ) totals .getAvailable () + (long ) totals .getLeased ();
229+ Assertions .assertTrue (allocated <= (long ) totals .getMax (), "allocated > max" );
230+
231+ if (policy != PoolConcurrencyPolicy .LAX ) {
232+ Assertions .assertTrue (totals .getLeased () <= pool .getMaxTotal (), "leased > max total" );
233+ }
234+
235+ Assertions .assertEquals (leased .size (), totals .getLeased (), "leased count mismatch" );
236+
237+ final int expectedPending = pending != null && !pending .isDone () && !pending .isCancelled () ? 1 : 0 ;
238+ Assertions .assertEquals (expectedPending , totals .getPending (), "pending count mismatch" );
239+
240+ if (policy != PoolConcurrencyPolicy .LAX ) {
241+ for (final String route : routes ) {
242+ final PoolStats routeStats = pool .getStats (route );
243+ Assertions .assertTrue (routeStats .getAvailable () >= 0 );
244+ Assertions .assertTrue (routeStats .getLeased () >= 0 );
245+ Assertions .assertTrue (routeStats .getPending () >= 0 );
246+ Assertions .assertTrue (routeStats .getMax () >= 0 );
247+ final long routeAllocated = (long ) routeStats .getAvailable () + (long ) routeStats .getLeased ();
248+ Assertions .assertTrue (routeAllocated <= (long ) routeStats .getMax (), "route allocated > max" );
249+ }
250+ }
251+ }
252+
253+ }
0 commit comments