Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dot/telemetry: Implement basic telemetry connection #1497

Merged
merged 20 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/gossamer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ func setDotGlobalConfig(ctx *cli.Context, tomlCfg *ctoml.Config, cfg *dot.Global
cfg.MetricsPort = tomlCfg.Global.MetricsPort
}

// TODO: generate random name if one is not assigned (see issue #1496)
// check --name flag and update node configuration
if name := ctx.GlobalString(NameFlag.Name); name != "" {
cfg.Name = name
Expand Down Expand Up @@ -433,6 +434,8 @@ func setDotGlobalConfig(ctx *cli.Context, tomlCfg *ctoml.Config, cfg *dot.Global
cfg.MetricsPort = uint32(metricsPort)
}

cfg.NoTelemetry = ctx.Bool("no-telemetry")
arijitAD marked this conversation as resolved.
Show resolved Hide resolved

logger.Debug(
"global configuration",
"name", cfg.Name,
Expand Down
14 changes: 14 additions & 0 deletions cmd/gossamer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,20 @@ func TestGlobalConfigFromFlags(t *testing.T) {
MetricsPort: uint32(9871),
},
},
{
"Test gossamer --no-telemetry",
[]string{"config", "no-telemetry"},
[]interface{}{testCfgFile.Name(), true},
dot.GlobalConfig{
Name: testCfg.Global.Name,
ID: testCfg.Global.ID,
BasePath: testCfg.Global.BasePath,
LogLvl: log.LvlInfo,
PublishMetrics: testCfg.Global.PublishMetrics,
MetricsPort: testCfg.Global.MetricsPort,
NoTelemetry: true,
},
},
}

for _, c := range testcases {
Expand Down
9 changes: 9 additions & 0 deletions cmd/gossamer/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ var (
Name: "metrics-port",
Usage: "Set metric listening port ",
}

// NoTelemetryFlag stops publishing telemetry to default defined in genesis.json
NoTelemetryFlag = cli.BoolFlag{
Name: "no-telemetry",
Usage: "Disable connecting to the Substrate telemetry server",
}
)

// Initialization-only flags
Expand Down Expand Up @@ -295,6 +301,9 @@ var (
// metrics flag
PublishMetricsFlag,
MetricsPortFlag,

// telemetry flags
NoTelemetryFlag,
}
)

Expand Down
1 change: 1 addition & 0 deletions dot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type GlobalConfig struct {
LogLvl log.Lvl
PublishMetrics bool
MetricsPort uint32
NoTelemetry bool
}

// LogConfig represents the log levels for individual packages
Expand Down
21 changes: 18 additions & 3 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,23 @@ import (
"os/signal"
"path"
"runtime/debug"
"strconv"
"sync"
"syscall"
"time"

"github.com/ChainSafe/chaindb"
gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics"
"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/genesis"
"github.com/ChainSafe/gossamer/lib/keystore"
"github.com/ChainSafe/gossamer/lib/services"
log "github.com/ChainSafe/log15"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/metrics/prometheus"

"github.com/ChainSafe/chaindb"
log "github.com/ChainSafe/log15"
)

var logger = log.New("pkg", "dot")
Expand Down Expand Up @@ -298,6 +300,19 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
publishMetrics(cfg)
}

gd, err := stateSrvc.Storage.GetGenesisData()
if err != nil {
return nil, err
}

if cfg.Global.NoTelemetry {
return node, nil
}

telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)
telemetry.GetInstance().SendConnection(cfg.Core.GrandpaAuthority, sysSrvc.ChainName(), stateSrvc.Block.GenesisHash().String(),
sysSrvc.SystemName(), cfg.Global.Name, sysSrvc.SystemVersion(), networkSrvc.NetworkState().PeerID, strconv.FormatInt(time.Now().UnixNano(), 10))

return node, nil
}

Expand Down
2 changes: 1 addition & 1 deletion dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type RuntimeAPI interface {
type SystemAPI interface {
SystemName() string
noot marked this conversation as resolved.
Show resolved Hide resolved
SystemVersion() string
NodeName() string
Properties() map[string]interface{}
ChainType() string
ChainName() string
}
4 changes: 2 additions & 2 deletions dot/rpc/modules/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewSystemModule(net NetworkAPI, sys SystemAPI, core CoreAPI,

// Chain returns the runtime chain
func (sm *SystemModule) Chain(r *http.Request, req *EmptyRequest, res *string) error {
*res = sm.systemAPI.NodeName()
*res = sm.systemAPI.ChainName()
return nil
}

Expand Down Expand Up @@ -161,7 +161,7 @@ func (sm *SystemModule) NodeRoles(r *http.Request, req *EmptyRequest, res *[]int
// AccountNextIndex Returns the next valid index (aka. nonce) for given account.
func (sm *SystemModule) AccountNextIndex(r *http.Request, req *StringRequest, res *U64Response) error {
if req == nil || len(req.String) == 0 {
return errors.New("Account address must be valid")
return errors.New("account address must be valid")
}
addressPubKey := crypto.PublicAddressToByteArray(common.Address(req.String))

Expand Down
3 changes: 1 addition & 2 deletions dot/rpc/modules/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,9 @@ func (api *mockSystemAPI) SystemVersion() string {
return api.info.SystemVersion
}

func (api *mockSystemAPI) NodeName() string {
func (api *mockSystemAPI) ChainName() string {
return api.genData.Name
}

func (api *mockSystemAPI) Properties() map[string]interface{} {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"os"

"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/blocktree"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/runtime"
rtstorage "github.com/ChainSafe/gossamer/lib/runtime/storage"

log "github.com/ChainSafe/log15"
)

Expand Down Expand Up @@ -334,6 +334,7 @@ func (s *Service) handleBlock(block *types.Block) error {
}
} else {
logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", block.Header.Hash())
telemetry.GetInstance().SendBlockImport(block.Header.Hash().String(), block.Header.Number)
}

// handle consensus digest for authority changes
Expand Down
4 changes: 2 additions & 2 deletions dot/system/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func (s *Service) SystemVersion() string {
return s.systemInfo.SystemVersion
}

// NodeName returns the nodeName (chain name)
func (s *Service) NodeName() string {
// ChainName returns the chain name defined in genesis.json
func (s *Service) ChainName() string {
return s.genesisData.Name
}

Expand Down
4 changes: 2 additions & 2 deletions dot/system/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
"github.com/stretchr/testify/require"
)

func TestService_NodeName(t *testing.T) {
func TestService_ChainName(t *testing.T) {
svc := newTestService()

name := svc.NodeName()
name := svc.ChainName()
require.Equal(t, "gssmr", name)
}

Expand Down
108 changes: 108 additions & 0 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2021 ChainSafe Systems (ON) Corp.
// This file is part of gossamer.
//
// The gossamer library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The gossamer library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the gossamer library. If not, see <http://www.gnu.org/licenses/>.
package telemetry

import (
"bytes"
"encoding/json"
"fmt"
"math/big"
"sync"
"time"

"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)

// Handler struct for holding telemetry related things
type Handler struct {
buf bytes.Buffer
wsConn []*websocket.Conn
telemetryLogger *log.Entry
}

// MyJSONFormatter struct for defining JSON Formatter
type MyJSONFormatter struct {
}

// Format function for handling JSON formatting, this overrides default logging formatter to remove
// log level, line number and timestamp
func (f *MyJSONFormatter) Format(entry *log.Entry) ([]byte, error) {
serialized, err := json.Marshal(entry.Data)
if err != nil {
return nil, fmt.Errorf("failed to marshal fields to JSON, %w", err)
}
return append(serialized, '\n'), nil
}

var once sync.Once
noot marked this conversation as resolved.
Show resolved Hide resolved
arijitAD marked this conversation as resolved.
Show resolved Hide resolved
var handlerInstance *Handler

// GetInstance singleton pattern to for accessing TelemetryHandler
func GetInstance() *Handler {
arijitAD marked this conversation as resolved.
Show resolved Hide resolved
if handlerInstance == nil {
once.Do(
func() {
handlerInstance = &Handler{
buf: bytes.Buffer{},
}
log.SetOutput(&handlerInstance.buf)
log.SetFormatter(new(MyJSONFormatter))
})
}
return handlerInstance
}

// AddConnections adds connections to telemetry sever
func (h *Handler) AddConnections(conns []interface{}) {
Copy link
Contributor

@noot noot Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space before AddConnections in comment pls

arijitAD marked this conversation as resolved.
Show resolved Hide resolved
for _, v := range conns {
c, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("%s", v.([]interface{})[0]), nil)
if err != nil {
fmt.Printf("Error %v\n", err)
}
h.wsConn = append(h.wsConn, c)
}
}

// SendConnection sends connection request message to telemetry connection
func (h *Handler) SendConnection(authority bool, chain, genesis_hash, system_name, node_name,
arijitAD marked this conversation as resolved.
Show resolved Hide resolved
system_version, network_id, start_time string) {
payload := log.Fields{"authority": authority, "chain": chain, "config": "", "genesis_hash": genesis_hash,
"implementation": system_name, "msg": "system.connected", "name": node_name, "network_id": network_id, "startup_time": start_time,
"version": system_version}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemtry()
}

// SendBlockImport sends block imported message to telemetry connection
func (h *Handler) SendBlockImport(best_hash string, height *big.Int) {
payload := log.Fields{"best": best_hash, "height": height.Int64(), "msg": "block.import", "origin": "NetworkInitialSync"}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemtry()
}

func (h *Handler) sendTelemtry() {
for _, c := range h.wsConn {
err := c.WriteMessage(websocket.TextMessage, h.buf.Bytes())
if err != nil {
// TODO (ed) determine how to handle this error
fmt.Printf("ERROR connecting to telemetry %v\n", err)
}
}
h.buf.Reset()
}
65 changes: 65 additions & 0 deletions dot/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package telemetry

import (
"log"
"math/big"
"net/http"
"os"
"testing"
"time"

"github.com/gorilla/websocket"
"github.com/stretchr/testify/require"
)

var upgrader = websocket.Upgrader{}
var lastMessage []byte

func TestMain(m *testing.M) {
// start server to listen for websocket connections
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
http.HandleFunc("/", listen)
go http.ListenAndServe("127.0.0.1:8001", nil)

time.Sleep(time.Millisecond)
// instantiate telemetry to connect to websocket (test) server
var testEndpoints = []interface{}{}
var testEndpoint1 = []interface{}{"ws://127.0.0.1:8001/", float64(0)}
GetInstance().AddConnections(append(testEndpoints, testEndpoint1))

// Start all tests
code := m.Run()
os.Exit(code)
}
func TestHandler_SendConnection(t *testing.T) {
expected := []byte(`{"id":1,"payload":{"authority":false,"chain":"chain","config":"","genesis_hash":"hash","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","version":"version"},"ts":`)
GetInstance().SendConnection(false, "chain", "hash", "systemName", "nodeName",
"version", "netID", "startTime")
time.Sleep(time.Millisecond)
// note, we only check the first 234 bytes because the remaining bytes are the timestamp, which we can't estimate
require.Equal(t, expected, lastMessage[:234])
}

func TestHandler_SendBlockImport(t *testing.T) {
expected := []byte(`{"id":1,"payload":{"best":"hash","height":2,"msg":"block.import","origin":"NetworkInitialSync"},"ts":`)
GetInstance().SendBlockImport("hash", big.NewInt(2))
time.Sleep(time.Millisecond)
// note, we only check the first 101 bytes because the remaining bytes are the timestamp, which we can't estimate
require.Equal(t, expected, lastMessage[:101])
}

func listen(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Error %v\n", err)
}
defer c.Close()
for {
_, msg, err := c.ReadMessage()
if err != nil {
log.Printf("read err %v", err)
break
}
lastMessage = msg
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ require (
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416
github.com/perlin-network/life v0.0.0-20191203030451-05c0e0f7eaea
github.com/rs/cors v1.7.0 // indirect
github.com/sirupsen/logrus v1.2.0
github.com/stretchr/testify v1.7.0
github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca // indirect
github.com/urfave/cli v1.20.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d h1:68u9r4wEvL3gYg2jvAOgROwZ3H+Y3hIDk4tbbmIjcYQ=
github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d/go.mod h1:5Ky9EC2xfoUKUor0Hjgi2BJhCSXJfMOFlmyYrVKGQMk=
Expand Down Expand Up @@ -601,6 +602,7 @@ github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY=
github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0=
Expand Down
Loading