diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 4c012ebd024..d1f97c75ef8 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1333,6 +1333,7 @@ const ( IndexProcessorCorruptedData ArchiverNonRetryableErrorCount ArchiverSkipUploadCount + ArchiverHistoryMutatedCount ArchiverRunningDeterministicConstructionCheckCount ArchiverDeterministicConstructionCheckFailedCount ArchiverCouldNotRunDeterministicConstructionCheckCount @@ -1573,6 +1574,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ IndexProcessorCorruptedData: {metricName: "index_processor_corrupted_data"}, ArchiverNonRetryableErrorCount: {metricName: "archiver_non_retryable_error"}, ArchiverSkipUploadCount: {metricName: "archiver_skip_upload"}, + ArchiverHistoryMutatedCount: {metricName: "archiver_history_mutated"}, ArchiverRunningDeterministicConstructionCheckCount: {metricName: "archiver_running_deterministic_construction_check"}, ArchiverDeterministicConstructionCheckFailedCount: {metricName: "archiver_deterministic_construction_check_failed"}, ArchiverCouldNotRunDeterministicConstructionCheckCount: {metricName: "archiver_could_not_run_deterministic_construction_check"}, diff --git a/service/worker/archiver/activities.go b/service/worker/archiver/activities.go index 5cf4a297c74..2de9f57f1f6 100644 --- a/service/worker/archiver/activities.go +++ b/service/worker/archiver/activities.go @@ -57,10 +57,12 @@ const ( errDeleteHistoryV1 = "failed to delete history from events_v1" errDeleteHistoryV2 = "failed to delete history from events_v2" + + errHistoryMutated = "history was mutated during uploading" ) var ( - uploadHistoryActivityNonRetryableErrors = []string{errGetDomainByID, errConstructKey, errGetTags, errUploadBlob, errReadBlob, errEmptyBucket, errConstructBlob, errDownloadBlob} + uploadHistoryActivityNonRetryableErrors = []string{errGetDomainByID, errConstructKey, errGetTags, errUploadBlob, errReadBlob, errEmptyBucket, errConstructBlob, errDownloadBlob, errHistoryMutated} deleteBlobActivityNonRetryableErrors = []string{errConstructKey, errGetTags, errUploadBlob, errEmptyBucket, errDeleteBlob} deleteHistoryActivityNonRetryableErrors = []string{errDeleteHistoryV1, errDeleteHistoryV2} errContextTimeout = errors.New("activity aborted because context timed out") @@ -153,6 +155,13 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.Error(err)) return err } + + if historyMutated(historyBlob, &request) { + scope.IncCounter(metrics.ArchiverHistoryMutatedCount) + logger.Error(uploadErrorMsg, tag.UploadFailReason("history was mutated during archiving")) + return cadence.NewCustomError(errHistoryMutated) + } + if runConstTest { // some tags are specific to the cluster and time a blob was uploaded from/when // this only updates those specific tags, all other parts of the blob are left unchanged diff --git a/service/worker/archiver/activities_test.go b/service/worker/archiver/activities_test.go index 79e3ed85f02..f26bc0e3386 100644 --- a/service/worker/archiver/activities_test.go +++ b/service/worker/archiver/activities_test.go @@ -624,7 +624,9 @@ func (s *activitiesSuite) TestUploadHistoryActivity_Success_BlobDoesNotAlreadyEx mockHistoryBlobReader := &HistoryBlobReaderMock{} mockHistoryBlobReader.On("GetBlob", mock.Anything).Return(&HistoryBlob{ Header: &HistoryBlobHeader{ - IsLast: common.BoolPtr(true), + LastFailoverVersion: common.Int64Ptr(testCloseFailoverVersion), + LastEventID: common.Int64Ptr(testNextEventID - 1), + IsLast: common.BoolPtr(true), }, }, nil) container := &BootstrapContainer{ @@ -670,7 +672,9 @@ func (s *activitiesSuite) TestUploadHistoryActivity_Success_ConcurrentUploads() mockHistoryBlobReader := &HistoryBlobReaderMock{} mockHistoryBlobReader.On("GetBlob", common.FirstBlobPageToken+1).Return(&HistoryBlob{ Header: &HistoryBlobHeader{ - IsLast: common.BoolPtr(true), + LastFailoverVersion: common.Int64Ptr(testCloseFailoverVersion), + LastEventID: common.Int64Ptr(testNextEventID - 1), + IsLast: common.BoolPtr(true), }, }, nil) container := &BootstrapContainer{ @@ -700,6 +704,49 @@ func (s *activitiesSuite) TestUploadHistoryActivity_Success_ConcurrentUploads() s.NoError(err) } +func (s *activitiesSuite) TestUploadHistoryActivity_Fail_HistoryMutated() { + s.metricsClient.On("Scope", metrics.ArchiverUploadHistoryActivityScope, []metrics.Tag{metrics.DomainTag(testDomainName)}).Return(s.metricsScope).Once() + s.metricsScope.On("IncCounter", metrics.ArchiverNonRetryableErrorCount).Once() + s.metricsScope.On("IncCounter", metrics.ArchiverHistoryMutatedCount).Once() + firstKey, _ := NewHistoryBlobKey(testDomainID, testWorkflowID, testRunID, testCloseFailoverVersion, common.FirstBlobPageToken) + domainCache, mockClusterMetadata := s.archivalConfig(true, testArchivalBucket, true) + mockBlobstore := &mocks.BlobstoreClient{} + mockBlobstore.On("GetTags", mock.Anything, mock.Anything, firstKey).Return(nil, blobstore.ErrBlobNotExists).Once() + mockHistoryBlobReader := &HistoryBlobReaderMock{} + // Return a history blob with a larger failover version + mockHistoryBlobReader.On("GetBlob", common.FirstBlobPageToken).Return(&HistoryBlob{ + Header: &HistoryBlobHeader{ + LastFailoverVersion: common.Int64Ptr(testCloseFailoverVersion + 1), + IsLast: common.BoolPtr(true), + }, + }, nil) + container := &BootstrapContainer{ + Logger: s.logger, + MetricsClient: s.metricsClient, + DomainCache: domainCache, + ClusterMetadata: mockClusterMetadata, + Blobstore: mockBlobstore, + HistoryBlobReader: mockHistoryBlobReader, + Config: getConfig(false), + } + env := s.NewTestActivityEnvironment() + env.SetWorkerOptions(worker.Options{ + BackgroundActivityContext: context.WithValue(context.Background(), bootstrapContainerKey, container), + }) + request := ArchiveRequest{ + DomainID: testDomainID, + DomainName: testDomainName, + WorkflowID: testWorkflowID, + RunID: testRunID, + BranchToken: testBranchToken, + NextEventID: testNextEventID, + CloseFailoverVersion: testCloseFailoverVersion, + BucketName: testArchivalBucket, + } + _, err := env.ExecuteActivity(uploadHistoryActivity, request) + s.Equal(errHistoryMutated, err.Error()) +} + func (s *activitiesSuite) TestDeleteBlobActivity_Fail_ConstructBlobKeyError() { s.metricsClient.On("Scope", metrics.ArchiverDeleteBlobActivityScope, []metrics.Tag{metrics.DomainTag(testDomainName)}).Return(s.metricsScope).Once() s.metricsScope.On("IncCounter", metrics.ArchiverNonRetryableErrorCount).Once() diff --git a/service/worker/archiver/util.go b/service/worker/archiver/util.go index 4f35c25eefa..8c5b5cc4bcb 100644 --- a/service/worker/archiver/util.go +++ b/service/worker/archiver/util.go @@ -28,6 +28,7 @@ import ( "time" "github.com/dgryski/go-farm" + "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "go.uber.org/cadence" @@ -96,6 +97,20 @@ func shouldRun(probability float64) bool { return rand.Intn(int(1.0/probability)) == 0 } +func historyMutated(historyBlob *HistoryBlob, request *ArchiveRequest) bool { + lastFailoverVersion := common.Int64Default(historyBlob.Header.LastFailoverVersion) + if lastFailoverVersion > request.CloseFailoverVersion { + return true + } + + if !common.BoolDefault(historyBlob.Header.IsLast) { + return false + } + + lastEventID := common.Int64Default(historyBlob.Header.LastEventID) + return lastFailoverVersion != request.CloseFailoverVersion || lastEventID+1 != request.NextEventID +} + func validateArchivalRequest(request *ArchiveRequest) error { if len(request.BucketName) == 0 { // this should not be able to occur, if domain enables archival bucket should always be set diff --git a/service/worker/archiver/util_test.go b/service/worker/archiver/util_test.go index 52f3ad46333..c328d07c129 100644 --- a/service/worker/archiver/util_test.go +++ b/service/worker/archiver/util_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/uber/cadence/common" "go.uber.org/cadence" ) @@ -89,6 +90,69 @@ func (s *UtilSuite) TestHashesEqual() { } } +func (s *UtilSuite) TestHistoryMutated() { + testCases := []struct { + historyBlob *HistoryBlob + request *ArchiveRequest + isMutated bool + }{ + { + historyBlob: &HistoryBlob{ + Header: &HistoryBlobHeader{ + LastFailoverVersion: common.Int64Ptr(15), + }, + }, + request: &ArchiveRequest{ + CloseFailoverVersion: 3, + }, + isMutated: true, + }, + { + historyBlob: &HistoryBlob{ + Header: &HistoryBlobHeader{ + LastFailoverVersion: common.Int64Ptr(10), + LastEventID: common.Int64Ptr(50), + IsLast: common.BoolPtr(true), + }, + }, + request: &ArchiveRequest{ + CloseFailoverVersion: 10, + NextEventID: 34, + }, + isMutated: true, + }, + { + historyBlob: &HistoryBlob{ + Header: &HistoryBlobHeader{ + LastFailoverVersion: common.Int64Ptr(9), + IsLast: common.BoolPtr(true), + }, + }, + request: &ArchiveRequest{ + CloseFailoverVersion: 10, + }, + isMutated: true, + }, + { + historyBlob: &HistoryBlob{ + Header: &HistoryBlobHeader{ + LastFailoverVersion: common.Int64Ptr(10), + LastEventID: common.Int64Ptr(33), + IsLast: common.BoolPtr(true), + }, + }, + request: &ArchiveRequest{ + CloseFailoverVersion: 10, + NextEventID: 34, + }, + isMutated: false, + }, + } + for _, tc := range testCases { + s.Equal(tc.isMutated, historyMutated(tc.historyBlob, tc.request)) + } +} + func (s *UtilSuite) TestValidateRequest() { testCases := []struct { request *ArchiveRequest