Skip to content

Commit

Permalink
Correct CAS retry handling for import based on import type (#2676)
Browse files Browse the repository at this point in the history
* Correct CAS retry handling for import based on import type

Feed-based imports shouldn't be retried on CAS failures - the subsequent mutation will be imported if needed.

On-demand imports need to be retried on CAS failure, so that the subsequent SG operation is working against the correct version of the document.

* Update comments based on PR review, add accel change to manifest
  • Loading branch information
adamcfraser authored Jun 23, 2017
1 parent ea21142 commit 8338da6
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 15 deletions.
67 changes: 66 additions & 1 deletion base/bucket_gocb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,6 @@ func TestXattrWriteCasUpsert(t *testing.T) {
if err != nil {
t.Errorf("Error doing GetWithXattr: %+v", err)
}
// TODO: Cas check fails, pending xattr code to make it to gocb master
log.Printf("TestWriteCasXATTR retrieved: %s, %s", retrievedVal2, retrievedXattr2)
assert.Equals(t, getCas, cas)
assert.Equals(t, retrievedVal2["body_field"], val2["body_field"])
Expand All @@ -679,6 +678,72 @@ func TestXattrWriteCasUpsert(t *testing.T) {

}

// TestXattrWriteCasWithXattrCasFailure. Validates cas check when using WriteCasWithXattr
func TestXattrWriteCasWithXattrCasFailure(t *testing.T) {

SkipXattrTestsIfNotEnabled(t)

bucket := GetBucketOrPanic()

key := "TestWriteCasXATTRSimple"
xattrName := "_sync"
val := make(map[string]interface{})
val["sg_field"] = "sg_value"

xattrVal := make(map[string]interface{})
xattrVal["seq"] = float64(123)
xattrVal["rev"] = "1-1234"

var existsVal map[string]interface{}
_, err := bucket.Get(key, existsVal)
if err == nil {
log.Printf("Key should not exist yet, expected error but got nil. Doing cleanup, assuming couchbase bucket testing")
err = bucket.DeleteWithXattr(key, xattrName)
}

cas := uint64(0)
cas, err = bucket.WriteCasWithXattr(key, xattrName, 0, cas, val, xattrVal)
assertNoError(t, err, "WriteCasWithXattr error")
log.Printf("Post-write, cas is %d", cas)

var retrievedVal map[string]interface{}
var retrievedXattr map[string]interface{}
getCas, err := bucket.GetWithXattr(key, xattrName, &retrievedVal, &retrievedXattr)
if err != nil {
t.Errorf("Error doing GetWithXattr: %+v", err)
}
log.Printf("TestWriteCasXATTR retrieved: %s, %s", retrievedVal, retrievedXattr)
assert.Equals(t, getCas, cas)
assert.Equals(t, retrievedVal["sg_field"], val["sg_field"])
assert.Equals(t, retrievedXattr["seq"], xattrVal["seq"])
assert.Equals(t, retrievedXattr["rev"], xattrVal["rev"])

// Simulate an SDK update
updatedVal := make(map[string]interface{})
updatedVal["sdk_field"] = "abc"
bucket.Set(key, 0, updatedVal)

// Attempt to update with the previous CAS
val["sg_field"] = "sg_value_mod"
xattrVal["rev"] = "2-1234"
_, err = bucket.WriteCasWithXattr(key, xattrName, 0, getCas, val, xattrVal)
assert.Equals(t, err, gocb.ErrKeyExists)

// Retrieve again, ensure we get the SDK value, SG xattr
retrievedVal = nil
retrievedXattr = nil
_, err = bucket.GetWithXattr(key, xattrName, &retrievedVal, &retrievedXattr)
if err != nil {
t.Errorf("Error doing GetWithXattr: %+v", err)
}
log.Printf("TestWriteCasXATTR retrieved: %s, %s", retrievedVal, retrievedXattr)
assert.Equals(t, retrievedVal["sg_field"], nil)
assert.Equals(t, retrievedVal["sdk_field"], updatedVal["sdk_field"])
assert.Equals(t, retrievedXattr["seq"], xattrVal["seq"])
assert.Equals(t, retrievedXattr["rev"], "1-1234")

}

// TestWriteCasXATTRRaw. Validates basic write of document and xattr as raw bytes.
func TestXattrWriteCasRaw(t *testing.T) {

Expand Down
12 changes: 8 additions & 4 deletions base/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ import (
type sgErrorCode uint16

const (
alreadyImported = sgErrorCode(0x00)
importCancelled = sgErrorCode(0x01)
alreadyImported = sgErrorCode(0x00)
importCancelled = sgErrorCode(0x01)
importCasFailure = sgErrorCode(0x02)
)

type SGError struct {
code sgErrorCode
}

var (
ErrAlreadyImported = &SGError{alreadyImported}
ErrImportCancelled = &SGError{importCancelled}
ErrImportCancelled = &SGError{importCancelled}
ErrAlreadyImported = &SGError{alreadyImported}
ErrImportCasFailure = &SGError{importCasFailure}
)

func (e SGError) Error() string {
Expand All @@ -41,6 +43,8 @@ func (e SGError) Error() string {
return "Document already imported"
case importCancelled:
return "Import cancelled"
case importCasFailure:
return "CAS failure during import"
default:
return "Unknown error"
}
Expand Down
8 changes: 6 additions & 2 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,13 @@ func (c *changeCache) DocChanged(event sgbucket.TapEvent) {
rawBody = nil
}
db := Database{DatabaseContext: c.context, user: nil}
_, err := db.ImportDocRaw(docID, rawBody, isDelete)
_, err := db.ImportDocRaw(docID, rawBody, isDelete, event.Cas, ImportFromFeed)
if err != nil {
base.Warn("Unable to import doc %q - external update will not be accessible via Sync Gateway. Reason: %v", docID, err)
if err == base.ErrImportCasFailure {
base.LogTo("Import+", "Not importing mutation - document %s has been subsequently updated and will be imported based on that mutation.", docID)
} else {
base.Warn("Unable to import doc %q - external update will not be accessible via Sync Gateway. Reason: %v", docID, err)
}
}
}
return
Expand Down
35 changes: 29 additions & 6 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (db *DatabaseContext) GetDoc(docid string) (doc *document, err error) {
isDelete := rawDoc == nil
db := Database{DatabaseContext: db, user: nil}
var importErr error
doc, importErr = db.ImportDocRaw(docid, rawDoc, isDelete)
doc, importErr = db.ImportDocRaw(docid, rawDoc, isDelete, cas, ImportOnDemand)
if importErr != nil {
return nil, importErr
}
Expand Down Expand Up @@ -486,7 +486,7 @@ func (db *Database) Put(docid string, body Body) (newRevID string, err error) {
// Use an admin-scoped database for import
importDb := Database{DatabaseContext: db.DatabaseContext, user: nil}
var importErr error
doc, importErr = importDb.ImportDoc(docid, doc.body, isDelete)
doc, importErr = importDb.ImportDoc(docid, doc.body, isDelete, doc.Cas, ImportOnDemand)
if importErr != nil {
return nil, nil, importErr
}
Expand Down Expand Up @@ -547,7 +547,7 @@ func (db *Database) PutExistingRev(docid string, body Body, docHistory []string)
// Use an admin-scoped database for import
importDb := Database{DatabaseContext: db.DatabaseContext, user: nil}
var importErr error
doc, importErr = importDb.ImportDoc(docid, doc.body, isDelete)
doc, importErr = importDb.ImportDoc(docid, doc.body, isDelete, doc.Cas, ImportOnDemand)
if importErr != nil {
return nil, nil, importErr
}
Expand Down Expand Up @@ -590,8 +590,15 @@ func (db *Database) PutExistingRev(docid string, body Body, docHistory []string)
return err
}

type ImportMode uint8

const (
ImportFromFeed = ImportMode(iota) // Feed-based import. Attempt to import once - cancels import on cas write failure of the imported doc.
ImportOnDemand // On-demand import. Reattempt import on cas write failure of the imported doc until either the import succeeds, or existing doc is an SG write.
)

// Imports a document that was written by someone other than sync gateway.
func (db *Database) ImportDocRaw(docid string, value []byte, isDelete bool) (docOut *document, err error) {
func (db *Database) ImportDocRaw(docid string, value []byte, isDelete bool, cas uint64, mode ImportMode) (docOut *document, err error) {

var body Body
if isDelete {
Expand All @@ -604,10 +611,10 @@ func (db *Database) ImportDocRaw(docid string, value []byte, isDelete bool) (doc
}
}

return db.ImportDoc(docid, body, isDelete)
return db.ImportDoc(docid, body, isDelete, cas, mode)
}

func (db *Database) ImportDoc(docid string, body Body, isDelete bool) (docOut *document, err error) {
func (db *Database) ImportDoc(docid string, body Body, isDelete bool, importCas uint64, mode ImportMode) (docOut *document, err error) {

base.LogTo("Import+", "Attempting to import doc %q...", docid)
var newRev string
Expand Down Expand Up @@ -635,6 +642,19 @@ func (db *Database) ImportDoc(docid string, body Body, isDelete bool) (docOut *d
return nil, nil, base.ErrAlreadyImported
}

// If there's a cas mismatch, the doc has been updated since the version that triggered the import. This is an SDK write (since we checked
// for SG write above). How to handle depends on import mode.
if doc.Cas != importCas {
// If this is a feed import, cancel on cas failure (doc has been updated )
if mode == ImportFromFeed {
return nil, nil, base.ErrImportCasFailure
}
// If this is an on-demand import, we want to switch to importing the current version doc
if mode == ImportOnDemand {
body = doc.body
}
}

// The active rev is the parent for an import
parentRev := doc.CurrentRev
generation, _ := ParseRevID(parentRev)
Expand All @@ -660,6 +680,9 @@ func (db *Database) ImportDoc(docid string, body Body, isDelete bool) (docOut *d
base.LogTo("Import+", "Imported %s (delete=%v) as rev %s", docid, isDelete, newRev)
case base.ErrImportCancelled:
// Import was cancelled (SG purge) - don't return error.
case base.ErrImportCasFailure:
// Import was cancelled due to CAS failure.
return nil, err
default:
base.LogTo("Import", "Error importing doc %q: %v", docid, err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (s *syncData) IsSGWrite(cas uint64) bool {
func (doc *document) IsSGWrite() bool {
result := doc.syncData.IsSGWrite(doc.Cas)
if result == false {
base.LogTo("Import+", "Doc %s is not an SG write, based on cas. cas:%x syncCas:%q", doc.ID, doc.Cas, doc.syncData.Cas)
base.LogTo("CRUD+", "Doc %s is not an SG write, based on cas. cas:%x syncCas:%q", doc.ID, doc.Cas, doc.syncData.Cas)
}
return result
}
Expand Down
2 changes: 1 addition & 1 deletion manifest/default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@


<!-- Sync Gateway Accel-->
<project groups="notdefault,sg-accel" name="sync-gateway-accel" path="godeps/src/github.com/couchbaselabs/sync-gateway-accel" remote="couchbaselabs_private" revision="7e4f6b042e7a829acc777edc07ffcb8efffcfb81"/>
<project groups="notdefault,sg-accel" name="sync-gateway-accel" path="godeps/src/github.com/couchbaselabs/sync-gateway-accel" remote="couchbaselabs_private" revision="b5185b1407e6893e3a871e1f4b76eed448ef6870"/>

<!-- Dependencies specific to Sync Gateway Accel-->
<project groups="notdefault,sg-accel" name="cbgt" path="godeps/src/github.com/couchbase/cbgt" remote="couchbase" revision="4aaaca921a7ef64900f3b569c33ad87e9e8df065"/>
Expand Down
64 changes: 64 additions & 0 deletions rest/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"encoding/json"
"fmt"
"log"
"sync"
"testing"
"time"

"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/db"
Expand Down Expand Up @@ -115,6 +117,68 @@ func TestXattrImportOldDoc(t *testing.T) {

}

// Test cas failure during WriteUpdate, triggering import of SDK write.
// Disabled, as test depends on artificial latency in PutDoc to reliably hit the CAS failure on the SG write. Scenario fully covered
// by functional test.
func DisableTestXattrImportOnCasFailure(t *testing.T) {

SkipImportTestsIfNotEnabled(t)

rt := RestTester{}
defer rt.Close()

bucket := rt.Bucket()
rt.SendAdminRequest("PUT", "/_logging", `{"ImportCas":true}`)

// 1. SG Write
key := "TestCasFailureImport"
docBody := make(map[string]interface{})
docBody["test"] = "TestCasFailureImport"
docBody["SG_write_count"] = "1"

response := rt.SendAdminRequest("PUT", "/db/TestCasFailureImport", `{"test":"TestCasFailureImport", "write_type":"SG_1"}`)
assert.Equals(t, response.Code, 201)
log.Printf("insert response: %s", response.Body.Bytes())
var body db.Body
json.Unmarshal(response.Body.Bytes(), &body)
assert.Equals(t, body["rev"], "1-111c27be37c17f18ae8fe9faa3bb4e0e")
revId := body["rev"].(string)

// Attempt a second SG write, to be interrupted by an SDK update. Should return a conflict
var wg sync.WaitGroup
wg.Add(1)
go func() {
response = rt.SendAdminRequest("PUT", fmt.Sprintf("/db/%s?rev=%s", key, revId), `{"write_type":"SG_2"}`)
assert.Equals(t, response.Code, 409)
log.Printf("SG CAS failure write response: %s", response.Body.Bytes())
wg.Done()
}()

// Concurrent SDK writes for 10 seconds, one per second
for i := 0; i < 10; i++ {
time.Sleep(1 * time.Second)
sdkBody := make(map[string]interface{})
sdkBody["test"] = "TestCasFailureImport"
sdkBody["SDK_write_count"] = i
err := bucket.Set(key, 0, sdkBody)
assertNoError(t, err, "Unexpected error doing SDK write")
}

// wait for SG write to happen
wg.Wait()

// Get to see where we ended up
response = rt.SendAdminRequest("GET", "/db/TestCasFailureImport", "")
assert.Equals(t, response.Code, 200)
log.Printf("Final get: %s", response.Body.Bytes())

// Get raw to see where the rev tree ended up
response = rt.SendAdminRequest("GET", "/db/_raw/TestCasFailureImport", "")
assert.Equals(t, response.Code, 200)
log.Printf("Final get raw: %s", response.Body.Bytes())

}

// Attempt to delete then recreate a document through SG
func TestXattrResurrectViaSG(t *testing.T) {

Expand Down

0 comments on commit 8338da6

Please sign in to comment.