From 0d434bf4d08737ae9c4c1c132f3597dea4e5fbfa Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Mon, 14 Mar 2022 21:20:48 +0100 Subject: [PATCH] go/worker/storage: Remove separate storage sync status store Previously the worker maintaned a separate store that kept information about the progress of storage sync. Since it was a separate store this could cause problems if it got out of sync (e.g. due to partial manual copies). This should make the process more robust as there is only one source of truth. --- .changelog/4565.internal.md | 6 ++ go/consensus/tendermint/abci/prune.go | 6 +- go/consensus/tendermint/abci/prune_test.go | 21 +++--- go/consensus/tendermint/abci/state.go | 5 +- go/oasis-node/cmd/node/node.go | 1 - go/storage/mkvs/checkpoint/checkpointer.go | 5 +- go/storage/mkvs/db/api/api.go | 14 ++-- go/storage/mkvs/db/badger/badger.go | 9 ++- go/storage/mkvs/tree_test.go | 14 ++-- go/worker/client/service.go | 5 +- go/worker/storage/committee/node.go | 79 +++++++++++----------- go/worker/storage/worker.go | 24 ++----- 12 files changed, 80 insertions(+), 109 deletions(-) create mode 100644 .changelog/4565.internal.md diff --git a/.changelog/4565.internal.md b/.changelog/4565.internal.md new file mode 100644 index 00000000000..bc090432b20 --- /dev/null +++ b/.changelog/4565.internal.md @@ -0,0 +1,6 @@ +go/worker/storage: Remove separate storage sync status store + +Previously the worker maintaned a separate store that kept information about +the progress of storage sync. Since it was a separate store this could cause +problems if it got out of sync (e.g. due to partial manual copies). This +should make the process more robust as there is only one source of truth. diff --git a/go/consensus/tendermint/abci/prune.go b/go/consensus/tendermint/abci/prune.go index 874a968ec64..d098daf762a 100644 --- a/go/consensus/tendermint/abci/prune.go +++ b/go/consensus/tendermint/abci/prune.go @@ -122,11 +122,7 @@ type genericPruner struct { func (p *genericPruner) Initialize() error { // Figure out the eldest version currently present in the tree. - var err error - if p.earliestVersion, err = p.ndb.GetEarliestVersion(context.Background()); err != nil { - return fmt.Errorf("failed to get earliest version: %w", err) - } - + p.earliestVersion = p.ndb.GetEarliestVersion() // Initially, the earliest version is the last retained version. p.lastRetainedVersion = p.earliestVersion diff --git a/go/consensus/tendermint/abci/prune_test.go b/go/consensus/tendermint/abci/prune_test.go index c50adc425f4..1293424b2ed 100644 --- a/go/consensus/tendermint/abci/prune_test.go +++ b/go/consensus/tendermint/abci/prune_test.go @@ -46,12 +46,11 @@ func TestPruneKeepN(t *testing.T) { require.NoError(err, "Finalize") } - earliestVersion, err := ndb.GetEarliestVersion(ctx) - require.NoError(err, "GetEarliestVersion") + earliestVersion := ndb.GetEarliestVersion() require.EqualValues(1, earliestVersion, "earliest version should be correct") - latestVersion, err := ndb.GetLatestVersion(ctx) - require.NoError(err, "GetLatestVersion") + latestVersion, exists := ndb.GetLatestVersion() require.EqualValues(11, latestVersion, "latest version should be correct") + require.True(exists, "latest version should exist") pruner, err := newStatePruner(&PruneConfig{ Strategy: PruneKeepN, @@ -59,12 +58,11 @@ func TestPruneKeepN(t *testing.T) { }, ndb) require.NoError(err, "newStatePruner failed") - earliestVersion, err = ndb.GetEarliestVersion(ctx) - require.NoError(err, "GetEarliestVersion") + earliestVersion = ndb.GetEarliestVersion() require.EqualValues(1, earliestVersion, "earliest version should be correct") - latestVersion, err = ndb.GetLatestVersion(ctx) - require.NoError(err, "GetLatestVersion") + latestVersion, exists = ndb.GetLatestVersion() require.EqualValues(11, latestVersion, "latest version should be correct") + require.True(exists, "latest version should exist") lastRetainedVersion := pruner.GetLastRetainedVersion() require.EqualValues(1, lastRetainedVersion, "last retained version should be correct") @@ -72,12 +70,11 @@ func TestPruneKeepN(t *testing.T) { err = pruner.Prune(ctx, 11) require.NoError(err, "Prune") - earliestVersion, err = ndb.GetEarliestVersion(ctx) - require.NoError(err, "GetEarliestVersion") + earliestVersion = ndb.GetEarliestVersion() require.EqualValues(9, earliestVersion, "earliest version should be correct") - latestVersion, err = ndb.GetLatestVersion(ctx) - require.NoError(err, "GetLatestVersion") + latestVersion, exists = ndb.GetLatestVersion() require.EqualValues(11, latestVersion, "latest version should be correct") + require.True(exists, "latest version should exist") lastRetainedVersion = pruner.GetLastRetainedVersion() require.EqualValues(9, lastRetainedVersion, "last retained version should be correct") diff --git a/go/consensus/tendermint/abci/state.go b/go/consensus/tendermint/abci/state.go index ec9dbad60c5..3da1fcc932f 100644 --- a/go/consensus/tendermint/abci/state.go +++ b/go/consensus/tendermint/abci/state.go @@ -498,10 +498,7 @@ func InitStateStorage(ctx context.Context, cfg *ApplicationConfig) (storage.Loca }() // Figure out the latest version/hash if any, and use that as the block height/hash. - latestVersion, err := ndb.GetLatestVersion(ctx) - if err != nil { - return nil, nil, nil, err - } + latestVersion, _ := ndb.GetLatestVersion() roots, err := ndb.GetRootsForVersion(ctx, latestVersion) if err != nil { return nil, nil, nil, err diff --git a/go/oasis-node/cmd/node/node.go b/go/oasis-node/cmd/node/node.go index ce510a30332..5bc5d5899e7 100644 --- a/go/oasis-node/cmd/node/node.go +++ b/go/oasis-node/cmd/node/node.go @@ -351,7 +351,6 @@ func (n *Node) initRuntimeWorkers() error { n.CommonWorker, n.RegistrationWorker, n.Genesis, - n.commonStore, ) if err != nil { return err diff --git a/go/storage/mkvs/checkpoint/checkpointer.go b/go/storage/mkvs/checkpoint/checkpointer.go index f3c4c2406f7..b9521c72ee7 100644 --- a/go/storage/mkvs/checkpoint/checkpointer.go +++ b/go/storage/mkvs/checkpoint/checkpointer.go @@ -218,10 +218,7 @@ func (c *checkpointer) maybeCheckpoint(ctx context.Context, version uint64, para sort.Slice(cpVersions, func(i, j int) bool { return cpVersions[i] < cpVersions[j] }) // Make sure to not start earlier than the earliest version. - earlyVersion, err := c.ndb.GetEarliestVersion(ctx) - if err != nil { - return fmt.Errorf("checkpointer: failed to get earliest version: %w", err) - } + earlyVersion := c.ndb.GetEarliestVersion() firstCheckpointVersion := lastCheckpointVersion + 1 // We can checkpoint the next version. if firstCheckpointVersion < earlyVersion { firstCheckpointVersion = earlyVersion diff --git a/go/storage/mkvs/db/api/api.go b/go/storage/mkvs/db/api/api.go index 6443d3257f5..14f066d0668 100644 --- a/go/storage/mkvs/db/api/api.go +++ b/go/storage/mkvs/db/api/api.go @@ -88,10 +88,12 @@ type NodeDB interface { GetWriteLog(ctx context.Context, startRoot, endRoot node.Root) (writelog.Iterator, error) // GetLatestVersion returns the most recent version in the node database. - GetLatestVersion(ctx context.Context) (uint64, error) + // + // The boolean flag signifies whether any version exists to disambiguate version zero. + GetLatestVersion() (uint64, bool) // GetEarliestVersion returns the earliest version in the node database. - GetEarliestVersion(ctx context.Context) (uint64, error) + GetEarliestVersion() uint64 // GetRootsForVersion returns a list of roots stored under the given version. GetRootsForVersion(ctx context.Context, version uint64) ([]node.Root, error) @@ -216,12 +218,12 @@ func (d *nopNodeDB) GetWriteLog(ctx context.Context, startRoot, endRoot node.Roo return nil, ErrWriteLogNotFound } -func (d *nopNodeDB) GetLatestVersion(ctx context.Context) (uint64, error) { - return 0, nil +func (d *nopNodeDB) GetLatestVersion() (uint64, bool) { + return 0, false } -func (d *nopNodeDB) GetEarliestVersion(ctx context.Context) (uint64, error) { - return 0, nil +func (d *nopNodeDB) GetEarliestVersion() uint64 { + return 0 } func (d *nopNodeDB) GetRootsForVersion(ctx context.Context, version uint64) ([]node.Root, error) { diff --git a/go/storage/mkvs/db/badger/badger.go b/go/storage/mkvs/db/badger/badger.go index b1648422346..e0f1b7cfc83 100644 --- a/go/storage/mkvs/db/badger/badger.go +++ b/go/storage/mkvs/db/badger/badger.go @@ -468,13 +468,12 @@ func (d *badgerNodeDB) GetWriteLog(ctx context.Context, startRoot, endRoot node. return nil, api.ErrWriteLogNotFound } -func (d *badgerNodeDB) GetLatestVersion(ctx context.Context) (uint64, error) { - version, _ := d.meta.getLastFinalizedVersion() - return version, nil +func (d *badgerNodeDB) GetLatestVersion() (uint64, bool) { + return d.meta.getLastFinalizedVersion() } -func (d *badgerNodeDB) GetEarliestVersion(ctx context.Context) (uint64, error) { - return d.meta.getEarliestVersion(), nil +func (d *badgerNodeDB) GetEarliestVersion() uint64 { + return d.meta.getEarliestVersion() } func (d *badgerNodeDB) GetRootsForVersion(ctx context.Context, version uint64) (roots []node.Root, err error) { diff --git a/go/storage/mkvs/tree_test.go b/go/storage/mkvs/tree_test.go index 45f875b6169..5f587618182 100644 --- a/go/storage/mkvs/tree_test.go +++ b/go/storage/mkvs/tree_test.go @@ -1318,12 +1318,11 @@ func testPruneBasic(t *testing.T, ndb db.NodeDB, factory NodeDBFactory) { err = ndb.Finalize(ctx, []node.Root{root3}) require.NoError(t, err, "Finalize") - earliestVersion, err := ndb.GetEarliestVersion(ctx) - require.NoError(t, err, "GetEarliestVersion") + earliestVersion := ndb.GetEarliestVersion() require.EqualValues(t, 0, earliestVersion, "earliest version should be correct") - latestVersion, err := ndb.GetLatestVersion(ctx) - require.NoError(t, err, "GetLatestVersion") + latestVersion, exists := ndb.GetLatestVersion() require.EqualValues(t, 2, latestVersion, "latest version should be correct") + require.True(t, exists, "latest version should exist") // Prune version 0. err = ndb.Prune(ctx, 0) @@ -1335,12 +1334,11 @@ func testPruneBasic(t *testing.T, ndb db.NodeDB, factory NodeDBFactory) { require.NoError(t, err, "ndb.New") defer ndb.Close() - earliestVersion, err = ndb.GetEarliestVersion(ctx) - require.NoError(t, err, "GetEarliestVersion") + earliestVersion = ndb.GetEarliestVersion() require.EqualValues(t, 1, earliestVersion, "earliest version should be correct") - latestVersion, err = ndb.GetLatestVersion(ctx) - require.NoError(t, err, "GetLatestVersion") + latestVersion, exists = ndb.GetLatestVersion() require.EqualValues(t, 2, latestVersion, "latest version should be correct") + require.True(t, exists, "latest version should exist") // Keys must still be available in version 2. tree = NewWithRoot(nil, ndb, node.Root{Namespace: testNs, Version: 2, Type: node.RootTypeState, Hash: rootHash3}) diff --git a/go/worker/client/service.go b/go/worker/client/service.go index 8cad609f592..10b727697bd 100644 --- a/go/worker/client/service.go +++ b/go/worker/client/service.go @@ -139,10 +139,7 @@ func (s *service) GetLastRetainedBlock(ctx context.Context, runtimeID common.Nam // we don't actually have state available. This may be because there is only a later checkpoint // available. if lsb, ok := rt.Storage().(storage.LocalBackend); ok { - version, err := lsb.NodeDB().GetEarliestVersion(ctx) - if err != nil { - return nil, err - } + version := lsb.NodeDB().GetEarliestVersion() if version > blk.Header.Round { blk, err = rt.History().GetBlock(ctx, version) diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index 7be376944f7..54a03004cea 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -13,7 +13,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/logging" "github.com/oasisprotocol/oasis-core/go/common/node" - "github.com/oasisprotocol/oasis-core/go/common/persistent" "github.com/oasisprotocol/oasis-core/go/common/pubsub" "github.com/oasisprotocol/oasis-core/go/common/workerpool" consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" @@ -112,11 +111,6 @@ type finalizeResult struct { err error } -// watcherState is the (persistent) watcher state. -type watcherState struct { - LastBlock blockSummary `json:"last_block"` -} - type roundWaiter struct { round uint64 ch chan uint64 @@ -140,8 +134,6 @@ type Node struct { // nolint: maligned fetchPool *workerpool.Pool - stateStore *persistent.ServiceStore - workerCommonCfg workerCommon.Config checkpointer checkpoint.Checkpointer @@ -149,7 +141,7 @@ type Node struct { // nolint: maligned checkpointSyncForced bool syncedLock sync.RWMutex - syncedState watcherState + syncedState blockSummary roundWaiters []roundWaiter blockCh *channels.InfiniteChannel @@ -168,7 +160,6 @@ type Node struct { // nolint: maligned func NewNode( commonNode *committee.Node, fetchPool *workerpool.Pool, - store *persistent.ServiceStore, roleProvider registration.RoleProvider, rpcRoleProvider registration.RoleProvider, workerCommonCfg workerCommon.Config, @@ -192,8 +183,6 @@ func NewNode( fetchPool: fetchPool, - stateStore: store, - checkpointSyncCfg: checkpointSyncCfg, blockCh: channels.NewInfiniteChannel(), @@ -210,12 +199,8 @@ func NewNode( return nil, fmt.Errorf("bad checkpoint sync configuration: %w", err) } - n.syncedState.LastBlock.Round = defaultUndefinedRound - rtID := commonNode.Runtime.ID() - err := store.GetCBOR(rtID[:], &n.syncedState) - if err != nil && err != persistent.ErrNotFound { - return nil, fmt.Errorf("storage worker: failed to restore sync state: %w", err) - } + // Initialize sync state. + n.syncedState.Round = defaultUndefinedRound n.ctx, n.ctxCancel = context.WithCancel(context.Background()) @@ -256,6 +241,7 @@ func NewNode( return blk.Header.StorageRoots(), nil }, } + var err error n.checkpointer, err = checkpoint.NewCheckpointer( n.ctx, localStorage.NodeDB(), @@ -263,7 +249,7 @@ func NewNode( *checkpointerCfg, ) if err != nil { - return nil, fmt.Errorf("storage worker: failed to create checkpointer: %w", err) + return nil, fmt.Errorf("failed to create checkpointer: %w", err) } } @@ -328,7 +314,7 @@ func (n *Node) GetStatus(ctx context.Context) (*api.Status, error) { defer n.syncedLock.RUnlock() return &api.Status{ - LastFinalizedRound: n.syncedState.LastBlock.Round, + LastFinalizedRound: n.syncedState.Round, }, nil } @@ -353,8 +339,8 @@ func (n *Node) WaitForRound(round uint64, root *storageApi.Root) (<-chan uint64, n.syncedLock.Lock() defer n.syncedLock.Unlock() - if round <= n.syncedState.LastBlock.Round || (root != nil && n.localStorage.NodeDB().HasRoot(*root)) { - retCh <- n.syncedState.LastBlock.Round + if round <= n.syncedState.Round || (root != nil && n.localStorage.NodeDB().HasRoot(*root)) { + retCh <- n.syncedState.Round close(retCh) return retCh, nil } @@ -419,7 +405,7 @@ func (n *Node) GetLastSynced() (uint64, storageApi.Root, storageApi.Root) { defer n.syncedLock.RUnlock() var io, state storageApi.Root - for _, root := range n.syncedState.LastBlock.Roots { + for _, root := range n.syncedState.Roots { switch root.Type { case storageApi.RootTypeIO: io = root @@ -428,7 +414,7 @@ func (n *Node) GetLastSynced() (uint64, storageApi.Root, storageApi.Root) { } } - return n.syncedState.LastBlock.Round, io, state + return n.syncedState.Round, io, state } func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { @@ -505,10 +491,7 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e // Check what the latest finalized version in the database is as we may be using a database // from a previous version or network. - latestVersion, err := n.localStorage.NodeDB().GetLatestVersion(n.ctx) - if err != nil { - return fmt.Errorf("failed to get latest version: %w", err) - } + latestVersion, _ := n.localStorage.NodeDB().GetLatestVersion() stateRoot := storageApi.Root{ Namespace: rt.ID, @@ -532,7 +515,7 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e "latest_version", latestVersion, ) for v := latestVersion; v < stateRoot.Version; v++ { - err = n.localStorage.Apply(n.ctx, &storageApi.ApplyRequest{ + err := n.localStorage.Apply(n.ctx, &storageApi.ApplyRequest{ Namespace: rt.ID, RootType: storageApi.RootTypeState, SrcRound: v, @@ -589,17 +572,13 @@ func (n *Node) flushSyncedState(summary *blockSummary) uint64 { n.syncedLock.Lock() defer n.syncedLock.Unlock() - n.syncedState.LastBlock = *summary - rtID := n.commonNode.Runtime.ID() - if err := n.stateStore.PutCBOR(rtID[:], &n.syncedState); err != nil { - n.logger.Error("can't store watcher state to database", "err", err) - } + n.syncedState = *summary // Wake up any round waiters. filtered := make([]roundWaiter, 0, len(n.roundWaiters)) for _, w := range n.roundWaiters { - if w.round <= n.syncedState.LastBlock.Round { - w.ch <- n.syncedState.LastBlock.Round + if w.round <= n.syncedState.Round { + w.ch <- n.syncedState.Round close(w.ch) } else { filtered = append(filtered, w) @@ -607,7 +586,7 @@ func (n *Node) flushSyncedState(summary *blockSummary) uint64 { } n.roundWaiters = filtered - return n.syncedState.LastBlock.Round + return n.syncedState.Round } func (n *Node) watchQuit() { @@ -727,7 +706,7 @@ func (n *Node) consensusCheckpointSyncer() { // We may have not yet synced the corresponding runtime round locally. In this case // we need to wait until this is the case. n.syncedLock.RLock() - lastSyncedRound := n.syncedState.LastBlock.Round + lastSyncedRound := n.syncedState.Round n.syncedLock.RUnlock() if blk.Header.Round > lastSyncedRound { n.logger.Debug("runtime round not available yet for checkpoint, waiting", @@ -791,6 +770,7 @@ func (n *Node) worker() { // nolint: gocyclo n.logger.Info("starting committee node") + // Determine genesis block. genesisBlock, err := n.commonNode.Consensus.RootHash().GetGenesisBlock(n.ctx, &roothashApi.RuntimeRequest{ RuntimeID: n.commonNode.Runtime.ID(), Height: consensus.HeightLatest, @@ -801,17 +781,36 @@ func (n *Node) worker() { // nolint: gocyclo } n.undefinedRound = genesisBlock.Header.Round - 1 + // Determine last finalized storage version. + if version, dbNonEmpty := n.localStorage.NodeDB().GetLatestVersion(); dbNonEmpty { + var blk *block.Block + blk, err = n.commonNode.Runtime.History().GetBlock(n.ctx, version) + switch err { + case nil: + // Set last synced version to last finalized storage version. + n.flushSyncedState(summaryFromBlock(blk)) + default: + // Failed to fetch historic block. This is fine when the network just went through a + // dump/restore upgrade and we don't have any information before genesis. We treat the + // database as unsynced and will proceed to either use checkpoints or sync iteratively. + n.logger.Warn("failed to fetch historic block", + "err", err, + "round", version, + ) + } + } + var fetcherGroup sync.WaitGroup n.syncedLock.RLock() - cachedLastRound := n.syncedState.LastBlock.Round + cachedLastRound := n.syncedState.Round n.syncedLock.RUnlock() if cachedLastRound == defaultUndefinedRound || cachedLastRound < genesisBlock.Header.Round { cachedLastRound = n.undefinedRound } - isInitialStartup := (cachedLastRound == n.undefinedRound) // Initialize genesis from the runtime descriptor. + isInitialStartup := (cachedLastRound == n.undefinedRound) if isInitialStartup { var rt *registryApi.Runtime rt, err = n.commonNode.Runtime.ActiveDescriptor(n.ctx) diff --git a/go/worker/storage/worker.go b/go/worker/storage/worker.go index d0ac58c5621..4f72a4b3b26 100644 --- a/go/worker/storage/worker.go +++ b/go/worker/storage/worker.go @@ -9,7 +9,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/grpc" "github.com/oasisprotocol/oasis-core/go/common/logging" "github.com/oasisprotocol/oasis-core/go/common/node" - "github.com/oasisprotocol/oasis-core/go/common/persistent" "github.com/oasisprotocol/oasis-core/go/common/workerpool" genesis "github.com/oasisprotocol/oasis-core/go/genesis/api" runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry" @@ -21,8 +20,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/worker/storage/committee" ) -var workerStorageDBBucketName = "worker/storage/watchers" - // Worker is a worker handling storage operations. type Worker struct { enabled bool @@ -34,9 +31,8 @@ type Worker struct { initCh chan struct{} quitCh chan struct{} - runtimes map[common.Namespace]*committee.Node - watchState *persistent.ServiceStore - fetchPool *workerpool.Pool + runtimes map[common.Namespace]*committee.Node + fetchPool *workerpool.Pool } // New constructs a new storage worker. @@ -45,7 +41,6 @@ func New( commonWorker *workerCommon.Worker, registration *registration.Worker, genesis genesis.Provider, - commonStore *persistent.CommonStore, ) (*Worker, error) { var enabled bool switch commonWorker.RuntimeRegistry.Mode() { @@ -70,16 +65,9 @@ func New( return s, nil } - var err error - s.fetchPool = workerpool.New("storage_fetch") s.fetchPool.Resize(viper.GetUint(cfgWorkerFetcherCount)) - s.watchState, err = commonStore.GetServiceStore(workerStorageDBBucketName) - if err != nil { - return nil, err - } - var checkpointerCfg *checkpoint.CheckpointerConfig if viper.GetBool(CfgWorkerCheckpointerEnabled) { checkpointerCfg = &checkpoint.CheckpointerConfig{ @@ -88,9 +76,9 @@ func New( } // Start storage node for every runtime. - for _, rt := range s.commonWorker.GetRuntimes() { + for id, rt := range s.commonWorker.GetRuntimes() { if err := s.registerRuntime(commonWorker.DataDir, rt, checkpointerCfg); err != nil { - return nil, err + return nil, fmt.Errorf("failed to create storage worker for runtime %s: %w", id, err) } } @@ -131,7 +119,6 @@ func (w *Worker) registerRuntime(dataDir string, commonNode *committeeCommon.Nod node, err := committee.NewNode( commonNode, w.fetchPool, - w.watchState, rp, rpRPC, w.commonWorker.GetConfig(), @@ -231,9 +218,6 @@ func (w *Worker) Stop() { if w.fetchPool != nil { w.fetchPool.Stop() } - if w.watchState != nil { - w.watchState.Close() - } } // Quit returns a channel that will be closed when the service terminates.