-
Notifications
You must be signed in to change notification settings - Fork 138
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
…go into infinite loop + rev tree repair tool) (#2869) * Fixes #2847 Getting doc history can go into infinite loop: Backport for 1.4.1.2 (#2858) * Fixes #2847 Getting doc history can go into infinite loop: Backport for 1.3.1.2 (#2856) * Repro case for #2847 - Getting doc history can go into infinite loop #2847 * Add revTree.Validate() method * Invoke rawDoc.History.RenderGraphvizDot(), change timeout checking * Add new test * Cherry pick commit from feature/issue_2847_cycles Cherry pick commit d8feb1d from feature/issue_2847_cycles, which is based on the master branch * Fixes issue #2847 by wwitching the dangling parent check in pruneRevisions to happen after branches are deleted #2847 (comment) * Comments on test * Fixes issue #2847 by fixing marshalJSON to better handle dangling parents during marshal #2847 (comment) * Replace getHistory with getValidatedHistory TestRevsHistoryInfiniteLoop now passes * Rename getValidatedHistory -> getHistory * Gofmt * Remove parent rev from history string * Remove unneeded fmt.Sprintf() * Run sg-accel tests * Try pointing to specific commit to fix build failure * Use git url instead of ssh for sga accel repo * Revert "Use git url instead of ssh for sga accel repo" This reverts commit 1c5e061. * Revert "Try pointing to specific commit to fix build failure" This reverts commit dd3f9d9. * Revert "Run sg-accel tests" This reverts commit d5cc940. * Remove rawDoc.History.Validate(). Does not help test catch any issues. * Remove commented log * Remove unneeded import # Conflicts: # db/revtree_test.go * Fix compile errors # Conflicts: # db/crud.go # db/revtree_test.go * Fixes #2857: Revision tree repair tool (#2866) * TestRepairRevsHistoryWithCycles() * Add Repair() — unit test passes now * Add repair bucket (in progress) * Test passes * Fix dry run * TestRepairBucketRevTreeCycles() passes * Add _repair endpoint * Fix invocation of InitFrom * Refactor RepairBucket() to return repaired docs to enable more test assertions * WriteRepairedDocsToDisk by default to make diagnosis of repair tool easier * Run gofmt + goimports * PR feedback, remove redundant repair_job * DocTransformer takes raw bytes instead of the marshalled document * Return RepairBucketResult with doc and and repair job * Update _repair endpoint to marshal result to response * Add WriteRepairedDocsToBucket(), fix super nasty dev bug along the way * Fix bug in WriteRepairedDocsToBucket() * Change to 24 hours and fix up _sync: doc id * gofmt * Return DryRun and BackupOrDryRunDocId in results * Repair() -> RepairCycles() * More test docs * Handle bucket.Update err * Add TestRepairBucketDryRun() * Use bucket.GetRaw() * Gofmt + goimports * TestRepairBucket had wrong number of docs in assertion + saw error on drone that made me think there is interference between walrus buckets w/ same name. * Fix compile error * Fix null return value when no repairs done * Change from Warn -> Crud * Disable _repair endpoint * Fix test compile errors * Fixes #2892 repair tool efficiency (#2893) * Fixes #2892 - Repair tool efficiency improvements Initial first pass at iterating over the view w/ paging * Add documentation * More comments * Comment regarding min pageSIzeViewResult * pull ViewQueryPageSize out to a constant. Enhance tests to add more docs to excercise the view iteration * Address PR feedback * Update comment * Change constant to DefaultViewQueryPageSize * Run gofmt # Conflicts: # base/constants.go # db/repair_bucket_test.go * Adds support for custom TTL of repaired docs and ViewQueryPageSize deafults (#2902) Adds support for custom TTL of repaired docs and ViewQueryPageSize defaults * correctly apply _repair parameter value for params.RepairedFileTTL (#2909) * Fixes #2919 Revtree repair tool gets stuck if node points to itself a… (#2920) * Fixes #2919 Revtree repair tool gets stuck if node points to itself as parent * PR feedback
- Loading branch information
1 parent
c416bf6
commit 29887fb
Showing
8 changed files
with
6,733 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,359 @@ | ||
package db | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
|
||
"github.com/couchbase/go-couchbase" | ||
"github.com/couchbase/sync_gateway/base" | ||
"time" | ||
) | ||
|
||
// Enum for the different repair jobs (eg, repairing rev tree cycles) | ||
type RepairJobType string | ||
|
||
const kDefaultRepairedFileTTL = 60 * 60 * 24 * time.Second // 24 hours | ||
|
||
const ( | ||
RepairRevTreeCycles = RepairJobType("RepairRevTreeCycles") | ||
) | ||
|
||
// Params suitable for external (eg, HTTP) invocations to describe a RepairBucket operation | ||
type RepairBucketParams struct { | ||
DryRun bool `json:"dry_run"` | ||
ViewQueryPageSize *int `json:"view_query_page_size"` | ||
RepairedFileTTL *int `json:"repair_output_ttl_seconds"` | ||
RepairJobs []RepairJobParams `json:"repair_jobs"` | ||
} | ||
|
||
// Params suitable for external (eg, HTTP) invocations to describe a specific RepairJob operation | ||
type RepairJobParams struct { | ||
RepairJobType RepairJobType `json:"type"` | ||
RepairJobParams map[string]interface{} `json:"params"` | ||
} | ||
|
||
// Record details about the result of a bucket repair that was made on a doc | ||
type RepairBucketResult struct { | ||
DryRun bool `json:"dry_run"` | ||
BackupOrDryRunDocId string `json:"backup_or_dryrun_doc_id"` | ||
DocId string `json:"id"` | ||
RepairJobTypes []RepairJobType `json:"repair_job_type"` | ||
} | ||
|
||
// Given a Couchbase Bucket doc, transform the doc in some way to produce a new doc. | ||
// Also return a boolean to indicate whether a transformation took place, or any errors occurred. | ||
type DocTransformer func(docId string, originalCBDoc []byte) (transformedCBDoc []byte, transformed bool, err error) | ||
|
||
// A RepairBucket struct is the main API entrypoint to call for repairing documents in buckets | ||
type RepairBucket struct { | ||
DryRun bool // If true, will only output what changes it *would* have made, but not make any changes | ||
RepairedFileTTL time.Duration | ||
ViewQueryPageSize int | ||
Bucket base.Bucket | ||
RepairJobs []DocTransformer | ||
} | ||
|
||
func NewRepairBucket(bucket base.Bucket) *RepairBucket { | ||
return &RepairBucket{ | ||
Bucket: bucket, | ||
ViewQueryPageSize: base.DefaultViewQueryPageSize, | ||
RepairedFileTTL: kDefaultRepairedFileTTL, | ||
} | ||
} | ||
|
||
func (r *RepairBucket) SetDryRun(dryRun bool) *RepairBucket { | ||
r.DryRun = dryRun | ||
return r | ||
} | ||
|
||
func (r *RepairBucket) AddRepairJob(repairJob DocTransformer) *RepairBucket { | ||
r.RepairJobs = append(r.RepairJobs, repairJob) | ||
return r | ||
} | ||
|
||
func (r *RepairBucket) InitFrom(params RepairBucketParams) *RepairBucket { | ||
|
||
r.SetDryRun(params.DryRun) | ||
if params.ViewQueryPageSize != nil && *params.ViewQueryPageSize > 0 { | ||
r.ViewQueryPageSize = *params.ViewQueryPageSize | ||
} | ||
|
||
if params.RepairedFileTTL != nil && *params.RepairedFileTTL >= 0 { | ||
r.RepairedFileTTL = time.Duration(*params.RepairedFileTTL) * time.Second | ||
} | ||
|
||
for _, repairJobParams := range params.RepairJobs { | ||
switch repairJobParams.RepairJobType { | ||
case RepairRevTreeCycles: | ||
r.AddRepairJob(RepairJobRevTreeCycles) | ||
} | ||
} | ||
|
||
return r | ||
} | ||
|
||
/* | ||
This is how the view is iterated: | ||
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ | ||
┌ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ | ||
│ ┌ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ┐ | ||
│ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ | ||
│┌────┐ ┌────┐ ┌────┐ ┌────┐ │┌────┐│ ┌────┐ │ | ||
│doc1│ │doc2│ ││doc3││ │doc4│ │doc5│ ││doc6│ │ | ||
│└────┘ └────┘ └────┘ └────┘ │└────┘│ └────┘ │ | ||
│ │ │ │ | ||
└ ─ ─ ─ ─ ─ ▲ ─ ─ ─ ─ ─ │ │ │ | ||
│ └ ─ ─ ─ ─ ─ ▲ ─ ─ ─ ─ ─ │ │ | ||
│ │ └ ─ ─ ─ ─ ─▲─ ─ ─ ─ ─ ┘ | ||
StartKey: "" │ └ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ | ||
Limit: 3 │ │ ▲ | ||
NumProcessed: 3 │ │ │ | ||
StartKey: "doc3" │ │ | ||
Limit: 3 │ │ | ||
NumProcessed: 2 │ └────────┐ | ||
│ │ | ||
StartKey: "doc5" │ | ||
Limit: 3 │ | ||
NumProcessed: 1 │ | ||
│ | ||
StartKey: "doc6" | ||
Limit: 3 | ||
NumProcessed: 0 | ||
* It starts with an empty start key | ||
* For the next page, it uses the last key processed as the new start key | ||
* Since the start key is inclusive, it will see the start key twice (on first page, and on next page) | ||
* If it's iterating a result page and sees a doc with the start key (eg, doc3 in above), it will ignore it so it doesn't process it twice | ||
* Stop condition: if NumProcessed is 0, because the only doc in result set had already been processed. | ||
* | ||
*/ | ||
func (r RepairBucket) RepairBucket() (results []RepairBucketResult, err error) { | ||
|
||
base.LogTo("CRUD", "RepairBucket() invoked") | ||
defer base.LogTo("CRUD", "RepairBucket() finished") | ||
|
||
startKey := "" | ||
results = []RepairBucketResult{} | ||
numDocsProcessed := 0 | ||
|
||
for { | ||
|
||
options := Body{"stale": false, "reduce": false} | ||
options["startkey"] = []interface{}{ | ||
true, | ||
startKey, | ||
} | ||
options["limit"] = r.ViewQueryPageSize | ||
|
||
base.LogTo("CRUD", "RepairBucket() querying view with options: %+v", options) | ||
vres, err := r.Bucket.View(DesignDocSyncHousekeeping, ViewImport, options) | ||
base.LogTo("CRUD", "RepairBucket() queried view and got %d results", len(vres.Rows)) | ||
if err != nil { | ||
return results, err | ||
} | ||
|
||
// Check | ||
if len(vres.Rows) == 0 { | ||
// No more results. Return | ||
return results, nil | ||
} | ||
|
||
// Keep a counter of how many results were processed, since if none were processed it indicates that | ||
// we hit the last (empty) page of data. This is needed because the start key is inclusive, and | ||
// so even on the last page of results, the view query will return a single result with the start key doc. | ||
numResultsProcessed := 0 | ||
|
||
for _, row := range vres.Rows { | ||
|
||
rowKey := row.Key.([]interface{}) | ||
docid := rowKey[1].(string) | ||
|
||
if docid == startKey { | ||
// Skip this, already processed in previous iteration. Important to do this before numResultsProcessed | ||
// is incremented. | ||
continue | ||
} | ||
|
||
// The next page for viewquery should start at the last result in this page. There is a de-duping mechanism | ||
// to avoid processing this doc twice. | ||
startKey = docid | ||
|
||
// Increment counter of how many results were processed for detecting stop condition | ||
numResultsProcessed += 1 | ||
|
||
key := realDocID(docid) | ||
var backupOrDryRunDocId string | ||
|
||
err = r.Bucket.Update(key, 0, func(currentValue []byte) ([]byte, error) { | ||
// Be careful: this block can be invoked multiple times if there are races! | ||
if currentValue == nil { | ||
return nil, couchbase.UpdateCancel // someone deleted it?! | ||
} | ||
updatedDoc, shouldUpdate, repairJobs, err := r.TransformBucketDoc(key, currentValue) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
switch shouldUpdate { | ||
case true: | ||
|
||
backupOrDryRunDocId, err = r.WriteRepairedDocsToBucket(key, currentValue, updatedDoc) | ||
if err != nil { | ||
base.LogTo("CRUD", "Repair Doc (dry_run=%v) Writing docs to bucket failed with error: %v. Dumping raw contents.", r.DryRun, err) | ||
base.LogTo("CRUD", "Original Doc before repair: %s", currentValue) | ||
base.LogTo("CRUD", "Updated doc after repair: %s", updatedDoc) | ||
} | ||
|
||
result := RepairBucketResult{ | ||
DryRun: r.DryRun, | ||
BackupOrDryRunDocId: backupOrDryRunDocId, | ||
DocId: key, | ||
RepairJobTypes: repairJobs, | ||
} | ||
|
||
results = append(results, result) | ||
|
||
if r.DryRun { | ||
return nil, couchbase.UpdateCancel | ||
} else { | ||
return updatedDoc, nil | ||
} | ||
default: | ||
return nil, couchbase.UpdateCancel | ||
} | ||
|
||
}) | ||
|
||
if err != nil { | ||
// Ignore couchbase.UpdateCancel (Cas.QUIT) errors. Any other errors should be returned to caller | ||
if err != couchbase.UpdateCancel { | ||
return results, err | ||
} | ||
} | ||
|
||
if backupOrDryRunDocId != "" { | ||
if r.DryRun { | ||
base.LogTo("CRUD", "Repair Doc: dry run result available in Bucket Doc: %v (auto-deletes in 24 hours)", backupOrDryRunDocId) | ||
} else { | ||
base.LogTo("CRUD", "Repair Doc: Doc repaired, original doc backed up in Bucket Doc: %v (auto-deletes in 24 hours)", backupOrDryRunDocId) | ||
} | ||
} | ||
|
||
} | ||
|
||
numDocsProcessed += numResultsProcessed | ||
|
||
base.LogTo("CRUD", "RepairBucket() processed %d / %d", numDocsProcessed, vres.TotalRows) | ||
|
||
if numResultsProcessed == 0 { | ||
// No point in going to the next page, since this page had 0 results. See method comments. | ||
return results, nil | ||
} | ||
|
||
} | ||
|
||
// Should never get here, due to early returns above | ||
return results, nil | ||
|
||
} | ||
|
||
func (r RepairBucket) WriteRepairedDocsToBucket(docId string, originalDoc, updatedDoc []byte) (backupOrDryRunDocId string, err error) { | ||
|
||
var contentToSave []byte | ||
|
||
if r.DryRun { | ||
backupOrDryRunDocId = fmt.Sprintf("_sync:repair:dryrun:%v", docId) | ||
contentToSave = updatedDoc | ||
} else { | ||
backupOrDryRunDocId = fmt.Sprintf("_sync:repair:backup:%v", docId) | ||
contentToSave = originalDoc | ||
} | ||
|
||
doc, err := unmarshalDocument(docId, contentToSave) | ||
if err != nil { | ||
return backupOrDryRunDocId, fmt.Errorf("Error unmarshalling updated/original doc. Err: %v", err) | ||
} | ||
|
||
//If the RepairedFileTTL is explicitly set to 0 then don't write the doc at all | ||
if int(r.RepairedFileTTL.Seconds()) == 0 { | ||
base.LogTo("CRUD", "Repair Doc: Doc %v repaired, TTL set to 0, doc will not be written to bucket", backupOrDryRunDocId) | ||
return backupOrDryRunDocId, nil | ||
} | ||
|
||
if err := r.Bucket.Set(backupOrDryRunDocId, base.DurationToCbsExpiry(r.RepairedFileTTL), doc); err != nil { | ||
return backupOrDryRunDocId, err | ||
} | ||
|
||
return backupOrDryRunDocId, nil | ||
|
||
} | ||
|
||
// Loops over all repair jobs and applies them | ||
func (r RepairBucket) TransformBucketDoc(docId string, originalCBDoc []byte) (transformedCBDoc []byte, transformed bool, repairJobs []RepairJobType, err error) { | ||
|
||
transformed = false | ||
for _, repairJob := range r.RepairJobs { | ||
|
||
repairedDoc, repairedDocTxformed, repairDocErr := repairJob(docId, originalCBDoc) | ||
if repairDocErr != nil { | ||
return nil, false, repairJobs, repairDocErr | ||
} | ||
|
||
if !repairedDocTxformed { | ||
continue | ||
} | ||
|
||
// Update output value to indicate this doc was transformed by at least one of the underlying repair jobs | ||
transformed = true | ||
|
||
// Update output value with latest result from repair job, which may be overwritten by later loop iterations | ||
transformedCBDoc = repairedDoc | ||
|
||
// Update doc that is being transformed to be the output of the last repair job, in order | ||
// that the next iteration of the loop use this as input | ||
originalCBDoc = repairedDoc | ||
|
||
// Hack: since RepairRevTreeCycles is the only type of repair job, hardcode it to this | ||
// In the future, this will need to be updated so that that RepairJob is based on interfaces instead of functions | ||
// So that .JobType() can be called on it. Currently there doesn't seem to be a way to do that. | ||
repairJobs = append(repairJobs, RepairRevTreeCycles) | ||
|
||
} | ||
|
||
return transformedCBDoc, transformed, repairJobs, nil | ||
} | ||
|
||
// Repairs rev tree cycles (see SG issue #2847) | ||
func RepairJobRevTreeCycles(docId string, originalCBDoc []byte) (transformedCBDoc []byte, transformed bool, err error) { | ||
|
||
base.LogTo("CRUD+","RepairJobRevTreeCycles() called with doc id: %v", docId) | ||
defer base.LogTo("CRUD+","RepairJobRevTreeCycles() finished. Doc id: %v. transformed: %v. err: %v", docId, transformed, err) | ||
|
||
doc, errUnmarshal := unmarshalDocument(docId, originalCBDoc) | ||
if errUnmarshal != nil { | ||
return nil, false, errUnmarshal | ||
} | ||
|
||
// Check if rev history has cycles | ||
containsCycles := doc.History.ContainsCycles() | ||
|
||
if !containsCycles { | ||
// nothing to repair | ||
return nil, false, nil | ||
} | ||
|
||
// Repair it | ||
if err := doc.History.RepairCycles(); err != nil { | ||
return nil, false, err | ||
} | ||
|
||
transformedCBDoc, errMarshal := json.Marshal(doc) | ||
if errMarshal != nil { | ||
return nil, false, errMarshal | ||
} | ||
|
||
return transformedCBDoc, true, nil | ||
|
||
} |
Oops, something went wrong.