-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update s3 secure settings #28517
Update s3 secure settings #28517
Changes from 3 commits
26acd44
8389c7e
469828c
5ae961c
92fe976
caf9a97
1602eb3
97f4360
81cfb9c
22168ee
5af6542
076ab66
beca7c7
b623bb8
a52e100
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,66 +28,78 @@ | |
import com.amazonaws.internal.StaticCredentialsProvider; | ||
import com.amazonaws.services.s3.AmazonS3; | ||
import com.amazonaws.services.s3.AmazonS3Client; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.ElasticsearchException; | ||
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.collect.MapBuilder; | ||
import org.elasticsearch.common.component.AbstractLifecycleComponent; | ||
import org.elasticsearch.common.logging.DeprecationLogger; | ||
import org.elasticsearch.common.settings.SecureString; | ||
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.common.settings.Settings; | ||
|
||
import java.util.HashMap; | ||
import org.elasticsearch.common.util.concurrent.AbstractRefCounted; | ||
import java.util.Collections; | ||
import java.util.Map; | ||
import java.util.function.Function; | ||
import static java.util.Collections.emptyMap; | ||
|
||
|
||
class InternalAwsS3Service extends AbstractLifecycleComponent implements AwsS3Service { | ||
|
||
// pkg private for tests | ||
static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity()); | ||
|
||
private final Map<String, S3ClientSettings> clientsSettings; | ||
private volatile Map<String, AmazonS3Wrapper> clientsCache = Collections.unmodifiableMap(emptyMap()); | ||
private volatile Map<String, S3ClientSettings> clientsSettings = Collections.unmodifiableMap(emptyMap()); | ||
|
||
private final Map<String, AmazonS3Client> clientsCache = new HashMap<>(); | ||
|
||
InternalAwsS3Service(Settings settings, Map<String, S3ClientSettings> clientsSettings) { | ||
InternalAwsS3Service(Settings settings) { | ||
super(settings); | ||
this.clientsSettings = clientsSettings; | ||
updateClientSettings(settings); | ||
} | ||
|
||
@Override | ||
public synchronized AmazonS3 client(Settings repositorySettings) { | ||
String clientName = CLIENT_NAME.get(repositorySettings); | ||
AmazonS3Client client = clientsCache.get(clientName); | ||
if (client != null) { | ||
return client; | ||
public synchronized void updateClientSettings(Settings settings) { | ||
// the clients will shutdown when they will not be used anymore | ||
for (final AmazonS3Wrapper clientWrapper : clientsCache.values()) { | ||
clientWrapper.decRef(); | ||
} | ||
// clear previously cached clients | ||
clientsCache = Collections.unmodifiableMap(emptyMap()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Collections.emptyMap() is already immutable |
||
// reload secure settings | ||
clientsSettings = Collections.unmodifiableMap(S3ClientSettings.load(settings)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. leave a comment here that we built the clients lazily |
||
assert clientsSettings.containsKey("default") : "always at least have 'default'"; | ||
} | ||
|
||
S3ClientSettings clientSettings = clientsSettings.get(clientName); | ||
@Override | ||
public AmazonS3Wrapper client(String clientName) { | ||
AmazonS3Wrapper clientWrapper = clientsCache.get(clientName); | ||
if ((clientWrapper != null) && clientWrapper.tryIncRef()) { | ||
return clientWrapper; | ||
} | ||
synchronized (this) { | ||
clientWrapper = clientsCache.get(clientName); | ||
if ((clientWrapper != null) && clientWrapper.tryIncRef()) { | ||
return clientWrapper; | ||
} | ||
clientWrapper = new InternalAmazonS3Wrapper(buildClient(clientName)); | ||
clientsCache = MapBuilder.newMapBuilder(clientsCache).put(clientName, clientWrapper).immutableMap(); | ||
clientWrapper.incRef(); | ||
return clientWrapper; | ||
} | ||
} | ||
|
||
// does not require synchronization because it is called inside computeIfAbsent | ||
private AmazonS3 buildClient(String clientName) { | ||
final S3ClientSettings clientSettings = clientsSettings.get(clientName); | ||
if (clientSettings == null) { | ||
throw new IllegalArgumentException("Unknown s3 client name [" + clientName + "]. Existing client configs: " + | ||
Strings.collectionToDelimitedString(clientsSettings.keySet(), ",")); | ||
} | ||
|
||
logger.debug("creating S3 client with client_name [{}], endpoint [{}]", clientName, clientSettings.endpoint); | ||
|
||
AWSCredentialsProvider credentials = buildCredentials(logger, deprecationLogger, clientSettings, repositorySettings); | ||
ClientConfiguration configuration = buildConfiguration(clientSettings); | ||
|
||
client = new AmazonS3Client(credentials, configuration); | ||
|
||
final AWSCredentialsProvider credentials = buildCredentials(clientSettings); | ||
final ClientConfiguration configuration = buildConfiguration(clientSettings); | ||
final AmazonS3Client client = new AmazonS3Client(credentials, configuration); | ||
if (Strings.hasText(clientSettings.endpoint)) { | ||
client.setEndpoint(clientSettings.endpoint); | ||
} | ||
|
||
clientsCache.put(clientName, client); | ||
return client; | ||
} | ||
|
||
// pkg private for tests | ||
static ClientConfiguration buildConfiguration(S3ClientSettings clientSettings) { | ||
ClientConfiguration clientConfiguration = new ClientConfiguration(); | ||
ClientConfiguration buildConfiguration(S3ClientSettings clientSettings) { | ||
final ClientConfiguration clientConfiguration = new ClientConfiguration(); | ||
// the response metadata cache is only there for diagnostics purposes, | ||
// but can force objects from every response to the old generation. | ||
clientConfiguration.setResponseMetadataCacheSize(0); | ||
|
@@ -109,27 +121,8 @@ static ClientConfiguration buildConfiguration(S3ClientSettings clientSettings) { | |
} | ||
|
||
// pkg private for tests | ||
static AWSCredentialsProvider buildCredentials(Logger logger, DeprecationLogger deprecationLogger, | ||
S3ClientSettings clientSettings, Settings repositorySettings) { | ||
|
||
|
||
BasicAWSCredentials credentials = clientSettings.credentials; | ||
if (S3Repository.ACCESS_KEY_SETTING.exists(repositorySettings)) { | ||
if (S3Repository.SECRET_KEY_SETTING.exists(repositorySettings) == false) { | ||
throw new IllegalArgumentException("Repository setting [" + S3Repository.ACCESS_KEY_SETTING.getKey() + | ||
" must be accompanied by setting [" + S3Repository.SECRET_KEY_SETTING.getKey() + "]"); | ||
} | ||
try (SecureString key = S3Repository.ACCESS_KEY_SETTING.get(repositorySettings); | ||
SecureString secret = S3Repository.SECRET_KEY_SETTING.get(repositorySettings)) { | ||
credentials = new BasicAWSCredentials(key.toString(), secret.toString()); | ||
} | ||
// backcompat for reading keys out of repository settings | ||
deprecationLogger.deprecated("Using s3 access/secret key from repository settings. Instead " + | ||
"store these in named clients and the elasticsearch keystore for secure settings."); | ||
} else if (S3Repository.SECRET_KEY_SETTING.exists(repositorySettings)) { | ||
throw new IllegalArgumentException("Repository setting [" + S3Repository.SECRET_KEY_SETTING.getKey() + | ||
" must be accompanied by setting [" + S3Repository.ACCESS_KEY_SETTING.getKey() + "]"); | ||
} | ||
AWSCredentialsProvider buildCredentials(S3ClientSettings clientSettings) { | ||
final BasicAWSCredentials credentials = clientSettings.credentials; | ||
if (credentials == null) { | ||
logger.debug("Using instance profile credentials"); | ||
return new PrivilegedInstanceProfileCredentialsProvider(); | ||
|
@@ -149,10 +142,9 @@ protected void doStop() throws ElasticsearchException { | |
|
||
@Override | ||
protected void doClose() throws ElasticsearchException { | ||
for (AmazonS3Client client : clientsCache.values()) { | ||
client.shutdown(); | ||
for (final AmazonS3Wrapper clientWrapper : clientsCache.values()) { | ||
clientWrapper.decRef(); | ||
} | ||
|
||
// Ensure that IdleConnectionReaper is shutdown | ||
IdleConnectionReaper.shutdown(); | ||
} | ||
|
@@ -174,4 +166,30 @@ public void refresh() { | |
SocketAccess.doPrivilegedVoid(credentials::refresh); | ||
} | ||
} | ||
|
||
private static class InternalAmazonS3Wrapper extends AbstractRefCounted implements AmazonS3Wrapper { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we need the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd call it |
||
|
||
private final AmazonS3 client; | ||
|
||
public InternalAmazonS3Wrapper(AmazonS3 client) { | ||
super("AWS_S3_CLIENT"); | ||
this.client = client; | ||
} | ||
|
||
@Override | ||
public void close() { | ||
decRef(); | ||
} | ||
|
||
@Override | ||
public AmazonS3 client() { | ||
return client; | ||
} | ||
|
||
@Override | ||
protected void closeInternal() { | ||
client.shutdown(); | ||
} | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can u use Releasables#close here instead of a loop.