Skip to content

Commit fee6364

Browse files
committed
CheckRequestAggregator: remove unused, leaking, out queue.
Check requests are already sent when doing a request. Even if it would work on App Engine (no background threads allowed), there is no point in sending check requests again at some later point. Check requests are still cached (and expired).
1 parent 86e0cbd commit fee6364

5 files changed

Lines changed: 59 additions & 294 deletions

File tree

endpoints-control/src/main/java/com/google/api/control/Client.java

Lines changed: 3 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
/*
22
* Copyright 2016 Google Inc. All Rights Reserved.
3+
* Copyright 2023 Uwe Trottmann
34
*
45
* Licensed under the Apache License, Version 2.0 (the "License");
56
* you may not use this file except in compliance with the License.
@@ -91,7 +92,7 @@ public Client(String serviceName, CheckAggregationOptions checkOptions,
9192
ServiceControl transport, ThreadFactory threads,
9293
SchedulerFactory schedulers, int statsLogFrequency, @Nullable Ticker ticker) {
9394
ticker = ticker == null ? Ticker.systemTicker() : ticker;
94-
this.checkAggregator = new CheckRequestAggregator(serviceName, checkOptions, null, ticker);
95+
this.checkAggregator = new CheckRequestAggregator(serviceName, checkOptions, ticker);
9596
this.reportAggregator = new ReportRequestAggregator(serviceName, reportOptions, null, ticker);
9697
this.quotaAggregator = new QuotaRequestAggregator(serviceName, quotaOptions, ticker);
9798
this.serviceName = serviceName;
@@ -134,6 +135,7 @@ public void run() {
134135
scheduleFlushes();
135136
}
136137
});
138+
// Note: this is not supported on App Engine Standard.
137139
schedulerThread.start();
138140
} catch (RuntimeException e) {
139141
log.atInfo().log(BACKGROUND_THREAD_ERROR);
@@ -305,7 +307,6 @@ private synchronized void initializeFlushing() {
305307
this.scheduler = schedulers.create(ticker);
306308
this.scheduler.setStatistics(statistics);
307309
log.atInfo().log("scheduling the initial check, report, and quota");
308-
flushAndScheduleChecks();
309310
flushAndScheduleReports();
310311
flushAndScheduleQuota();
311312
}
@@ -323,51 +324,6 @@ private synchronized boolean resetIfStopped() {
323324
return true;
324325
}
325326

326-
private void flushAndScheduleChecks() {
327-
if (resetIfStopped()) {
328-
log.atFine().log("did not schedule check flush: client is stopped");
329-
return;
330-
}
331-
int interval = checkAggregator.getFlushIntervalMillis();
332-
if (interval < 0) {
333-
log.atFine().log("did not schedule check flush: caching is disabled");
334-
return; // cache is disabled, so no flushing it
335-
}
336-
337-
if (isRunningSchedulerDirectly()) {
338-
log.atFine().log("did not schedule check flush: no scheduler thread is running");
339-
return;
340-
}
341-
342-
log.atFine().log("flushing the check aggregator");
343-
Stopwatch w = Stopwatch.createUnstarted(ticker);
344-
for (CheckRequest req : checkAggregator.flush()) {
345-
try {
346-
statistics.recachedChecks.incrementAndGet();
347-
w.reset().start();
348-
CheckResponse resp = transport.services().check(serviceName, req).execute();
349-
statistics.totalCheckTransportTimeMillis.addAndGet(w.elapsed(TimeUnit.MILLISECONDS));
350-
w.reset().start();
351-
checkAggregator.addResponse(req, resp);
352-
statistics.totalCheckCacheUpdateTimeMillis.addAndGet(w.elapsed(TimeUnit.MILLISECONDS));
353-
} catch (IOException e) {
354-
log.atSevere().withCause(e).log("direct send of a check request %s failed", req);
355-
}
356-
}
357-
// copy scheduler into a local variable to avoid data races beween this method and stop()
358-
Scheduler currentScheduler = scheduler;
359-
if (resetIfStopped()) {
360-
log.atFine().log("did not schedule succeeding check flush: client is stopped");
361-
return;
362-
}
363-
currentScheduler.enter(new Runnable() {
364-
@Override
365-
public void run() {
366-
flushAndScheduleChecks(); // Do this again after the interval
367-
}
368-
}, interval, 0 /* high priority */);
369-
}
370-
371327
private void flushAndScheduleReports() {
372328
if (resetIfStopped()) {
373329
log.atFine().log("did not schedule report flush: client is stopped");

endpoints-control/src/main/java/com/google/api/control/aggregator/CheckAggregationOptions.java

Lines changed: 13 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
/*
22
* Copyright 2016 Google Inc. All Rights Reserved.
3+
* Copyright 2023 Uwe Trottmann
34
*
45
* Licensed under the Apache License, Version 2.0 (the "License");
56
* you may not use this file except in compliance with the License.
@@ -20,10 +21,7 @@
2021
import com.google.common.base.Ticker;
2122
import com.google.common.cache.Cache;
2223
import com.google.common.cache.CacheBuilder;
23-
import com.google.common.cache.RemovalListener;
24-
import com.google.common.cache.RemovalNotification;
2524

26-
import java.util.concurrent.ConcurrentLinkedDeque;
2725
import java.util.concurrent.TimeUnit;
2826

2927
import javax.annotation.Nullable;
@@ -42,13 +40,7 @@ public class CheckAggregationOptions {
4240
*/
4341
public static final int DEFAULT_RESPONSE_EXPIRATION_MILLIS = 4000;
4442

45-
/**
46-
* The default flush cache entry interval.
47-
*/
48-
public static final int DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS = 2000;
49-
5043
private final int numEntries;
51-
private final int flushCacheEntryIntervalMillis;
5244
private final int expirationMillis;
5345

5446
/**
@@ -58,21 +50,13 @@ public class CheckAggregationOptions {
5850
* is the maximum number of cache entries that can be kept in the
5951
* aggregation cache. The cache is disabled if this value is
6052
* negative.
61-
* @param flushCacheEntryIntervalMillis
62-
* the maximum interval before an aggregated check request is
63-
* flushed to the server. The cache entry is deleted after the
64-
* flush
6553
* @param expirationMillis
6654
* is the maximum interval in milliseconds before a cached check
67-
* response is invalidated. This value should be greater than
68-
* {@code flushCacheEntryIntervalMillis}. If not, it is ignored,
69-
* and a value of {@code flushCacheEntryIntervalMillis} is used
70-
* instead.
55+
* response is invalidated.
7156
*/
72-
public CheckAggregationOptions(int numEntries, int flushCacheEntryIntervalMillis, int expirationMillis) {
57+
public CheckAggregationOptions(int numEntries, int expirationMillis) {
7358
this.numEntries = numEntries;
74-
this.flushCacheEntryIntervalMillis = flushCacheEntryIntervalMillis;
75-
this.expirationMillis = Math.max(expirationMillis, flushCacheEntryIntervalMillis + 1);
59+
this.expirationMillis = expirationMillis;
7660
}
7761

7862
/**
@@ -81,7 +65,7 @@ public CheckAggregationOptions(int numEntries, int flushCacheEntryIntervalMillis
8165
* Creates an instance initialized with the default values.
8266
*/
8367
public CheckAggregationOptions() {
84-
this(DEFAULT_NUM_ENTRIES, DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS, DEFAULT_RESPONSE_EXPIRATION_MILLIS);
68+
this(DEFAULT_NUM_ENTRIES, DEFAULT_RESPONSE_EXPIRATION_MILLIS);
8569
}
8670

8771
/**
@@ -92,18 +76,9 @@ public int getNumEntries() {
9276
return numEntries;
9377
}
9478

95-
/**
96-
* @return the maximum interval before aggregated report requests are
97-
* flushed to the server
98-
*/
99-
public int getFlushCacheEntryIntervalMillis() {
100-
return flushCacheEntryIntervalMillis;
101-
}
102-
10379
/**
10480
* @return the maximum interval before a cached check response should be
105-
* deleted. This value will not be greater than
106-
* {@link #getFlushCacheEntryIntervalMillis()}
81+
* deleted.
10782
*/
10883
public int getExpirationMillis() {
10984
return expirationMillis;
@@ -115,45 +90,29 @@ public int getExpirationMillis() {
11590
* @param <T>
11691
* the type of the instance being cached
11792
*
118-
* @param out
119-
* a concurrent {@code Deque} to which previously cached items
120-
* are added as they expire
12193
* @return a {@link Cache} corresponding to this instance's values or
12294
* {@code null} unless {@link #numEntries} is positive.
12395
*/
12496
@Nullable
125-
public <T> Cache<String, T> createCache(ConcurrentLinkedDeque<T> out) {
126-
return createCache(out, Ticker.systemTicker());
97+
public <T> Cache<String, T> createCache() {
98+
return createCache(Ticker.systemTicker());
12799
}
128100

129101
/**
130102
* Creates a {@link Cache} configured by this instance.
131103
*
132-
* @param <T>
133-
* the type of the value stored in the Cache
134-
* @param out
135-
* a concurrent {@code Deque} to which the cached values are
136-
* added as they are removed from the cache
137-
* @param ticker
138-
* the time source used to determine expiration
104+
* @param <T> the type of the value stored in the Cache
105+
* @param ticker the time source used to determine expiration
139106
* @return a {@link Cache} corresponding to this instance's values or
140-
* {@code null} unless {@code #numEntries} is positive.
107+
* {@code null} unless {@code #numEntries} is positive.
141108
*/
142109
@Nullable
143-
public <T> Cache<String, T> createCache(final ConcurrentLinkedDeque<T> out, Ticker ticker) {
144-
Preconditions.checkNotNull(out, "The out deque cannot be null");
110+
public <T> Cache<String, T> createCache(Ticker ticker) {
145111
Preconditions.checkNotNull(ticker, "The ticker cannot be null");
146112
if (numEntries <= 0) {
147113
return null;
148114
}
149-
final RemovalListener<String, T> listener = new RemovalListener<String, T>() {
150-
@Override
151-
public void onRemoval(RemovalNotification<String, T> notification) {
152-
out.addFirst(notification.getValue());
153-
}
154-
};
155-
CacheBuilder<String, T> b = CacheBuilder.newBuilder().maximumSize(numEntries).ticker(ticker)
156-
.removalListener(listener);
115+
CacheBuilder<Object, Object> b = CacheBuilder.newBuilder().maximumSize(numEntries).ticker(ticker);
157116
if (expirationMillis >= 0) {
158117
b.expireAfterWrite(expirationMillis, TimeUnit.MILLISECONDS);
159118
}

0 commit comments

Comments
 (0)