diff --git a/core/pom.xml b/core/pom.xml index 12ad99859..6d12c4277 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -127,6 +127,11 @@ under the License. ${tika.version} + + org.apache.commons + commons-compress + + org.mockito mockito-core diff --git a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java index 6615ea517..8e2a9edd0 100644 --- a/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java +++ b/core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java @@ -17,9 +17,11 @@ package org.apache.stormcrawler.spout; +import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -30,8 +32,11 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Queue; +import java.util.zip.GZIPInputStream; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.lang3.StringUtils; import org.apache.storm.spout.Scheme; import org.apache.storm.spout.SpoutOutputCollector; @@ -46,27 +51,24 @@ import org.slf4j.LoggerFactory; /** - * Reads the lines from a UTF-8 file and use them as a spout. Load the entire content into memory. - * Uses StringTabScheme to parse the lines into URLs and Metadata, generates tuples on the default - * stream unless withDiscoveredStatus is set to true. + * Reads the lines from a UTF-8 file and use them as a spout. The spout reads files in chunks of + * 10,000 lines, keeping memory usage very low even for extremely large files with millions of seed + * URLs. Uses StringTabScheme to parse the lines into URLs and Metadata, generates tuples on the + * default stream unless withDiscoveredStatus is set to true. */ public class FileSpout extends BaseRichSpout { public static final int BATCH_SIZE = 10000; public static final Logger LOG = LoggerFactory.getLogger(FileSpout.class); - - protected SpoutOutputCollector collector; - private final Queue inputFiles; - private BufferedReader currentBuffer; - + protected SpoutOutputCollector collector; protected Scheme scheme = new StringTabScheme(); - protected LinkedList buffer = new LinkedList<>(); protected boolean active; - private boolean withDiscoveredStatus = false; protected int totalTasks; protected int taskIndex; + private BufferedReader currentBuffer; + private boolean withDiscoveredStatus = false; /** * @param dir containing the seed files @@ -138,11 +140,20 @@ protected void populateBuffer() throws IOException { return; } Path inputPath = Paths.get(file); - currentBuffer = - new BufferedReader( - new InputStreamReader( - new FileInputStream(inputPath.toFile()), - StandardCharsets.UTF_8)); + InputStream is = new BufferedInputStream(new FileInputStream(inputPath.toFile())); + try { + String fileLower = file.toLowerCase(Locale.ROOT); + if (fileLower.endsWith(".gz") || fileLower.endsWith(".gzip")) { + is = new GZIPInputStream(is); + } else if (fileLower.endsWith(".bz2")) { + is = new BZip2CompressorInputStream(is, true); + } + currentBuffer = + new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)); + } catch (IOException e) { + is.close(); + throw e; + } } String line = null; @@ -228,7 +239,16 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override - public void close() {} + public void close() { + if (currentBuffer != null) { + try { + currentBuffer.close(); + } catch (IOException e) { + LOG.error("Exception thrown when closing current buffer", e); + } + currentBuffer = null; + } + } @Override public void activate() { diff --git a/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java b/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java index 8d6b03db3..e0447b281 100644 --- a/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java +++ b/core/src/test/java/org/apache/stormcrawler/spout/FileSpoutTest.java @@ -22,6 +22,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.util.List; import java.util.Map; @@ -101,6 +103,62 @@ void testSeedFileWithCustomData() throws URISyntaxException { assertEquals(Status.DISCOVERED, tuple.get(2)); } + @Test + void testGzipSeedFile(@org.junit.jupiter.api.io.TempDir Path tempDir) throws Exception { + Path gzipFile = tempDir.resolve("seed-list.txt.gz"); + try (java.io.OutputStream os = Files.newOutputStream(gzipFile); + java.util.zip.GZIPOutputStream gzos = new java.util.zip.GZIPOutputStream(os)) { + gzos.write("https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8)); + gzos.write("https://github.com\n".getBytes(StandardCharsets.UTF_8)); + } + + final FileSpout spout = new FileSpout(gzipFile.toAbsolutePath().toString()); + final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock(); + spout.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock); + spout.activate(); + + // 1st tuple + spout.nextTuple(); + List tuple = collectorMock.getTuple(); + assertNotNull(tuple); + assertEquals("https://stormcrawler.apache.org", tuple.get(0)); + + // 2nd tuple + spout.nextTuple(); + tuple = collectorMock.getTuple(); + assertNotNull(tuple); + assertEquals("https://github.com", tuple.get(0)); + } + + @Test + void testBzip2SeedFile(@org.junit.jupiter.api.io.TempDir Path tempDir) throws Exception { + Path bzip2File = tempDir.resolve("seed-list.txt.bz2"); + try (java.io.OutputStream os = Files.newOutputStream(bzip2File); + org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream bzos = + new org.apache.commons.compress.compressors.bzip2 + .BZip2CompressorOutputStream(os)) { + bzos.write("https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8)); + bzos.write("https://github.com\n".getBytes(StandardCharsets.UTF_8)); + } + + final FileSpout spout = new FileSpout(bzip2File.toAbsolutePath().toString()); + final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock(); + spout.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock); + spout.activate(); + + // 1st tuple + spout.nextTuple(); + List tuple = collectorMock.getTuple(); + assertNotNull(tuple); + assertEquals("https://stormcrawler.apache.org", tuple.get(0)); + + // 2nd tuple + spout.nextTuple(); + tuple = collectorMock.getTuple(); + assertNotNull(tuple); + assertEquals("https://github.com", tuple.get(0)); + } + private Path getPath(String resource) throws URISyntaxException { return Path.of( Objects.requireNonNull(