diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java index 2439bd76c..fa9edb429 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java @@ -24,7 +24,10 @@ import java.net.MalformedURLException; import java.net.URL; import java.net.UnknownHostException; +import java.time.DateTimeException; import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -94,6 +97,26 @@ public class FetcherBolt extends StatusEmitterBolt { */ public static final String FETCH_TIMEOUT_PARAM_KEY = "fetcher.thread.timeout"; + /** + * Maximum delay in seconds the fetcher will wait when a server requests a back-off via the Retry-After + * HTTP response header. A value of {@code -1} (the default) means the delay requested by the + * server is honored as-is, with no upper bound. + */ + public static final String MAX_RETRY_AFTER_PARAM_KEY = "fetcher.max.retry.after"; + + /** Name of the Retry-After HTTP header, lower-cased as stored by the protocol layer. */ + private static final String RETRY_AFTER_KEY = "retry-after"; + + /** + * Formatter for the HTTP-date form of the Retry-After header, e.g. {@code Wed, 21 Oct 2015 + * 07:28:00 GMT}. {@link DateTimeFormatter} is immutable and thread-safe, unlike {@code + * SimpleDateFormat}, which matters here as it is shared across fetcher threads. + */ + private static final DateTimeFormatter HTTP_DATE_FORMATTER = + DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT) + .withZone(ZoneOffset.UTC); + /** Key name of the custom crawl delay for a queue that may be present in the metadata. */ private static final String CRAWL_DELAY_KEY_NAME = "crawl.delay"; @@ -233,10 +256,16 @@ public int getInProgressSize() { return inProgress.get(); } - public void finishFetchItem(FetchItem it, boolean asap) { + public void finishFetchItem(FetchItem it, boolean asap, long retryAfterMS) { if (it != null) { inProgress.decrementAndGet(); setNextFetchTime(System.currentTimeMillis(), asap); + if (retryAfterMS > 0) { + // the server asked us to back off: push the next fetch time for + // this queue further out if needed, but never bring it earlier + final long earliest = System.currentTimeMillis() + retryAfterMS; + nextFetchTime.getAndUpdate(prev -> Math.max(prev, earliest)); + } } } @@ -345,13 +374,13 @@ public synchronized boolean addFetchItem(URL u, String url, Tuple input) { return added; } - public synchronized void finishFetchItem(FetchItem it, boolean asap) { + public synchronized void finishFetchItem(FetchItem it, boolean asap, long retryAfterMS) { FetchItemQueue fiq = queues.get(it.queueId); if (fiq == null) { LOG.warn("Attempting to finish item from unknown queue: {}", it.queueId); return; } - fiq.finishFetchItem(it, asap); + fiq.finishFetchItem(it, asap, retryAfterMS); } public synchronized FetchItemQueue getFetchItemQueue(String id, Metadata metadata) { @@ -483,6 +512,41 @@ public synchronized FetchItem getFetchItem() { } } + /** + * Parses the value of a Retry-After + * HTTP response header into a delay expressed in milliseconds, relative to now. The value is + * either a number of seconds (e.g. {@code 120}) or an HTTP date (e.g. {@code Wed, 21 Oct 2015 + * 07:28:00 GMT}). + * + * @return the delay in milliseconds, or {@code -1} if the header is absent, malformed or in the + * past. + */ + static long parseRetryAfterDelay(String retryAfter) { + if (StringUtils.isBlank(retryAfter)) { + return -1; + } + retryAfter = retryAfter.trim(); + // delay expressed as a number of seconds + if (retryAfter.matches("[0-9]+")) { + try { + return Long.parseLong(retryAfter) * 1000L; + } catch (NumberFormatException e) { + // value too large to fit in a long - ignore + return -1; + } + } + // delay expressed as an HTTP date + try { + Instant date = Instant.from(HTTP_DATE_FORMATTER.parse(retryAfter)); + long delay = date.toEpochMilli() - System.currentTimeMillis(); + return delay > 0 ? delay : -1; + } catch (DateTimeException e) { + LOG.debug("Invalid Retry-After header value: {}", retryAfter); + return -1; + } + } + /** This class picks items from queues and fetches the pages. */ private class FetcherThread extends Thread { @@ -501,6 +565,9 @@ private class FetcherThread extends Thread { /** Hard timeout in seconds for a single protocol fetch. -1 means disabled. */ private long fetchTimeout = -1; + /** Upper bound in ms for honoring the Retry-After header; -1 means no cap. */ + private long maxRetryAfter = -1; + /** * Single-thread executor used to run the protocol call so that it can be interrupted via * {@link Future#cancel(boolean)} when the bolt-level timeout fires. @@ -521,6 +588,8 @@ public FetcherThread(Config conf, int num) { this.threadNum = num; timeoutInQueues = ConfUtils.getLong(conf, QUEUED_TIMEOUT_PARAM_KEY, timeoutInQueues); fetchTimeout = ConfUtils.getLong(conf, FETCH_TIMEOUT_PARAM_KEY, fetchTimeout); + long maxRetryAfterSecs = ConfUtils.getLong(conf, MAX_RETRY_AFTER_PARAM_KEY, -1L); + maxRetryAfter = maxRetryAfterSecs < 0 ? -1L : maxRetryAfterSecs * 1000L; protocolMetadataPrefix = ConfUtils.getString( conf, @@ -586,6 +655,9 @@ public void run() { boolean asap = false; + // delay in ms requested by the server via the Retry-After header; -1 if none + long retryAfter = -1; + try { URL url = URLUtil.toURL(fit.url); Protocol protocol = protocolFactory.getProtocol(url); @@ -783,6 +855,21 @@ public void run() { response.getStatusCode(), timeFetching); + // honour a Retry-After response header (e.g. on 429 or 503) by + // delaying the next fetch from this queue accordingly + // https://github.com/apache/stormcrawler/issues/784 + retryAfter = + parseRetryAfterDelay( + response.getMetadata().getFirstValue(RETRY_AFTER_KEY)); + if (retryAfter > 0 && maxRetryAfter >= 0 && retryAfter > maxRetryAfter) { + LOG.debug( + "Capping Retry-After for {} from {} to {} ms", + fit.url, + retryAfter, + maxRetryAfter); + retryAfter = maxRetryAfter; + } + // merges the original MD and the ones returned by the // protocol Metadata mergedMetadata = new Metadata(); @@ -885,7 +972,7 @@ public void run() { eventCounter.scope("exception").incrBy(1); } finally { - fetchQueues.finishFetchItem(fit, asap); + fetchQueues.finishFetchItem(fit, asap, retryAfter); activeThreads.decrementAndGet(); // count threads // ack it whatever happens collector.ack(fit.tuple); @@ -1013,7 +1100,9 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { @Override public void cleanup() { super.cleanup(); - protocolFactory.cleanup(); + if (protocolFactory != null) { + protocolFactory.cleanup(); + } } @Override diff --git a/core/src/main/resources/crawler-default.yaml b/core/src/main/resources/crawler-default.yaml index d024ed941..b86527a44 100644 --- a/core/src/main/resources/crawler-default.yaml +++ b/core/src/main/resources/crawler-default.yaml @@ -52,6 +52,13 @@ config: # use the delay specified in the robots.txt fetcher.server.delay.force: false + # max. delay in seconds honored when a server requests a back-off via the + # Retry-After HTTP response header (e.g. on a 429 or 503). The next fetch from + # the affected internal queue is delayed accordingly. + # (-1) honor the delay requested by the server as-is, with no upper bound + # (>=0) cap the honored delay to this many seconds + fetcher.max.retry.after: -1 + # time bucket to use for the metrics sent by the Fetcher fetcher.metrics.time.bucket.secs: 10 diff --git a/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java b/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java index 8019bcf34..0bf5b745f 100644 --- a/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java +++ b/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java @@ -17,12 +17,113 @@ package org.apache.stormcrawler.bolt; +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.findAll; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import com.github.tomakehurst.wiremock.junit5.WireMockTest; +import com.github.tomakehurst.wiremock.verification.LoggedRequest; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.TestOutputCollector; +import org.apache.stormcrawler.TestUtil; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +@WireMockTest public class FetcherBoltTest extends AbstractFetcherBoltTest { + private static final DateTimeFormatter HTTP_DATE = + DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT) + .withZone(ZoneOffset.UTC); + @BeforeEach void setUpContext() throws Exception { bolt = new FetcherBolt(); } + + @Test + void parsesDelayInSeconds() { + Assertions.assertEquals(120_000L, FetcherBolt.parseRetryAfterDelay("120")); + // surrounding whitespace is tolerated + Assertions.assertEquals(90_000L, FetcherBolt.parseRetryAfterDelay(" 90 ")); + Assertions.assertEquals(0L, FetcherBolt.parseRetryAfterDelay("0")); + } + + @Test + void parsesHttpDate() { + String future = HTTP_DATE.format(Instant.now().plusSeconds(120)); + long delay = FetcherBolt.parseRetryAfterDelay(future); + // allow some slack for the clock ticking between formatting and parsing + Assertions.assertTrue(delay > 100_000L && delay <= 120_000L, "unexpected delay: " + delay); + } + + @Test + void returnsMinusOneForInvalidOrAbsentValues() { + Assertions.assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(null)); + Assertions.assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("")); + Assertions.assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("not-a-date")); + // a date in the past is ignored + String past = HTTP_DATE.format(Instant.now().minusSeconds(120)); + Assertions.assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(past)); + } + + @Test + void retryAfterDelaysNextFetchFromSameQueue(WireMockRuntimeInfo wmRuntimeInfo) { + // first URL of the host asks for a 3s back-off, the second is fine + stubFor( + get(urlEqualTo("/a")) + .willReturn(aResponse().withStatus(503).withHeader("Retry-After", "3"))); + stubFor(get(urlEqualTo("/b")).willReturn(aResponse().withStatus(200).withBody("ok"))); + + TestOutputCollector output = new TestOutputCollector(); + Map config = new HashMap<>(); + config.put("http.agent.name", "this_is_only_a_test"); + // keep the regular politeness delay low so only the Retry-After can + // explain a multi-second gap between the two fetches + config.put("fetcher.server.delay", 0.1f); + bolt.prepare(config, TestUtil.getMockedTopologyContext(), new OutputCollector(output)); + + String base = "http://localhost:" + wmRuntimeInfo.getHttpPort(); + bolt.execute(tupleForUrl(base + "/a")); + bolt.execute(tupleForUrl(base + "/b")); + + // both URLs are fetched and acked + await().atMost(30, TimeUnit.SECONDS).until(() -> output.getAckedTuples().size() == 2); + + List a = findAll(getRequestedFor(urlEqualTo("/a"))); + List b = findAll(getRequestedFor(urlEqualTo("/b"))); + Assertions.assertEquals(1, a.size()); + Assertions.assertEquals(1, b.size()); + + long gap = b.get(0).getLoggedDate().getTime() - a.get(0).getLoggedDate().getTime(); + // the second fetch must have waited roughly the requested 3 seconds + Assertions.assertTrue( + gap >= 2_500L, + "second fetch happened only " + gap + "ms after the first, Retry-After ignored"); + } + + private static Tuple tupleForUrl(String url) { + Tuple tuple = mock(Tuple.class); + when(tuple.getSourceComponent()).thenReturn("source"); + when(tuple.getStringByField("url")).thenReturn(url); + when(tuple.getValueByField("metadata")).thenReturn(null); + return tuple; + } }