Skip to content

Commit bed0579

Browse files
junmuzymuzammil
authored andcommitted
Changes for supporting Atomic writes with Hadoop 3.4+
1 parent b1c3463 commit bed0579

3 files changed

Lines changed: 65 additions & 2 deletions

File tree

paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/AzureFileIO.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@
2020

2121
import org.apache.paimon.catalog.CatalogContext;
2222
import org.apache.paimon.fs.FileIO;
23+
import org.apache.paimon.fs.Path;
2324
import org.apache.paimon.options.Options;
2425

2526
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.fs.FSDataOutputStream;
28+
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
29+
import org.apache.hadoop.fs.FileAlreadyExistsException;
2630
import org.apache.hadoop.fs.FileSystem;
2731
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
2832
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
@@ -32,6 +36,7 @@
3236
import java.io.IOException;
3337
import java.io.UncheckedIOException;
3438
import java.net.URI;
39+
import java.nio.charset.StandardCharsets;
3540
import java.util.Map;
3641
import java.util.Objects;
3742
import java.util.concurrent.ConcurrentHashMap;
@@ -66,6 +71,33 @@ public void configure(CatalogContext context) {
6671
this.hadoopOptions = mirrorCertainHadoopConfig(loadHadoopConfigFromContext(context));
6772
}
6873

74+
/**
75+
* Write content atomically using Azure conditional writes.
76+
*
77+
* @param path the target file path
78+
* @param content the content to write
79+
* @return true if write succeeded, false if file already exists
80+
* @throws IOException on I/O errors
81+
*/
82+
@Override
83+
public boolean tryToWriteAtomic(Path path, String content) throws IOException {
84+
org.apache.hadoop.fs.Path hadoopPath = path(path);
85+
FileSystem fs = getFileSystem(hadoopPath);
86+
87+
byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
88+
89+
FSDataOutputStreamBuilder builder = fs.createFile(hadoopPath);
90+
builder.must("fs.option.create.conditional.overwrite", true);
91+
92+
try (FSDataOutputStream out = builder.create().overwrite(false).build()) {
93+
out.write(contentBytes);
94+
return true;
95+
} catch (FileAlreadyExistsException e) {
96+
LOG.debug("Conditional write failed, file already exists: {}", path);
97+
return false;
98+
}
99+
}
100+
69101
// add additional config entries from the IO config to the Hadoop config
70102
private Options loadHadoopConfigFromContext(CatalogContext context) {
71103
Options hadoopConfig = new Options();

paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/HadoopCompliantFileIO.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,11 @@ public boolean rename(Path src, Path dst) throws IOException {
102102
return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst);
103103
}
104104

105-
private org.apache.hadoop.fs.Path path(Path path) {
105+
protected org.apache.hadoop.fs.Path path(Path path) {
106106
return new org.apache.hadoop.fs.Path(path.toUri());
107107
}
108108

109-
private FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException {
109+
protected FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException {
110110
if (fs == null) {
111111
synchronized (this) {
112112
if (fs == null) {

paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import org.apache.paimon.options.Options;
2626

2727
import org.apache.hadoop.conf.Configuration;
28+
import org.apache.hadoop.fs.FSDataOutputStream;
29+
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
30+
import org.apache.hadoop.fs.FileAlreadyExistsException;
2831
import org.apache.hadoop.fs.FileSystem;
2932
import org.apache.hadoop.fs.s3a.S3AFileSystem;
3033
import org.slf4j.Logger;
@@ -33,6 +36,7 @@
3336
import java.io.IOException;
3437
import java.io.UncheckedIOException;
3538
import java.net.URI;
39+
import java.nio.charset.StandardCharsets;
3640
import java.util.Map;
3741
import java.util.Objects;
3842
import java.util.concurrent.ConcurrentHashMap;
@@ -85,6 +89,33 @@ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite
8589
new S3MultiPartUpload(fs, fs.getConf()), hadoopPath, path);
8690
}
8791

92+
/**
93+
* Write content atomically using S3 conditional writes via Hadoop 3.4+ native API.
94+
*
95+
* @param path the target file path
96+
* @param content the content to write
97+
* @return true if write succeeded, false if file already exists
98+
* @throws IOException on I/O errors
99+
*/
100+
@Override
101+
public boolean tryToWriteAtomic(Path path, String content) throws IOException {
102+
org.apache.hadoop.fs.Path hadoopPath = path(path);
103+
S3AFileSystem fs = (S3AFileSystem) getFileSystem(hadoopPath);
104+
105+
byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
106+
107+
FSDataOutputStreamBuilder builder = fs.createFile(hadoopPath);
108+
builder.must("fs.option.create.conditional.overwrite", true);
109+
110+
try (FSDataOutputStream out = builder.create().overwrite(false).build()) {
111+
out.write(contentBytes);
112+
return true;
113+
} catch (FileAlreadyExistsException e) {
114+
LOG.debug("Conditional write failed, file already exists: {}", path);
115+
return false;
116+
}
117+
}
118+
88119
// add additional config entries from the IO config to the Hadoop config
89120
private Options loadHadoopConfigFromContext(CatalogContext context) {
90121
Options hadoopConfig = new Options();

0 commit comments

Comments
 (0)