Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,23 @@

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
import com.amazonaws.services.securitytoken.model.Credentials;
import com.amazonaws.services.securitytoken.model.GetSessionTokenResult;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.apache.fluss.utils.Preconditions.checkNotNull;

Expand All @@ -47,10 +53,15 @@ public class S3DelegationTokenProvider {
private static final String REGION_KEY = "fs.s3a.region";
private static final String ENDPOINT_KEY = "fs.s3a.endpoint";

private static final String ROLE_ARN_KEY = "fs.s3a.assumed.role.arn";
private static final String STS_ENDPOINT_KEY = "fs.s3a.assumed.role.sts.endpoint";

private final String scheme;
private final String region;
private final String accessKey;
private final String secretKey;
@Nullable private final String roleArn;
@Nullable private final String stsEndpoint;
private final Map<String, String> additionInfos;

public S3DelegationTokenProvider(String scheme, Configuration conf) {
Expand All @@ -59,6 +70,8 @@ public S3DelegationTokenProvider(String scheme, Configuration conf) {
checkNotNull(region, "Region is not set.");
this.accessKey = conf.get(ACCESS_KEY_ID);
this.secretKey = conf.get(ACCESS_KEY_SECRET);
this.roleArn = conf.get(ROLE_ARN_KEY);
this.stsEndpoint = conf.get(STS_ENDPOINT_KEY);
this.additionInfos = new HashMap<>();
for (String key : Arrays.asList(REGION_KEY, ENDPOINT_KEY)) {
if (conf.get(key) != null) {
Expand All @@ -68,25 +81,59 @@ public S3DelegationTokenProvider(String scheme, Configuration conf) {
}

public ObtainedSecurityToken obtainSecurityToken() {
LOG.info("Obtaining session credentials token with access key: {}", accessKey);
AWSSecurityTokenService stsClient = buildStsClient();
try {
Credentials credentials;

if (roleArn != null) {
LOG.info(
"Obtaining session credentials via AssumeRole with access key: {}, role: {}",
accessKey,
roleArn);
AssumeRoleRequest request =
new AssumeRoleRequest()
.withRoleArn(roleArn)
.withRoleSessionName("fluss-" + UUID.randomUUID());
AssumeRoleResult result = stsClient.assumeRole(request);
credentials = result.getCredentials();
} else {
LOG.info(
"Obtaining session credentials via GetSessionToken with access key: {}",
accessKey);
GetSessionTokenResult result = stsClient.getSessionToken();
credentials = result.getCredentials();
}

AWSSecurityTokenService stsClient =
LOG.info(
"Session credentials obtained successfully with access key: {} expiration: {}",
credentials.getAccessKeyId(),
credentials.getExpiration());

return new ObtainedSecurityToken(
scheme,
toJson(credentials),
credentials.getExpiration().getTime(),
additionInfos);
} finally {
stsClient.shutdown();
}
}

private AWSSecurityTokenService buildStsClient() {
AWSSecurityTokenServiceClientBuilder builder =
AWSSecurityTokenServiceClientBuilder.standard()
.withRegion(region)
.withCredentials(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(accessKey, secretKey)))
.build();
GetSessionTokenResult sessionTokenResult = stsClient.getSessionToken();
Credentials credentials = sessionTokenResult.getCredentials();

LOG.info(
"Session credentials obtained successfully with access key: {} expiration: {}",
credentials.getAccessKeyId(),
credentials.getExpiration());

return new ObtainedSecurityToken(
scheme, toJson(credentials), credentials.getExpiration().getTime(), additionInfos);
new BasicAWSCredentials(accessKey, secretKey)));

if (stsEndpoint != null) {
builder.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(stsEndpoint, region));
} else {
builder.withRegion(region);
}

return builder.build();
}

private byte[] toJson(Credentials credentials) {
Expand Down
Loading