-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Bulk Deletes for GCS Repository #41368
Changes from 5 commits
bc18a17
b971752
1ae7460
ae58d9d
a029e7b
4199c2a
77e5875
d5a05fa
729e8b9
1d8d9c0
2838d3d
a0e616c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
|
||
package org.elasticsearch.repositories.gcs; | ||
|
||
import com.google.cloud.BatchResult; | ||
import com.google.cloud.ReadChannel; | ||
import com.google.cloud.WriteChannel; | ||
import com.google.cloud.storage.Blob; | ||
|
@@ -27,10 +28,10 @@ | |
import com.google.cloud.storage.Bucket; | ||
import com.google.cloud.storage.Storage; | ||
import com.google.cloud.storage.Storage.BlobListOption; | ||
import com.google.cloud.storage.StorageBatch; | ||
import com.google.cloud.storage.StorageException; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import com.google.common.collect.Lists; | ||
import org.elasticsearch.common.SuppressForbidden; | ||
import org.elasticsearch.common.blobstore.BlobContainer; | ||
import org.elasticsearch.common.blobstore.BlobMetaData; | ||
|
@@ -51,6 +52,7 @@ | |
import java.nio.file.FileAlreadyExistsException; | ||
import java.nio.file.NoSuchFileException; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
@@ -59,8 +61,6 @@ | |
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; | ||
|
||
class GoogleCloudStorageBlobStore implements BlobStore { | ||
|
||
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class); | ||
|
||
// The recommended maximum size of a blob that should be uploaded in a single | ||
// request. Larger files should be uploaded over multiple requests (this is | ||
|
@@ -105,7 +105,7 @@ public void close() { | |
* @param bucketName name of the bucket | ||
* @return true iff the bucket exists | ||
*/ | ||
boolean doesBucketExist(String bucketName) { | ||
private boolean doesBucketExist(String bucketName) { | ||
try { | ||
final Bucket bucket = SocketAccess.doPrivilegedIOException(() -> client().get(bucketName)); | ||
return bucket != null; | ||
|
@@ -295,7 +295,7 @@ void deleteBlob(String blobName) throws IOException { | |
* | ||
* @param prefix prefix of the blobs to delete | ||
*/ | ||
void deleteBlobsByPrefix(String prefix) throws IOException { | ||
private void deleteBlobsByPrefix(String prefix) throws IOException { | ||
deleteBlobs(listBlobsByPrefix("", prefix).keySet()); | ||
} | ||
|
||
|
@@ -314,17 +314,39 @@ void deleteBlobs(Collection<String> blobNames) throws IOException { | |
return; | ||
} | ||
final List<BlobId> blobIdsToDelete = blobNames.stream().map(blob -> BlobId.of(bucketName, blob)).collect(Collectors.toList()); | ||
final List<Boolean> deletedStatuses = SocketAccess.doPrivilegedIOException(() -> client().delete(blobIdsToDelete)); | ||
final List<Boolean> deletedStatuses = SocketAccess.doPrivilegedIOException(() -> { | ||
final StorageBatch batch = client().batch(); | ||
final List<Boolean> results = Lists.newArrayList(); | ||
for (BlobId blob : blobIdsToDelete) { | ||
batch.delete(blob).notify( | ||
new BatchResult.Callback<>() { | ||
@Override | ||
public void success(Boolean result) { | ||
results.add(result); | ||
} | ||
|
||
@Override | ||
public void error(StorageException exception) { | ||
if (exception.getCode() == 404) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's use the constant here: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure :) |
||
results.add(Boolean.TRUE); | ||
} else { | ||
results.add(Boolean.FALSE); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer to have this consistent with the S3 implementation, which currently collects the exceptions and add them as suppressed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
} | ||
}); | ||
} | ||
batch.submit(); | ||
return Collections.unmodifiableList(results); | ||
}); | ||
assert blobIdsToDelete.size() == deletedStatuses.size(); | ||
boolean failed = false; | ||
for (int i = 0; i < blobIdsToDelete.size(); i++) { | ||
if (deletedStatuses.get(i) == false) { | ||
logger.error("Failed to delete blob [{}] in bucket [{}]", blobIdsToDelete.get(i).getName(), bucketName); | ||
failed = true; | ||
} | ||
} | ||
if (failed) { | ||
throw new IOException("Failed to delete all [" + blobIdsToDelete.size() + "] blobs"); | ||
throw new IOException("Failed to delete the following blobs " + blobIdsToDelete); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. perhaps just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
package org.elasticsearch.repositories.gcs; | ||
|
||
import com.google.api.gax.paging.Page; | ||
import com.google.cloud.BatchResult; | ||
import com.google.cloud.Policy; | ||
import com.google.cloud.ReadChannel; | ||
import com.google.cloud.RestorableState; | ||
|
@@ -34,6 +35,7 @@ | |
import com.google.cloud.storage.ServiceAccount; | ||
import com.google.cloud.storage.Storage; | ||
import com.google.cloud.storage.StorageBatch; | ||
import com.google.cloud.storage.StorageBatchResult; | ||
import com.google.cloud.storage.StorageException; | ||
import com.google.cloud.storage.StorageOptions; | ||
import com.google.cloud.storage.StorageRpcOptionUtils; | ||
|
@@ -57,6 +59,12 @@ | |
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
import static org.mockito.Matchers.any; | ||
import static org.mockito.Matchers.anyVararg; | ||
import static org.mockito.Mockito.doAnswer; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.when; | ||
|
||
/** | ||
* {@link MockStorage} mocks a {@link Storage} client by storing all the blobs | ||
* in a given concurrent map. | ||
|
@@ -356,8 +364,21 @@ public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) { | |
} | ||
|
||
@Override | ||
@SuppressWarnings("unchecked") | ||
public StorageBatch batch() { | ||
return null; | ||
final StorageBatch batch = mock(StorageBatch.class); | ||
StorageBatchResult<Boolean> result = mock(StorageBatchResult.class); | ||
doAnswer(answer -> { | ||
BatchResult.Callback<Boolean, Exception> callback = (BatchResult.Callback<Boolean, Exception>) answer.getArguments()[0]; | ||
callback.success(true); | ||
return null; | ||
}).when(result).notify(any(BatchResult.Callback.class)); | ||
when(batch.delete(any(BlobId.class), anyVararg())).thenAnswer(invocation -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we somehow check that no other method is called on this mock? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Jup done :) |
||
final BlobId blobId = (BlobId) invocation.getArguments()[0]; | ||
delete(blobId); | ||
return result; | ||
}); | ||
return batch; | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the method on blobstore should now also be renamed to
deleteBlobsIgnoringIfNotExists
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done