From 5df70af01cce4f51476eb281a399302be5d2c9ce Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Tue, 9 May 2023 19:16:06 -0400 Subject: [PATCH] refactor: Improve rollback on peer P2P collection error (#1461) ## Relevant issue(s) Resolves #1389 ## Description This PR improves the rollback of P2P collection manipulation errors. It ensures that a rollback is applied if the transaction commit fails. --- net/peer.go | 103 ++++++++++++++++++++++++++++------------------------ 1 file changed, 56 insertions(+), 47 deletions(-) diff --git a/net/peer.go b/net/peer.go index c4a5a880ee..9911e5fcd6 100644 --- a/net/peer.go +++ b/net/peer.go @@ -763,6 +763,26 @@ type EvtPubSub struct { Peer peer.ID } +// rollbackAddPubSubTopics removes the given topics from the pubsub system. +func (p *Peer) rollbackAddPubSubTopics(topics []string, cause error) error { + for _, topic := range topics { + if err := p.server.removePubSubTopic(topic); err != nil { + return errors.WithStack(err, errors.NewKV("Cause", cause)) + } + } + return cause +} + +// rollbackRemovePubSubTopics adds back the given topics from the pubsub system. +func (p *Peer) rollbackRemovePubSubTopics(topics []string, cause error) error { + for _, topic := range topics { + if err := p.server.addPubSubTopic(topic, true); err != nil { + return errors.WithStack(err, errors.NewKV("Cause", cause)) + } + } + return cause +} + // AddP2PCollections adds the given collectionIDs to the pubsup topics. // // It will error if any of the given collectionIDs are invalid, in such a case some of the @@ -778,11 +798,13 @@ func (p *Peer) AddP2PCollections(collections []string) error { store := p.db.WithTxn(txn) // first let's make sure the collections actually exists + storeCollections := []client.Collection{} for _, col := range collections { - _, err := store.GetCollectionBySchemaID(p.ctx, col) + storeCol, err := store.GetCollectionBySchemaID(p.ctx, col) if err != nil { return err } + storeCollections = append(storeCollections, storeCol) } // Ensure we can add all the collections to the store on the transaction @@ -799,42 +821,34 @@ func (p *Peer) AddP2PCollections(collections []string) error { for _, col := range collections { err = p.server.addPubSubTopic(col, true) if err != nil { - for _, topic := range addedTopics { - e := p.server.removePubSubTopic(topic) - if e != nil { - return errors.WithStack(e, errors.NewKV("Cause", err)) - } - } - return err + return p.rollbackAddPubSubTopics(addedTopics, err) } addedTopics = append(addedTopics, col) } - // If adding the collection topics succeeds, we remove the collections' documents + // After adding the collection topics, we remove the collections' documents // from the pubsub topics to avoid receiving duplicate events. - for _, col := range collections { - c, err := store.GetCollectionBySchemaID(p.ctx, col) - if err != nil { - return err - } - keyChan, err := c.GetAllDocKeys(p.ctx) + removedTopics := []string{} + for _, col := range storeCollections { + keyChan, err := col.GetAllDocKeys(p.ctx) if err != nil { return err } for key := range keyChan { err := p.server.removePubSubTopic(key.Key.String()) if err != nil { - log.Info( - p.ctx, - "Failed to remove doc from pubsub topic", - logging.NewKV("DocKey", key.Key.String()), - logging.NewKV("Cause", err), - ) + return p.rollbackRemovePubSubTopics(removedTopics, err) } + removedTopics = append(removedTopics, key.Key.String()) } } - return txn.Commit(p.ctx) + if err = txn.Commit(p.ctx); err != nil { + err = p.rollbackRemovePubSubTopics(removedTopics, err) + return p.rollbackAddPubSubTopics(addedTopics, err) + } + + return nil } // RemoveP2PCollections removes the given collectionIDs from the pubsup topics. @@ -852,11 +866,13 @@ func (p *Peer) RemoveP2PCollections(collections []string) error { store := p.db.WithTxn(txn) // first let's make sure the collections actually exists + storeCollections := []client.Collection{} for _, col := range collections { - _, err := store.GetCollectionBySchemaID(p.ctx, col) + storeCol, err := store.GetCollectionBySchemaID(p.ctx, col) if err != nil { return err } + storeCollections = append(storeCollections, storeCol) } // Ensure we can remove all the collections to the store on the transaction @@ -873,54 +889,48 @@ func (p *Peer) RemoveP2PCollections(collections []string) error { for _, col := range collections { err = p.server.removePubSubTopic(col) if err != nil { - for _, topic := range removedTopics { - e := p.server.addPubSubTopic(topic, true) - if e != nil { - return errors.WithStack(e, errors.NewKV("Cause", err)) - } - } - return err + return p.rollbackRemovePubSubTopics(removedTopics, err) } + removedTopics = append(removedTopics, col) } - // If removing the collection topics succeeds, we add back the collections' documents + // After removing the collection topics, we add back the collections' documents // to the pubsub topics. - for _, col := range collections { - c, err := store.GetCollectionBySchemaID(p.ctx, col) - if err != nil { - return err - } - keyChan, err := c.GetAllDocKeys(p.ctx) + addedTopics := []string{} + for _, col := range storeCollections { + keyChan, err := col.GetAllDocKeys(p.ctx) if err != nil { return err } for key := range keyChan { err := p.server.addPubSubTopic(key.Key.String(), true) if err != nil { - log.Info( - p.ctx, - "Failed to add doc to pubsub topic", - logging.NewKV("DocKey", key.Key.String()), - logging.NewKV("Cause", err), - ) + return p.rollbackAddPubSubTopics(addedTopics, err) } + addedTopics = append(addedTopics, key.Key.String()) } } - return txn.Commit(p.ctx) + if err = txn.Commit(p.ctx); err != nil { + err = p.rollbackAddPubSubTopics(addedTopics, err) + return p.rollbackRemovePubSubTopics(removedTopics, err) + } + + return nil } -// GetAllP2PCollections gets all the collectionIDs from the pubsup topics +// GetAllP2PCollections gets all the collectionIDs that have been added to the +// pubsub topics from the system store. func (p *Peer) GetAllP2PCollections() ([]client.P2PCollection, error) { txn, err := p.db.NewTxn(p.ctx, false) if err != nil { return nil, err } + defer txn.Discard(p.ctx) store := p.db.WithTxn(txn) collections, err := p.db.GetAllP2PCollections(p.ctx) if err != nil { - txn.Discard(p.ctx) return nil, err } @@ -928,7 +938,6 @@ func (p *Peer) GetAllP2PCollections() ([]client.P2PCollection, error) { for _, colID := range collections { col, err := store.GetCollectionBySchemaID(p.ctx, colID) if err != nil { - txn.Discard(p.ctx) return nil, err } p2pCols = append(p2pCols, client.P2PCollection{