diff --git a/client/db.go b/client/db.go index 4838773dde..30f123d286 100644 --- a/client/db.go +++ b/client/db.go @@ -60,7 +60,7 @@ type DB interface { // Peerstore returns the peerstore where known host information is stored. // // It sits within the rootstore returned by [Root]. - Peerstore() datastore.DSBatching + Peerstore() datastore.DSReaderWriter // Headstore returns the headstore where the current heads of the database are stored. // diff --git a/client/mocks/db.go b/client/mocks/db.go index 73cf4b3665..024d2ea31c 100644 --- a/client/mocks/db.go +++ b/client/mocks/db.go @@ -1564,19 +1564,19 @@ func (_c *DB_PeerInfo_Call) RunAndReturn(run func() peer.AddrInfo) *DB_PeerInfo_ } // Peerstore provides a mock function with given fields: -func (_m *DB) Peerstore() datastore.DSBatching { +func (_m *DB) Peerstore() datastore.DSReaderWriter { ret := _m.Called() if len(ret) == 0 { panic("no return value specified for Peerstore") } - var r0 datastore.DSBatching - if rf, ok := ret.Get(0).(func() datastore.DSBatching); ok { + var r0 datastore.DSReaderWriter + if rf, ok := ret.Get(0).(func() datastore.DSReaderWriter); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(datastore.DSBatching) + r0 = ret.Get(0).(datastore.DSReaderWriter) } } @@ -1600,12 +1600,12 @@ func (_c *DB_Peerstore_Call) Run(run func()) *DB_Peerstore_Call { return _c } -func (_c *DB_Peerstore_Call) Return(_a0 datastore.DSBatching) *DB_Peerstore_Call { +func (_c *DB_Peerstore_Call) Return(_a0 datastore.DSReaderWriter) *DB_Peerstore_Call { _c.Call.Return(_a0) return _c } -func (_c *DB_Peerstore_Call) RunAndReturn(run func() datastore.DSBatching) *DB_Peerstore_Call { +func (_c *DB_Peerstore_Call) RunAndReturn(run func() datastore.DSReaderWriter) *DB_Peerstore_Call { _c.Call.Return(run) return _c } diff --git a/client/replicator.go b/client/replicator.go index 8df204906f..730d3e2609 100644 --- a/client/replicator.go +++ b/client/replicator.go @@ -10,10 +10,26 @@ package client -import "github.com/libp2p/go-libp2p/core/peer" +import ( + "time" + + "github.com/libp2p/go-libp2p/core/peer" +) // Replicator is a peer that a set of local collections are replicated to. type Replicator struct { - Info peer.AddrInfo - Schemas []string + Info peer.AddrInfo + Schemas []string + Status ReplicatorStatus + LastStatusChange time.Time } + +// ReplicatorStatus is the status of a Replicator. +type ReplicatorStatus uint8 + +const ( + // ReplicatorStatusActive is the status of a Replicator that is actively replicating. + ReplicatorStatusActive ReplicatorStatus = iota + // ReplicatorStatusInactive is the status of a Replicator that is inactive/offline. + ReplicatorStatusInactive +) diff --git a/datastore/mocks/txn.go b/datastore/mocks/txn.go index ea923d5de4..a7bd1c6fd2 100644 --- a/datastore/mocks/txn.go +++ b/datastore/mocks/txn.go @@ -533,19 +533,19 @@ func (_c *Txn_OnSuccessAsync_Call) RunAndReturn(run func(func())) *Txn_OnSuccess } // Peerstore provides a mock function with given fields: -func (_m *Txn) Peerstore() datastore.DSBatching { +func (_m *Txn) Peerstore() datastore.DSReaderWriter { ret := _m.Called() if len(ret) == 0 { panic("no return value specified for Peerstore") } - var r0 datastore.DSBatching - if rf, ok := ret.Get(0).(func() datastore.DSBatching); ok { + var r0 datastore.DSReaderWriter + if rf, ok := ret.Get(0).(func() datastore.DSReaderWriter); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(datastore.DSBatching) + r0 = ret.Get(0).(datastore.DSReaderWriter) } } @@ -569,12 +569,12 @@ func (_c *Txn_Peerstore_Call) Run(run func()) *Txn_Peerstore_Call { return _c } -func (_c *Txn_Peerstore_Call) Return(_a0 datastore.DSBatching) *Txn_Peerstore_Call { +func (_c *Txn_Peerstore_Call) Return(_a0 datastore.DSReaderWriter) *Txn_Peerstore_Call { _c.Call.Return(_a0) return _c } -func (_c *Txn_Peerstore_Call) RunAndReturn(run func() datastore.DSBatching) *Txn_Peerstore_Call { +func (_c *Txn_Peerstore_Call) RunAndReturn(run func() datastore.DSReaderWriter) *Txn_Peerstore_Call { _c.Call.Return(run) return _c } diff --git a/datastore/multi.go b/datastore/multi.go index cbbf80e23f..5a2c934852 100644 --- a/datastore/multi.go +++ b/datastore/multi.go @@ -12,7 +12,6 @@ package datastore import ( ds "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" ) var ( @@ -31,7 +30,7 @@ type multistore struct { data DSReaderWriter enc Blockstore head DSReaderWriter - peer DSBatching + peer DSReaderWriter system DSReaderWriter dag Blockstore } @@ -46,7 +45,7 @@ func MultiStoreFrom(rootstore ds.Datastore) MultiStore { data: prefix(rootRW, dataStoreKey), enc: newBlockstore(prefix(rootRW, encStoreKey)), head: prefix(rootRW, headStoreKey), - peer: namespace.Wrap(rootstore, peerStoreKey), + peer: prefix(rootRW, peerStoreKey), system: prefix(rootRW, systemStoreKey), dag: newBlockstore(prefix(rootRW, blockStoreKey)), } @@ -70,7 +69,7 @@ func (ms multistore) Headstore() DSReaderWriter { } // Peerstore implements MultiStore. -func (ms multistore) Peerstore() DSBatching { +func (ms multistore) Peerstore() DSReaderWriter { return ms.peer } diff --git a/datastore/store.go b/datastore/store.go index 641cd10b1a..7f4048ec90 100644 --- a/datastore/store.go +++ b/datastore/store.go @@ -47,7 +47,7 @@ type MultiStore interface { // Peerstore is a wrapped root DSReaderWriter as a ds.Batching, embedded into a DSBatching // under the /peers namespace - Peerstore() DSBatching + Peerstore() DSReaderWriter // Blockstore is a wrapped root DSReaderWriter as a Blockstore, embedded into a Blockstore // under the /blocks namespace @@ -81,8 +81,3 @@ type IPLDStorage interface { storage.ReadableStorage storage.WritableStorage } - -// DSBatching wraps the Batching interface from go-datastore -type DSBatching interface { - ds.Batching -} diff --git a/event/event.go b/event/event.go index 698cb8dc90..5ae882c6bb 100644 --- a/event/event.go +++ b/event/event.go @@ -35,6 +35,8 @@ const ( PeerInfoName = Name("peer-info") // ReplicatorName is the name of the replicator event. ReplicatorName = Name("replicator") + // ReplicatorFailureName is the name of the replicator failure event. + ReplicatorFailureName = Name("replicator-failure") // P2PTopicCompletedName is the name of the network p2p topic update completed event. P2PTopicCompletedName = Name("p2p-topic-completed") // ReplicatorCompletedName is the name of the replicator completed event. @@ -68,8 +70,12 @@ type Update struct { // also formed this update. Block []byte - // IsCreate is true if this update is the creation of a new document. - IsCreate bool + // IsRetry is true if this update is a retry of a previously failed update. + IsRetry bool + + // Success is a channel that will receive a boolean value indicating if the update was successful. + // It is used during retries. + Success chan bool } // Merge is a notification that a merge can be performed up to the provided CID. @@ -137,3 +143,11 @@ type Replicator struct { // and those collections have documents to be replicated. Docs <-chan Update } + +// ReplicatorFailure is an event that is published when a replicator fails to replicate a document. +type ReplicatorFailure struct { + // PeerID is the id of the peer that failed to replicate the document. + PeerID peer.ID + // DocID is the unique immutable identifier of the document that failed to replicate. + DocID string +} diff --git a/http/client.go b/http/client.go index 777cf4a733..ca43181c3c 100644 --- a/http/client.go +++ b/http/client.go @@ -493,7 +493,7 @@ func (c *Client) Encstore() datastore.Blockstore { panic("client side database") } -func (c *Client) Peerstore() datastore.DSBatching { +func (c *Client) Peerstore() datastore.DSReaderWriter { panic("client side database") } diff --git a/http/client_tx.go b/http/client_tx.go index daacb4128e..4a993d66d9 100644 --- a/http/client_tx.go +++ b/http/client_tx.go @@ -99,7 +99,7 @@ func (c *Transaction) Headstore() datastore.DSReaderWriter { panic("client side transaction") } -func (c *Transaction) Peerstore() datastore.DSBatching { +func (c *Transaction) Peerstore() datastore.DSReaderWriter { panic("client side transaction") } diff --git a/internal/core/key.go b/internal/core/key.go index 60601795b2..883f12a240 100644 --- a/internal/core/key.go +++ b/internal/core/key.go @@ -58,8 +58,10 @@ const ( FIELD_ID_SEQ = "/seq/field" PRIMARY_KEY = "/pk" DATASTORE_DOC_VERSION_FIELD_ID = "v" - REPLICATOR = "/replicator/id" P2P_COLLECTION = "/p2p/collection" + REPLICATOR = "/rep/id" + REPLICATOR_RETRY_ID = "/rep/retry/id" + REPLICATOR_RETRY_DOC = "/rep/retry/doc" ) // Key is an interface that represents a key in the database. @@ -946,3 +948,73 @@ func bytesPrefixEnd(b []byte) []byte { // maximal byte string (i.e. already \xff...). return b } + +type ReplicatorRetryIDKey struct { + PeerID string +} + +var _ Key = (*ReplicatorRetryIDKey)(nil) + +func NewReplicatorRetryIDKey(peerID string) ReplicatorRetryIDKey { + return ReplicatorRetryIDKey{ + PeerID: peerID, + } +} + +func NewReplicatorRetryIDKeyFromString(key string) (ReplicatorRetryIDKey, error) { + keyArr := strings.Split(key, "/") + if len(keyArr) != 5 { + return ReplicatorRetryIDKey{}, errors.WithStack(ErrInvalidKey, errors.NewKV("Key", key)) + } + return NewReplicatorRetryIDKey(keyArr[4]), nil +} + +func (k ReplicatorRetryIDKey) ToString() string { + return REPLICATOR_RETRY_ID + "/" + k.PeerID +} + +func (k ReplicatorRetryIDKey) Bytes() []byte { + return []byte(k.ToString()) +} + +func (k ReplicatorRetryIDKey) ToDS() ds.Key { + return ds.NewKey(k.ToString()) +} + +type ReplicatorRetryDocIDKey struct { + PeerID string + DocID string +} + +var _ Key = (*ReplicatorRetryDocIDKey)(nil) + +func NewReplicatorRetryDocIDKey(peerID, docID string) ReplicatorRetryDocIDKey { + return ReplicatorRetryDocIDKey{ + PeerID: peerID, + DocID: docID, + } +} + +func NewReplicatorRetryDocIDKeyFromString(key string) (ReplicatorRetryDocIDKey, error) { + keyArr := strings.Split(key, "/") + if len(keyArr) != 6 { + return ReplicatorRetryDocIDKey{}, errors.WithStack(ErrInvalidKey, errors.NewKV("Key", key)) + } + return NewReplicatorRetryDocIDKey(keyArr[4], keyArr[5]), nil +} + +func (k ReplicatorRetryDocIDKey) ToString() string { + keyString := REPLICATOR_RETRY_DOC + "/" + k.PeerID + if k.DocID != "" { + keyString += "/" + k.DocID + } + return keyString +} + +func (k ReplicatorRetryDocIDKey) Bytes() []byte { + return []byte(k.ToString()) +} + +func (k ReplicatorRetryDocIDKey) ToDS() ds.Key { + return ds.NewKey(k.ToString()) +} diff --git a/internal/db/collection.go b/internal/db/collection.go index 8f78e51429..04a3efff65 100644 --- a/internal/db/collection.go +++ b/internal/db/collection.go @@ -727,7 +727,6 @@ func (c *collection) save( Cid: link.Cid, SchemaRoot: c.Schema().Root, Block: headNode, - IsCreate: isCreate, } txn.OnSuccess(func() { c.db.events.Publish(event.NewMessage(event.UpdateName, updateEvent)) diff --git a/internal/db/config.go b/internal/db/config.go index 3d69e833c4..f2fc942ae2 100644 --- a/internal/db/config.go +++ b/internal/db/config.go @@ -11,6 +11,8 @@ package db import ( + "time" + "github.com/sourcenetwork/immutable" ) @@ -20,7 +22,24 @@ const ( ) type dbOptions struct { - maxTxnRetries immutable.Option[int] + maxTxnRetries immutable.Option[int] + RetryIntervals []time.Duration +} + +// defaultOptions returns the default db options. +func defaultOptions() *dbOptions { + return &dbOptions{ + RetryIntervals: []time.Duration{ + // exponential backoff retry intervals + time.Second * 30, + time.Minute, + time.Minute * 2, + time.Minute * 4, + time.Minute * 8, + time.Minute * 16, + time.Minute * 32, + }, + } } // Option is a funtion that sets a config value on the db. @@ -32,3 +51,11 @@ func WithMaxRetries(num int) Option { opts.maxTxnRetries = immutable.Some(num) } } + +func WithRetryInterval(interval []time.Duration) Option { + return func(opt *dbOptions) { + if len(interval) > 0 { + opt.RetryIntervals = interval + } + } +} diff --git a/internal/db/config_test.go b/internal/db/config_test.go index a52d494a21..7f73c2e755 100644 --- a/internal/db/config_test.go +++ b/internal/db/config_test.go @@ -12,6 +12,7 @@ package db import ( "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -22,3 +23,9 @@ func TestWithMaxRetries(t *testing.T) { assert.True(t, d.maxTxnRetries.HasValue()) assert.Equal(t, 10, d.maxTxnRetries.Value()) } + +func TestWithRetryInterval(t *testing.T) { + d := dbOptions{} + WithRetryInterval([]time.Duration{time.Minute, time.Hour})(&d) + assert.Equal(t, []time.Duration{time.Minute, time.Hour}, d.RetryIntervals) +} diff --git a/internal/db/db.go b/internal/db/db.go index d5872cef0c..8ab3145586 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -18,6 +18,7 @@ import ( "context" "sync" "sync/atomic" + "time" ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" @@ -80,6 +81,15 @@ type db struct { // The peer ID and network address information for the current node // if network is enabled. The `atomic.Value` should hold a `peer.AddrInfo` struct. peerInfo atomic.Value + + // To be able to close the context passed to NewDB on DB close, + // we need to keep a reference to the cancel function. Otherwise, + // some goroutines might leak. + ctxCancel context.CancelFunc + + retryIntervals []time.Duration + retryChan chan event.ReplicatorFailure + retryDone chan retryStatus } // NewDB creates a new instance of the DB using the given options. @@ -107,20 +117,25 @@ func newDB( return nil, err } - db := &db{ - rootstore: rootstore, - multistore: multistore, - acp: acp, - lensRegistry: lens, - parser: parser, - options: options, - events: event.NewBus(commandBufferSize, eventBufferSize), + opts := defaultOptions() + for _, opt := range options { + opt(opts) } - // apply options - var opts dbOptions - for _, opt := range options { - opt(&opts) + ctx, cancel := context.WithCancel(ctx) + + db := &db{ + rootstore: rootstore, + multistore: multistore, + acp: acp, + lensRegistry: lens, + parser: parser, + options: options, + events: event.NewBus(commandBufferSize, eventBufferSize), + retryChan: make(chan event.ReplicatorFailure, 100), + retryDone: make(chan retryStatus), + retryIntervals: opts.RetryIntervals, + ctxCancel: cancel, } if opts.maxTxnRetries.HasValue() { @@ -136,11 +151,12 @@ func newDB( return nil, err } - sub, err := db.events.Subscribe(event.MergeName, event.PeerInfoName) + sub, err := db.events.Subscribe(event.MergeName, event.PeerInfoName, event.ReplicatorFailureName) if err != nil { return nil, err } go db.handleMessages(ctx, sub) + go db.handleReplicatorRetries(ctx) return db, nil } @@ -173,7 +189,7 @@ func (db *db) Encstore() datastore.Blockstore { } // Peerstore returns the internal DAG store which contains IPLD blocks. -func (db *db) Peerstore() datastore.DSBatching { +func (db *db) Peerstore() datastore.DSReaderWriter { return db.multistore.Peerstore() } @@ -370,6 +386,8 @@ func (db *db) PrintDump(ctx context.Context) error { func (db *db) Close() { log.Info("Closing DefraDB process...") + db.ctxCancel() + db.events.Close() err := db.rootstore.Close() diff --git a/internal/db/errors.go b/internal/db/errors.go index 612d5ddb40..3a77a7b9dd 100644 --- a/internal/db/errors.go +++ b/internal/db/errors.go @@ -148,6 +148,8 @@ var ( ErrSelfReferenceWithoutSelf = errors.New(errSelfReferenceWithoutSelf) ErrColNotMaterialized = errors.New(errColNotMaterialized) ErrMaterializedViewAndACPNotSupported = errors.New(errMaterializedViewAndACPNotSupported) + ErrContextDone = errors.New("context done") + ErrTimeoutDocRetry = errors.New("timeout while retrying doc") ) // NewErrFailedToGetHeads returns a new error indicating that the heads of a document diff --git a/internal/db/messages.go b/internal/db/messages.go index 04b81cd210..11235287a8 100644 --- a/internal/db/messages.go +++ b/internal/db/messages.go @@ -78,6 +78,9 @@ func (db *db) handleMessages(ctx context.Context, sub *event.Subscription) { log.ErrorContextE(ctx, "Failed to load replicators", err) } }) + case event.ReplicatorFailure: + // ReplicatorFailure is a notification that a replicator has failed to replicate a document. + db.retryChan <- evt } } } diff --git a/internal/db/p2p_replicator.go b/internal/db/p2p_replicator.go index b66ab4f2cf..3b94403e1c 100644 --- a/internal/db/p2p_replicator.go +++ b/internal/db/p2p_replicator.go @@ -13,8 +13,10 @@ package db import ( "context" "encoding/json" + "time" - dsq "github.com/ipfs/go-datastore/query" + "github.com/fxamacker/cbor/v2" + "github.com/ipfs/go-datastore/query" "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/corelog" @@ -23,9 +25,15 @@ import ( "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/event" "github.com/sourcenetwork/defradb/internal/core" + coreblock "github.com/sourcenetwork/defradb/internal/core/block" "github.com/sourcenetwork/defradb/internal/merkle/clock" ) +const ( + retryLoopInterval = 2 * time.Second + retryTimeout = 10 * time.Second +) + func (db *db) SetReplicator(ctx context.Context, rep client.Replicator) error { txn, err := db.NewTxn(ctx, false) if err != nil { @@ -50,12 +58,12 @@ func (db *db) SetReplicator(ctx context.Context, rep client.Replicator) error { storedRep := client.Replicator{} storedSchemas := make(map[string]struct{}) repKey := core.NewReplicatorKey(rep.Info.ID.String()) - hasOldRep, err := txn.Systemstore().Has(ctx, repKey.ToDS()) + hasOldRep, err := txn.Peerstore().Has(ctx, repKey.ToDS()) if err != nil { return err } if hasOldRep { - repBytes, err := txn.Systemstore().Get(ctx, repKey.ToDS()) + repBytes, err := txn.Peerstore().Get(ctx, repKey.ToDS()) if err != nil { return err } @@ -68,6 +76,7 @@ func (db *db) SetReplicator(ctx context.Context, rep client.Replicator) error { } } else { storedRep.Info = rep.Info + storedRep.LastStatusChange = time.Now() } var collections []client.Collection @@ -113,7 +122,7 @@ func (db *db) SetReplicator(ctx context.Context, rep client.Replicator) error { return err } - err = txn.Systemstore().Put(ctx, repKey.ToDS(), newRepBytes) + err = txn.Peerstore().Put(ctx, repKey.ToDS(), newRepBytes) if err != nil { return err } @@ -214,14 +223,14 @@ func (db *db) DeleteReplicator(ctx context.Context, rep client.Replicator) error storedRep := client.Replicator{} storedSchemas := make(map[string]struct{}) repKey := core.NewReplicatorKey(rep.Info.ID.String()) - hasOldRep, err := txn.Systemstore().Has(ctx, repKey.ToDS()) + hasOldRep, err := txn.Peerstore().Has(ctx, repKey.ToDS()) if err != nil { return err } if !hasOldRep { return ErrReplicatorNotFound } - repBytes, err := txn.Systemstore().Get(ctx, repKey.ToDS()) + repBytes, err := txn.Peerstore().Get(ctx, repKey.ToDS()) if err != nil { return err } @@ -245,7 +254,7 @@ func (db *db) DeleteReplicator(ctx context.Context, rep client.Replicator) error } // make sure the replicator exists in the datastore key := core.NewReplicatorKey(rep.Info.ID.String()) - _, err = txn.Systemstore().Get(ctx, key.ToDS()) + _, err = txn.Peerstore().Get(ctx, key.ToDS()) if err != nil { return err } @@ -265,7 +274,7 @@ func (db *db) DeleteReplicator(ctx context.Context, rep client.Replicator) error // Persist the replicator to the store, deleting it if no schemas remain key := core.NewReplicatorKey(rep.Info.ID.String()) if len(rep.Schemas) == 0 { - err := txn.Systemstore().Delete(ctx, key.ToDS()) + err := txn.Peerstore().Delete(ctx, key.ToDS()) if err != nil { return err } @@ -274,7 +283,7 @@ func (db *db) DeleteReplicator(ctx context.Context, rep client.Replicator) error if err != nil { return err } - err = txn.Systemstore().Put(ctx, key.ToDS(), repBytes) + err = txn.Peerstore().Put(ctx, key.ToDS(), repBytes) if err != nil { return err } @@ -298,10 +307,10 @@ func (db *db) GetAllReplicators(ctx context.Context) ([]client.Replicator, error defer txn.Discard(ctx) // create collection system prefix query - query := dsq.Query{ + query := query.Query{ Prefix: core.NewReplicatorKey("").ToString(), } - results, err := txn.Systemstore().Query(ctx, query) + results, err := txn.Peerstore().Query(ctx, query) if err != nil { return nil, err } @@ -335,3 +344,377 @@ func (db *db) loadAndPublishReplicators(ctx context.Context) error { } return nil } + +// retryStatus is used to communicate if the retry was successful or not. +type retryStatus struct { + PeerID string + Success bool +} + +// handleReplicatorRetries manages retries for failed replication attempts. +func (db *db) handleReplicatorRetries(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case r := <-db.retryChan: + err := db.handleReplicatorFailure(ctx, r) + if err != nil { + log.ErrorContextE(ctx, "Failed to handle replicator failure", err) + } + + case r := <-db.retryDone: + err := db.handleCompletedReplicatorRetry(ctx, r) + if err != nil { + log.ErrorContextE(ctx, "Failed to handle completed replicator retry", err) + } + + case <-time.After(retryLoopInterval): + db.retryReplicators(ctx) + } + } +} + +func (db *db) handleReplicatorFailure(ctx context.Context, r event.ReplicatorFailure) error { + err := db.updateReplicatorStatus(ctx, r.PeerID.String(), false) + if err != nil { + return err + } + err = db.createIfNotExistsReplicatorRetry(ctx, r.PeerID.String()) + if err != nil { + return err + } + docIDKey := core.NewReplicatorRetryDocIDKey(r.PeerID.String(), r.DocID) + err = db.Peerstore().Put(ctx, docIDKey.ToDS(), []byte{}) + if err != nil { + return err + } + return nil +} + +func (db *db) handleCompletedReplicatorRetry(ctx context.Context, r retryStatus) error { + if r.Success { + done, err := db.deleteReplicatorRetryIfNoMoreDocs(ctx, r.PeerID) + if err != nil { + return err + } + if done { + err := db.updateReplicatorStatus(ctx, r.PeerID, true) + if err != nil { + return err + } + } + } else { + err := db.setReplicatorNextRetry(ctx, r.PeerID) + if err != nil { + return err + } + } + return nil +} + +// updateReplicatorStatus updates the status of a replicator in the peerstore. +func (db *db) updateReplicatorStatus(ctx context.Context, peerID string, active bool) error { + key := core.NewReplicatorKey(peerID) + repBytes, err := db.Peerstore().Get(ctx, key.ToDS()) + if err != nil { + return err + } + rep := client.Replicator{} + err = json.Unmarshal(repBytes, &rep) + if err != nil { + return err + } + switch active { + case true: + rep.Status = client.ReplicatorStatusActive + if rep.Status == client.ReplicatorStatusInactive { + rep.LastStatusChange = time.Time{} + } + case false: + rep.Status = client.ReplicatorStatusInactive + if rep.Status == client.ReplicatorStatusActive { + rep.LastStatusChange = time.Now() + } + } + b, err := json.Marshal(rep) + if err != nil { + return err + } + return db.Peerstore().Put(ctx, key.ToDS(), b) +} + +type retryInfo struct { + NextRetry time.Time + NumRetries int + Retrying bool +} + +func (db *db) createIfNotExistsReplicatorRetry(ctx context.Context, peerID string) error { + key := core.NewReplicatorRetryIDKey(peerID) + exists, err := db.Peerstore().Has(ctx, key.ToDS()) + if err != nil { + return err + } + if !exists { + r := retryInfo{ + NextRetry: time.Now().Add(db.retryIntervals[0]), + NumRetries: 0, + } + b, err := cbor.Marshal(r) + if err != nil { + return err + } + err = db.Peerstore().Put(ctx, key.ToDS(), b) + if err != nil { + return err + } + return nil + } + return nil +} + +func (db *db) retryReplicators(ctx context.Context) { + q := query.Query{ + Prefix: core.REPLICATOR_RETRY_ID, + } + results, err := db.Peerstore().Query(ctx, q) + if err != nil { + log.ErrorContextE(ctx, "Failed to query replicator retries", err) + return + } + defer closeQueryResults(results) + now := time.Now() + for result := range results.Next() { + rInfo := retryInfo{} + err = cbor.Unmarshal(result.Value, &rInfo) + if err != nil { + log.ErrorContextE(ctx, "Failed to unmarshal replicator retry info", err) + continue + } + // If the next retry time has passed and the replicator is not already retrying. + if now.After(rInfo.NextRetry) && !rInfo.Retrying { + key, err := core.NewReplicatorRetryIDKeyFromString(result.Key) + if err != nil { + log.ErrorContextE(ctx, "Failed to parse replicator retry ID key", err) + continue + } + + // The replicator might have been deleted by the time we reach this point. + // If it no longer exists, we delete the retry key and all retry docs. + exists, err := db.Peerstore().Has(ctx, core.NewReplicatorKey(key.PeerID).ToDS()) + if err != nil { + log.ErrorContextE(ctx, "Failed to check if replicator exists", err) + continue + } + if !exists { + err = db.deleteReplicatorRetryAndDocs(ctx, key.PeerID) + if err != nil { + log.ErrorContextE(ctx, "Failed to delete replicator retry and docs", err) + } + continue + } + + err = db.setReplicatorAsRetrying(ctx, key, rInfo) + if err != nil { + log.ErrorContextE(ctx, "Failed to set replicator as retrying", err) + continue + } + go db.retryReplicator(ctx, key.PeerID) + } + } +} + +func (db *db) setReplicatorAsRetrying(ctx context.Context, key core.ReplicatorRetryIDKey, rInfo retryInfo) error { + rInfo.Retrying = true + rInfo.NumRetries++ + b, err := cbor.Marshal(rInfo) + if err != nil { + return err + } + return db.Peerstore().Put(ctx, key.ToDS(), b) +} + +func (db *db) setReplicatorNextRetry(ctx context.Context, peerID string) error { + key := core.NewReplicatorRetryIDKey(peerID) + b, err := db.Peerstore().Get(ctx, key.ToDS()) + if err != nil { + return err + } + rInfo := retryInfo{} + err = cbor.Unmarshal(b, &rInfo) + if err != nil { + return err + } + if rInfo.NumRetries >= len(db.retryIntervals) { + rInfo.NextRetry = time.Now().Add(db.retryIntervals[len(db.retryIntervals)-1]) + } else { + rInfo.NextRetry = time.Now().Add(db.retryIntervals[rInfo.NumRetries]) + } + rInfo.Retrying = false + b, err = cbor.Marshal(rInfo) + if err != nil { + return err + } + return db.Peerstore().Put(ctx, key.ToDS(), b) +} + +func (db *db) retryReplicator(ctx context.Context, peerID string) { + log.InfoContext(ctx, "Retrying replicator", corelog.String("PeerID", peerID)) + key := core.NewReplicatorRetryDocIDKey(peerID, "") + q := query.Query{ + Prefix: key.ToString(), + } + results, err := db.Peerstore().Query(ctx, q) + if err != nil { + log.ErrorContextE(ctx, "Failed to query retry docs", err) + return + } + defer closeQueryResults(results) + for result := range results.Next() { + select { + case <-ctx.Done(): + return + default: + } + key, err := core.NewReplicatorRetryDocIDKeyFromString(result.Key) + if err != nil { + log.ErrorContextE(ctx, "Failed to parse retry doc key", err) + continue + } + err = db.retryDoc(ctx, key.DocID) + if err != nil { + log.ErrorContextE(ctx, "Failed to retry doc", err) + db.retryDone <- retryStatus{ + PeerID: peerID, + Success: false, + } + // if one doc fails, stop retrying the rest and just wait for the next retry + return + } + err = db.Peerstore().Delete(ctx, key.ToDS()) + if err != nil { + log.ErrorContextE(ctx, "Failed to delete retry docID", err) + } + } + db.retryDone <- retryStatus{ + PeerID: peerID, + Success: true, + } +} + +func (db *db) retryDoc(ctx context.Context, docID string) error { + ctx, txn, err := ensureContextTxn(ctx, db, false) + if err != nil { + return err + } + defer txn.Discard(ctx) + headStoreKey := core.HeadStoreKey{ + DocID: docID, + FieldID: core.COMPOSITE_NAMESPACE, + } + headset := clock.NewHeadSet(txn.Headstore(), headStoreKey) + cids, _, err := headset.List(ctx) + if err != nil { + return err + } + + for _, c := range cids { + select { + case <-ctx.Done(): + return ErrContextDone + default: + } + rawblk, err := db.Blockstore().Get(ctx, c) + if err != nil { + return err + } + blk, err := coreblock.GetFromBytes(rawblk.RawData()) + if err != nil { + return err + } + schema, err := db.getSchemaByVersionID(ctx, blk.Delta.GetSchemaVersionID()) + if err != nil { + return err + } + successChan := make(chan bool) + updateEvent := event.Update{ + DocID: docID, + Cid: c, + SchemaRoot: schema.Root, + Block: rawblk.RawData(), + IsRetry: true, + Success: successChan, + } + db.events.Publish(event.NewMessage(event.UpdateName, updateEvent)) + + select { + case success := <-successChan: + if !success { + return errors.New("pushlog failed") + } + case <-time.After(retryTimeout): + return ErrTimeoutDocRetry + } + } + return nil +} + +// deleteReplicatorRetryIfNoMoreDocs deletes the replicator retry key if there are no more docs to retry. +// It returns true if there are no more docs to retry, false otherwise. +func (db *db) deleteReplicatorRetryIfNoMoreDocs(ctx context.Context, peerID string) (bool, error) { + key := core.NewReplicatorRetryDocIDKey(peerID, "") + q := query.Query{ + Prefix: key.ToString(), + KeysOnly: true, + } + results, err := db.Peerstore().Query(ctx, q) + if err != nil { + return false, err + } + defer closeQueryResults(results) + entries, err := results.Rest() + if err != nil { + return false, err + } + if len(entries) == 0 { + key := core.NewReplicatorRetryIDKey(peerID) + return true, db.Peerstore().Delete(ctx, key.ToDS()) + } + // If we there are still docs to retry, we run the retry right away. + go db.retryReplicator(ctx, peerID) + return false, nil +} + +// deleteReplicatorRetryAndDocs deletes the replicator retry and all retry docs. +func (db *db) deleteReplicatorRetryAndDocs(ctx context.Context, peerID string) error { + key := core.NewReplicatorRetryIDKey(peerID) + err := db.Peerstore().Delete(ctx, key.ToDS()) + if err != nil { + return err + } + docKey := core.NewReplicatorRetryDocIDKey(peerID, "") + q := query.Query{ + Prefix: docKey.ToString(), + KeysOnly: true, + } + results, err := db.Peerstore().Query(ctx, q) + if err != nil { + return err + } + defer closeQueryResults(results) + for result := range results.Next() { + err = db.Peerstore().Delete(ctx, core.NewReplicatorRetryDocIDKey(peerID, result.Key).ToDS()) + if err != nil { + return err + } + } + return nil +} + +func closeQueryResults(results query.Results) { + err := results.Close() + if err != nil { + log.ErrorE("Failed to close query results", err) + } +} diff --git a/net/client.go b/net/client.go index 35c1de139d..e2c31e0746 100644 --- a/net/client.go +++ b/net/client.go @@ -30,7 +30,20 @@ var ( // pushLog creates a pushLog request and sends it to another node // over libp2p grpc connection -func (s *server) pushLog(evt event.Update, pid peer.ID) error { +func (s *server) pushLog(evt event.Update, pid peer.ID) (err error) { + defer func() { + if err != nil && !evt.IsRetry { + s.peer.bus.Publish(event.NewMessage(event.ReplicatorFailureName, event.ReplicatorFailure{ + DocID: evt.DocID, + PeerID: pid, + })) + } + // Success is not nil when the pushLog is called from a retry + if evt.Success != nil { + evt.Success <- err == nil + } + }() + client, err := s.dial(pid) // grpc dial over P2P stream if err != nil { return NewErrPushLog(err) diff --git a/net/client_test.go b/net/client_test.go index 629b176605..410ced82a7 100644 --- a/net/client_test.go +++ b/net/client_test.go @@ -95,7 +95,7 @@ func TestPushlogWithInvalidPeerID(t *testing.T) { require.Contains(t, err.Error(), "failed to parse peer ID") } -func TestPushlogW_WithValidPeerID_NoError(t *testing.T) { +func TestPushlog_WithValidPeerID_NoError(t *testing.T) { ctx := context.Background() db1, p1 := newTestPeer(ctx, t) defer db1.Close() diff --git a/net/peer.go b/net/peer.go index 7b855a1ca2..2218865ccd 100644 --- a/net/peer.go +++ b/net/peer.go @@ -22,7 +22,6 @@ import ( "github.com/ipfs/boxo/blockservice" "github.com/ipfs/boxo/bootstrap" blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" gostream "github.com/libp2p/go-libp2p-gostream" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" @@ -237,13 +236,7 @@ func (p *Peer) handleMessageLoop() { switch evt := msg.Data.(type) { case event.Update: - var err error - if evt.IsCreate { - err = p.handleDocCreateLog(evt) - } else { - err = p.handleDocUpdateLog(evt) - } - + err := p.handleLog(evt) if err != nil { log.ErrorE("Error while handling broadcast log", err) } @@ -253,6 +246,7 @@ func (p *Peer) handleMessageLoop() { case event.Replicator: p.server.updateReplicators(evt) + default: // ignore other events continue @@ -260,77 +254,31 @@ func (p *Peer) handleMessageLoop() { } } -// RegisterNewDocument registers a new document with the peer node. -func (p *Peer) RegisterNewDocument( - ctx context.Context, - docID client.DocID, - c cid.Cid, - rawBlock []byte, - schemaRoot string, -) error { - // register topic - err := p.server.addPubSubTopic(docID.String(), !p.server.hasPubSubTopic(schemaRoot), nil) - if err != nil { - log.ErrorE( - "Failed to create new pubsub topic", - err, - corelog.String("DocID", docID.String()), - ) - return err - } - - req := &pushLogRequest{ - DocID: docID.String(), - CID: c.Bytes(), - SchemaRoot: schemaRoot, - Creator: p.host.ID().String(), - Block: rawBlock, - } - - return p.server.publishLog(ctx, schemaRoot, req) -} - -func (p *Peer) handleDocCreateLog(evt event.Update) error { - docID, err := client.NewDocIDFromString(evt.DocID) +func (p *Peer) handleLog(evt event.Update) error { + _, err := client.NewDocIDFromString(evt.DocID) if err != nil { return NewErrFailedToGetDocID(err) } - // We need to register the document before pushing to the replicators if we want to - // ensure that we have subscribed to the topic. - err = p.RegisterNewDocument(p.ctx, docID, evt.Cid, evt.Block, evt.SchemaRoot) - if err != nil { - return err - } - // push to each peer (replicator) - p.pushLogToReplicators(evt) - - return nil -} - -func (p *Peer) handleDocUpdateLog(evt event.Update) error { // push to each peer (replicator) p.pushLogToReplicators(evt) - _, err := client.NewDocIDFromString(evt.DocID) - if err != nil { - return NewErrFailedToGetDocID(err) - } - - req := &pushLogRequest{ - DocID: evt.DocID, - CID: evt.Cid.Bytes(), - SchemaRoot: evt.SchemaRoot, - Creator: p.host.ID().String(), - Block: evt.Block, - } + if !evt.IsRetry { + req := &pushLogRequest{ + DocID: evt.DocID, + CID: evt.Cid.Bytes(), + SchemaRoot: evt.SchemaRoot, + Creator: p.host.ID().String(), + Block: evt.Block, + } - if err := p.server.publishLog(p.ctx, evt.DocID, req); err != nil { - return NewErrPublishingToDocIDTopic(err, evt.Cid.String(), evt.DocID) - } + if err := p.server.publishLog(p.ctx, evt.DocID, req); err != nil { + return NewErrPublishingToDocIDTopic(err, evt.Cid.String(), evt.DocID) + } - if err := p.server.publishLog(p.ctx, evt.SchemaRoot, req); err != nil { - return NewErrPublishingToSchemaTopic(err, evt.Cid.String(), evt.SchemaRoot) + if err := p.server.publishLog(p.ctx, evt.SchemaRoot, req); err != nil { + return NewErrPublishingToSchemaTopic(err, evt.Cid.String(), evt.SchemaRoot) + } } return nil @@ -344,26 +292,12 @@ func (p *Peer) pushLogToReplicators(lg event.Update) { log.ErrorE("Failed to notify new blocks", err) } - // push to each peer (replicator) - peers := make(map[string]struct{}) - for _, peer := range p.ps.ListPeers(lg.DocID) { - peers[peer.String()] = struct{}{} - } - for _, peer := range p.ps.ListPeers(lg.SchemaRoot) { - peers[peer.String()] = struct{}{} - } - p.server.mu.Lock() reps, exists := p.server.replicators[lg.SchemaRoot] p.server.mu.Unlock() if exists { for pid := range reps { - // Don't push if pid is in the list of peers for the topic. - // It will be handled by the pubsub system. - if _, ok := peers[pid.String()]; ok { - continue - } go func(peerID peer.ID) { if err := p.server.pushLog(lg, peerID); err != nil { log.ErrorE( diff --git a/net/peer_test.go b/net/peer_test.go index 10af3a3ab4..40249192ea 100644 --- a/net/peer_test.go +++ b/net/peer_test.go @@ -13,6 +13,7 @@ package net import ( "context" "testing" + "time" "github.com/ipfs/go-cid" mh "github.com/multiformats/go-multihash" @@ -67,7 +68,13 @@ func newTestPeer(ctx context.Context, t *testing.T) (client.DB, *Peer) { store := memory.NewDatastore(ctx) acpLocal := acp.NewLocalACP() acpLocal.Init(context.Background(), "") - db, err := db.NewDB(ctx, store, immutable.Some[acp.ACP](acpLocal), nil) + db, err := db.NewDB( + ctx, + store, + immutable.Some[acp.ACP](acpLocal), + nil, + db.WithRetryInterval([]time.Duration{time.Second}), + ) require.NoError(t, err) n, err := NewPeer( @@ -134,60 +141,7 @@ func TestStart_WithKnownPeer_NoError(t *testing.T) { require.NoError(t, err) } -func TestRegisterNewDocument_NoError(t *testing.T) { - ctx := context.Background() - db, p := newTestPeer(ctx, t) - defer db.Close() - defer p.Close() - - _, err := db.AddSchema(ctx, `type User { - name: String - age: Int - }`) - require.NoError(t, err) - - col, err := db.GetCollectionByName(ctx, "User") - require.NoError(t, err) - - doc, err := client.NewDocFromJSON([]byte(`{"name": "John", "age": 30}`), col.Definition()) - require.NoError(t, err) - - cid, err := createCID(doc) - require.NoError(t, err) - - err = p.RegisterNewDocument(ctx, doc.ID(), cid, emptyBlock(), col.SchemaRoot()) - require.NoError(t, err) -} - -func TestRegisterNewDocument_RPCTopicAlreadyRegisteredError(t *testing.T) { - ctx := context.Background() - db, p := newTestPeer(ctx, t) - defer db.Close() - defer p.Close() - - _, err := db.AddSchema(ctx, `type User { - name: String - age: Int - }`) - require.NoError(t, err) - - col, err := db.GetCollectionByName(ctx, "User") - require.NoError(t, err) - - doc, err := client.NewDocFromJSON([]byte(`{"name": "John", "age": 30}`), col.Definition()) - require.NoError(t, err) - - _, err = rpc.NewTopic(ctx, p.ps, p.host.ID(), doc.ID().String(), true) - require.NoError(t, err) - - cid, err := createCID(doc) - require.NoError(t, err) - - err = p.RegisterNewDocument(ctx, doc.ID(), cid, emptyBlock(), col.SchemaRoot()) - require.Equal(t, err.Error(), "creating topic: joining topic: topic already exists") -} - -func TestHandleDocCreateLog_NoError(t *testing.T) { +func TestHandleLog_NoError(t *testing.T) { ctx := context.Background() db, p := newTestPeer(ctx, t) defer db.Close() @@ -214,7 +168,7 @@ func TestHandleDocCreateLog_NoError(t *testing.T) { b, err := db.Blockstore().AsIPLDStorage().Get(ctx, headCID.KeyString()) require.NoError(t, err) - err = p.handleDocCreateLog(event.Update{ + err = p.handleLog(event.Update{ DocID: doc.ID().String(), Cid: headCID, SchemaRoot: col.SchemaRoot(), @@ -223,88 +177,19 @@ func TestHandleDocCreateLog_NoError(t *testing.T) { require.NoError(t, err) } -func TestHandleDocCreateLog_WithInvalidDocID_NoError(t *testing.T) { - ctx := context.Background() - db, p := newTestPeer(ctx, t) - defer db.Close() - defer p.Close() - - err := p.handleDocCreateLog(event.Update{ - DocID: "some-invalid-key", - }) - require.ErrorContains(t, err, "failed to get DocID from broadcast message: selected encoding not supported") -} - -func TestHandleDocCreateLog_WithExistingTopic_TopicExistsError(t *testing.T) { - ctx := context.Background() - db, p := newTestPeer(ctx, t) - defer db.Close() - defer p.Close() - - _, err := db.AddSchema(ctx, `type User { - name: String - age: Int - }`) - require.NoError(t, err) - - col, err := db.GetCollectionByName(ctx, "User") - require.NoError(t, err) - - doc, err := client.NewDocFromJSON([]byte(`{"name": "John", "age": 30}`), col.Definition()) - require.NoError(t, err) - - _, err = rpc.NewTopic(ctx, p.ps, p.host.ID(), "bae-7fca96a2-5f01-5558-a81f-09b47587f26d", true) - require.NoError(t, err) - - err = p.handleDocCreateLog(event.Update{ - DocID: doc.ID().String(), - SchemaRoot: col.SchemaRoot(), - }) - require.ErrorContains(t, err, "topic already exists") -} - -func TestHandleDocUpdateLog_NoError(t *testing.T) { - ctx := context.Background() - db, p := newTestPeer(ctx, t) - defer db.Close() - defer p.Close() - - _, err := db.AddSchema(ctx, `type User { - name: String - age: Int - }`) - require.NoError(t, err) - - col, err := db.GetCollectionByName(ctx, "User") - require.NoError(t, err) - - doc, err := client.NewDocFromJSON([]byte(`{"name": "John", "age": 30}`), col.Definition()) - require.NoError(t, err) - - cid, err := createCID(doc) - require.NoError(t, err) - - err = p.handleDocUpdateLog(event.Update{ - DocID: doc.ID().String(), - Cid: cid, - SchemaRoot: col.SchemaRoot(), - }) - require.NoError(t, err) -} - -func TestHandleDoUpdateLog_WithInvalidDocID_NoError(t *testing.T) { +func TestHandleLog_WithInvalidDocID_NoError(t *testing.T) { ctx := context.Background() db, p := newTestPeer(ctx, t) defer db.Close() defer p.Close() - err := p.handleDocUpdateLog(event.Update{ + err := p.handleLog(event.Update{ DocID: "some-invalid-key", }) require.ErrorContains(t, err, "failed to get DocID from broadcast message: selected encoding not supported") } -func TestHandleDocUpdateLog_WithExistingDocIDTopic_TopicExistsError(t *testing.T) { +func TestHandleLog_WithExistingTopic_TopicExistsError(t *testing.T) { ctx := context.Background() db, p := newTestPeer(ctx, t) defer db.Close() @@ -322,21 +207,17 @@ func TestHandleDocUpdateLog_WithExistingDocIDTopic_TopicExistsError(t *testing.T doc, err := client.NewDocFromJSON([]byte(`{"name": "John", "age": 30}`), col.Definition()) require.NoError(t, err) - cid, err := createCID(doc) - require.NoError(t, err) - _, err = rpc.NewTopic(ctx, p.ps, p.host.ID(), "bae-7fca96a2-5f01-5558-a81f-09b47587f26d", true) require.NoError(t, err) - err = p.handleDocUpdateLog(event.Update{ + err = p.handleLog(event.Update{ DocID: doc.ID().String(), - Cid: cid, SchemaRoot: col.SchemaRoot(), }) require.ErrorContains(t, err, "topic already exists") } -func TestHandleDocUpdateLog_WithExistingSchemaTopic_TopicExistsError(t *testing.T) { +func TestHandleLog_WithExistingSchemaTopic_TopicExistsError(t *testing.T) { ctx := context.Background() db, p := newTestPeer(ctx, t) defer db.Close() @@ -360,7 +241,7 @@ func TestHandleDocUpdateLog_WithExistingSchemaTopic_TopicExistsError(t *testing. _, err = rpc.NewTopic(ctx, p.ps, p.host.ID(), "bafkreia7ljiy5oief4dp5xsk7t7zlgfjzqh3537hw7rtttjzchybfxtn4u", true) require.NoError(t, err) - err = p.handleDocUpdateLog(event.Update{ + err = p.handleLog(event.Update{ DocID: doc.ID().String(), Cid: cid, SchemaRoot: col.SchemaRoot(), diff --git a/net/server.go b/net/server.go index 2e4939c77f..74cc16f22f 100644 --- a/net/server.go +++ b/net/server.go @@ -266,10 +266,6 @@ func (s *server) removeAllPubsubTopics() error { // publishLog publishes the given PushLogRequest object on the PubSub network via the // corresponding topic func (s *server) publishLog(ctx context.Context, topic string, req *pushLogRequest) error { - log.InfoContext(ctx, "Publish log", - corelog.String("PeerID", s.peer.PeerID().String()), - corelog.String("Topic", topic)) - if s.peer.ps == nil { // skip if we aren't running with a pubsub net return nil } @@ -277,13 +273,21 @@ func (s *server) publishLog(ctx context.Context, topic string, req *pushLogReque t, ok := s.topics[topic] s.mu.Unlock() if !ok { - err := s.addPubSubTopic(topic, false, nil) + subscribe := true + if topic != req.SchemaRoot && s.hasPubSubTopic(req.SchemaRoot) { + subscribe = false + } + err := s.addPubSubTopic(topic, subscribe, nil) if err != nil { return errors.Wrap(fmt.Sprintf("failed to created single use topic %s", topic), err) } return s.publishLog(ctx, topic, req) } + log.InfoContext(ctx, "Publish log", + corelog.String("PeerID", s.peer.PeerID().String()), + corelog.String("Topic", topic)) + data, err := cbor.Marshal(req) if err != nil { return errors.Wrap("failed to marshal pubsub message", err) diff --git a/tests/bench/query/planner/utils.go b/tests/bench/query/planner/utils.go index b9e077867b..0ab739ac20 100644 --- a/tests/bench/query/planner/utils.go +++ b/tests/bench/query/planner/utils.go @@ -137,7 +137,7 @@ func (*dummyTxn) Rootstore() datastore.DSReaderWriter { return nil } func (*dummyTxn) Datastore() datastore.DSReaderWriter { return nil } func (*dummyTxn) Encstore() datastore.Blockstore { return nil } func (*dummyTxn) Headstore() datastore.DSReaderWriter { return nil } -func (*dummyTxn) Peerstore() datastore.DSBatching { return nil } +func (*dummyTxn) Peerstore() datastore.DSReaderWriter { return nil } func (*dummyTxn) Blockstore() datastore.Blockstore { return nil } func (*dummyTxn) Systemstore() datastore.DSReaderWriter { return nil } func (*dummyTxn) Commit(ctx context.Context) error { return nil } diff --git a/tests/clients/cli/wrapper.go b/tests/clients/cli/wrapper.go index b3261f09a8..6983aa1797 100644 --- a/tests/clients/cli/wrapper.go +++ b/tests/clients/cli/wrapper.go @@ -531,7 +531,7 @@ func (w *Wrapper) Headstore() ds.Read { return w.node.DB.Headstore() } -func (w *Wrapper) Peerstore() datastore.DSBatching { +func (w *Wrapper) Peerstore() datastore.DSReaderWriter { return w.node.DB.Peerstore() } diff --git a/tests/clients/cli/wrapper_tx.go b/tests/clients/cli/wrapper_tx.go index e3bf41d818..d4b0f244a5 100644 --- a/tests/clients/cli/wrapper_tx.go +++ b/tests/clients/cli/wrapper_tx.go @@ -83,7 +83,7 @@ func (w *Transaction) Headstore() datastore.DSReaderWriter { return w.tx.Headstore() } -func (w *Transaction) Peerstore() datastore.DSBatching { +func (w *Transaction) Peerstore() datastore.DSReaderWriter { return w.tx.Peerstore() } diff --git a/tests/clients/http/wrapper.go b/tests/clients/http/wrapper.go index ae6cd61529..35a386e18a 100644 --- a/tests/clients/http/wrapper.go +++ b/tests/clients/http/wrapper.go @@ -252,7 +252,7 @@ func (w *Wrapper) Headstore() ds.Read { return w.node.DB.Headstore() } -func (w *Wrapper) Peerstore() datastore.DSBatching { +func (w *Wrapper) Peerstore() datastore.DSReaderWriter { return w.node.DB.Peerstore() } diff --git a/tests/clients/http/wrapper_tx.go b/tests/clients/http/wrapper_tx.go index baf841871a..8778dcdd7d 100644 --- a/tests/clients/http/wrapper_tx.go +++ b/tests/clients/http/wrapper_tx.go @@ -77,7 +77,7 @@ func (w *TxWrapper) Headstore() datastore.DSReaderWriter { return w.server.Headstore() } -func (w *TxWrapper) Peerstore() datastore.DSBatching { +func (w *TxWrapper) Peerstore() datastore.DSReaderWriter { return w.server.Peerstore() } diff --git a/tests/integration/db.go b/tests/integration/db.go index b9c1b3791d..784ff6952f 100644 --- a/tests/integration/db.go +++ b/tests/integration/db.go @@ -37,9 +37,9 @@ const ( ) const ( - badgerIMType DatabaseType = "badger-in-memory" - defraIMType DatabaseType = "defra-memory-datastore" - badgerFileType DatabaseType = "badger-file-system" + BadgerIMType DatabaseType = "badger-in-memory" + DefraIMType DatabaseType = "defra-memory-datastore" + BadgerFileType DatabaseType = "badger-file-system" ) var ( @@ -165,10 +165,10 @@ func setupNode(s *state, opts ...node.Option) (*node.Node, string, error) { var path string switch s.dbt { - case badgerIMType: + case BadgerIMType: opts = append(opts, node.WithBadgerInMemory(true)) - case badgerFileType: + case BadgerFileType: switch { case databaseDir != "": // restarting database @@ -185,7 +185,7 @@ func setupNode(s *state, opts ...node.Option) (*node.Node, string, error) { opts = append(opts, node.WithStorePath(path), node.WithACPPath(path)) - case defraIMType: + case DefraIMType: opts = append(opts, node.WithStoreType(node.MemoryStore)) default: diff --git a/tests/integration/net/simple/replicator/with_create_test.go b/tests/integration/net/simple/replicator/with_create_test.go index 1eab640b83..d8bad791b6 100644 --- a/tests/integration/net/simple/replicator/with_create_test.go +++ b/tests/integration/net/simple/replicator/with_create_test.go @@ -585,3 +585,73 @@ func TestP2POneToOneReplicatorOrderIndependentDirectCreate(t *testing.T) { testUtils.ExecuteTestCase(t, test) } + +func TestP2POneToOneReplicator_ManyDocsWithTargetNodeTemporarilyOffline_ShouldSucceed(t *testing.T) { + test := testUtils.TestCase{ + SupportedDatabaseTypes: immutable.Some( + []testUtils.DatabaseType{ + // This test only supports file type databases since it requires the ability to + // stop and start a node without losing data. + testUtils.BadgerFileType, + }, + ), + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + Name: String + Age: Int + } + `, + }, + testUtils.ConfigureReplicator{ + SourceNodeID: 0, + TargetNodeID: 1, + }, + testUtils.Close{ + NodeID: immutable.Some(1), + }, + testUtils.CreateDoc{ + // Create John on the first (source) node only, and allow the value to sync + NodeID: immutable.Some(0), + Doc: `{ + "Name": "John", + "Age": 21 + }`, + }, + testUtils.CreateDoc{ + // Create Fred on the first (source) node only, and allow the value to sync + NodeID: immutable.Some(0), + Doc: `{ + "Name": "Fred", + "Age": 22 + }`, + }, + testUtils.Start{ + NodeID: immutable.Some(1), + }, + testUtils.WaitForSync{}, + testUtils.Request{ + Request: `query { + Users { + Age + } + }`, + Results: map[string]any{ + "Users": []map[string]any{ + { + "Age": int64(22), + }, + { + "Age": int64(21), + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/test_case.go b/tests/integration/test_case.go index f102294e97..3103d674ca 100644 --- a/tests/integration/test_case.go +++ b/tests/integration/test_case.go @@ -60,6 +60,13 @@ type TestCase struct { // differences between view types, or we need to temporarily document a bug. SupportedViewTypes immutable.Option[[]ViewType] + // If provided a value, SupportedDatabaseTypes will cause this test to be skipped + // if the active database type is not within the given set. + // + // This is to only be used in the very rare cases where we really do want behavioural + // differences between database types, or we need to temporarily document a bug. + SupportedDatabaseTypes immutable.Option[[]DatabaseType] + // Configuration for KMS to be used in the test KMS KMS } @@ -93,6 +100,22 @@ type ConfigureNode func() []net.NodeOpt // Restart is an action that will close and then start all nodes. type Restart struct{} +// Close is an action that will close a node. +type Close struct { + // NodeID may hold the ID (index) of a node to close. + // + // If a value is not provided the close will be applied to all nodes. + NodeID immutable.Option[int] +} + +// Start is an action that will start a node that has been previously closed. +type Start struct { + // NodeID may hold the ID (index) of a node to start. + // + // If a value is not provided the start will be applied to all nodes. + NodeID immutable.Option[int] +} + // SchemaUpdate is an action that will update the database schema. // // WARNING: getCollectionNames will not work with schemas ending in `type`, e.g. `user_type` diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 6aac10e5e4..744f874423 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -178,13 +178,13 @@ func ExecuteTestCase( var databases []DatabaseType if badgerInMemory { - databases = append(databases, badgerIMType) + databases = append(databases, BadgerIMType) } if badgerFile { - databases = append(databases, badgerFileType) + databases = append(databases, BadgerFileType) } if inMemoryStore { - databases = append(databases, defraIMType) + databases = append(databases, DefraIMType) } var kmsList []KMSType @@ -203,6 +203,7 @@ func ExecuteTestCase( require.NotEmpty(t, databases) require.NotEmpty(t, clients) + databases = skipIfDatabaseTypeUnsupported(t, databases, testCase.SupportedDatabaseTypes) clients = skipIfClientTypeUnsupported(t, clients, testCase.SupportedClientTypes) ctx := context.Background() @@ -251,7 +252,7 @@ func executeTestCase( // It is very important that the databases are always closed, otherwise resources will leak // as tests run. This is particularly important for file based datastores. - defer closeNodes(s) + defer closeNodes(s, Close{}) // Documents and Collections may already exist in the database if actions have been split // by the change detector so we should fetch them here at the start too (if they exist). @@ -292,6 +293,12 @@ func performAction( case Restart: restartNodes(s) + case Close: + closeNodes(s, action) + + case Start: + startNodes(s, action) + case ConnectPeers: connectPeers(s, action) @@ -458,7 +465,7 @@ func benchmarkAction( actionIndex int, bench Benchmark, ) { - if s.dbt == defraIMType { + if s.dbt == DefraIMType { // Benchmarking makes no sense for test in-memory storage return } @@ -557,8 +564,9 @@ func getCollectionNamesFromSchema(result map[string]int, schema string, nextInde // closeNodes closes all the given nodes, ensuring that resources are properly released. func closeNodes( s *state, + action Close, ) { - for _, node := range s.nodes { + for _, node := range getNodes(action.NodeID, s.nodes) { node.Close() } } @@ -723,20 +731,18 @@ func setStartingNodes( } } -func restartNodes( - s *state, -) { - if s.dbt == badgerIMType || s.dbt == defraIMType { - return - } - closeNodes(s) - +func startNodes(s *state, action Start) { + nodes := getNodes(action.NodeID, s.nodes) // We need to restart the nodes in reverse order, to avoid dial backoff issues. - for i := len(s.nodes) - 1; i >= 0; i-- { + for i := len(nodes) - 1; i >= 0; i-- { + nodeIndex := i + if action.NodeID.HasValue() { + nodeIndex = action.NodeID.Value() + } originalPath := databaseDir - databaseDir = s.dbPaths[i] + databaseDir = s.dbPaths[nodeIndex] node, _, err := setupNode(s) - require.Nil(s.t, err) + require.NoError(s.t, err) databaseDir = originalPath if len(s.nodeConfigs) == 0 { @@ -744,22 +750,22 @@ func restartNodes( // basic (i.e. no P2P stuff) and can be yielded now. c, err := setupClient(s, node) require.NoError(s.t, err) - s.nodes[i] = c + s.nodes[nodeIndex] = c eventState, err := newEventState(c.Events()) require.NoError(s.t, err) - s.nodeEvents[i] = eventState + s.nodeEvents[nodeIndex] = eventState continue } // We need to make sure the node is configured with its old address, otherwise // a new one may be selected and reconnnection to it will fail. var addresses []string - for _, addr := range s.nodeAddresses[i].Addrs { + for _, addr := range s.nodeAddresses[nodeIndex].Addrs { addresses = append(addresses, addr.String()) } - nodeOpts := s.nodeConfigs[i] + nodeOpts := s.nodeConfigs[nodeIndex] nodeOpts = append(nodeOpts, net.WithListenAddresses(addresses...)) node.Peer, err = net.NewPeer(s.ctx, node.DB.Blockstore(), node.DB.Encstore(), node.DB.Events(), nodeOpts...) @@ -767,11 +773,11 @@ func restartNodes( c, err := setupClient(s, node) require.NoError(s.t, err) - s.nodes[i] = c + s.nodes[nodeIndex] = c eventState, err := newEventState(c.Events()) require.NoError(s.t, err) - s.nodeEvents[i] = eventState + s.nodeEvents[nodeIndex] = eventState waitForNetworkSetupEvents(s, i) } @@ -780,6 +786,16 @@ func restartNodes( // will reference the old (closed) database instances. refreshCollections(s) refreshIndexes(s) +} + +func restartNodes( + s *state, +) { + if s.dbt == BadgerIMType || s.dbt == DefraIMType { + return + } + closeNodes(s, Close{}) + startNodes(s, Start{}) reconnectPeers(s) } @@ -840,7 +856,7 @@ func configureNode( netNodeOpts := action() netNodeOpts = append(netNodeOpts, net.WithPrivateKey(privateKey)) - nodeOpts := []node.Option{node.WithDisableP2P(false)} + nodeOpts := []node.Option{node.WithDisableP2P(false), db.WithRetryInterval([]time.Duration{time.Millisecond * 1})} for _, opt := range netNodeOpts { nodeOpts = append(nodeOpts, opt) } @@ -2456,6 +2472,31 @@ func skipIfACPTypeUnsupported(t testing.TB, supporteACPTypes immutable.Option[[] } } +func skipIfDatabaseTypeUnsupported( + t testing.TB, + databases []DatabaseType, + supporteDatabaseTypes immutable.Option[[]DatabaseType], +) []DatabaseType { + if !supporteDatabaseTypes.HasValue() { + return databases + } + filteredDatabases := []DatabaseType{} + for _, supportedType := range supporteDatabaseTypes.Value() { + for _, database := range databases { + if supportedType == database { + filteredDatabases = append(filteredDatabases, database) + break + } + } + } + + if len(filteredDatabases) == 0 { + t.Skipf("test does not support any given database type. Type: %v", filteredDatabases) + } + + return filteredDatabases +} + // skipIfNetworkTest skips the current test if the given actions // contain network actions and skipNetworkTests is true. func skipIfNetworkTest(t testing.TB, actions []any) {