Skip to content

Commit

Permalink
Implement Bulk Deletes for GCS Repository
Browse files Browse the repository at this point in the history
* Just like elastic#40322 for AWS
* We already had a bulk delete API but weren't using it from the blob container implementation, now we are using it
  * Made the bulk delete API also compliant with our interface that only suppresses errors about non existent blobs by stating failed deletes (I didn't use any bulk stat action here since having to stat here should be the exception anyway and it would make error handling a lot more complex)
* Fixed bulk delete API to limit its batch size to 100 in line with GCS recommendations
  • Loading branch information
original-brownbear committed Apr 19, 2019
1 parent 55db0e2 commit bc18a17
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {

Expand Down Expand Up @@ -78,7 +80,12 @@ public void deleteBlob(String blobName) throws IOException {
blobStore.deleteBlob(buildKey(blobName));
}

protected String buildKey(String blobName) {
@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
blobStore.deleteBlobs(blobNames.stream().map(this::buildKey).collect(Collectors.toList()));
}

private String buildKey(String blobName) {
assert blobName != null;
return path + blobName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
Expand All @@ -50,6 +51,7 @@
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -62,6 +64,12 @@ class GoogleCloudStorageBlobStore implements BlobStore {

private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class);

/**
* Maximum batch size for aggregating multiple API requests into a single {@link com.google.cloud.storage.StorageBatch}.
* @see <a href="https://github.com/googleapis/google-cloud-java/pull/952#issuecomment-213466772">here</a>
*/
private static final int MAX_BATCH_SIZE = 100;

// The recommended maximum size of a blob that should be uploaded in a single
// request. Larger files should be uploaded over multiple requests (this is
// called "resumable upload")
Expand Down Expand Up @@ -105,7 +113,7 @@ public void close() {
* @param bucketName name of the bucket
* @return true iff the bucket exists
*/
boolean doesBucketExist(String bucketName) {
private boolean doesBucketExist(String bucketName) {
try {
final Bucket bucket = SocketAccess.doPrivilegedIOException(() -> client().get(bucketName));
return bucket != null;
Expand Down Expand Up @@ -295,7 +303,7 @@ void deleteBlob(String blobName) throws IOException {
*
* @param prefix prefix of the blobs to delete
*/
void deleteBlobsByPrefix(String prefix) throws IOException {
private void deleteBlobsByPrefix(String prefix) throws IOException {
deleteBlobs(listBlobsByPrefix("", prefix).keySet());
}

Expand All @@ -314,13 +322,33 @@ void deleteBlobs(Collection<String> blobNames) throws IOException {
return;
}
final List<BlobId> blobIdsToDelete = blobNames.stream().map(blob -> BlobId.of(bucketName, blob)).collect(Collectors.toList());
final List<Boolean> deletedStatuses = SocketAccess.doPrivilegedIOException(() -> client().delete(blobIdsToDelete));
assert blobIdsToDelete.size() == deletedStatuses.size();
final List<Boolean> deletedStatuses = new ArrayList<>();
final int deleteCount = blobIdsToDelete.size();
SocketAccess.doPrivilegedIOException(() -> {
final int batches = deleteCount / MAX_BATCH_SIZE + (deleteCount % MAX_BATCH_SIZE == 0 ? 0 : 1);
for (int i = 0; i < batches - 1; ++i) {
deletedStatuses.addAll(client().delete(blobIdsToDelete.subList(i * MAX_BATCH_SIZE, (i + 1) * MAX_BATCH_SIZE)));
}
deletedStatuses.addAll(client().delete(blobIdsToDelete.subList(batches - 1, deleteCount)));
return null;
});
assert deleteCount == deletedStatuses.size();
boolean failed = false;
for (int i = 0; i < blobIdsToDelete.size(); i++) {
if (deletedStatuses.get(i) == false) {
logger.error("Failed to delete blob [{}] in bucket [{}]", blobIdsToDelete.get(i).getName(), bucketName);
failed = true;
final BlobId blobId = blobIdsToDelete.get(i);
try {
Blob blob = SocketAccess.doPrivilegedIOException(() -> client().get(blobId));
if (blob != null) {
logger.error("Failed to delete blob [{}] in bucket [{}]", blobId.getName(), bucketName);
failed = true;
}
} catch (Exception e) {
logger.error(
new ParameterizedMessage(
"Failed to delete and then stat blob [{}] in bucket [{}]", blobId.getName(), bucketName), e);
failed = true;
}
}
}
if (failed) {
Expand Down

0 comments on commit bc18a17

Please sign in to comment.