diff --git a/core/pom.xml b/core/pom.xml
index 12ad99859..6d12c4277 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -127,6 +127,11 @@ under the License.
${tika.version}
+
+ org.apache.commons
+ commons-compress
+
+
org.mockito
mockito-core
diff --git a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java
index 6615ea517..8e2a9edd0 100644
--- a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java
+++ b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java
@@ -17,9 +17,11 @@
package org.apache.stormcrawler.spout;
+import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -30,8 +32,11 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Queue;
+import java.util.zip.GZIPInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.spout.Scheme;
import org.apache.storm.spout.SpoutOutputCollector;
@@ -46,27 +51,24 @@
import org.slf4j.LoggerFactory;
/**
- * Reads the lines from a UTF-8 file and use them as a spout. Load the entire content into memory.
- * Uses StringTabScheme to parse the lines into URLs and Metadata, generates tuples on the default
- * stream unless withDiscoveredStatus is set to true.
+ * Reads the lines from a UTF-8 file and use them as a spout. The spout reads files in chunks of
+ * 10,000 lines, keeping memory usage very low even for extremely large files with millions of seed
+ * URLs. Uses StringTabScheme to parse the lines into URLs and Metadata, generates tuples on the
+ * default stream unless withDiscoveredStatus is set to true.
*/
public class FileSpout extends BaseRichSpout {
public static final int BATCH_SIZE = 10000;
public static final Logger LOG = LoggerFactory.getLogger(FileSpout.class);
-
- protected SpoutOutputCollector collector;
-
private final Queue inputFiles;
- private BufferedReader currentBuffer;
-
+ protected SpoutOutputCollector collector;
protected Scheme scheme = new StringTabScheme();
-
protected LinkedList buffer = new LinkedList<>();
protected boolean active;
- private boolean withDiscoveredStatus = false;
protected int totalTasks;
protected int taskIndex;
+ private BufferedReader currentBuffer;
+ private boolean withDiscoveredStatus = false;
/**
* @param dir containing the seed files
@@ -138,11 +140,20 @@ protected void populateBuffer() throws IOException {
return;
}
Path inputPath = Paths.get(file);
- currentBuffer =
- new BufferedReader(
- new InputStreamReader(
- new FileInputStream(inputPath.toFile()),
- StandardCharsets.UTF_8));
+ InputStream is = new BufferedInputStream(new FileInputStream(inputPath.toFile()));
+ try {
+ String fileLower = file.toLowerCase(Locale.ROOT);
+ if (fileLower.endsWith(".gz") || fileLower.endsWith(".gzip")) {
+ is = new GZIPInputStream(is);
+ } else if (fileLower.endsWith(".bz2")) {
+ is = new BZip2CompressorInputStream(is, true);
+ }
+ currentBuffer =
+ new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
+ } catch (IOException e) {
+ is.close();
+ throw e;
+ }
}
String line = null;
@@ -228,7 +239,16 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
- public void close() {}
+ public void close() {
+ if (currentBuffer != null) {
+ try {
+ currentBuffer.close();
+ } catch (IOException e) {
+ LOG.error("Exception thrown when closing current buffer", e);
+ }
+ currentBuffer = null;
+ }
+ }
@Override
public void activate() {
diff --git a/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java b/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java
index 8d6b03db3..e0447b281 100644
--- a/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java
+++ b/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java
@@ -22,6 +22,8 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
@@ -101,6 +103,62 @@ void testSeedFileWithCustomData() throws URISyntaxException {
assertEquals(Status.DISCOVERED, tuple.get(2));
}
+ @Test
+ void testGzipSeedFile(@org.junit.jupiter.api.io.TempDir Path tempDir) throws Exception {
+ Path gzipFile = tempDir.resolve("seed-list.txt.gz");
+ try (java.io.OutputStream os = Files.newOutputStream(gzipFile);
+ java.util.zip.GZIPOutputStream gzos = new java.util.zip.GZIPOutputStream(os)) {
+ gzos.write("https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8));
+ gzos.write("https://github.com\n".getBytes(StandardCharsets.UTF_8));
+ }
+
+ final FileSpout spout = new FileSpout(gzipFile.toAbsolutePath().toString());
+ final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock();
+ spout.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock);
+ spout.activate();
+
+ // 1st tuple
+ spout.nextTuple();
+ List