2222
2323import com .amazonaws .auth .AWSStaticCredentialsProvider ;
2424import com .amazonaws .auth .BasicAWSCredentials ;
25+ import com .amazonaws .client .builder .AwsClientBuilder ;
2526import com .amazonaws .services .securitytoken .AWSSecurityTokenService ;
2627import com .amazonaws .services .securitytoken .AWSSecurityTokenServiceClientBuilder ;
28+ import com .amazonaws .services .securitytoken .model .AssumeRoleRequest ;
29+ import com .amazonaws .services .securitytoken .model .AssumeRoleResult ;
2730import com .amazonaws .services .securitytoken .model .Credentials ;
2831import com .amazonaws .services .securitytoken .model .GetSessionTokenResult ;
2932import org .apache .hadoop .conf .Configuration ;
3033import org .slf4j .Logger ;
3134import org .slf4j .LoggerFactory ;
3235
36+ import javax .annotation .Nullable ;
37+
3338import java .util .Arrays ;
3439import java .util .HashMap ;
3540import java .util .Map ;
41+ import java .util .UUID ;
3642
3743import static org .apache .fluss .utils .Preconditions .checkNotNull ;
3844
@@ -47,10 +53,15 @@ public class S3DelegationTokenProvider {
4753 private static final String REGION_KEY = "fs.s3a.region" ;
4854 private static final String ENDPOINT_KEY = "fs.s3a.endpoint" ;
4955
56+ private static final String ROLE_ARN_KEY = "fs.s3a.assumed.role.arn" ;
57+ private static final String STS_ENDPOINT_KEY = "fs.s3a.assumed.role.sts.endpoint" ;
58+
5059 private final String scheme ;
5160 private final String region ;
5261 private final String accessKey ;
5362 private final String secretKey ;
63+ @ Nullable private final String roleArn ;
64+ @ Nullable private final String stsEndpoint ;
5465 private final Map <String , String > additionInfos ;
5566
5667 public S3DelegationTokenProvider (String scheme , Configuration conf ) {
@@ -59,6 +70,8 @@ public S3DelegationTokenProvider(String scheme, Configuration conf) {
5970 checkNotNull (region , "Region is not set." );
6071 this .accessKey = conf .get (ACCESS_KEY_ID );
6172 this .secretKey = conf .get (ACCESS_KEY_SECRET );
73+ this .roleArn = conf .get (ROLE_ARN_KEY );
74+ this .stsEndpoint = conf .get (STS_ENDPOINT_KEY );
6275 this .additionInfos = new HashMap <>();
6376 for (String key : Arrays .asList (REGION_KEY , ENDPOINT_KEY )) {
6477 if (conf .get (key ) != null ) {
@@ -68,17 +81,27 @@ public S3DelegationTokenProvider(String scheme, Configuration conf) {
6881 }
6982
7083 public ObtainedSecurityToken obtainSecurityToken () {
71- LOG .info ("Obtaining session credentials token with access key: {}" , accessKey );
72-
73- AWSSecurityTokenService stsClient =
74- AWSSecurityTokenServiceClientBuilder .standard ()
75- .withRegion (region )
76- .withCredentials (
77- new AWSStaticCredentialsProvider (
78- new BasicAWSCredentials (accessKey , secretKey )))
79- .build ();
80- GetSessionTokenResult sessionTokenResult = stsClient .getSessionToken ();
81- Credentials credentials = sessionTokenResult .getCredentials ();
84+ AWSSecurityTokenService stsClient = buildStsClient ();
85+ Credentials credentials ;
86+
87+ if (roleArn != null ) {
88+ LOG .info (
89+ "Obtaining session credentials via AssumeRole with access key: {}, role: {}" ,
90+ accessKey ,
91+ roleArn );
92+ AssumeRoleRequest request =
93+ new AssumeRoleRequest ()
94+ .withRoleArn (roleArn )
95+ .withRoleSessionName ("fluss-" + UUID .randomUUID ());
96+ AssumeRoleResult result = stsClient .assumeRole (request );
97+ credentials = result .getCredentials ();
98+ } else {
99+ LOG .info (
100+ "Obtaining session credentials via GetSessionToken with access key: {}" ,
101+ accessKey );
102+ GetSessionTokenResult result = stsClient .getSessionToken ();
103+ credentials = result .getCredentials ();
104+ }
82105
83106 LOG .info (
84107 "Session credentials obtained successfully with access key: {} expiration: {}" ,
@@ -89,6 +112,23 @@ public ObtainedSecurityToken obtainSecurityToken() {
89112 scheme , toJson (credentials ), credentials .getExpiration ().getTime (), additionInfos );
90113 }
91114
115+ private AWSSecurityTokenService buildStsClient () {
116+ AWSSecurityTokenServiceClientBuilder builder =
117+ AWSSecurityTokenServiceClientBuilder .standard ()
118+ .withCredentials (
119+ new AWSStaticCredentialsProvider (
120+ new BasicAWSCredentials (accessKey , secretKey )));
121+
122+ if (stsEndpoint != null ) {
123+ builder .withEndpointConfiguration (
124+ new AwsClientBuilder .EndpointConfiguration (stsEndpoint , region ));
125+ } else {
126+ builder .withRegion (region );
127+ }
128+
129+ return builder .build ();
130+ }
131+
92132 private byte [] toJson (Credentials credentials ) {
93133 org .apache .fluss .fs .token .Credentials flussCredentials =
94134 new org .apache .fluss .fs .token .Credentials (
0 commit comments