diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/AmazonS3Reference.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/AmazonS3Reference.java new file mode 100644 index 0000000000000..4646f116e4d04 --- /dev/null +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/AmazonS3Reference.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories.s3; + +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; + +import org.elasticsearch.common.lease.Releasable; + +/** + * Handles the shutdown of the internal {@link AmazonS3Client} by reference + * counting. + */ +public class AmazonS3Reference extends AbstractRefCounted implements Releasable { + + private final AmazonS3 client; + + AmazonS3Reference(AmazonS3 client) { + super("AWS_S3_CLIENT"); + this.client = client; + } + + /** + * Call when the client is not needed anymore. + */ + @Override + public void close() { + decRef(); + } + + /** + * Returns the underlying `AmazonS3` client. All calls are permitted BUT NOT + * shutdown. Shutdown is called on 0 reference count. + */ + public AmazonS3 client() { + return client; + } + + @Override + protected void closeInternal() { + client.shutdown(); + } + +} \ No newline at end of file diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/AwsS3Service.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/AwsS3Service.java index ed8c9f322aa60..5ddb93cdbe1b2 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/AwsS3Service.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/AwsS3Service.java @@ -19,23 +19,16 @@ package org.elasticsearch.repositories.s3; -import com.amazonaws.services.s3.AmazonS3; import org.elasticsearch.common.component.LifecycleComponent; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.RefCounted; interface AwsS3Service extends LifecycleComponent { /** * Creates an {@code AmazonS3} client from the given repository metadata and node settings. */ - AmazonS3Wrapper client(String clientName); + AmazonS3Reference client(String clientName); void updateClientSettings(Settings settings); - static interface AmazonS3Wrapper extends Releasable, RefCounted { - AmazonS3 client(); - } - } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/InternalAwsS3Service.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/InternalAwsS3Service.java index 034f7e58e1647..f76f6900f5350 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/InternalAwsS3Service.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/InternalAwsS3Service.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import java.util.Collections; import java.util.Map; import static java.util.Collections.emptyMap; @@ -41,46 +40,51 @@ class InternalAwsS3Service extends AbstractLifecycleComponent implements AwsS3Service { - private volatile Map clientsCache = Collections.unmodifiableMap(emptyMap()); - private volatile Map clientsSettings = Collections.unmodifiableMap(emptyMap()); + private volatile Map clientsCache = emptyMap(); + private volatile Map clientsSettings = emptyMap(); InternalAwsS3Service(Settings settings) { super(settings); updateClientSettings(settings); } + /** + * Reloads the settings for the AmazonS3 client. New clients will be build using + * these. Old clients are usable until released. On release they will be + * destroyed contrary to being returned to the registry. + */ @Override public synchronized void updateClientSettings(Settings settings) { - // the clients will shutdown when they will not be used anymore - for (final AmazonS3Wrapper clientWrapper : clientsCache.values()) { - clientWrapper.decRef(); - } - // clear previously cached clients - clientsCache = Collections.unmodifiableMap(emptyMap()); + // shutdown all unused clients, others will shutdown on their respective release + doClose(); // reload secure settings clientsSettings = Collections.unmodifiableMap(S3ClientSettings.load(settings)); assert clientsSettings.containsKey("default") : "always at least have 'default'"; + // clients are built lazily by #client(String) } + /** + * Attempts to retrieve a client by name from the registry. If the client does + * not exist it will be created. + */ @Override - public AmazonS3Wrapper client(String clientName) { - AmazonS3Wrapper clientWrapper = clientsCache.get(clientName); - if ((clientWrapper != null) && clientWrapper.tryIncRef()) { - return clientWrapper; + public AmazonS3Reference client(String clientName) { + AmazonS3Reference clientReference = clientsCache.get(clientName); + if ((clientReference != null) && clientReference.tryIncRef()) { + return clientReference; } synchronized (this) { - clientWrapper = clientsCache.get(clientName); - if ((clientWrapper != null) && clientWrapper.tryIncRef()) { - return clientWrapper; + clientReference = clientsCache.get(clientName); + if ((clientReference != null) && clientReference.tryIncRef()) { + return clientReference; } - clientWrapper = new InternalAmazonS3Wrapper(buildClient(clientName)); - clientsCache = MapBuilder.newMapBuilder(clientsCache).put(clientName, clientWrapper).immutableMap(); - clientWrapper.incRef(); - return clientWrapper; + clientReference = new AmazonS3Reference(buildClient(clientName)); + clientsCache = MapBuilder.newMapBuilder(clientsCache).put(clientName, clientReference).immutableMap(); + clientReference.incRef(); + return clientReference; } } - // does not require synchronization because it is called inside computeIfAbsent private AmazonS3 buildClient(String clientName) { final S3ClientSettings clientSettings = clientsSettings.get(clientName); if (clientSettings == null) { @@ -141,11 +145,15 @@ protected void doStop() throws ElasticsearchException { } @Override - protected void doClose() throws ElasticsearchException { - for (final AmazonS3Wrapper clientWrapper : clientsCache.values()) { - clientWrapper.decRef(); + protected synchronized void doClose() throws ElasticsearchException { + // the clients will shutdown when they will not be used anymore + for (final AmazonS3Reference clientReference : clientsCache.values()) { + clientReference.decRef(); } - // Ensure that IdleConnectionReaper is shutdown + // clear previously cached clients + clientsCache = emptyMap(); + // shutdown IdleConnectionReaper background thread + // it will be restarted on any new client usage IdleConnectionReaper.shutdown(); } @@ -167,29 +175,4 @@ public void refresh() { } } - private static class InternalAmazonS3Wrapper extends AbstractRefCounted implements AmazonS3Wrapper { - - private final AmazonS3 client; - - public InternalAmazonS3Wrapper(AmazonS3 client) { - super("AWS_S3_CLIENT"); - this.client = client; - } - - @Override - public void close() { - decRef(); - } - - @Override - public AmazonS3 client() { - return client; - } - - @Override - protected void closeInternal() { - client.shutdown(); - } - - } } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 3d9c399f6eff6..7a4ca6dc329c5 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -72,8 +72,8 @@ class S3BlobContainer extends AbstractBlobContainer { @Override public boolean blobExists(String blobName) { - try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) { - return SocketAccess.doPrivileged(() -> clientWrapper.client().doesObjectExist(blobStore.bucket(), buildKey(blobName))); + try (AmazonS3Reference clientReference = blobStore.clientReference()) { + return SocketAccess.doPrivileged(() -> clientReference.client().doesObjectExist(blobStore.bucket(), buildKey(blobName))); } catch (Exception e) { throw new BlobStoreException("Failed to check if blob [" + blobName +"] exists", e); } @@ -81,8 +81,8 @@ public boolean blobExists(String blobName) { @Override public InputStream readBlob(String blobName) throws IOException { - try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) { - final S3Object s3Object = SocketAccess.doPrivileged(() -> clientWrapper.client().getObject(blobStore.bucket(), + try (AmazonS3Reference clientReference = blobStore.clientReference()) { + final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(blobStore.bucket(), buildKey(blobName))); return s3Object.getObjectContent(); } catch (AmazonClientException e) { @@ -114,8 +114,8 @@ public void deleteBlob(String blobName) throws IOException { throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); } - try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) { - SocketAccess.doPrivilegedVoid(() -> clientWrapper.client().deleteObject(blobStore.bucket(), buildKey(blobName))); + try (AmazonS3Reference clientReference = blobStore.clientReference()) { + SocketAccess.doPrivilegedVoid(() -> clientReference.client().deleteObject(blobStore.bucket(), buildKey(blobName))); } catch (AmazonClientException e) { throw new IOException("Exception when deleting blob [" + blobName + "]", e); } @@ -124,19 +124,19 @@ public void deleteBlob(String blobName) throws IOException { @Override public Map listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException { final MapBuilder blobsBuilder = MapBuilder.newMapBuilder(); - try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) { + try (AmazonS3Reference clientReference = blobStore.clientReference()) { ObjectListing prevListing = null; while (true) { ObjectListing list; if (prevListing != null) { final ObjectListing finalPrevListing = prevListing; - list = SocketAccess.doPrivileged(() -> clientWrapper.client().listNextBatchOfObjects(finalPrevListing)); + list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing)); } else { if (blobNamePrefix != null) { - list = SocketAccess.doPrivileged(() -> clientWrapper.client().listObjects(blobStore.bucket(), + list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(blobStore.bucket(), buildKey(blobNamePrefix))); } else { - list = SocketAccess.doPrivileged(() -> clientWrapper.client().listObjects(blobStore.bucket(), keyPath)); + list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(blobStore.bucket(), keyPath)); } } for (S3ObjectSummary summary : list.getObjectSummaries()) { @@ -166,10 +166,10 @@ public void move(String sourceBlobName, String targetBlobName) throws IOExceptio request.setNewObjectMetadata(objectMetadata); } - try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) { + try (AmazonS3Reference clientReference = blobStore.clientReference()) { SocketAccess.doPrivilegedVoid(() -> { - clientWrapper.client().copyObject(request); - clientWrapper.client().deleteObject(blobStore.bucket(), buildKey(sourceBlobName)); + clientReference.client().copyObject(request); + clientReference.client().deleteObject(blobStore.bucket(), buildKey(sourceBlobName)); }); } catch (AmazonS3Exception e) { throw new IOException(e); @@ -210,9 +210,9 @@ void executeSingleUpload(final S3BlobStore blobStore, putRequest.setStorageClass(blobStore.getStorageClass()); putRequest.setCannedAcl(blobStore.getCannedACL()); - try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) { + try (AmazonS3Reference clientReference = blobStore.clientReference()) { SocketAccess.doPrivilegedVoid(() -> { - clientWrapper.client().putObject(putRequest); + clientReference.client().putObject(putRequest); }); } catch (AmazonClientException e) { throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e); @@ -259,9 +259,9 @@ void executeMultipartUpload(final S3BlobStore blobStore, md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); initRequest.setObjectMetadata(md); } - try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) { + try (AmazonS3Reference clientReference = blobStore.clientReference()) { - uploadId.set(SocketAccess.doPrivileged(() -> clientWrapper.client().initiateMultipartUpload(initRequest).getUploadId())); + uploadId.set(SocketAccess.doPrivileged(() -> clientReference.client().initiateMultipartUpload(initRequest).getUploadId())); if (Strings.isEmpty(uploadId.get())) { throw new IOException("Failed to initialize multipart upload " + blobName); } @@ -286,7 +286,7 @@ void executeMultipartUpload(final S3BlobStore blobStore, } bytesCount += uploadRequest.getPartSize(); - final UploadPartResult uploadResponse = SocketAccess.doPrivileged(() -> clientWrapper.client().uploadPart(uploadRequest)); + final UploadPartResult uploadResponse = SocketAccess.doPrivileged(() -> clientReference.client().uploadPart(uploadRequest)); parts.add(uploadResponse.getPartETag()); } @@ -296,7 +296,7 @@ void executeMultipartUpload(final S3BlobStore blobStore, } CompleteMultipartUploadRequest complRequest = new CompleteMultipartUploadRequest(bucketName, blobName, uploadId.get(), parts); - SocketAccess.doPrivilegedVoid(() -> clientWrapper.client().completeMultipartUpload(complRequest)); + SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest)); success = true; } catch (AmazonClientException e) { @@ -304,8 +304,8 @@ void executeMultipartUpload(final S3BlobStore blobStore, } finally { if (success == false && Strings.hasLength(uploadId.get())) { final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(bucketName, blobName, uploadId.get()); - try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) { - SocketAccess.doPrivilegedVoid(() -> clientWrapper.client().abortMultipartUpload(abortRequest)); + try (AmazonS3Reference clientReference = blobStore.clientReference()) { + SocketAccess.doPrivilegedVoid(() -> clientReference.client().abortMultipartUpload(abortRequest)); } } } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index 511bff30504b7..395eb83659772 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -69,9 +69,9 @@ class S3BlobStore extends AbstractComponent implements BlobStore { // Also, if invalid security credentials are used to execute this method, the // client is not able to distinguish between bucket permission errors and // invalid credential errors, and this method could return an incorrect result. - try (AwsS3Service.AmazonS3Wrapper clientWrapper = clientWrapper()) { + try (AmazonS3Reference clientReference = clientReference()) { SocketAccess.doPrivilegedVoid(() -> { - if (clientWrapper.client().doesBucketExist(bucket) == false) { + if (clientReference.client().doesBucketExist(bucket) == false) { throw new IllegalArgumentException("The bucket [" + bucket + "] does not exist. Please create it before " + " creating an s3 snapshot repository backed by it."); } @@ -84,7 +84,7 @@ public String toString() { return bucket; } - public AwsS3Service.AmazonS3Wrapper clientWrapper() { + public AmazonS3Reference clientReference() { return service.client(clientName); } @@ -107,30 +107,30 @@ public BlobContainer blobContainer(BlobPath path) { @Override public void delete(BlobPath path) { - try (AwsS3Service.AmazonS3Wrapper clientWrapper = clientWrapper()) { + try (AmazonS3Reference clientReference = clientReference()) { ObjectListing prevListing = null; // From // http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html // we can do at most 1K objects per delete // We don't know the bucket name until first object listing DeleteObjectsRequest multiObjectDeleteRequest = null; - ArrayList keys = new ArrayList<>(); + final ArrayList keys = new ArrayList<>(); while (true) { ObjectListing list; if (prevListing != null) { final ObjectListing finalPrevListing = prevListing; - list = SocketAccess.doPrivileged(() -> clientWrapper.client().listNextBatchOfObjects(finalPrevListing)); + list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing)); } else { - list = SocketAccess.doPrivileged(() -> clientWrapper.client().listObjects(bucket, path.buildAsString())); + list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(bucket, path.buildAsString())); multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName()); } - for (S3ObjectSummary summary : list.getObjectSummaries()) { + for (final S3ObjectSummary summary : list.getObjectSummaries()) { keys.add(new KeyVersion(summary.getKey())); // Every 500 objects batch the delete request if (keys.size() > 500) { multiObjectDeleteRequest.setKeys(keys); final DeleteObjectsRequest finalMultiObjectDeleteRequest = multiObjectDeleteRequest; - SocketAccess.doPrivilegedVoid(() -> clientWrapper.client().deleteObjects(finalMultiObjectDeleteRequest)); + SocketAccess.doPrivilegedVoid(() -> clientReference.client().deleteObjects(finalMultiObjectDeleteRequest)); multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName()); keys.clear(); } @@ -144,7 +144,7 @@ public void delete(BlobPath path) { if (!keys.isEmpty()) { multiObjectDeleteRequest.setKeys(keys); final DeleteObjectsRequest finalMultiObjectDeleteRequest = multiObjectDeleteRequest; - SocketAccess.doPrivilegedVoid(() -> clientWrapper.client().deleteObjects(finalMultiObjectDeleteRequest)); + SocketAccess.doPrivilegedVoid(() -> clientReference.client().deleteObjects(finalMultiObjectDeleteRequest)); } } } @@ -161,18 +161,18 @@ public CannedAccessControlList getCannedACL() { public StorageClass getStorageClass() { return storageClass; } public static StorageClass initStorageClass(String storageClass) { - if (storageClass == null || storageClass.equals("")) { + if ((storageClass == null) || storageClass.equals("")) { return StorageClass.Standard; } try { - StorageClass _storageClass = StorageClass.fromValue(storageClass.toUpperCase(Locale.ENGLISH)); + final StorageClass _storageClass = StorageClass.fromValue(storageClass.toUpperCase(Locale.ENGLISH)); if (_storageClass.equals(StorageClass.Glacier)) { throw new BlobStoreException("Glacier storage class is not supported"); } return _storageClass; - } catch (IllegalArgumentException illegalArgumentException) { + } catch (final IllegalArgumentException illegalArgumentException) { throw new BlobStoreException("`" + storageClass + "` is not a valid S3 Storage Class."); } } @@ -181,11 +181,11 @@ public static StorageClass initStorageClass(String storageClass) { * Constructs canned acl from string */ public static CannedAccessControlList initCannedACL(String cannedACL) { - if (cannedACL == null || cannedACL.equals("")) { + if ((cannedACL == null) || cannedACL.equals("")) { return CannedAccessControlList.Private; } - for (CannedAccessControlList cur : CannedAccessControlList.values()) { + for (final CannedAccessControlList cur : CannedAccessControlList.values()) { if (cur.toString().equalsIgnoreCase(cannedACL)) { return cur; }