Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions src/java/org/apache/nutch/crawl/CrawlDbFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.metrics.NutchMetrics;
Expand Down Expand Up @@ -50,6 +51,11 @@ public class CrawlDbFilter extends

private String scope;

// Cached counter references for performance
private Counter goneRecordsRemovedCounter;
private Counter orphanRecordsRemovedCounter;
private Counter urlsFilteredCounter;

private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());

Expand All @@ -68,6 +74,21 @@ public void setup(Mapper<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
scope = conf.get(URL_NORMALIZING_SCOPE, URLNormalizers.SCOPE_CRAWLDB);
normalizers = new URLNormalizers(conf, scope);
}

// Initialize cached counter references
initCounters(context);
}

/**
* Initialize cached counter references to avoid repeated lookups in hot paths.
*/
private void initCounters(Context context) {
goneRecordsRemovedCounter = context.getCounter(
NutchMetrics.GROUP_CRAWLDB_FILTER, NutchMetrics.CRAWLDB_GONE_RECORDS_REMOVED_TOTAL);
orphanRecordsRemovedCounter = context.getCounter(
NutchMetrics.GROUP_CRAWLDB_FILTER, NutchMetrics.CRAWLDB_ORPHAN_RECORDS_REMOVED_TOTAL);
urlsFilteredCounter = context.getCounter(
NutchMetrics.GROUP_CRAWLDB_FILTER, NutchMetrics.CRAWLDB_URLS_FILTERED_TOTAL);
}

private Text newKey = new Text();
Expand All @@ -81,15 +102,13 @@ public void map(Text key, CrawlDatum value,
// https://issues.apache.org/jira/browse/NUTCH-1101 check status first,
// cheaper than normalizing or filtering
if (url404Purging && CrawlDatum.STATUS_DB_GONE == value.getStatus()) {
context.getCounter(NutchMetrics.GROUP_CRAWLDB_FILTER,
NutchMetrics.CRAWLDB_GONE_RECORDS_REMOVED_TOTAL).increment(1);
goneRecordsRemovedCounter.increment(1);
return;
}
// Whether to remove orphaned pages
// https://issues.apache.org/jira/browse/NUTCH-1932
if (purgeOrphans && CrawlDatum.STATUS_DB_ORPHAN == value.getStatus()) {
context.getCounter(NutchMetrics.GROUP_CRAWLDB_FILTER,
NutchMetrics.CRAWLDB_ORPHAN_RECORDS_REMOVED_TOTAL).increment(1);
orphanRecordsRemovedCounter.increment(1);
return;
}
if (url != null && urlNormalizers) {
Expand All @@ -109,8 +128,7 @@ public void map(Text key, CrawlDatum value,
}
}
if (url == null) {
context.getCounter(NutchMetrics.GROUP_CRAWLDB_FILTER,
NutchMetrics.CRAWLDB_URLS_FILTERED_TOTAL).increment(1);
urlsFilteredCounter.increment(1);
} else {
// URL has passed filters
newKey.set(url); // collect it
Expand Down
21 changes: 17 additions & 4 deletions src/java/org/apache/nutch/crawl/CrawlDbReducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -52,6 +55,9 @@ public class CrawlDbReducer extends
private FetchSchedule schedule;
private ErrorTracker errorTracker;

// Cached counter references for status-based metrics
private Map<Byte, Counter> statusCounters = new HashMap<>();

@Override
public void setup(Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
Configuration conf = context.getConfiguration();
Expand All @@ -66,6 +72,15 @@ public void setup(Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
errorTracker = new ErrorTracker(NutchMetrics.GROUP_CRAWLDB, context);
}

/**
* Get counter for status, caching for subsequent lookups.
*/
private Counter getStatusCounter(byte status, Context context) {
return statusCounters.computeIfAbsent(status,
s -> context.getCounter(NutchMetrics.GROUP_CRAWLDB,
CrawlDatum.getStatusName(s)));
}

@Override
public void reduce(Text key, Iterable<CrawlDatum> values,
Context context) throws IOException, InterruptedException {
Expand Down Expand Up @@ -170,8 +185,7 @@ public void reduce(Text key, Iterable<CrawlDatum> values,
}
context.write(key, old);
// Dynamic counter based on status name
context.getCounter(NutchMetrics.GROUP_CRAWLDB,
CrawlDatum.getStatusName(old.getStatus())).increment(1);
getStatusCounter(old.getStatus(), context).increment(1);
} else {
LOG.warn("Missing fetch and old value, signature={}",
StringUtil.toHexString(signature));
Expand Down Expand Up @@ -329,8 +343,7 @@ public void reduce(Text key, Iterable<CrawlDatum> values,
result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY);
context.write(key, result);
// Dynamic counter based on status name
context.getCounter(NutchMetrics.GROUP_CRAWLDB,
CrawlDatum.getStatusName(result.getStatus())).increment(1);
getStatusCounter(result.getStatus(), context).increment(1);
}

}
Expand Down
17 changes: 15 additions & 2 deletions src/java/org/apache/nutch/crawl/DeduplicationJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,33 @@ public static class DedupReducer<K extends Writable>

protected String[] compareOrder;

// Cached counter reference for performance
private Counter documentsMarkedDuplicateCounter;

@Override
public void setup(
Reducer<K, CrawlDatum, Text, CrawlDatum>.Context context) {
Configuration conf = context.getConfiguration();
compareOrder = conf.get(DEDUPLICATION_COMPARE_ORDER).split(",");

// Initialize cached counter reference
initCounters(context);
}

/**
* Initialize cached counter references to avoid repeated lookups in hot paths.
*/
private void initCounters(Context context) {
documentsMarkedDuplicateCounter = context.getCounter(
NutchMetrics.GROUP_DEDUP, NutchMetrics.DEDUP_DOCUMENTS_MARKED_DUPLICATE_TOTAL);
}

protected void writeOutAsDuplicate(CrawlDatum datum,
Context context)
throws IOException, InterruptedException {
datum.setStatus(CrawlDatum.STATUS_DB_DUPLICATE);
Text key = (Text) datum.getMetaData().remove(urlKey);
context.getCounter(NutchMetrics.GROUP_DEDUP,
NutchMetrics.DEDUP_DOCUMENTS_MARKED_DUPLICATE_TOTAL).increment(1);
documentsMarkedDuplicateCounter.increment(1);
context.write(key, datum);
}

Expand Down
80 changes: 62 additions & 18 deletions src/java/org/apache/nutch/crawl/Generator.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,17 @@ public static class SelectorMapper
private JexlScript expr = null;
private ErrorTracker errorTracker;

// Cached counter references for performance
private Counter urlFiltersRejectedCounter;
private Counter scheduleRejectedCounter;
private Counter waitForUpdateCounter;
private Counter exprRejectedCounter;
private Counter statusRejectedCounter;
private Counter scoreTooLowCounter;
private Counter intervalRejectedCounter;
private Counter hostsAffectedPerHostOverflowCounter;
private Counter urlsSkippedPerHostOverflowCounter;

@Override
public void setup(
Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry>.Context context)
Expand All @@ -219,6 +230,32 @@ public void setup(
expr = JexlUtil.parseExpression(conf.get(GENERATOR_EXPR, null));
// Initialize error tracker with cached counters
errorTracker = new ErrorTracker(NutchMetrics.GROUP_GENERATOR, context);
// Initialize cached counter references
initCounters(context);
}

/**
* Initialize cached counter references to avoid repeated lookups in hot paths.
*/
private void initCounters(Context context) {
urlFiltersRejectedCounter = context.getCounter(
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_URL_FILTERS_REJECTED_TOTAL);
scheduleRejectedCounter = context.getCounter(
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_SCHEDULE_REJECTED_TOTAL);
waitForUpdateCounter = context.getCounter(
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_WAIT_FOR_UPDATE_TOTAL);
exprRejectedCounter = context.getCounter(
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_EXPR_REJECTED_TOTAL);
statusRejectedCounter = context.getCounter(
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_STATUS_REJECTED_TOTAL);
scoreTooLowCounter = context.getCounter(
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_SCORE_TOO_LOW_TOTAL);
intervalRejectedCounter = context.getCounter(
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_INTERVAL_REJECTED_TOTAL);
hostsAffectedPerHostOverflowCounter = context.getCounter(
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_HOSTS_AFFECTED_PER_HOST_OVERFLOW_TOTAL);
urlsSkippedPerHostOverflowCounter = context.getCounter(
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_URLS_SKIPPED_PER_HOST_OVERFLOW_TOTAL);
}

@Override
Expand All @@ -230,8 +267,7 @@ public void map(Text key, CrawlDatum value, Context context)
// URLFilters
try {
if (filters.filter(url.toString()) == null) {
context.getCounter(NutchMetrics.GROUP_GENERATOR,
NutchMetrics.GENERATOR_URL_FILTERS_REJECTED_TOTAL).increment(1);
urlFiltersRejectedCounter.increment(1);
return;
}
} catch (URLFilterException e) {
Expand All @@ -245,8 +281,7 @@ public void map(Text key, CrawlDatum value, Context context)
if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
LOG.debug("-shouldFetch rejected '{}', fetchTime={}, curTime={}", url,
crawlDatum.getFetchTime(), curTime);
context.getCounter(NutchMetrics.GROUP_GENERATOR,
NutchMetrics.GENERATOR_SCHEDULE_REJECTED_TOTAL).increment(1);
scheduleRejectedCounter.increment(1);
return;
}

Expand All @@ -255,8 +290,7 @@ public void map(Text key, CrawlDatum value, Context context)
if (oldGenTime != null) { // awaiting fetch & update
if (oldGenTime.get() + genDelay > curTime) { // still wait for
// update
context.getCounter(NutchMetrics.GROUP_GENERATOR,
NutchMetrics.GENERATOR_WAIT_FOR_UPDATE_TOTAL).increment(1);
waitForUpdateCounter.increment(1);
return;
}
}
Expand All @@ -271,31 +305,27 @@ public void map(Text key, CrawlDatum value, Context context)
// check expr
if (expr != null) {
if (!crawlDatum.execute(expr, key.toString())) {
context.getCounter(NutchMetrics.GROUP_GENERATOR,
NutchMetrics.GENERATOR_EXPR_REJECTED_TOTAL).increment(1);
exprRejectedCounter.increment(1);
return;
}
}

if (restrictStatus != -1 && restrictStatus != crawlDatum.getStatus()) {
context.getCounter(NutchMetrics.GROUP_GENERATOR,
NutchMetrics.GENERATOR_STATUS_REJECTED_TOTAL).increment(1);
statusRejectedCounter.increment(1);
return;
}

// consider only entries with a score superior to the threshold
if (!Float.isNaN(scoreThreshold) && sort < scoreThreshold) {
context.getCounter(NutchMetrics.GROUP_GENERATOR,
NutchMetrics.GENERATOR_SCORE_TOO_LOW_TOTAL).increment(1);
scoreTooLowCounter.increment(1);
return;
}

// consider only entries with a retry (or fetch) interval lower than
// threshold
if (intervalThreshold != -1
&& crawlDatum.getFetchInterval() > intervalThreshold) {
context.getCounter(NutchMetrics.GROUP_GENERATOR,
NutchMetrics.GENERATOR_INTERVAL_REJECTED_TOTAL).increment(1);
intervalRejectedCounter.increment(1);
return;
}

Expand Down Expand Up @@ -332,6 +362,10 @@ public static class SelectorReducer extends
private Map<String, HostDatum> hostDatumCache = new HashMap<>();
private ErrorTracker errorTracker;

// Cached counter references for performance
private Counter hostsAffectedPerHostOverflowCounter;
private Counter urlsSkippedPerHostOverflowCounter;

public void readHostDb() throws IOException {
if (conf.get(GENERATOR_HOSTDB) == null) {
return;
Expand Down Expand Up @@ -426,10 +460,22 @@ public void setup(Context context) throws IOException {
}
// Initialize error tracker with cached counters
errorTracker = new ErrorTracker(NutchMetrics.GROUP_GENERATOR, context);
// Initialize cached counter references
initReducerCounters(context);

readHostDb();
}

/**
* Initialize cached counter references to avoid repeated lookups in hot paths.
*/
private void initReducerCounters(Context context) {
hostsAffectedPerHostOverflowCounter = context.getCounter(
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_HOSTS_AFFECTED_PER_HOST_OVERFLOW_TOTAL);
urlsSkippedPerHostOverflowCounter = context.getCounter(
NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_URLS_SKIPPED_PER_HOST_OVERFLOW_TOTAL);
}

@Override
public void cleanup(Context context)
throws IOException, InterruptedException {
Expand Down Expand Up @@ -555,15 +601,13 @@ public void reduce(FloatWritable key, Iterable<SelectorEntry> values,
hostCount[1] = 1;
} else {
if (hostCount[1] == (maxCount+1)) {
context.getCounter(NutchMetrics.GROUP_GENERATOR,
NutchMetrics.GENERATOR_HOSTS_AFFECTED_PER_HOST_OVERFLOW_TOTAL).increment(1);
hostsAffectedPerHostOverflowCounter.increment(1);
LOG.info(
"Host or domain {} has more than {} URLs for all {} segments. Additional URLs won't be included in the fetchlist.",
hostordomain, maxCount, maxNumSegments);
}
// skip this entry
context.getCounter(NutchMetrics.GROUP_GENERATOR,
NutchMetrics.GENERATOR_URLS_SKIPPED_PER_HOST_OVERFLOW_TOTAL).increment(1);
urlsSkippedPerHostOverflowCounter.increment(1);
continue;
}
}
Expand Down
Loading