diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java
index 2439bd76c..fa9edb429 100644
--- a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java
+++ b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java
@@ -24,7 +24,10 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
+import java.time.DateTimeException;
import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -94,6 +97,26 @@ public class FetcherBolt extends StatusEmitterBolt {
*/
public static final String FETCH_TIMEOUT_PARAM_KEY = "fetcher.thread.timeout";
+ /**
+ * Maximum delay in seconds the fetcher will wait when a server requests a back-off via the Retry-After
+ * HTTP response header. A value of {@code -1} (the default) means the delay requested by the
+ * server is honored as-is, with no upper bound.
+ */
+ public static final String MAX_RETRY_AFTER_PARAM_KEY = "fetcher.max.retry.after";
+
+ /** Name of the Retry-After HTTP header, lower-cased as stored by the protocol layer. */
+ private static final String RETRY_AFTER_KEY = "retry-after";
+
+ /**
+ * Formatter for the HTTP-date form of the Retry-After header, e.g. {@code Wed, 21 Oct 2015
+ * 07:28:00 GMT}. {@link DateTimeFormatter} is immutable and thread-safe, unlike {@code
+ * SimpleDateFormat}, which matters here as it is shared across fetcher threads.
+ */
+ private static final DateTimeFormatter HTTP_DATE_FORMATTER =
+ DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT)
+ .withZone(ZoneOffset.UTC);
+
/** Key name of the custom crawl delay for a queue that may be present in the metadata. */
private static final String CRAWL_DELAY_KEY_NAME = "crawl.delay";
@@ -233,10 +256,16 @@ public int getInProgressSize() {
return inProgress.get();
}
- public void finishFetchItem(FetchItem it, boolean asap) {
+ public void finishFetchItem(FetchItem it, boolean asap, long retryAfterMS) {
if (it != null) {
inProgress.decrementAndGet();
setNextFetchTime(System.currentTimeMillis(), asap);
+ if (retryAfterMS > 0) {
+ // the server asked us to back off: push the next fetch time for
+ // this queue further out if needed, but never bring it earlier
+ final long earliest = System.currentTimeMillis() + retryAfterMS;
+ nextFetchTime.getAndUpdate(prev -> Math.max(prev, earliest));
+ }
}
}
@@ -345,13 +374,13 @@ public synchronized boolean addFetchItem(URL u, String url, Tuple input) {
return added;
}
- public synchronized void finishFetchItem(FetchItem it, boolean asap) {
+ public synchronized void finishFetchItem(FetchItem it, boolean asap, long retryAfterMS) {
FetchItemQueue fiq = queues.get(it.queueId);
if (fiq == null) {
LOG.warn("Attempting to finish item from unknown queue: {}", it.queueId);
return;
}
- fiq.finishFetchItem(it, asap);
+ fiq.finishFetchItem(it, asap, retryAfterMS);
}
public synchronized FetchItemQueue getFetchItemQueue(String id, Metadata metadata) {
@@ -483,6 +512,41 @@ public synchronized FetchItem getFetchItem() {
}
}
+ /**
+ * Parses the value of a Retry-After
+ * HTTP response header into a delay expressed in milliseconds, relative to now. The value is
+ * either a number of seconds (e.g. {@code 120}) or an HTTP date (e.g. {@code Wed, 21 Oct 2015
+ * 07:28:00 GMT}).
+ *
+ * @return the delay in milliseconds, or {@code -1} if the header is absent, malformed or in the
+ * past.
+ */
+ static long parseRetryAfterDelay(String retryAfter) {
+ if (StringUtils.isBlank(retryAfter)) {
+ return -1;
+ }
+ retryAfter = retryAfter.trim();
+ // delay expressed as a number of seconds
+ if (retryAfter.matches("[0-9]+")) {
+ try {
+ return Long.parseLong(retryAfter) * 1000L;
+ } catch (NumberFormatException e) {
+ // value too large to fit in a long - ignore
+ return -1;
+ }
+ }
+ // delay expressed as an HTTP date
+ try {
+ Instant date = Instant.from(HTTP_DATE_FORMATTER.parse(retryAfter));
+ long delay = date.toEpochMilli() - System.currentTimeMillis();
+ return delay > 0 ? delay : -1;
+ } catch (DateTimeException e) {
+ LOG.debug("Invalid Retry-After header value: {}", retryAfter);
+ return -1;
+ }
+ }
+
/** This class picks items from queues and fetches the pages. */
private class FetcherThread extends Thread {
@@ -501,6 +565,9 @@ private class FetcherThread extends Thread {
/** Hard timeout in seconds for a single protocol fetch. -1 means disabled. */
private long fetchTimeout = -1;
+ /** Upper bound in ms for honoring the Retry-After header; -1 means no cap. */
+ private long maxRetryAfter = -1;
+
/**
* Single-thread executor used to run the protocol call so that it can be interrupted via
* {@link Future#cancel(boolean)} when the bolt-level timeout fires.
@@ -521,6 +588,8 @@ public FetcherThread(Config conf, int num) {
this.threadNum = num;
timeoutInQueues = ConfUtils.getLong(conf, QUEUED_TIMEOUT_PARAM_KEY, timeoutInQueues);
fetchTimeout = ConfUtils.getLong(conf, FETCH_TIMEOUT_PARAM_KEY, fetchTimeout);
+ long maxRetryAfterSecs = ConfUtils.getLong(conf, MAX_RETRY_AFTER_PARAM_KEY, -1L);
+ maxRetryAfter = maxRetryAfterSecs < 0 ? -1L : maxRetryAfterSecs * 1000L;
protocolMetadataPrefix =
ConfUtils.getString(
conf,
@@ -586,6 +655,9 @@ public void run() {
boolean asap = false;
+ // delay in ms requested by the server via the Retry-After header; -1 if none
+ long retryAfter = -1;
+
try {
URL url = URLUtil.toURL(fit.url);
Protocol protocol = protocolFactory.getProtocol(url);
@@ -783,6 +855,21 @@ public void run() {
response.getStatusCode(),
timeFetching);
+ // honour a Retry-After response header (e.g. on 429 or 503) by
+ // delaying the next fetch from this queue accordingly
+ // https://github.com/apache/stormcrawler/issues/784
+ retryAfter =
+ parseRetryAfterDelay(
+ response.getMetadata().getFirstValue(RETRY_AFTER_KEY));
+ if (retryAfter > 0 && maxRetryAfter >= 0 && retryAfter > maxRetryAfter) {
+ LOG.debug(
+ "Capping Retry-After for {} from {} to {} ms",
+ fit.url,
+ retryAfter,
+ maxRetryAfter);
+ retryAfter = maxRetryAfter;
+ }
+
// merges the original MD and the ones returned by the
// protocol
Metadata mergedMetadata = new Metadata();
@@ -885,7 +972,7 @@ public void run() {
eventCounter.scope("exception").incrBy(1);
} finally {
- fetchQueues.finishFetchItem(fit, asap);
+ fetchQueues.finishFetchItem(fit, asap, retryAfter);
activeThreads.decrementAndGet(); // count threads
// ack it whatever happens
collector.ack(fit.tuple);
@@ -1013,7 +1100,9 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
@Override
public void cleanup() {
super.cleanup();
- protocolFactory.cleanup();
+ if (protocolFactory != null) {
+ protocolFactory.cleanup();
+ }
}
@Override
diff --git a/core/src/main/resources/crawler-default.yaml b/core/src/main/resources/crawler-default.yaml
index d024ed941..b86527a44 100644
--- a/core/src/main/resources/crawler-default.yaml
+++ b/core/src/main/resources/crawler-default.yaml
@@ -52,6 +52,13 @@ config:
# use the delay specified in the robots.txt
fetcher.server.delay.force: false
+ # max. delay in seconds honored when a server requests a back-off via the
+ # Retry-After HTTP response header (e.g. on a 429 or 503). The next fetch from
+ # the affected internal queue is delayed accordingly.
+ # (-1) honor the delay requested by the server as-is, with no upper bound
+ # (>=0) cap the honored delay to this many seconds
+ fetcher.max.retry.after: -1
+
# time bucket to use for the metrics sent by the Fetcher
fetcher.metrics.time.bucket.secs: 10
diff --git a/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java b/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java
index 8019bcf34..0bf5b745f 100644
--- a/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java
+++ b/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java
@@ -17,12 +17,113 @@
package org.apache.stormcrawler.bolt;
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.findAll;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
+import com.github.tomakehurst.wiremock.junit5.WireMockTest;
+import com.github.tomakehurst.wiremock.verification.LoggedRequest;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
+import org.apache.stormcrawler.TestOutputCollector;
+import org.apache.stormcrawler.TestUtil;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+@WireMockTest
public class FetcherBoltTest extends AbstractFetcherBoltTest {
+ private static final DateTimeFormatter HTTP_DATE =
+ DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT)
+ .withZone(ZoneOffset.UTC);
+
@BeforeEach
void setUpContext() throws Exception {
bolt = new FetcherBolt();
}
+
+ @Test
+ void parsesDelayInSeconds() {
+ Assertions.assertEquals(120_000L, FetcherBolt.parseRetryAfterDelay("120"));
+ // surrounding whitespace is tolerated
+ Assertions.assertEquals(90_000L, FetcherBolt.parseRetryAfterDelay(" 90 "));
+ Assertions.assertEquals(0L, FetcherBolt.parseRetryAfterDelay("0"));
+ }
+
+ @Test
+ void parsesHttpDate() {
+ String future = HTTP_DATE.format(Instant.now().plusSeconds(120));
+ long delay = FetcherBolt.parseRetryAfterDelay(future);
+ // allow some slack for the clock ticking between formatting and parsing
+ Assertions.assertTrue(delay > 100_000L && delay <= 120_000L, "unexpected delay: " + delay);
+ }
+
+ @Test
+ void returnsMinusOneForInvalidOrAbsentValues() {
+ Assertions.assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(null));
+ Assertions.assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(""));
+ Assertions.assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("not-a-date"));
+ // a date in the past is ignored
+ String past = HTTP_DATE.format(Instant.now().minusSeconds(120));
+ Assertions.assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(past));
+ }
+
+ @Test
+ void retryAfterDelaysNextFetchFromSameQueue(WireMockRuntimeInfo wmRuntimeInfo) {
+ // first URL of the host asks for a 3s back-off, the second is fine
+ stubFor(
+ get(urlEqualTo("/a"))
+ .willReturn(aResponse().withStatus(503).withHeader("Retry-After", "3")));
+ stubFor(get(urlEqualTo("/b")).willReturn(aResponse().withStatus(200).withBody("ok")));
+
+ TestOutputCollector output = new TestOutputCollector();
+ Map config = new HashMap<>();
+ config.put("http.agent.name", "this_is_only_a_test");
+ // keep the regular politeness delay low so only the Retry-After can
+ // explain a multi-second gap between the two fetches
+ config.put("fetcher.server.delay", 0.1f);
+ bolt.prepare(config, TestUtil.getMockedTopologyContext(), new OutputCollector(output));
+
+ String base = "http://localhost:" + wmRuntimeInfo.getHttpPort();
+ bolt.execute(tupleForUrl(base + "/a"));
+ bolt.execute(tupleForUrl(base + "/b"));
+
+ // both URLs are fetched and acked
+ await().atMost(30, TimeUnit.SECONDS).until(() -> output.getAckedTuples().size() == 2);
+
+ List a = findAll(getRequestedFor(urlEqualTo("/a")));
+ List b = findAll(getRequestedFor(urlEqualTo("/b")));
+ Assertions.assertEquals(1, a.size());
+ Assertions.assertEquals(1, b.size());
+
+ long gap = b.get(0).getLoggedDate().getTime() - a.get(0).getLoggedDate().getTime();
+ // the second fetch must have waited roughly the requested 3 seconds
+ Assertions.assertTrue(
+ gap >= 2_500L,
+ "second fetch happened only " + gap + "ms after the first, Retry-After ignored");
+ }
+
+ private static Tuple tupleForUrl(String url) {
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getSourceComponent()).thenReturn("source");
+ when(tuple.getStringByField("url")).thenReturn(url);
+ when(tuple.getValueByField("metadata")).thenReturn(null);
+ return tuple;
+ }
}