From 4ee1be60fac16be078f3c9cfb585f0d8267f8db0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20Junior?= Date: Fri, 23 Jul 2021 11:27:28 -0300 Subject: [PATCH] feat(dot/network) add `network_is_major_syncing` metric (#1697) * chore: transform goal into atomic * add tests * remove debug from polkadot config * chore: add comment * chore: remove comments * change var name Co-authored-by: Timothy Wu * update var name * chore: add metrics port * chore: add clean up to close dbs while testing Co-authored-by: Timothy Wu --- chain/polkadot/config.toml | 1 + dot/config.go | 1 + dot/network/block_announce.go | 4 +-- dot/network/service.go | 16 +++++++++++ dot/network/service_test.go | 18 +++++++++++++ dot/network/sync.go | 50 +++++++++++++++++++++++++---------- dot/node.go | 1 + lib/grandpa/grandpa_test.go | 2 ++ 8 files changed, 76 insertions(+), 17 deletions(-) diff --git a/chain/polkadot/config.toml b/chain/polkadot/config.toml index a7166b5a9e..6ec7c85783 100644 --- a/chain/polkadot/config.toml +++ b/chain/polkadot/config.toml @@ -1,6 +1,7 @@ [global] basepath = "~/.gossamer/polkadot" log = "info" +metrics-port = 9876 [log] core = "" diff --git a/dot/config.go b/dot/config.go index e5572e10ab..4b626e9f4b 100644 --- a/dot/config.go +++ b/dot/config.go @@ -238,6 +238,7 @@ func PolkadotConfig() *Config { LogLvl: polkadot.DefaultLvl, RetainBlocks: gssmr.DefaultRetainBlocks, Pruning: pruner.Mode(gssmr.DefaultPruningMode), + MetricsPort: gssmr.DefaultMetricsPort, }, Log: LogConfig{ CoreLvl: polkadot.DefaultLvl, diff --git a/dot/network/block_announce.go b/dot/network/block_announce.go index bf8e655195..34371ffdf4 100644 --- a/dot/network/block_announce.go +++ b/dot/network/block_announce.go @@ -241,9 +241,7 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err return nil } - go func() { - s.syncQueue.handleBlockAnnounceHandshake(bhs.BestBlockNumber, peer) - }() + go s.syncQueue.handleBlockAnnounceHandshake(bhs.BestBlockNumber, peer) return nil } diff --git a/dot/network/service.go b/dot/network/service.go index 96d7bc8c32..04cb19c749 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -47,6 +47,8 @@ const ( transactionsID = "/transactions/1" maxMessageSize = 1024 * 63 // 63kb for now + + gssmrIsMajorSyncMetric = "gossamer/network/is_major_syncing" ) var ( @@ -692,6 +694,20 @@ func (s *Service) NodeRoles() byte { return s.cfg.Roles } +// CollectGauge will be used to collect coutable metrics from network service +func (s *Service) CollectGauge() map[string]int64 { + var isSynced int64 + if !s.syncer.IsSynced() { + isSynced = 1 + } else { + isSynced = 0 + } + + return map[string]int64{ + gssmrIsMajorSyncMetric: isSynced, + } +} + // HighestBlock returns the highest known block number func (s *Service) HighestBlock() int64 { return s.syncQueue.goal diff --git a/dot/network/service_test.go b/dot/network/service_test.go index 4f9c0c944b..ffa5d85564 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -344,3 +344,21 @@ func TestHandleConn(t *testing.T) { require.True(t, ok) require.Equal(t, 1, aScore) } + +func TestSerivceIsMajorSyncMetrics(t *testing.T) { + mocksyncer := new(MockSyncer) + + node := &Service{ + syncer: mocksyncer, + } + + mocksyncer.On("IsSynced").Return(false).Once() + m := node.CollectGauge() + + require.Equal(t, int64(1), m[gssmrIsMajorSyncMetric]) + + mocksyncer.On("IsSynced").Return(true).Once() + m = node.CollectGauge() + + require.Equal(t, int64(0), m[gssmrIsMajorSyncMetric]) +} diff --git a/dot/network/sync.go b/dot/network/sync.go index 5ff957ec36..87e286b961 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -23,6 +23,7 @@ import ( "reflect" "sort" "sync" + "sync/atomic" "time" "github.com/ChainSafe/gossamer/dot/types" @@ -194,6 +195,7 @@ func (q *syncQueue) syncAtHead() { t := time.NewTicker(q.slotDuration * 2) defer t.Stop() + for { select { // sleep for average block time TODO: make this configurable from slot duration @@ -207,10 +209,13 @@ func (q *syncQueue) syncAtHead() { continue } + goal := atomic.LoadInt64(&q.goal) + // we aren't at the head yet, sleep - if curr.Number.Int64() < q.goal && curr.Number.Cmp(prev.Number) > 0 { + if curr.Number.Int64() < goal && curr.Number.Cmp(prev.Number) > 0 { prev = curr q.s.noGossip = true + q.s.syncer.SetSyncing(true) continue } @@ -247,10 +252,12 @@ func (q *syncQueue) handleResponseQueue() { } q.responseLock.Lock() + goal := atomic.LoadInt64(&q.goal) + if len(q.responses) == 0 { q.responseLock.Unlock() - if len(q.requestCh) == 0 && head.Int64() < q.goal { + if len(q.requestCh) == 0 && head.Int64() < goal { q.pushRequest(uint64(head.Int64()+1), blockRequestBufferSize, "") } continue @@ -328,6 +335,9 @@ func (q *syncQueue) prunePeers() { } func (q *syncQueue) benchmark() { + t := time.NewTimer(time.Second * 5) + defer t.Stop() + for { if q.ctx.Err() != nil { return @@ -338,19 +348,27 @@ func (q *syncQueue) benchmark() { continue } - if before.Number.Int64() >= q.goal { + goal := atomic.LoadInt64(&q.goal) + + if before.Number.Int64() >= goal { finalised, err := q.s.blockState.GetFinalisedHeader(0, 0) //nolint if err != nil { continue } logger.Info("💤 node waiting", "peer count", len(q.s.host.peers()), "head", before.Number, "finalised", finalised.Number) - time.Sleep(time.Second * 5) + + // reset the counter and then wait 5 seconds + t.Reset(time.Second * 5) + <-t.C + continue } q.benchmarker.begin(before.Number.Uint64()) - time.Sleep(time.Second * 5) + + t.Reset(time.Second * 5) + <-t.C after, err := q.s.blockState.BestBlockHeader() if err != nil { @@ -361,7 +379,7 @@ func (q *syncQueue) benchmark() { logger.Info("🚣 currently syncing", "peer count", len(q.s.host.peers()), - "goal", q.goal, + "goal", goal, "average blocks/second", q.benchmarker.mostRecentAverage(), "overall average", q.benchmarker.average(), ) @@ -418,11 +436,13 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) { return } - if q.goal < best.Int64() { - q.goal = best.Int64() + goal := atomic.LoadInt64(&q.goal) + if goal < best.Int64() { + atomic.StoreInt64(&q.goal, best.Int64()) } - if q.goal-int64(start) < int64(blockRequestSize) { + goal = atomic.LoadInt64(&q.goal) + if goal-int64(start) < int64(blockRequestSize) { start := best.Int64() + 1 req := createBlockRequest(start, 0) @@ -443,7 +463,7 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) { start = start - m + 1 for i := 0; i < numRequests; i++ { - if start > uint64(q.goal) { + if start > uint64(goal) { return } @@ -833,11 +853,12 @@ func (q *syncQueue) handleBlockAnnounceHandshake(blockNum uint32, from peer.ID) return } - if bestNum.Int64() >= int64(blockNum) || q.goal >= int64(blockNum) { + goal := atomic.LoadInt64(&q.goal) + if bestNum.Int64() >= int64(blockNum) || goal >= int64(blockNum) { return } - q.goal = int64(blockNum) + atomic.StoreInt64(&q.goal, int64(blockNum)) q.pushRequest(uint64(bestNum.Int64()+1), blockRequestBufferSize, from) } @@ -856,8 +877,9 @@ func (q *syncQueue) handleBlockAnnounce(msg *BlockAnnounceMessage, from peer.ID) return } - if header.Number.Int64() > q.goal { - q.goal = header.Number.Int64() + goal := atomic.LoadInt64(&q.goal) + if header.Number.Int64() > goal { + atomic.StoreInt64(&q.goal, header.Number.Int64()) } req := createBlockRequestWithHash(header.Hash(), blockRequestSize) diff --git a/dot/node.go b/dot/node.go index 16cea8e72e..7cdde1aecf 100644 --- a/dot/node.go +++ b/dot/node.go @@ -335,6 +335,7 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, c := metrics.NewCollector(context.Background()) c.AddGauge(fg) c.AddGauge(stateSrvc) + c.AddGauge(networkSrvc) go c.Start() diff --git a/lib/grandpa/grandpa_test.go b/lib/grandpa/grandpa_test.go index ae325d76da..645faeda0e 100644 --- a/lib/grandpa/grandpa_test.go +++ b/lib/grandpa/grandpa_test.go @@ -68,6 +68,8 @@ func newTestState(t *testing.T) *state.Service { db, err := utils.SetupDatabase(testDatadirPath, true) require.NoError(t, err) + t.Cleanup(func() { db.Close() }) + gen, genTrie, _ := genesis.NewTestGenesisWithTrieAndHeader(t) block, err := state.NewBlockStateFromGenesis(db, testHeader) require.NoError(t, err)