Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Piggy back archival uploading progress on heartbeat details #1963

Merged
merged 10 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

169 changes: 106 additions & 63 deletions service/worker/archiver/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,48 @@ import (
"go.uber.org/cadence/activity"
)

type (
uploadProgress struct {
UploadedBlobs []string
IteratorState []byte
BlobPageToken int
HandledLastBlob bool
}

uploadResult struct {
BlobsToDelete []string
ErrorReason string
ErrorDetails string
}
)

const (
uploadHistoryActivityFnName = "uploadHistoryActivity"
deleteBlobActivityFnName = "deleteBlobActivity"
deleteHistoryActivityFnName = "deleteHistoryActivity"
blobstoreTimeout = 30 * time.Second

errGetDomainByID = "could not get domain cache entry"
errConstructKey = "could not construct blob key"
errGetTags = "could not get blob tags"
errUploadBlob = "could not upload blob"
errReadBlob = "could not read blob"
errEmptyBucket = "domain is enabled for archival but bucket is not set"
errConstructBlob = "failed to construct blob"
errDownloadBlob = "could not download existing blob"
errDeleteBlob = "could not delete existing blob"
errInvalidRequest = "archival request is invalid"
errGetDomainByID = "could not get domain cache entry"
errConstructKey = "could not construct blob key"
errGetTags = "could not get blob tags"
errUploadBlob = "could not upload blob"
errReadBlob = "could not read blob"
errEmptyBucket = "domain is enabled for archival but bucket is not set"
errConstructBlob = "failed to construct blob"
errDownloadBlob = "could not download existing blob"
errDeleteBlob = "could not delete existing blob"

errDeleteHistoryV1 = "failed to delete history from events_v1"
errDeleteHistoryV2 = "failed to delete history from events_v2"

errHistoryMutated = "history was mutated during uploading"

errActivityPanic = "cadenceInternal:Panic"
)

var (
uploadHistoryActivityNonRetryableErrors = []string{errGetDomainByID, errConstructKey, errGetTags, errUploadBlob, errReadBlob, errEmptyBucket, errConstructBlob, errDownloadBlob, errHistoryMutated}
uploadHistoryActivityNonRetryableErrors = []string{errGetDomainByID, errConstructKey, errGetTags, errUploadBlob, errReadBlob, errEmptyBucket, errConstructBlob, errDownloadBlob, errHistoryMutated, errActivityPanic}
deleteBlobActivityNonRetryableErrors = []string{errConstructKey, errGetTags, errUploadBlob, errEmptyBucket, errDeleteBlob}
deleteHistoryActivityNonRetryableErrors = []string{errDeleteHistoryV1, errDeleteHistoryV2}
errContextTimeout = errors.New("activity aborted because context timed out")
Expand All @@ -76,69 +94,92 @@ const (
// uploadHistoryActivity is used to upload a workflow execution history to blobstore.
// method will retry all retryable operations until context expires.
// archival will be skipped and no error will be returned if cluster or domain is not figured for archival.
// method will always return either: nil, errContextTimeout or an error from uploadHistoryActivityNonRetryableErrors.
func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err error) {
// an error will be returned if context timeout, request is invalid, or failed to get domain cache entry.
// all other errors (and the error details) will be included in the uploadResult.
// the result will also contain a list of blob keys to delete when there's an error.
func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (result uploadResult, err error) {
container := ctx.Value(bootstrapContainerKey).(*BootstrapContainer)
progress := uploadProgress{
BlobPageToken: common.FirstBlobPageToken,
}
scope := container.MetricsClient.Scope(metrics.ArchiverUploadHistoryActivityScope, metrics.DomainTag(request.DomainName))
sw := scope.StartTimer(metrics.CadenceLatency)
defer func() {
result, err = getUploadHistoryActivityResponse(progress, err)
sw.Stop()
if err != nil {
if err == errContextTimeout {
scope.IncCounter(metrics.CadenceErrContextTimeoutCounter)
} else {
scope.IncCounter(metrics.ArchiverNonRetryableErrorCount)
}
if err == errContextTimeout {
scope.IncCounter(metrics.CadenceErrContextTimeoutCounter)
} else if err != nil || result.ErrorReason != "" {
scope.IncCounter(metrics.ArchiverNonRetryableErrorCount)
}
}()

logger := tagLoggerWithRequest(tagLoggerWithActivityInfo(container.Logger, activity.GetInfo(ctx)), request)

if activity.HasHeartbeatDetails(ctx) {
if err := activity.GetHeartbeatDetails(ctx, &progress); err != nil {
logger.Info("failed to get previous progress, start from beginning")
yycptt marked this conversation as resolved.
Show resolved Hide resolved
// reset to initial state
progress = uploadProgress{
UploadedBlobs: nil,
IteratorState: nil,
HandledLastBlob: false,
BlobPageToken: common.FirstBlobPageToken,
}
} else {
progress.BlobPageToken++
}
}

domainCache := container.DomainCache
clusterMetadata := container.ClusterMetadata
domainCacheEntry, err := getDomainByID(ctx, domainCache, request.DomainID)
if err != nil {
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.Error(err))
return err
return result, err
}
if clusterMetadata.ArchivalConfig().GetArchivalStatus() != cluster.ArchivalEnabled {
logger.Error(uploadSkipMsg, tag.ArchivalUploadFailReason("cluster is not enabled for archival"))
scope.IncCounter(metrics.ArchiverSkipUploadCount)
return nil
return result, nil
}
if domainCacheEntry.GetConfig().ArchivalStatus != shared.ArchivalStatusEnabled {
logger.Error(uploadSkipMsg, tag.ArchivalUploadFailReason("domain is not enabled for archival"))
scope.IncCounter(metrics.ArchiverSkipUploadCount)
return nil
return result, nil
}
if err := validateArchivalRequest(&request); err != nil {
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(err.Error()))
return err
return result, err
}

domainName := domainCacheEntry.GetInfo().Name
clusterName := container.ClusterMetadata.GetCurrentClusterName()
historyBlobIterator, err := NewHistoryBlobIterator(request, container, domainName, clusterName, progress.IteratorState)
if err != nil {
logger.Error("failed to decode history blob iterator state, start from beginning", tag.Error(err))
}
historyBlobReader := container.HistoryBlobReader
if historyBlobReader == nil { // only will be set by testing code
historyBlobReader = NewHistoryBlobReader(NewHistoryBlobIterator(request, container, domainName, clusterName))
historyBlobReader = NewHistoryBlobReader(historyBlobIterator)
yycptt marked this conversation as resolved.
Show resolved Hide resolved
}
blobstoreClient := container.Blobstore

var handledLastBlob bool
handledLastBlob := progress.HandledLastBlob
var totalUploadSize int64

runBlobIntegrityCheck := shouldRun(container.Config.BlobIntegrityCheckProbability())
runBlobIntegrityCheck := !activity.HasHeartbeatDetails(ctx) && shouldRun(container.Config.BlobIntegrityCheckProbability())
var uploadedHistoryEventHashes []uint64
for pageToken := common.FirstBlobPageToken; !handledLastBlob; pageToken++ {
for pageToken := progress.BlobPageToken; !handledLastBlob; pageToken++ {
key, err := NewHistoryBlobKey(request.DomainID, request.WorkflowID, request.RunID, request.CloseFailoverVersion, pageToken)
if err != nil {
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason("could not construct blob key"))
return cadence.NewCustomError(errConstructKey, err.Error())
return result, cadence.NewCustomError(errConstructKey, err.Error())
}

tags, err := getTags(ctx, blobstoreClient, request.BucketName, key)
if err != nil && err != blobstore.ErrBlobNotExists {
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.ArchivalBlobKey(key.String()), tag.Error(err))
return err
return result, err
}

runConstTest := false
Expand All @@ -157,7 +198,7 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
historyBlob, err := getBlob(ctx, historyBlobReader, pageToken)
if err != nil {
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.Error(err))
return err
return result, err
}
if runBlobIntegrityCheck {
for _, e := range historyBlob.Body.Events {
Expand All @@ -168,7 +209,7 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
if historyMutated(historyBlob, &request) {
scope.IncCounter(metrics.ArchiverHistoryMutatedCount)
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason("history was mutated during archiving"))
return cadence.NewCustomError(errHistoryMutated)
return result, cadence.NewCustomError(errHistoryMutated)
}

if runConstTest {
Expand All @@ -180,7 +221,7 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
blob, reason, err := constructBlob(historyBlob, container.Config.EnableArchivalCompression(domainName))
if err != nil {
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(reason), tag.ArchivalBlobKey(key.String()))
return cadence.NewCustomError(errConstructBlob, err.Error())
return result, cadence.NewCustomError(errConstructBlob, err.Error())
}
currBlobSize := int64(len(blob.Body))
scope.RecordTimer(metrics.ArchiverBlobSize, time.Duration(currBlobSize))
Expand Down Expand Up @@ -209,28 +250,39 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
}
if err := uploadBlob(ctx, blobstoreClient, request.BucketName, key, blob); err != nil {
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.ArchivalBlobKey(key.String()), tag.Error(err))
return err
return result, err
}
handledLastBlob = *historyBlob.Header.IsLast

progress.UploadedBlobs = append(progress.UploadedBlobs, key.String())
currIteratorState, err := historyBlobIterator.GetState()
if err == nil {
yycptt marked this conversation as resolved.
Show resolved Hide resolved
progress.IteratorState = currIteratorState
progress.BlobPageToken = pageToken
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels weird to me that BlobPageToken and UploadedBlobs can become out of sync. The way I am reading this code is that progress.BlobPageToken represents the highest blob page you have successfully uploaded so far. But in the case where historyBlobIterator.GetState() returns error you get a progress which has a blob in UploadedBlobs with a higher page than progress.BlobPageToken

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, but I still feel a little bit weird as I think progress.UploadedBlobs should always reflect what has been uploaded. In the case when GetState() returns an error, the blob is still uploaded successfully, so I think progress should still be updated.

progress.HandledLastBlob = handledLastBlob
} else {
logger.Error("failed to get history blob iterator state", tag.Error(err))
}
activity.RecordHeartbeat(ctx, progress)
yycptt marked this conversation as resolved.
Show resolved Hide resolved
}
scope.RecordTimer(metrics.ArchiverTotalUploadSize, time.Duration(totalUploadSize))
indexBlobKey, err := NewHistoryIndexBlobKey(request.DomainID, request.WorkflowID, request.RunID)
if err != nil {
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason("could not construct index blob key"))
return cadence.NewCustomError(errConstructKey, err.Error())
return result, cadence.NewCustomError(errConstructKey, err.Error())
}
existingVersions, err := getTags(ctx, blobstoreClient, request.BucketName, indexBlobKey)
if err != nil && err != blobstore.ErrBlobNotExists {
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.ArchivalBlobKey(indexBlobKey.String()), tag.Error(err))
return err
return result, err
}
indexBlobWithVersion := addVersion(request.CloseFailoverVersion, existingVersions)
if indexBlobWithVersion == nil {
return nil
return result, nil
}
if err := uploadBlob(ctx, blobstoreClient, request.BucketName, indexBlobKey, indexBlobWithVersion); err != nil {
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.ArchivalBlobKey(indexBlobKey.String()), tag.Error(err))
return err
return result, err
}
if runBlobIntegrityCheck {
scope.IncCounter(metrics.ArchiverRunningBlobIntegrityCheckCount)
Expand All @@ -251,7 +303,7 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
if err != nil {
scope.IncCounter(metrics.ArchiverCouldNotRunBlobIntegrityCheckCount)
logger.Error("failed to access history for blob integrity check", tag.Error(err))
return nil
return result, nil
}
for _, e := range resp.HistoryBlob.Body.Events {
fetchedHistoryEventHashes = append(fetchedHistoryEventHashes, hash(e.String()))
Expand All @@ -263,7 +315,7 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
logger.Error("uploaded history does not match fetched history")
}
}
return nil
return result, nil
}

// deleteHistoryActivity deletes workflow execution history from persistence.
Expand Down Expand Up @@ -301,9 +353,7 @@ func deleteHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
// deleteBlobActivity deletes uploaded history blobs from blob store.
// method will retry all retryable operations until context expires.
// method will always return either: nil, contextTimeoutErr or an error from deleteBlobActivityNonRetryableErrors.
// TODO: after heartbeating during uploadHistoryActivity is implemented, this activity should take
// a list of uploaded blob keys as input.
func deleteBlobActivity(ctx context.Context, request ArchiveRequest) (err error) {
func deleteBlobActivity(ctx context.Context, request ArchiveRequest, blobsToDelete []string) (err error) {
container := ctx.Value(bootstrapContainerKey).(*BootstrapContainer)
scope := container.MetricsClient.Scope(metrics.ArchiverDeleteBlobActivityScope, metrics.DomainTag(request.DomainName))
sw := scope.StartTimer(metrics.CadenceLatency)
Expand Down Expand Up @@ -354,37 +404,30 @@ func deleteBlobActivity(ctx context.Context, request ArchiveRequest) (err error)
}
}

pageToken := common.FirstBlobPageToken
startIdx := 0
if activity.HasHeartbeatDetails(ctx) {
var prevPageToken int
if err := activity.GetHeartbeatDetails(ctx, &prevPageToken); err == nil {
pageToken = prevPageToken + 1
var prevIdx int
if err := activity.GetHeartbeatDetails(ctx, &prevIdx); err == nil {
startIdx = prevIdx + 1
}
}

startPageToken := pageToken
for {
key, err := NewHistoryBlobKey(request.DomainID, request.WorkflowID, request.RunID, request.CloseFailoverVersion, pageToken)
for idx, keyString := range blobsToDelete[startIdx:] {
key, err := blob.NewKeyFromString(keyString)
if err != nil {
logger.Error("could not construct blob key", tag.Error(err))
return cadence.NewCustomError(errConstructKey, err.Error())
// this should not happen
logger.Error("could not constrcut blob key from string, skip to next one", tag.ArchivalBlobKey(keyString))
continue
}

deleted, err := deleteBlob(ctx, blobstoreClient, request.BucketName, key)
_, err = deleteBlob(ctx, blobstoreClient, request.BucketName, key)
if err != nil {
logger.Error("failed to delete blob", tag.ArchivalBlobKey(key.String()), tag.ArchivalDeleteHistoryFailReason(errorDetails(err)), tag.Error(err))
return err
}
if !deleted && pageToken != startPageToken {
// Blob does not exist. This means we have deleted all uploaded blobs.
// Note we should not break if the first page does not exist as it's possible that a blob has been deleted,
// but the worker restarts before heartbeat is recorded.
break
logger.Error("failed to delete blob, continue to next one", tag.ArchivalBlobKey(key.String()), tag.ArchivalDeleteHistoryFailReason(errorDetails(err)), tag.Error(err))
continue
}
activity.RecordHeartbeat(ctx, pageToken)
pageToken++
}

activity.RecordHeartbeat(ctx, idx+startIdx)
}
return nil
}

Expand Down
Loading