Skip to content

Commit

Permalink
feat(dot/telemetry): implement telemetry message network_state (#1618)
Browse files Browse the repository at this point in the history
* refactor telemetry messages to map format

* add basic network state telemetry message

* refactor message sender to handle interface{} types

* refactor telemetry messages to be structs

* lint

* go fmt

* lint

* move msg building logic outside msg sending loop

* make telemetry messages an interface

* Lookup transactions count from TransactionsState

* address comments

* fix mocks for tests

* lint

* refactor TelemetryMessage to Message

* update mock handler to return result

* add TransactionsCount to mockhandler

* move logic to build new network state message

* lint

* fix interface

* update mockhandler

* lint
  • Loading branch information
edwardmack authored Jun 30, 2021
1 parent bb281d4 commit a81844e
Show file tree
Hide file tree
Showing 14 changed files with 289 additions and 120 deletions.
9 changes: 7 additions & 2 deletions chain/dev/genesis.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
"id": "dev",
"chainType": "Local",
"bootNodes": [],
"telemetryEndpoints": null,
"telemetryEndpoints": [
[
"wss://telemetry.polkadot.io/submit/",
0
]
],
"protocolId": "/gossamer/dev/0",
"genesis": {
"raw": {
Expand Down Expand Up @@ -32,4 +37,4 @@
"forkBlocks": null,
"badBlocks": null,
"consensusEngine": ""
}
}
8 changes: 7 additions & 1 deletion chain/gssmr/genesis.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
"id": "gssmr",
"chainType": "Local",
"bootNodes": [],
"telemetryEndpoints": [
[
"wss://telemetry.polkadot.io/submit/",
0
]
],
"protocolId": "/gossamer/gssmr/0",
"genesis": {
"raw": {
Expand Down Expand Up @@ -40,4 +46,4 @@
"forkBlocks": null,
"badBlocks": null,
"consensusEngine": ""
}
}
5 changes: 5 additions & 0 deletions dot/core/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,8 @@ func (s *Service) HandleTransactionMessage(msg *network.TransactionMessage) (boo

return len(msg.Extrinsics) > 0, nil
}

// TransactionsCount returns number for pending transactions in pool
func (s *Service) TransactionsCount() int {
return len(s.transactionState.PendingInPool())
}
6 changes: 3 additions & 3 deletions dot/core/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"testing"
"time"

. "github.com/ChainSafe/gossamer/dot/core/mocks"
. "github.com/ChainSafe/gossamer/dot/core/mocks" // nolint
"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/types"
Expand All @@ -38,7 +38,7 @@ import (

func TestService_ProcessBlockAnnounceMessage(t *testing.T) {
// TODO: move to sync package
net := new(MockNetwork)
net := new(MockNetwork) // nolint

cfg := &Config{
Network: net,
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestService_HandleTransactionMessage(t *testing.T) {
ks := keystore.NewGlobalKeystore()
ks.Acco.Insert(kp)

bp := new(MockBlockProducer)
bp := new(MockBlockProducer) // nolint
blockC := make(chan types.Block)
bp.On("GetBlockChannel", nil).Return(blockC)

Expand Down
14 changes: 14 additions & 0 deletions dot/network/mock_transaction_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 19 additions & 14 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"io"
"math/big"
"os"
"sync"
"time"
Expand Down Expand Up @@ -315,11 +316,12 @@ main:

case <-ticker.C:
o := s.host.bwc.GetBandwidthTotals()
err := telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("bandwidth_download", o.RateIn),
telemetry.NewKeyValue("bandwidth_upload", o.RateOut),
telemetry.NewKeyValue("msg", "system.interval"),
telemetry.NewKeyValue("peers", s.host.peerCount())))
err := telemetry.GetInstance().SendMessage(telemetry.NewBandwidthTM(o.RateIn, o.RateOut, s.host.peerCount()))
if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}

err = telemetry.GetInstance().SendMessage(telemetry.NewNetworkStateTM(s.host.h, s.Peers()))
if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
Expand All @@ -333,19 +335,22 @@ func (s *Service) sentBlockIntervalTelemetry() {
if err != nil {
continue
}
bestHash := best.Hash()

finalized, err := s.blockState.GetFinalizedHeader(0, 0) //nolint
if err != nil {
continue
}

err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("best", best.Hash().String()),
telemetry.NewKeyValue("finalized_hash", finalized.Hash().String()), //nolint
telemetry.NewKeyValue("finalized_height", finalized.Number), //nolint
telemetry.NewKeyValue("height", best.Number),
telemetry.NewKeyValue("msg", "system.interval"),
telemetry.NewKeyValue("txcount", 0), // todo (ed) determine where to get tx count
telemetry.NewKeyValue("used_state_cache_size", 0))) // todo (ed) determine where to get used_state_cache_size
finalizedHash := finalized.Hash()

err = telemetry.GetInstance().SendMessage(telemetry.NewBlockIntervalTM(
&bestHash,
best.Number,
&finalizedHash,
finalized.Number,
big.NewInt(int64(s.transactionHandler.TransactionsCount())),
big.NewInt(0), // todo (ed) determine where to get used_state_cache_size
))
if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
Expand Down
1 change: 1 addition & 0 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
if cfg.TransactionHandler == nil {
mocktxhandler := &MockTransactionHandler{}
mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*TransactionMessage")).Return(nil)
mocktxhandler.On("TransactionsCount").Return(0)
cfg.TransactionHandler = mocktxhandler
}

Expand Down
1 change: 1 addition & 0 deletions dot/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ type Syncer interface {
// TransactionHandler is the interface used by the transactions sub-protocol
type TransactionHandler interface {
HandleTransactionMessage(*TransactionMessage) (bool, error)
TransactionsCount() int
}
1 change: 1 addition & 0 deletions dot/network/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func NewMockSyncer() *MockSyncer {
func NewMockTransactionHandler() *MockTransactionHandler {
mocktxhandler := new(MockTransactionHandler)
mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(nil)
mocktxhandler.On("TransactionsCount").Return(0)
return mocktxhandler
}

Expand Down
1 change: 1 addition & 0 deletions dot/network/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestHandleTransactionMessage(t *testing.T) {
basePath := utils.NewTestBasePath(t, "nodeA")
mockhandler := &MockTransactionHandler{}
mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil)
mockhandler.On("TransactionsCount").Return(0)

config := &Config{
BasePath: basePath,
Expand Down
21 changes: 10 additions & 11 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,17 +345,16 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
}

telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)

err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("authority", cfg.Core.GrandpaAuthority),
telemetry.NewKeyValue("chain", sysSrvc.ChainName()),
telemetry.NewKeyValue("genesis_hash", stateSrvc.Block.GenesisHash().String()),
telemetry.NewKeyValue("implementation", sysSrvc.SystemName()),
telemetry.NewKeyValue("msg", "system.connected"),
telemetry.NewKeyValue("name", cfg.Global.Name),
telemetry.NewKeyValue("network_id", networkSrvc.NetworkState().PeerID),
telemetry.NewKeyValue("startup_time", strconv.FormatInt(time.Now().UnixNano(), 10)),
telemetry.NewKeyValue("version", sysSrvc.SystemVersion())))
genesisHash := stateSrvc.Block.GenesisHash()
err = telemetry.GetInstance().SendMessage(telemetry.NewSystemConnectedTM(
cfg.Core.GrandpaAuthority,
sysSrvc.ChainName(),
&genesisHash,
sysSrvc.SystemName(),
cfg.Global.Name,
networkSrvc.NetworkState().PeerID,
strconv.FormatInt(time.Now().UnixNano(), 10),
sysSrvc.SystemVersion()))
if err != nil {
logger.Debug("problem sending system.connected telemetry message", "err", err)
}
Expand Down
12 changes: 6 additions & 6 deletions dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,13 +346,13 @@ func (s *Service) handleBlock(block *types.Block) error {

logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", block.Header.Hash())

err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( // nolint
telemetry.NewKeyValue("best", block.Header.Hash().String()),
telemetry.NewKeyValue("height", block.Header.Number.Uint64()),
telemetry.NewKeyValue("msg", "block.import"),
telemetry.NewKeyValue("origin", "NetworkInitialSync")))
blockHash := block.Header.Hash()
err = telemetry.GetInstance().SendMessage(telemetry.NewBlockImportTM(
&blockHash,
block.Header.Number,
"NetworkInitialSync"))
if err != nil {
logger.Trace("problem sending block.import telemetry message", "error", err)
logger.Debug("problem sending block.import telemetry message", "error", err)
}

return nil
Expand Down
Loading

0 comments on commit a81844e

Please sign in to comment.