diff --git a/go.mod b/go.mod index 44cde08367..3425d33671 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/coreos/go-oidc/v3 v3.12.0 github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d github.com/couchbase/clog v0.1.0 - github.com/couchbase/go-blip v0.0.0-20250130142438-e3a29100f703 + github.com/couchbase/go-blip v0.0.0-20250222004812-31da589e100a github.com/couchbase/gocb/v2 v2.9.4 github.com/couchbase/gocbcore/v10 v10.5.4 github.com/couchbase/gomemcached v0.2.1 diff --git a/go.sum b/go.sum index 6477806e68..c2e671b72c 100644 --- a/go.sum +++ b/go.sum @@ -40,8 +40,8 @@ github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d h1:X80jy41uF1ivq github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d/go.mod h1:MImhtmvk0qjJit5HbmA34tnYThZoNtvgjL7jJH/kCAE= github.com/couchbase/clog v0.1.0 h1:4Kh/YHkhRjMCbdQuvRVsm39XZh4FtL1d8fAwJsHrEPY= github.com/couchbase/clog v0.1.0/go.mod h1:7tzUpEOsE+fgU81yfcjy5N1H6XtbVC8SgOz/3mCjmd4= -github.com/couchbase/go-blip v0.0.0-20250130142438-e3a29100f703 h1:PSEnn/xmEDjZ8uXjuZIXGYlG9QfLOioqKz5MLUpMvkk= -github.com/couchbase/go-blip v0.0.0-20250130142438-e3a29100f703/go.mod h1:J2dZK3JAfWPKZfRWERC7xVlRPQvMVMde/I1Ed4cQIrM= +github.com/couchbase/go-blip v0.0.0-20250222004812-31da589e100a h1:tCXbdnE+cHXHEgoyONn5yD2c73CFeVYuxCu6Md4v5w4= +github.com/couchbase/go-blip v0.0.0-20250222004812-31da589e100a/go.mod h1:+K082iN0fPzrWgNU/58/sMpydLVTTSdnuJ1srwlWTuk= github.com/couchbase/go-couchbase v0.1.1 h1:ClFXELcKj/ojyoTYbsY34QUrrYCBi/1G749sXSCkdhk= github.com/couchbase/go-couchbase v0.1.1/go.mod h1:+/bddYDxXsf9qt0xpDUtRR47A2GjaXmGGAqQ/k3GJ8A= github.com/couchbase/gocb/v2 v2.9.4 h1:PNYu6dqLFwIdHlEfZBzYE9Nh9NDtPu1/KLRF76bupdU= diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 72ef418d0e..42d65782c5 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -2185,25 +2185,10 @@ func TestRemovedMessageWithAlternateAccess(t *testing.T) { btcRunner.StartOneshotPull(btc.id) _ = btcRunner.WaitForVersion(btc.id, docMarker, docMarkerVersion) - messages := btc.pullReplication.GetMessages() - - var highestMsgSeq uint32 - var highestSeqMsg blip.Message - // Grab most recent changes message - for _, message := range messages { - messageBody, err := message.Body() - require.NoError(t, err) - if message.Properties["Profile"] == db.MessageChanges && string(messageBody) != "null" { - if highestMsgSeq < uint32(message.SerialNumber()) { - highestMsgSeq = uint32(message.SerialNumber()) - highestSeqMsg = message - } - } - } + changesMsg := btc.getMostRecentChangesMessage() var messageBody []interface{} - err = highestSeqMsg.ReadJSONBody(&messageBody) - assert.NoError(t, err) + require.NoError(t, changesMsg.ReadJSONBody(&messageBody)) require.Len(t, messageBody, 3) require.Len(t, messageBody[0], 4) // Rev 2 of doc, being sent as removal from channel A require.Len(t, messageBody[1], 4) // Rev 3 of doc, being sent as removal from channel B @@ -2288,25 +2273,9 @@ func TestRemovedMessageWithAlternateAccessAndChannelFilteredReplication(t *testi btcRunner.StartPullSince(btc.id, BlipTesterPullOptions{Channels: "A", Continuous: false}) _ = btcRunner.WaitForVersion(btc.id, markerID, markerVersion) - messages := btc.pullReplication.GetMessages() - - var highestMsgSeq uint32 - var highestSeqMsg blip.Message - // Grab most recent changes message - for _, message := range messages { - messageBody, err := message.Body() - require.NoError(t, err) - if message.Properties["Profile"] == db.MessageChanges && string(messageBody) != "null" { - if highestMsgSeq < uint32(message.SerialNumber()) { - highestMsgSeq = uint32(message.SerialNumber()) - highestSeqMsg = message - } - } - } - + changesMsg := btc.getMostRecentChangesMessage() var messageBody []interface{} - err = highestSeqMsg.ReadJSONBody(&messageBody) - assert.NoError(t, err) + require.NoError(t, changesMsg.ReadJSONBody(&messageBody)) require.Len(t, messageBody, 1) require.Len(t, messageBody[0], 3) // marker doc require.Equal(t, "docmarker", messageBody[0].([]interface{})[1]) diff --git a/rest/blip_client_test.go b/rest/blip_client_test.go index 54a3023105..a4bb95f7a3 100644 --- a/rest/blip_client_test.go +++ b/rest/blip_client_test.go @@ -989,6 +989,29 @@ func (btc *BlipTesterClient) waitForReplicationMessage(collection *db.DatabaseCo return btc.pushReplication.WaitForMessage(serialNumber + 1) } +// getMostRecentChangesMessage returns the most recent non nil changes message received from the pull replication. This represents the latest set of changes. +func (btc *BlipTesterClient) getMostRecentChangesMessage() *blip.Message { + var highestMsgSeq uint32 + var highestSeqMsg *blip.Message + // Grab most recent changes message + for _, message := range btc.pullReplication.GetMessages() { + if message.Properties["Profile"] != db.MessageChanges { + continue + } + messageBody, err := message.Body() + require.NoError(btc.TB(), err) + if string(messageBody) == "null" { + continue + } + if highestMsgSeq >= uint32(message.SerialNumber()) { + continue + } + highestMsgSeq = uint32(message.SerialNumber()) + highestSeqMsg = message + } + return highestSeqMsg +} + // SingleCollection returns a single collection blip tester if the RestTester database is configured with only one collection. Otherwise, throw a fatal test error. func (btcRunner *BlipTestClientRunner) SingleCollection(clientID uint32) *BlipTesterCollectionClient { if btcRunner.clients[clientID].nonCollectionAwareClient != nil { @@ -1476,15 +1499,15 @@ func (btr *BlipTesterReplicator) GetMessage(serialNumber blip.MessageNumber) (ms } // GetMessages returns a copy of all messages stored in the Client keyed by serial number -func (btr *BlipTesterReplicator) GetMessages() map[blip.MessageNumber]blip.Message { +func (btr *BlipTesterReplicator) GetMessages() map[blip.MessageNumber]*blip.Message { btr.messagesLock.RLock() defer btr.messagesLock.RUnlock() - messages := make(map[blip.MessageNumber]blip.Message, len(btr.messages)) + messages := make(map[blip.MessageNumber]*blip.Message, len(btr.messages)) for k, v := range btr.messages { // Read the body before copying, since it might be read asynchronously _, _ = v.Body() - messages[k] = *v + messages[k] = v } return messages diff --git a/rest/revocation_test.go b/rest/revocation_test.go index 25167cafeb..0e0d087737 100644 --- a/rest/revocation_test.go +++ b/rest/revocation_test.go @@ -18,7 +18,6 @@ import ( "testing" "time" - "github.com/couchbase/go-blip" "github.com/couchbase/sync_gateway/auth" "github.com/couchbase/sync_gateway/base" "github.com/couchbase/sync_gateway/channels" @@ -2372,25 +2371,9 @@ func TestRevocationNoRev(t *testing.T) { _ = btcRunner.WaitForVersion(btc.id, waitMarkerID, waitMarkerVersion) - messages := btc.pullReplication.GetMessages() - - var highestMsgSeq uint32 - var highestSeqMsg blip.Message - // Grab most recent changes message - for _, message := range messages { - messageBody, err := message.Body() - require.NoError(t, err) - if message.Properties["Profile"] == db.MessageChanges && string(messageBody) != "null" { - if highestMsgSeq < uint32(message.SerialNumber()) { - highestMsgSeq = uint32(message.SerialNumber()) - highestSeqMsg = message - } - } - } - + changesMsg := btc.getMostRecentChangesMessage() var messageBody []interface{} - err := highestSeqMsg.ReadJSONBody(&messageBody) - require.NoError(t, err) + require.NoError(t, changesMsg.ReadJSONBody(&messageBody)) require.Len(t, messageBody, 2) require.Len(t, messageBody[0], 4)