From 415f666a3f384acd88c24b18ee84a8039b6241d7 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 30 Apr 2019 12:38:22 +0200 Subject: [PATCH] Implement Bulk Deletes for GCS Repository (#41368) * Implement Bulk Deletes for GCS Repository * Just like #40322 for AWS * We already had a bulk delete API but weren't using it from the blob container implementation, now we are using it * Made the bulk delete API also compliant with our interface that only suppresses errors about non existent blobs by stating failed deletes (I didn't use any bulk stat action here since having to stat here should be the exception anyway and it would make error handling a lot more complex) * Fixed bulk delete API to limit its batch size to 100 in line with GCS recommendations --- .../gcs/GoogleCloudStorageBlobContainer.java | 9 +++- .../gcs/GoogleCloudStorageBlobStore.java | 53 ++++++++++++------- .../repositories/gcs/MockStorage.java | 27 +++++++++- 3 files changed, 69 insertions(+), 20 deletions(-) diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index 8ad9b453a9092..fb81a5c90039f 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -26,7 +26,9 @@ import java.io.IOException; import java.io.InputStream; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; class GoogleCloudStorageBlobContainer extends AbstractBlobContainer { @@ -78,7 +80,12 @@ public void deleteBlob(String blobName) throws IOException { blobStore.deleteBlob(buildKey(blobName)); } - protected String buildKey(String blobName) { + @Override + public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOException { + blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList())); + } + + private String buildKey(String blobName) { assert blobName != null; return path + blobName; } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 84184660159a4..d873a5cd29074 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.gcs; +import com.google.cloud.BatchResult; import com.google.cloud.ReadChannel; import com.google.cloud.WriteChannel; import com.google.cloud.storage.Blob; @@ -27,10 +28,9 @@ import com.google.cloud.storage.Bucket; import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobListOption; +import com.google.cloud.storage.StorageBatch; import com.google.cloud.storage.StorageException; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; @@ -50,17 +50,18 @@ import java.nio.channels.WritableByteChannel; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; class GoogleCloudStorageBlobStore implements BlobStore { - - private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class); // The recommended maximum size of a blob that should be uploaded in a single // request. Larger files should be uploaded over multiple requests (this is @@ -105,7 +106,7 @@ public void close() { * @param bucketName name of the bucket * @return true iff the bucket exists */ - boolean doesBucketExist(String bucketName) { + private boolean doesBucketExist(String bucketName) { try { final Bucket bucket = SocketAccess.doPrivilegedIOException(() -> client().get(bucketName)); return bucket != null; @@ -295,8 +296,8 @@ void deleteBlob(String blobName) throws IOException { * * @param prefix prefix of the blobs to delete */ - void deleteBlobsByPrefix(String prefix) throws IOException { - deleteBlobs(listBlobsByPrefix("", prefix).keySet()); + private void deleteBlobsByPrefix(String prefix) throws IOException { + deleteBlobsIgnoringIfNotExists(listBlobsByPrefix("", prefix).keySet()); } /** @@ -304,7 +305,7 @@ void deleteBlobsByPrefix(String prefix) throws IOException { * * @param blobNames names of the blobs to delete */ - void deleteBlobs(Collection blobNames) throws IOException { + void deleteBlobsIgnoringIfNotExists(Collection blobNames) throws IOException { if (blobNames.isEmpty()) { return; } @@ -314,17 +315,33 @@ void deleteBlobs(Collection blobNames) throws IOException { return; } final List blobIdsToDelete = blobNames.stream().map(blob -> BlobId.of(bucketName, blob)).collect(Collectors.toList()); - final List deletedStatuses = SocketAccess.doPrivilegedIOException(() -> client().delete(blobIdsToDelete)); - assert blobIdsToDelete.size() == deletedStatuses.size(); - boolean failed = false; - for (int i = 0; i < blobIdsToDelete.size(); i++) { - if (deletedStatuses.get(i) == false) { - logger.error("Failed to delete blob [{}] in bucket [{}]", blobIdsToDelete.get(i).getName(), bucketName); - failed = true; + final List failedBlobs = Collections.synchronizedList(new ArrayList<>()); + final StorageException e = SocketAccess.doPrivilegedIOException(() -> { + final AtomicReference ioe = new AtomicReference<>(); + final StorageBatch batch = client().batch(); + for (BlobId blob : blobIdsToDelete) { + batch.delete(blob).notify( + new BatchResult.Callback<>() { + @Override + public void success(Boolean result) { + } + + @Override + public void error(StorageException exception) { + if (exception.getCode() != HTTP_NOT_FOUND) { + failedBlobs.add(blob); + if (ioe.compareAndSet(null, exception) == false) { + ioe.get().addSuppressed(exception); + } + } + } + }); } - } - if (failed) { - throw new IOException("Failed to delete all [" + blobIdsToDelete.size() + "] blobs"); + batch.submit(); + return ioe.get(); + }); + if (e != null) { + throw new IOException("Exception when deleting blobs [" + failedBlobs + "]", e); } } diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java index 97c7e2ab76bd2..eddf2a9f78082 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java @@ -20,6 +20,7 @@ package org.elasticsearch.repositories.gcs; import com.google.api.gax.paging.Page; +import com.google.cloud.BatchResult; import com.google.cloud.Policy; import com.google.cloud.ReadChannel; import com.google.cloud.RestorableState; @@ -34,11 +35,13 @@ import com.google.cloud.storage.ServiceAccount; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageBatch; +import com.google.cloud.storage.StorageBatchResult; import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.StorageRpcOptionUtils; import com.google.cloud.storage.StorageTestUtils; import org.elasticsearch.core.internal.io.IOUtils; +import org.mockito.stubbing.Answer; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -57,6 +60,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyVararg; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + /** * {@link MockStorage} mocks a {@link Storage} client by storing all the blobs * in a given concurrent map. @@ -356,8 +364,25 @@ public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) { } @Override + @SuppressWarnings("unchecked") public StorageBatch batch() { - return null; + final Answer throwOnMissingMock = invocationOnMock -> { + throw new AssertionError("Did not expect call to method [" + invocationOnMock.getMethod().getName() + ']'); + }; + final StorageBatch batch = mock(StorageBatch.class, throwOnMissingMock); + StorageBatchResult result = mock(StorageBatchResult.class, throwOnMissingMock); + doAnswer(answer -> { + BatchResult.Callback callback = (BatchResult.Callback) answer.getArguments()[0]; + callback.success(true); + return null; + }).when(result).notify(any(BatchResult.Callback.class)); + doAnswer(invocation -> { + final BlobId blobId = (BlobId) invocation.getArguments()[0]; + delete(blobId); + return result; + }).when(batch).delete(any(BlobId.class), anyVararg()); + doAnswer(invocation -> null).when(batch).submit(); + return batch; } @Override