Skip to content

Commit 19c0695

Browse files
committed
AVRO-3594: FsInput to use openFile() API
Boost performance reading from object stores in hadoop by using the openFile builder API and passing in the file length as an option (can save a HEAD) and asks for adaptive IO (sequential going to random if the client starts seeking)
1 parent b201f5d commit 19c0695

1 file changed

Lines changed: 11 additions & 1 deletion

File tree

  • lang/java/mapred/src/main/java/org/apache/avro/mapred

lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929
import org.apache.avro.file.SeekableInput;
3030

31+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
32+
3133
/** Adapt an {@link FSDataInputStream} to {@link SeekableInput}. */
3234
public class FsInput implements Closeable, SeekableInput {
3335
private final FSDataInputStream stream;
@@ -41,7 +43,15 @@ public FsInput(Path path, Configuration conf) throws IOException {
4143
/** Construct given a path and a {@code FileSystem}. */
4244
public FsInput(Path path, FileSystem fileSystem) throws IOException {
4345
this.len = fileSystem.getFileStatus(path).getLen();
44-
this.stream = fileSystem.open(path);
46+
// use the hadoop 3.3.0 openFile API and specify length
47+
// and read policy. object stores can use these to
48+
// optimize read performance.
49+
// the read policy "adaptive" means "start sequential but
50+
// go to random IO after backwards seeks"
51+
// Filesystems which don't recognize the options will ignore them
52+
53+
this.stream = awaitFuture(fileSystem.openFile(path).opt("fs.option.openfile.read.policy", "adaptive")
54+
.opt("fs.option.openfile.length", Long.toString(len)).build());
4555
}
4656

4757
@Override

0 commit comments

Comments
 (0)