Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ under the License.
<version>${tika.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
52 changes: 36 additions & 16 deletions core/src/main/java/org/apache/stormcrawler/spout/FileSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.stormcrawler.spout;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
Expand All @@ -30,8 +32,11 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Queue;
import java.util.zip.GZIPInputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.spout.Scheme;
import org.apache.storm.spout.SpoutOutputCollector;
Expand All @@ -46,27 +51,24 @@
import org.slf4j.LoggerFactory;

/**
* Reads the lines from a UTF-8 file and use them as a spout. Load the entire content into memory.
* Uses StringTabScheme to parse the lines into URLs and Metadata, generates tuples on the default
* stream unless withDiscoveredStatus is set to true.
* Reads the lines from a UTF-8 file and use them as a spout. The spout reads files in chunks of
* 10,000 lines, keeping memory usage very low even for extremely large files with millions of seed
* URLs. Uses StringTabScheme to parse the lines into URLs and Metadata, generates tuples on the
* default stream unless withDiscoveredStatus is set to true.
*/
public class FileSpout extends BaseRichSpout {

public static final int BATCH_SIZE = 10000;
public static final Logger LOG = LoggerFactory.getLogger(FileSpout.class);

protected SpoutOutputCollector collector;

private final Queue<String> inputFiles;
private BufferedReader currentBuffer;

protected SpoutOutputCollector collector;
protected Scheme scheme = new StringTabScheme();

protected LinkedList<byte[]> buffer = new LinkedList<>();
protected boolean active;
private boolean withDiscoveredStatus = false;
protected int totalTasks;
protected int taskIndex;
private BufferedReader currentBuffer;
private boolean withDiscoveredStatus = false;

/**
* @param dir containing the seed files
Expand Down Expand Up @@ -138,11 +140,20 @@ protected void populateBuffer() throws IOException {
return;
}
Path inputPath = Paths.get(file);
currentBuffer =
new BufferedReader(
new InputStreamReader(
new FileInputStream(inputPath.toFile()),
StandardCharsets.UTF_8));
InputStream is = new BufferedInputStream(new FileInputStream(inputPath.toFile()));
try {
String fileLower = file.toLowerCase(Locale.ROOT);
if (fileLower.endsWith(".gz") || fileLower.endsWith(".gzip")) {
is = new GZIPInputStream(is);
} else if (fileLower.endsWith(".bz2")) {
is = new BZip2CompressorInputStream(is, true);
}
currentBuffer =
new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
} catch (IOException e) {
is.close();
throw e;
}
}

String line = null;
Expand Down Expand Up @@ -228,7 +239,16 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

@Override
public void close() {}
public void close() {
if (currentBuffer != null) {
try {
currentBuffer.close();
} catch (IOException e) {
LOG.error("Exception thrown when closing current buffer", e);
}
currentBuffer = null;
}
}

@Override
public void activate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -101,6 +103,62 @@ void testSeedFileWithCustomData() throws URISyntaxException {
assertEquals(Status.DISCOVERED, tuple.get(2));
}

@Test
void testGzipSeedFile(@org.junit.jupiter.api.io.TempDir Path tempDir) throws Exception {
Path gzipFile = tempDir.resolve("seed-list.txt.gz");
try (java.io.OutputStream os = Files.newOutputStream(gzipFile);
java.util.zip.GZIPOutputStream gzos = new java.util.zip.GZIPOutputStream(os)) {
gzos.write("https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8));
gzos.write("https://github.com\n".getBytes(StandardCharsets.UTF_8));
}

final FileSpout spout = new FileSpout(gzipFile.toAbsolutePath().toString());
final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock();
spout.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock);
spout.activate();

// 1st tuple
spout.nextTuple();
List<Object> tuple = collectorMock.getTuple();
assertNotNull(tuple);
assertEquals("https://stormcrawler.apache.org", tuple.get(0));

// 2nd tuple
spout.nextTuple();
tuple = collectorMock.getTuple();
assertNotNull(tuple);
assertEquals("https://github.com", tuple.get(0));
}

@Test
void testBzip2SeedFile(@org.junit.jupiter.api.io.TempDir Path tempDir) throws Exception {
Path bzip2File = tempDir.resolve("seed-list.txt.bz2");
try (java.io.OutputStream os = Files.newOutputStream(bzip2File);
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream bzos =
new org.apache.commons.compress.compressors.bzip2
.BZip2CompressorOutputStream(os)) {
bzos.write("https://stormcrawler.apache.org\n".getBytes(StandardCharsets.UTF_8));
bzos.write("https://github.com\n".getBytes(StandardCharsets.UTF_8));
}

final FileSpout spout = new FileSpout(bzip2File.toAbsolutePath().toString());
final FileSpoutOutputCollectorMock collectorMock = new FileSpoutOutputCollectorMock();
spout.open(Map.of(), new FileSpoutTopologyContextMock(), collectorMock);
spout.activate();

// 1st tuple
spout.nextTuple();
List<Object> tuple = collectorMock.getTuple();
assertNotNull(tuple);
assertEquals("https://stormcrawler.apache.org", tuple.get(0));

// 2nd tuple
spout.nextTuple();
tuple = collectorMock.getTuple();
assertNotNull(tuple);
assertEquals("https://github.com", tuple.get(0));
}

private Path getPath(String resource) throws URISyntaxException {
return Path.of(
Objects.requireNonNull(
Expand Down