Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 94 additions & 5 deletions core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.time.DateTimeException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -94,6 +97,26 @@ public class FetcherBolt extends StatusEmitterBolt {
*/
public static final String FETCH_TIMEOUT_PARAM_KEY = "fetcher.thread.timeout";

/**
* Maximum delay in seconds the fetcher will wait when a server requests a back-off via the <a
* href="https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After">Retry-After</a>
* HTTP response header. A value of {@code -1} (the default) means the delay requested by the
* server is honored as-is, with no upper bound.
*/
public static final String MAX_RETRY_AFTER_PARAM_KEY = "fetcher.max.retry.after";

/** Name of the Retry-After HTTP header, lower-cased as stored by the protocol layer. */
private static final String RETRY_AFTER_KEY = "retry-after";

/**
* Formatter for the HTTP-date form of the Retry-After header, e.g. {@code Wed, 21 Oct 2015
* 07:28:00 GMT}. {@link DateTimeFormatter} is immutable and thread-safe, unlike {@code
* SimpleDateFormat}, which matters here as it is shared across fetcher threads.
*/
private static final DateTimeFormatter HTTP_DATE_FORMATTER =
DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT)
.withZone(ZoneOffset.UTC);

/** Key name of the custom crawl delay for a queue that may be present in the metadata. */
private static final String CRAWL_DELAY_KEY_NAME = "crawl.delay";

Expand Down Expand Up @@ -233,10 +256,16 @@ public int getInProgressSize() {
return inProgress.get();
}

public void finishFetchItem(FetchItem it, boolean asap) {
public void finishFetchItem(FetchItem it, boolean asap, long retryAfterMS) {
if (it != null) {
inProgress.decrementAndGet();
setNextFetchTime(System.currentTimeMillis(), asap);
if (retryAfterMS > 0) {
// the server asked us to back off: push the next fetch time for
// this queue further out if needed, but never bring it earlier
final long earliest = System.currentTimeMillis() + retryAfterMS;
nextFetchTime.getAndUpdate(prev -> Math.max(prev, earliest));
}
}
}

Expand Down Expand Up @@ -345,13 +374,13 @@ public synchronized boolean addFetchItem(URL u, String url, Tuple input) {
return added;
}

public synchronized void finishFetchItem(FetchItem it, boolean asap) {
public synchronized void finishFetchItem(FetchItem it, boolean asap, long retryAfterMS) {
FetchItemQueue fiq = queues.get(it.queueId);
if (fiq == null) {
LOG.warn("Attempting to finish item from unknown queue: {}", it.queueId);
return;
}
fiq.finishFetchItem(it, asap);
fiq.finishFetchItem(it, asap, retryAfterMS);
}

public synchronized FetchItemQueue getFetchItemQueue(String id, Metadata metadata) {
Expand Down Expand Up @@ -483,6 +512,41 @@ public synchronized FetchItem getFetchItem() {
}
}

/**
* Parses the value of a <a
* href="https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After">Retry-After</a>
* HTTP response header into a delay expressed in milliseconds, relative to now. The value is
* either a number of seconds (e.g. {@code 120}) or an HTTP date (e.g. {@code Wed, 21 Oct 2015
* 07:28:00 GMT}).
*
* @return the delay in milliseconds, or {@code -1} if the header is absent, malformed or in the
* past.
*/
static long parseRetryAfterDelay(String retryAfter) {
if (StringUtils.isBlank(retryAfter)) {
return -1;
}
retryAfter = retryAfter.trim();
// delay expressed as a number of seconds
if (retryAfter.matches("[0-9]+")) {
try {
return Long.parseLong(retryAfter) * 1000L;
} catch (NumberFormatException e) {
// value too large to fit in a long - ignore
return -1;
}
}
// delay expressed as an HTTP date
try {
Instant date = Instant.from(HTTP_DATE_FORMATTER.parse(retryAfter));
long delay = date.toEpochMilli() - System.currentTimeMillis();
return delay > 0 ? delay : -1;
} catch (DateTimeException e) {
LOG.debug("Invalid Retry-After header value: {}", retryAfter);
return -1;
}
}

/** This class picks items from queues and fetches the pages. */
private class FetcherThread extends Thread {

Expand All @@ -501,6 +565,9 @@ private class FetcherThread extends Thread {
/** Hard timeout in seconds for a single protocol fetch. -1 means disabled. */
private long fetchTimeout = -1;

/** Upper bound in ms for honoring the Retry-After header; -1 means no cap. */
private long maxRetryAfter = -1;

/**
* Single-thread executor used to run the protocol call so that it can be interrupted via
* {@link Future#cancel(boolean)} when the bolt-level timeout fires.
Expand All @@ -521,6 +588,8 @@ public FetcherThread(Config conf, int num) {
this.threadNum = num;
timeoutInQueues = ConfUtils.getLong(conf, QUEUED_TIMEOUT_PARAM_KEY, timeoutInQueues);
fetchTimeout = ConfUtils.getLong(conf, FETCH_TIMEOUT_PARAM_KEY, fetchTimeout);
long maxRetryAfterSecs = ConfUtils.getLong(conf, MAX_RETRY_AFTER_PARAM_KEY, -1L);
maxRetryAfter = maxRetryAfterSecs < 0 ? -1L : maxRetryAfterSecs * 1000L;
protocolMetadataPrefix =
ConfUtils.getString(
conf,
Expand Down Expand Up @@ -586,6 +655,9 @@ public void run() {

boolean asap = false;

// delay in ms requested by the server via the Retry-After header; -1 if none
long retryAfter = -1;

try {
URL url = URLUtil.toURL(fit.url);
Protocol protocol = protocolFactory.getProtocol(url);
Expand Down Expand Up @@ -783,6 +855,21 @@ public void run() {
response.getStatusCode(),
timeFetching);

// honour a Retry-After response header (e.g. on 429 or 503) by
// delaying the next fetch from this queue accordingly
// https://github.com/apache/stormcrawler/issues/784
retryAfter =
parseRetryAfterDelay(
response.getMetadata().getFirstValue(RETRY_AFTER_KEY));
if (retryAfter > 0 && maxRetryAfter >= 0 && retryAfter > maxRetryAfter) {
LOG.debug(
"Capping Retry-After for {} from {} to {} ms",
fit.url,
retryAfter,
maxRetryAfter);
retryAfter = maxRetryAfter;
}

// merges the original MD and the ones returned by the
// protocol
Metadata mergedMetadata = new Metadata();
Expand Down Expand Up @@ -885,7 +972,7 @@ public void run() {

eventCounter.scope("exception").incrBy(1);
} finally {
fetchQueues.finishFetchItem(fit, asap);
fetchQueues.finishFetchItem(fit, asap, retryAfter);
activeThreads.decrementAndGet(); // count threads
// ack it whatever happens
collector.ack(fit.tuple);
Expand Down Expand Up @@ -1013,7 +1100,9 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
@Override
public void cleanup() {
super.cleanup();
protocolFactory.cleanup();
if (protocolFactory != null) {
protocolFactory.cleanup();
}
}

@Override
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/resources/crawler-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ config:
# use the delay specified in the robots.txt
fetcher.server.delay.force: false

# max. delay in seconds honored when a server requests a back-off via the
# Retry-After HTTP response header (e.g. on a 429 or 503). The next fetch from
# the affected internal queue is delayed accordingly.
# (-1) honor the delay requested by the server as-is, with no upper bound
# (>=0) cap the honored delay to this many seconds
fetcher.max.retry.after: -1

# time bucket to use for the metrics sent by the Fetcher
fetcher.metrics.time.bucket.secs: 10

Expand Down
101 changes: 101 additions & 0 deletions core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,113 @@

package org.apache.stormcrawler.bolt;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.findAll;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import com.github.tomakehurst.wiremock.verification.LoggedRequest;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.tuple.Tuple;
import org.apache.stormcrawler.TestOutputCollector;
import org.apache.stormcrawler.TestUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

@WireMockTest
public class FetcherBoltTest extends AbstractFetcherBoltTest {

private static final DateTimeFormatter HTTP_DATE =
DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT)
.withZone(ZoneOffset.UTC);

@BeforeEach
void setUpContext() throws Exception {
bolt = new FetcherBolt();
}

@Test
void parsesDelayInSeconds() {
Assertions.assertEquals(120_000L, FetcherBolt.parseRetryAfterDelay("120"));
// surrounding whitespace is tolerated
Assertions.assertEquals(90_000L, FetcherBolt.parseRetryAfterDelay(" 90 "));
Assertions.assertEquals(0L, FetcherBolt.parseRetryAfterDelay("0"));
}

@Test
void parsesHttpDate() {
String future = HTTP_DATE.format(Instant.now().plusSeconds(120));
long delay = FetcherBolt.parseRetryAfterDelay(future);
// allow some slack for the clock ticking between formatting and parsing
Assertions.assertTrue(delay > 100_000L && delay <= 120_000L, "unexpected delay: " + delay);
}

@Test
void returnsMinusOneForInvalidOrAbsentValues() {
Assertions.assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(null));
Assertions.assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(""));
Assertions.assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("not-a-date"));
// a date in the past is ignored
String past = HTTP_DATE.format(Instant.now().minusSeconds(120));
Assertions.assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(past));
}

@Test
void retryAfterDelaysNextFetchFromSameQueue(WireMockRuntimeInfo wmRuntimeInfo) {
// first URL of the host asks for a 3s back-off, the second is fine
stubFor(
get(urlEqualTo("/a"))
.willReturn(aResponse().withStatus(503).withHeader("Retry-After", "3")));
stubFor(get(urlEqualTo("/b")).willReturn(aResponse().withStatus(200).withBody("ok")));

TestOutputCollector output = new TestOutputCollector();
Map<String, Object> config = new HashMap<>();
config.put("http.agent.name", "this_is_only_a_test");
// keep the regular politeness delay low so only the Retry-After can
// explain a multi-second gap between the two fetches
config.put("fetcher.server.delay", 0.1f);
bolt.prepare(config, TestUtil.getMockedTopologyContext(), new OutputCollector(output));

String base = "http://localhost:" + wmRuntimeInfo.getHttpPort();
bolt.execute(tupleForUrl(base + "/a"));
bolt.execute(tupleForUrl(base + "/b"));

// both URLs are fetched and acked
await().atMost(30, TimeUnit.SECONDS).until(() -> output.getAckedTuples().size() == 2);

List<LoggedRequest> a = findAll(getRequestedFor(urlEqualTo("/a")));
List<LoggedRequest> b = findAll(getRequestedFor(urlEqualTo("/b")));
Assertions.assertEquals(1, a.size());
Assertions.assertEquals(1, b.size());

long gap = b.get(0).getLoggedDate().getTime() - a.get(0).getLoggedDate().getTime();
// the second fetch must have waited roughly the requested 3 seconds
Assertions.assertTrue(
gap >= 2_500L,
"second fetch happened only " + gap + "ms after the first, Retry-After ignored");
}

private static Tuple tupleForUrl(String url) {
Tuple tuple = mock(Tuple.class);
when(tuple.getSourceComponent()).thenReturn("source");
when(tuple.getStringByField("url")).thenReturn(url);
when(tuple.getValueByField("metadata")).thenReturn(null);
return tuple;
}
}