Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dot/telemetry: Implement basic telemetry connection #1497

Merged
merged 20 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/gossamer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ func setDotGlobalConfig(ctx *cli.Context, tomlCfg *ctoml.Config, cfg *dot.Global
cfg.MetricsPort = tomlCfg.Global.MetricsPort
}

// TODO: generate random name if one is not assigned (see issue #1496)
// check --name flag and update node configuration
if name := ctx.GlobalString(NameFlag.Name); name != "" {
cfg.Name = name
Expand Down Expand Up @@ -433,6 +434,8 @@ func setDotGlobalConfig(ctx *cli.Context, tomlCfg *ctoml.Config, cfg *dot.Global
cfg.MetricsPort = uint32(metricsPort)
}

cfg.NoTelemetry = ctx.Bool("no-telemetry")
arijitAD marked this conversation as resolved.
Show resolved Hide resolved

logger.Debug(
"global configuration",
"name", cfg.Name,
Expand Down
14 changes: 14 additions & 0 deletions cmd/gossamer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,20 @@ func TestGlobalConfigFromFlags(t *testing.T) {
MetricsPort: uint32(9871),
},
},
{
"Test gossamer --no-telemetry",
[]string{"config", "no-telemetry"},
[]interface{}{testCfgFile.Name(), true},
dot.GlobalConfig{
Name: testCfg.Global.Name,
ID: testCfg.Global.ID,
BasePath: testCfg.Global.BasePath,
LogLvl: log.LvlInfo,
PublishMetrics: testCfg.Global.PublishMetrics,
MetricsPort: testCfg.Global.MetricsPort,
NoTelemetry: true,
},
},
}

for _, c := range testcases {
Expand Down
9 changes: 9 additions & 0 deletions cmd/gossamer/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ var (
Name: "metrics-port",
Usage: "Set metric listening port ",
}

// NoTelemetryFlag stops publishing telemetry to default defined in genesis.json
NoTelemetryFlag = cli.BoolFlag{
Name: "no-telemetry",
Usage: "Disable connecting to the Substrate telemetry server",
}
)

// Initialization-only flags
Expand Down Expand Up @@ -295,6 +301,9 @@ var (
// metrics flag
PublishMetricsFlag,
MetricsPortFlag,

// telemetry flags
NoTelemetryFlag,
}
)

Expand Down
1 change: 1 addition & 0 deletions dot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type GlobalConfig struct {
LogLvl log.Lvl
PublishMetrics bool
MetricsPort uint32
NoTelemetry bool
}

// LogConfig represents the log levels for individual packages
Expand Down
30 changes: 27 additions & 3 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,23 @@ import (
"os/signal"
"path"
"runtime/debug"
"strconv"
"sync"
"syscall"
"time"

"github.com/ChainSafe/chaindb"
gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics"
"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/genesis"
"github.com/ChainSafe/gossamer/lib/keystore"
"github.com/ChainSafe/gossamer/lib/services"
log "github.com/ChainSafe/log15"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/metrics/prometheus"

"github.com/ChainSafe/chaindb"
log "github.com/ChainSafe/log15"
)

var logger = log.New("pkg", "dot")
Expand Down Expand Up @@ -306,6 +308,28 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
publishMetrics(cfg)
}

gd, err := stateSrvc.Storage.GetGenesisData()
if err != nil {
return nil, err
}

if cfg.Global.NoTelemetry {
return node, nil
}

telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)
data := &telemetry.ConnectionData{
Authority: cfg.Core.GrandpaAuthority,
Chain: sysSrvc.ChainName(),
GenesisHash: stateSrvc.Block.GenesisHash().String(),
SystemName: sysSrvc.SystemName(),
NodeName: cfg.Global.Name,
SystemVersion: sysSrvc.SystemVersion(),
NetworkID: networkSrvc.NetworkState().PeerID,
StartTime: strconv.FormatInt(time.Now().UnixNano(), 10),
}
telemetry.GetInstance().SendConnection(data)

return node, nil
}

Expand Down
2 changes: 1 addition & 1 deletion dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type RuntimeAPI interface {
type SystemAPI interface {
SystemName() string
noot marked this conversation as resolved.
Show resolved Hide resolved
SystemVersion() string
NodeName() string
Properties() map[string]interface{}
ChainType() string
ChainName() string
}
4 changes: 2 additions & 2 deletions dot/rpc/modules/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewSystemModule(net NetworkAPI, sys SystemAPI, core CoreAPI,

// Chain returns the runtime chain
func (sm *SystemModule) Chain(r *http.Request, req *EmptyRequest, res *string) error {
*res = sm.systemAPI.NodeName()
*res = sm.systemAPI.ChainName()
return nil
}

Expand Down Expand Up @@ -161,7 +161,7 @@ func (sm *SystemModule) NodeRoles(r *http.Request, req *EmptyRequest, res *[]int
// AccountNextIndex Returns the next valid index (aka. nonce) for given account.
func (sm *SystemModule) AccountNextIndex(r *http.Request, req *StringRequest, res *U64Response) error {
if req == nil || len(req.String) == 0 {
return errors.New("Account address must be valid")
return errors.New("account address must be valid")
}
addressPubKey := crypto.PublicAddressToByteArray(common.Address(req.String))

Expand Down
3 changes: 1 addition & 2 deletions dot/rpc/modules/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,9 @@ func (api *mockSystemAPI) SystemVersion() string {
return api.info.SystemVersion
}

func (api *mockSystemAPI) NodeName() string {
func (api *mockSystemAPI) ChainName() string {
return api.genData.Name
}

func (api *mockSystemAPI) Properties() map[string]interface{} {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"os"

"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/blocktree"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/runtime"
rtstorage "github.com/ChainSafe/gossamer/lib/runtime/storage"

log "github.com/ChainSafe/log15"
)

Expand Down Expand Up @@ -339,6 +339,7 @@ func (s *Service) handleBlock(block *types.Block) error {
}
} else {
logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", block.Header.Hash())
telemetry.GetInstance().SendBlockImport(block.Header.Hash().String(), block.Header.Number)
}

// handle consensus digest for authority changes
Expand Down
4 changes: 2 additions & 2 deletions dot/system/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func (s *Service) SystemVersion() string {
return s.systemInfo.SystemVersion
}

// NodeName returns the nodeName (chain name)
func (s *Service) NodeName() string {
// ChainName returns the chain name defined in genesis.json
func (s *Service) ChainName() string {
return s.genesisData.Name
}

Expand Down
4 changes: 2 additions & 2 deletions dot/system/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
"github.com/stretchr/testify/require"
)

func TestService_NodeName(t *testing.T) {
func TestService_ChainName(t *testing.T) {
svc := newTestService()

name := svc.NodeName()
name := svc.ChainName()
require.Equal(t, "gssmr", name)
}

Expand Down
123 changes: 123 additions & 0 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2021 ChainSafe Systems (ON) Corp.
// This file is part of gossamer.
//
// The gossamer library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The gossamer library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the gossamer library. If not, see <http://www.gnu.org/licenses/>.
package telemetry

import (
"bytes"
"encoding/json"
"fmt"
"math/big"
"sync"
"time"

"github.com/ChainSafe/gossamer/lib/genesis"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)

// Handler struct for holding telemetry related things
type Handler struct {
buf bytes.Buffer
wsConn []*websocket.Conn
telemetryLogger *log.Entry
}

// MyJSONFormatter struct for defining JSON Formatter
type MyJSONFormatter struct {
}

// Format function for handling JSON formatting, this overrides default logging formatter to remove
// log level, line number and timestamp
func (f *MyJSONFormatter) Format(entry *log.Entry) ([]byte, error) {
serialized, err := json.Marshal(entry.Data)
if err != nil {
return nil, fmt.Errorf("failed to marshal fields to JSON, %w", err)
}
return append(serialized, '\n'), nil
}

var (
once sync.Once
handlerInstance *Handler
)

// GetInstance singleton pattern to for accessing TelemetryHandler
func GetInstance() *Handler {
arijitAD marked this conversation as resolved.
Show resolved Hide resolved
if handlerInstance == nil {
once.Do(
func() {
handlerInstance = &Handler{
buf: bytes.Buffer{},
}
log.SetOutput(&handlerInstance.buf)
log.SetFormatter(new(MyJSONFormatter))
})
}
return handlerInstance
}

// AddConnections adds connections to telemetry sever
func (h *Handler) AddConnections(conns []genesis.TelemetryEndpoint) {
for _, v := range conns {
c, _, err := websocket.DefaultDialer.Dial(v.Endpoint, nil)
if err != nil {
fmt.Printf("Error %v\n", err)
return
}
h.wsConn = append(h.wsConn, c)
}
}

// ConnectionData struct to hold connection data
type ConnectionData struct {
Authority bool
Chain string
GenesisHash string
SystemName string
NodeName string
SystemVersion string
NetworkID string
StartTime string
}

// SendConnection sends connection request message to telemetry connection
func (h *Handler) SendConnection(data *ConnectionData) {
payload := log.Fields{"authority": data.Authority, "chain": data.Chain, "config": "", "genesis_hash": data.GenesisHash,
"implementation": data.SystemName, "msg": "system.connected", "name": data.NodeName, "network_id": data.NetworkID, "startup_time": data.StartTime,
"version": data.SystemVersion}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemtry()
}

// SendBlockImport sends block imported message to telemetry connection
func (h *Handler) SendBlockImport(bestHash string, height *big.Int) {
payload := log.Fields{"best": bestHash, "height": height.Int64(), "msg": "block.import", "origin": "NetworkInitialSync"}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemtry()
}

func (h *Handler) sendTelemtry() {
for _, c := range h.wsConn {
err := c.WriteMessage(websocket.TextMessage, h.buf.Bytes())
if err != nil {
// TODO (ed) determine how to handle this error
fmt.Printf("ERROR connecting to telemetry %v\n", err)
}
}
h.buf.Reset()
}
Loading