diff --git a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java index 33caaacddc..74c178a34b 100644 --- a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java +++ b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java @@ -22,17 +22,23 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.securitytoken.AWSSecurityTokenService; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import com.amazonaws.services.securitytoken.model.AssumeRoleRequest; +import com.amazonaws.services.securitytoken.model.AssumeRoleResult; import com.amazonaws.services.securitytoken.model.Credentials; import com.amazonaws.services.securitytoken.model.GetSessionTokenResult; import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -47,10 +53,15 @@ public class S3DelegationTokenProvider { private static final String REGION_KEY = "fs.s3a.region"; private static final String ENDPOINT_KEY = "fs.s3a.endpoint"; + private static final String ROLE_ARN_KEY = "fs.s3a.assumed.role.arn"; + private static final String STS_ENDPOINT_KEY = "fs.s3a.assumed.role.sts.endpoint"; + private final String scheme; private final String region; private final String accessKey; private final String secretKey; + @Nullable private final String roleArn; + @Nullable private final String stsEndpoint; private final Map additionInfos; public S3DelegationTokenProvider(String scheme, Configuration conf) { @@ -59,6 +70,8 @@ public S3DelegationTokenProvider(String scheme, Configuration conf) { checkNotNull(region, "Region is not set."); this.accessKey = conf.get(ACCESS_KEY_ID); this.secretKey = conf.get(ACCESS_KEY_SECRET); + this.roleArn = conf.get(ROLE_ARN_KEY); + this.stsEndpoint = conf.get(STS_ENDPOINT_KEY); this.additionInfos = new HashMap<>(); for (String key : Arrays.asList(REGION_KEY, ENDPOINT_KEY)) { if (conf.get(key) != null) { @@ -68,25 +81,59 @@ public S3DelegationTokenProvider(String scheme, Configuration conf) { } public ObtainedSecurityToken obtainSecurityToken() { - LOG.info("Obtaining session credentials token with access key: {}", accessKey); + AWSSecurityTokenService stsClient = buildStsClient(); + try { + Credentials credentials; + + if (roleArn != null) { + LOG.info( + "Obtaining session credentials via AssumeRole with access key: {}, role: {}", + accessKey, + roleArn); + AssumeRoleRequest request = + new AssumeRoleRequest() + .withRoleArn(roleArn) + .withRoleSessionName("fluss-" + UUID.randomUUID()); + AssumeRoleResult result = stsClient.assumeRole(request); + credentials = result.getCredentials(); + } else { + LOG.info( + "Obtaining session credentials via GetSessionToken with access key: {}", + accessKey); + GetSessionTokenResult result = stsClient.getSessionToken(); + credentials = result.getCredentials(); + } - AWSSecurityTokenService stsClient = + LOG.info( + "Session credentials obtained successfully with access key: {} expiration: {}", + credentials.getAccessKeyId(), + credentials.getExpiration()); + + return new ObtainedSecurityToken( + scheme, + toJson(credentials), + credentials.getExpiration().getTime(), + additionInfos); + } finally { + stsClient.shutdown(); + } + } + + private AWSSecurityTokenService buildStsClient() { + AWSSecurityTokenServiceClientBuilder builder = AWSSecurityTokenServiceClientBuilder.standard() - .withRegion(region) .withCredentials( new AWSStaticCredentialsProvider( - new BasicAWSCredentials(accessKey, secretKey))) - .build(); - GetSessionTokenResult sessionTokenResult = stsClient.getSessionToken(); - Credentials credentials = sessionTokenResult.getCredentials(); - - LOG.info( - "Session credentials obtained successfully with access key: {} expiration: {}", - credentials.getAccessKeyId(), - credentials.getExpiration()); - - return new ObtainedSecurityToken( - scheme, toJson(credentials), credentials.getExpiration().getTime(), additionInfos); + new BasicAWSCredentials(accessKey, secretKey))); + + if (stsEndpoint != null) { + builder.withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration(stsEndpoint, region)); + } else { + builder.withRegion(region); + } + + return builder.build(); } private byte[] toJson(Credentials credentials) {