From 2105724668f2c06f03ab157c207b60086d1c61be Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Mon, 15 Jun 2026 18:23:37 +0100 Subject: [PATCH 1/3] Add support for gzip and bz2 files + fix javadoc and prevent possible leak Signed-off-by: Julien Nioche --- core/pom.xml | 5 ++ .../apache/stormcrawler/spout/FileSpout.java | 60 +++++++++++-------- .../stormcrawler/spout/FileSpoutTest.java | 58 ++++++++++++++++++ 3 files changed, 98 insertions(+), 25 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 12ad99859..6d12c4277 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -127,6 +127,11 @@ under the License. ${tika.version} + + org.apache.commons + commons-compress + + org.mockito mockito-core diff --git a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java index 6615ea517..cdf22d8ea 100644 --- a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java +++ b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java @@ -17,21 +17,16 @@ package org.apache.stormcrawler.spout; -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; +import java.io.*; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; +import java.util.*; +import java.util.zip.GZIPInputStream; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.lang3.StringUtils; import org.apache.storm.spout.Scheme; import org.apache.storm.spout.SpoutOutputCollector; @@ -46,27 +41,24 @@ import org.slf4j.LoggerFactory; /** - * Reads the lines from a UTF-8 file and use them as a spout. Load the entire content into memory. - * Uses StringTabScheme to parse the lines into URLs and Metadata, generates tuples on the default - * stream unless withDiscoveredStatus is set to true. + * Reads the lines from a UTF-8 file and use them as a spout. The spout reads files in chunks of + * 10,000 lines, keeping memory usage very low even for extremely large files with millions of seed + * URLs. Uses StringTabScheme to parse the lines into URLs and Metadata, generates tuples on the + * default stream unless withDiscoveredStatus is set to true. */ public class FileSpout extends BaseRichSpout { public static final int BATCH_SIZE = 10000; public static final Logger LOG = LoggerFactory.getLogger(FileSpout.class); - - protected SpoutOutputCollector collector; - private final Queue inputFiles; - private BufferedReader currentBuffer; - + protected SpoutOutputCollector collector; protected Scheme scheme = new StringTabScheme(); - protected LinkedList buffer = new LinkedList<>(); protected boolean active; - private boolean withDiscoveredStatus = false; protected int totalTasks; protected int taskIndex; + private BufferedReader currentBuffer; + private boolean withDiscoveredStatus = false; /** * @param dir containing the seed files @@ -138,11 +130,20 @@ protected void populateBuffer() throws IOException { return; } Path inputPath = Paths.get(file); - currentBuffer = - new BufferedReader( - new InputStreamReader( - new FileInputStream(inputPath.toFile()), - StandardCharsets.UTF_8)); + InputStream is = new FileInputStream(inputPath.toFile()); + try { + String fileLower = file.toLowerCase(); + if (fileLower.endsWith(".gz") || fileLower.endsWith(".gzip")) { + is = new GZIPInputStream(is); + } else if (fileLower.endsWith(".bz2")) { + is = new BZip2CompressorInputStream(is); + } + currentBuffer = + new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)); + } catch (IOException e) { + is.close(); + throw e; + } } String line = null; @@ -228,7 +229,16 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override - public void close() {} + public void close() { + if (currentBuffer != null) { + try { + currentBuffer.close(); + } catch (IOException e) { + LOG.error("Exception thrown when closing current buffer", e); + } + currentBuffer = null; + } + } @Override public void activate() { diff --git a/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java b/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java index 8d6b03db3..e0447b281 100644 --- a/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java +++ b/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java @@ -22,6 +22,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.util.List; import java.util.Map; @@ -101,6 +103,62 @@ void testSeedFileWithCustomData() throws URISyntaxException { assertEquals(Status.DISCOVERED, tuple.get(2)); } + @Test + void testGzipSeedFile(@org.junit.jupiter.api.io.TempDir Path tempDir) throws Exception { + Path gzipFile = tempDir.resolve("seed-list.txt.gz"); + try (java.io.OutputStream os = Files.newOutputStream(gzipFile); + java.util.zip.GZIPOutputStream gzos = new java.util.zip.GZIPOutputStream(os)) { + gzos.write("https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8)); + gzos.write("https://github.com\n".getBytes(StandardCharsets.UTF_8)); + } + + final FileSpout spout = new FileSpout(gzipFile.toAbsolutePath().toString()); + final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock(); + spout.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock); + spout.activate(); + + // 1st tuple + spout.nextTuple(); + List tuple = collectorMock.getTuple(); + assertNotNull(tuple); + assertEquals("https://stormcrawler.apache.org", tuple.get(0)); + + // 2nd tuple + spout.nextTuple(); + tuple = collectorMock.getTuple(); + assertNotNull(tuple); + assertEquals("https://github.com", tuple.get(0)); + } + + @Test + void testBzip2SeedFile(@org.junit.jupiter.api.io.TempDir Path tempDir) throws Exception { + Path bzip2File = tempDir.resolve("seed-list.txt.bz2"); + try (java.io.OutputStream os = Files.newOutputStream(bzip2File); + org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream bzos = + new org.apache.commons.compress.compressors.bzip2 + .BZip2CompressorOutputStream(os)) { + bzos.write("https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8)); + bzos.write("https://github.com\n".getBytes(StandardCharsets.UTF_8)); + } + + final FileSpout spout = new FileSpout(bzip2File.toAbsolutePath().toString()); + final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock(); + spout.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock); + spout.activate(); + + // 1st tuple + spout.nextTuple(); + List tuple = collectorMock.getTuple(); + assertNotNull(tuple); + assertEquals("https://stormcrawler.apache.org", tuple.get(0)); + + // 2nd tuple + spout.nextTuple(); + tuple = collectorMock.getTuple(); + assertNotNull(tuple); + assertEquals("https://github.com", tuple.get(0)); + } + private Path getPath(String resource) throws URISyntaxException { return Path.of( Objects.requireNonNull( From 1358eac6e49e17f2e8279f5999e46d01abccb5c9 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Tue, 16 Jun 2026 09:53:28 +0100 Subject: [PATCH 2/3] comply to checkstyle Signed-off-by: Julien Nioche --- .../org/apache/stormcrawler/spout/FileSpout.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java index cdf22d8ea..2bcf4fb46 100644 --- a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java +++ b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java @@ -17,14 +17,23 @@ package org.apache.stormcrawler.spout; -import java.io.*; +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.*; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Queue; import java.util.zip.GZIPInputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.lang3.StringUtils; @@ -132,7 +141,7 @@ protected void populateBuffer() throws IOException { Path inputPath = Paths.get(file); InputStream is = new FileInputStream(inputPath.toFile()); try { - String fileLower = file.toLowerCase(); + String fileLower = file.toLowerCase(Locale.ROOT); if (fileLower.endsWith(".gz") || fileLower.endsWith(".gzip")) { is = new GZIPInputStream(is); } else if (fileLower.endsWith(".bz2")) { From e5a54e17c7d848697f55bb9a58daa6c3b6e25152 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Tue, 16 Jun 2026 11:43:42 +0100 Subject: [PATCH 3/3] buffered input + handle concats for bz2 Signed-off-by: Julien Nioche --- .../main/java/org/apache/stormcrawler/spout/FileSpout.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java index 2bcf4fb46..8e2a9edd0 100644 --- a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java +++ b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java @@ -17,6 +17,7 @@ package org.apache.stormcrawler.spout; +import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; @@ -139,13 +140,13 @@ protected void populateBuffer() throws IOException { return; } Path inputPath = Paths.get(file); - InputStream is = new FileInputStream(inputPath.toFile()); + InputStream is = new BufferedInputStream(new FileInputStream(inputPath.toFile())); try { String fileLower = file.toLowerCase(Locale.ROOT); if (fileLower.endsWith(".gz") || fileLower.endsWith(".gzip")) { is = new GZIPInputStream(is); } else if (fileLower.endsWith(".bz2")) { - is = new BZip2CompressorInputStream(is); + is = new BZip2CompressorInputStream(is, true); } currentBuffer = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));