Skip to content

Commit e1086d1

Browse files
[FLUSS-2686] Add COS filesystem support
1 parent b813d7e commit e1086d1

3 files changed

Lines changed: 47 additions & 27 deletions

File tree

fluss-filesystems/fluss-fs-cos/pom.xml

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,17 @@
121121
<artifactId>maven-jar-plugin</artifactId>
122122
<executions>
123123
<execution>
124+
<id>default-jar</id>
124125
<goals>
125126
<goal>jar</goal>
126127
</goals>
127128
</execution>
129+
<execution>
130+
<id>test-jar</id>
131+
<goals>
132+
<goal>test-jar</goal>
133+
</goals>
134+
</execution>
128135
</executions>
129136
<configuration>
130137
<archive>
@@ -237,17 +244,6 @@
237244
</executions>
238245
</plugin>
239246

240-
<plugin>
241-
<groupId>org.apache.maven.plugins</groupId>
242-
<artifactId>maven-jar-plugin</artifactId>
243-
<executions>
244-
<execution>
245-
<goals>
246-
<goal>test-jar</goal>
247-
</goals>
248-
</execution>
249-
</executions>
250-
</plugin>
251247
</plugins>
252248
</build>
253249

fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/token/COSSecurityTokenReceiver.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.fs.token.ObtainedSecurityToken;
2323
import org.apache.fluss.fs.token.SecurityTokenReceiver;
2424

25+
import com.qcloud.cos.auth.BasicCOSCredentials;
2526
import com.qcloud.cos.auth.BasicSessionCredentials;
2627
import com.qcloud.cos.auth.COSCredentials;
2728
import org.slf4j.Logger;
@@ -65,7 +66,10 @@ protected static void updateHadoopConfig(
6566
// then, set addition info
6667
if (additionInfos == null) {
6768
// if addition info is null, it also means we have not received any token,
68-
throw new RuntimeException("Credentials is not ready.");
69+
throw new RuntimeException(
70+
"COS credentials have not been received yet. "
71+
+ "Ensure onNewTokensObtained() has been called with valid tokens "
72+
+ "before invoking COSSecurityTokenReceiver.updateHadoopConfig().");
6973
} else {
7074
for (Map.Entry<String, String> entry : additionInfos.entrySet()) {
7175
hadoopConfig.set(entry.getKey(), entry.getValue());
@@ -89,14 +93,23 @@ public void onNewTokensObtained(ObtainedSecurityToken token) {
8993
org.apache.fluss.fs.token.Credentials flussCredentials =
9094
CredentialsJsonSerde.fromJson(tokenBytes);
9195

92-
// Create Credential from fluss credentials
93-
credentials =
94-
new BasicSessionCredentials(
95-
flussCredentials.getAccessKeyId(),
96-
flussCredentials.getSecretAccessKey(),
97-
flussCredentials.getSecurityToken());
96+
// Create COSCredentials from fluss credentials, distinguishing between
97+
// static credentials (no session token) and temporary session credentials
98+
if (flussCredentials.getSecurityToken() != null) {
99+
credentials =
100+
new BasicSessionCredentials(
101+
flussCredentials.getAccessKeyId(),
102+
flussCredentials.getSecretAccessKey(),
103+
flussCredentials.getSecurityToken());
104+
} else {
105+
credentials =
106+
new BasicCOSCredentials(
107+
flussCredentials.getAccessKeyId(),
108+
flussCredentials.getSecretAccessKey());
109+
}
98110
additionInfos = token.getAdditionInfos();
99111

112+
// Consistent with S3DelegationTokenReceiver, logging access key at INFO level
100113
LOG.info(
101114
"Session credentials updated successfully with access key: {}.",
102115
credentials.getCOSAccessKeyId());

fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/token/DynamicTemporaryCOSCredentialsProvider.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@
2626
import org.slf4j.LoggerFactory;
2727

2828
/**
29-
* Support dynamic session credentials for authenticating with Tencent Cloud COS. It'll get
30-
* credentials from {@link COSSecurityTokenReceiver}. It implements COS native {@link
31-
* COSCredentialsProvider} to work with CosNFileSystem.
29+
* Support dynamic credentials for authenticating with Tencent Cloud COS. It'll get credentials from
30+
* {@link COSSecurityTokenReceiver}. It implements COS native {@link COSCredentialsProvider} to work
31+
* with CosNFileSystem.
32+
*
33+
* <p>This provider supports both static credentials (secretId/secretKey) and temporary session
34+
* credentials (secretId/secretKey/sessionToken).
3235
*/
3336
@Internal
3437
public class DynamicTemporaryCOSCredentialsProvider implements COSCredentialsProvider {
@@ -42,13 +45,21 @@ public class DynamicTemporaryCOSCredentialsProvider implements COSCredentialsPro
4245
public COSCredentials getCredentials() {
4346
COSCredentials credentials = COSSecurityTokenReceiver.getCredentials();
4447
if (credentials == null) {
45-
throw new RuntimeException("Credentials is not ready.");
48+
throw new RuntimeException(
49+
"COS credentials have not been received yet. "
50+
+ "Ensure COSSecurityTokenReceiver has received valid tokens.");
51+
}
52+
if (credentials instanceof BasicSessionCredentials) {
53+
BasicSessionCredentials sessionCredentials = (BasicSessionCredentials) credentials;
54+
LOG.debug("Providing session credentials");
55+
return new BasicSessionCredentials(
56+
sessionCredentials.getCOSAccessKeyId(),
57+
sessionCredentials.getCOSSecretKey(),
58+
sessionCredentials.getSessionToken());
59+
} else {
60+
LOG.debug("Providing non-session COS credentials");
61+
return credentials;
4662
}
47-
LOG.debug("Providing session credentials");
48-
return new BasicSessionCredentials(
49-
credentials.getCOSAccessKeyId(),
50-
credentials.getCOSSecretKey(),
51-
((BasicSessionCredentials) credentials).getSessionToken());
5263
}
5364

5465
@Override

0 commit comments

Comments
 (0)