Skip to content

Commit

Permalink
Add Bulk Delete Api to BlobStore
Browse files Browse the repository at this point in the history
* Adds Bulk delete API to blob container
* Implement bulk delete API for S3
* Adjust S3Fixture to accept both path styles for bulk deletes since the S3 SDK uses both during our ITs
* Closes elastic#40250
  • Loading branch information
original-brownbear committed Apr 1, 2019
1 parent 5fb53af commit 86e2648
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
Expand Down Expand Up @@ -118,6 +119,40 @@ public void deleteBlob(String blobName) throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
}

@Override
public void deleteBlobs(List<String> blobNames) throws IOException {
if (blobNames.isEmpty()) {
return;
}
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
final List<String> partition = new ArrayList<>();
for (String blob : blobNames) {
partition.add(buildKey(blob));
if (partition.size() == 1000 ) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
partition.clear();
}
}
if (partition.isEmpty() == false) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
}
SocketAccess.doPrivilegedVoid(() -> {
for (DeleteObjectsRequest deleteRequest : deleteRequests) {
clientReference.client().deleteObjects(
deleteRequest);
}
});
} catch (final AmazonClientException e) {
throw new IOException("Exception when deleting blobs [" + blobNames + "]", e);
}
}

private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
return new DeleteObjectsRequest(bucket).withKeys(blobs.toArray(Strings.EMPTY_ARRAY)).withQuiet(true);
}

@Override
public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke
// Delete Multiple Objects
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), (request) -> {
final RequestHandler bulkDeleteHandler = request -> {
final List<String> deletes = new ArrayList<>();
final List<String> errors = new ArrayList<>();

Expand All @@ -344,7 +344,6 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke
if (closingOffset != -1) {
offset = offset + startMarker.length();
final String objectName = requestBody.substring(offset, closingOffset);

boolean found = false;
for (Bucket bucket : buckets.values()) {
if (bucket.objects.containsKey(objectName)) {
Expand All @@ -369,7 +368,9 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke
}
}
return newInternalError(request.getId(), "Something is wrong with this POST multiple deletes request");
});
};
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), bulkDeleteHandler);
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/{bucket}"), bulkDeleteHandler);

// non-authorized requests

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -96,6 +97,7 @@ public interface BlobContainer {
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Deletes a blob with giving name, if the blob exists. If the blob does not exist,
* this method throws a NoSuchFileException.
Expand All @@ -107,6 +109,20 @@ public interface BlobContainer {
*/
void deleteBlob(String blobName) throws IOException;

/**
* Deletes the blobs with giving names. Unlike {@link #deleteBlob(String)} this method will not throw an exception
* when one or multiple of the given blobs don't exist and simply ignore this case.
*
* @param blobNames The names of the blob to delete.
* @throws NoSuchFileException if the blob does not exist
* @throws IOException if the blob exists but could not be deleted.
*/
default void deleteBlobs(List<String> blobNames) throws IOException {
for (String blobName : blobNames) {
deleteBlob(blobName);
}
}

/**
* Deletes a blob with giving name, ignoring if the blob does not exist.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
Expand Down Expand Up @@ -470,22 +469,15 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
final Collection<IndexId> indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values());
indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values());
final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices"));
for (final IndexId indexId : indicesToCleanUp) {
try {
indicesBlobContainer.deleteBlobIgnoringIfNotExists(indexId.getId());
} catch (DirectoryNotEmptyException dnee) {
// if the directory isn't empty for some reason, it will fail to clean up;
// we'll ignore that and accept that cleanup didn't fully succeed.
// since we are using UUIDs for path names, this won't be an issue for
// snapshotting indices of the same name
logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
"but failed to clean up its index folder due to the directory not being empty.", metadata.name(), indexId), dnee);
indicesBlobContainer.deleteBlobs(indicesToCleanUp.stream().map(IndexId::getId).collect(Collectors.toList()));
} catch (IOException ioe) {
// a different IOException occurred while trying to delete - will just log the issue for now
logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
"but failed to clean up its index folder.", metadata.name(), indexId), ioe);
logger.warn(() ->
new ParameterizedMessage(
"[{}] indices [{}] are no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders.", metadata.name(), indicesToCleanUp), ioe);
}
}
} catch (IOException | ResourceNotFoundException ex) {
throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex);
}
Expand Down Expand Up @@ -1022,16 +1014,14 @@ protected void finalize(final List<SnapshotFiles> snapshots,
try {
// Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier
// attempt to write an index file with this generation failed mid-way after creating the temporary file.
for (final String blobName : blobs.keySet()) {
if (FsBlobContainer.isTempBlobName(blobName)) {
try {
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
snapshotId, shardId, blobName), e);
throw e;
}
}
final List<String> blobNames =
blobs.keySet().stream().filter(FsBlobContainer::isTempBlobName).collect(Collectors.toList());
try {
blobContainer.deleteBlobs(blobNames);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs [{}] during finalization",
snapshotId, shardId, blobNames), e);
throw e;
}

// If we deleted all snapshots, we don't need to create a new index file
Expand All @@ -1040,16 +1030,14 @@ protected void finalize(final List<SnapshotFiles> snapshots,
}

// Delete old index files
for (final String blobName : blobs.keySet()) {
if (blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) {
try {
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
snapshotId, shardId, blobName), e);
throw e;
}
}
final List<String> indexBlobs =
blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
try {
blobContainer.deleteBlobs(indexBlobs);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs [{}] during finalization",
snapshotId, shardId, indexBlobs), e);
throw e;
}

// Delete all blobs that don't exist in a snapshot
Expand Down

0 comments on commit 86e2648

Please sign in to comment.