Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archival: Delete blob activity #1931

Merged
merged 13 commits into from
Jun 4, 2019
3 changes: 3 additions & 0 deletions common/blobstore/filestore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ func (c *client) Delete(_ context.Context, bucket string, key blob.Key) (bool, e
blobPath := bucketItemPath(c.storeDirectory, bucket, key.String())
deleted, err := deleteFile(blobPath)
if err != nil {
if os.IsNotExist(err) {
return false, blobstore.ErrBlobNotExists
}
return false, ErrDeleteFile
}
return deleted, nil
Expand Down
2 changes: 1 addition & 1 deletion common/blobstore/filestore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (s *ClientSuite) TestDelete_Success() {
key, err := blob.NewKeyFromString("blob.blob")
s.NoError(err)
deleted, err := client.Delete(context.Background(), defaultBucketName, key)
s.NoError(err)
s.Equal(blobstore.ErrBlobNotExists, err)
s.False(deleted)

b := blob.NewBlob([]byte("body"), map[string]string{})
Expand Down
10 changes: 4 additions & 6 deletions common/blobstore/filestore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
"bytes"
"encoding/gob"
"errors"
"github.com/uber/cadence/common/blobstore/blob"
"gopkg.in/yaml.v2"
"io/ioutil"
"os"

"github.com/uber/cadence/common/blobstore/blob"
"gopkg.in/yaml.v2"
)

const (
Expand Down Expand Up @@ -92,10 +93,7 @@ func readFile(filepath string) ([]byte, error) {

func deleteFile(filepath string) (bool, error) {
if err := os.Remove(filepath); err != nil {
if !os.IsNotExist(err) {
return false, err
}
return false, nil
return false, err
}
return true, nil
}
Expand Down
9 changes: 5 additions & 4 deletions common/blobstore/filestore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
package filestore

import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber/cadence/common/blobstore/blob"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber/cadence/common/blobstore/blob"
)

type UtilSuite struct {
Expand Down Expand Up @@ -152,7 +153,7 @@ func (s *UtilSuite) TestDeleteFile() {
filename := "test-file-name"
fpath := filepath.Join(dir, filename)
deleted, err := deleteFile(fpath)
s.NoError(err)
s.True(os.IsNotExist(err))
s.False(deleted)

err = writeFile(fpath, []byte("file contents"))
Expand Down
9 changes: 9 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,8 @@ const (
ArchiverUploadHistoryActivityScope
// ArchiverDeleteHistoryActivityScope is scope used by all metrics emitted by archiver.DeleteHistoryActivity
ArchiverDeleteHistoryActivityScope
// ArchiverDeleteBlobActivityScope is scope used by all metrics emitted by archiver.DeleteBlobActivity
ArchiverDeleteBlobActivityScope
// ArchiverScope is scope used by all metrics emitted by archiver.Archiver
ArchiverScope
// ArchiverPumpScope is scope used by all metrics emitted by archiver.Pump
Expand Down Expand Up @@ -1099,6 +1101,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
IndexProcessorScope: {operation: "IndexProcessor"},
ArchiverUploadHistoryActivityScope: {operation: "ArchiverUploadHistoryActivity"},
ArchiverDeleteHistoryActivityScope: {operation: "ArchiverDeleteHistoryActivity"},
ArchiverDeleteBlobActivityScope: {operation: "ArchiverDeleteBlobActivity"},
ArchiverScope: {operation: "Archiver"},
ArchiverPumpScope: {operation: "ArchiverPump"},
ArchiverArchivalWorkflowScope: {operation: "ArchiverArchivalWorkflow"},
Expand Down Expand Up @@ -1339,9 +1342,12 @@ const (
ArchiverCoroutineStoppedCount
ArchiverHandleRequestLatency
ArchiverUploadWithRetriesLatency
ArchiverDeleteBlobWithRetriesLatency
ArchiverDeleteWithRetriesLatency
ArchiverUploadFailedAllRetriesCount
ArchiverUploadSuccessCount
ArchiverDeleteBlobFailedAllRetriesCount
ArchiverDeleteBlobSuccessCount
ArchiverDeleteLocalFailedAllRetriesCount
ArchiverDeleteLocalSuccessCount
ArchiverDeleteFailedAllRetriesCount
Expand Down Expand Up @@ -1576,9 +1582,12 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ArchiverCoroutineStoppedCount: {metricName: "archiver_coroutine_stopped"},
ArchiverHandleRequestLatency: {metricName: "archiver_handle_request_latency"},
ArchiverUploadWithRetriesLatency: {metricName: "archiver_upload_with_retries_latency"},
ArchiverDeleteBlobWithRetriesLatency: {metricName: "archiver_delete_blob_with_retries_latency"},
ArchiverDeleteWithRetriesLatency: {metricName: "archiver_delete_with_retries_latency"},
ArchiverUploadFailedAllRetriesCount: {metricName: "archiver_upload_failed_all_retries"},
ArchiverUploadSuccessCount: {metricName: "archiver_upload_success"},
ArchiverDeleteBlobFailedAllRetriesCount: {metricName: "archiver_delete_blob_failed_all_retries"},
ArchiverDeleteBlobSuccessCount: {metricName: "archiver_delete_blob_success"},
ArchiverDeleteLocalFailedAllRetriesCount: {metricName: "archiver_delete_local_failed_all_retries"},
ArchiverDeleteLocalSuccessCount: {metricName: "archiver_delete_local_success"},
ArchiverDeleteFailedAllRetriesCount: {metricName: "archiver_delete_failed_all_retries"},
Expand Down
1 change: 1 addition & 0 deletions service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ func (t *timerQueueProcessorBase) archiveWorkflow(task *persistence.TimerTaskInf
BranchToken: msBuilder.GetCurrentBranch(),
NextEventID: msBuilder.GetNextEventID(),
CloseFailoverVersion: msBuilder.GetLastWriteVersion(),
BucketName: domainCacheEntry.GetConfig().ArchivalBucket,
}

// send signal before deleting mutable state to make sure archival is idempotent
Expand Down
Loading