Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dot/telemetry: Implement basic telemetry connection #1497

Merged
merged 20 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,17 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
}

telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)
telemetry.GetInstance().SendConnection(cfg.Core.GrandpaAuthority, sysSrvc.ChainName(), stateSrvc.Block.GenesisHash().String(),
sysSrvc.SystemName(), cfg.Global.Name, sysSrvc.SystemVersion(), networkSrvc.NetworkState().PeerID, strconv.FormatInt(time.Now().UnixNano(), 10))
data := &telemetry.ConnectionData{
Authority: cfg.Core.GrandpaAuthority,
Chain: sysSrvc.ChainName(),
GenesisHash: stateSrvc.Block.GenesisHash().String(),
SystemName: sysSrvc.SystemName(),
NodeName: cfg.Global.Name,
SystemVersion: sysSrvc.SystemVersion(),
NetworkID: networkSrvc.NetworkState().PeerID,
StartTime: strconv.FormatInt(time.Now().UnixNano(), 10),
}
telemetry.GetInstance().SendConnection(data)

return node, nil
}
Expand Down
37 changes: 26 additions & 11 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/ChainSafe/gossamer/lib/genesis"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)
Expand All @@ -48,8 +49,10 @@ func (f *MyJSONFormatter) Format(entry *log.Entry) ([]byte, error) {
return append(serialized, '\n'), nil
}

var once sync.Once
var handlerInstance *Handler
var (
once sync.Once
handlerInstance *Handler
)

// GetInstance singleton pattern to for accessing TelemetryHandler
func GetInstance() *Handler {
arijitAD marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -67,30 +70,42 @@ func GetInstance() *Handler {
}

// AddConnections adds connections to telemetry sever
func (h *Handler) AddConnections(conns []interface{}) {
func (h *Handler) AddConnections(conns []genesis.TelemetryEndpoint) {
for _, v := range conns {
c, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("%s", v.([]interface{})[0]), nil)
c, _, err := websocket.DefaultDialer.Dial(v.Endpoint, nil)
if err != nil {
fmt.Printf("Error %v\n", err)
return
}
h.wsConn = append(h.wsConn, c)
}
}

// ConnectionData struct to hold connection data
type ConnectionData struct {
Authority bool
Chain string
GenesisHash string
SystemName string
NodeName string
SystemVersion string
NetworkID string
StartTime string
}

// SendConnection sends connection request message to telemetry connection
func (h *Handler) SendConnection(authority bool, chain, genesis_hash, system_name, node_name,
system_version, network_id, start_time string) {
payload := log.Fields{"authority": authority, "chain": chain, "config": "", "genesis_hash": genesis_hash,
"implementation": system_name, "msg": "system.connected", "name": node_name, "network_id": network_id, "startup_time": start_time,
"version": system_version}
func (h *Handler) SendConnection(data *ConnectionData) {
payload := log.Fields{"authority": data.Authority, "chain": data.Chain, "config": "", "genesis_hash": data.GenesisHash,
"implementation": data.SystemName, "msg": "system.connected", "name": data.NodeName, "network_id": data.NetworkID, "startup_time": data.StartTime,
"version": data.SystemVersion}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemtry()
}

// SendBlockImport sends block imported message to telemetry connection
func (h *Handler) SendBlockImport(best_hash string, height *big.Int) {
payload := log.Fields{"best": best_hash, "height": height.Int64(), "msg": "block.import", "origin": "NetworkInitialSync"}
func (h *Handler) SendBlockImport(bestHash string, height *big.Int) {
payload := log.Fields{"best": bestHash, "height": height.Int64(), "msg": "block.import", "origin": "NetworkInitialSync"}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemtry()
Expand Down
21 changes: 17 additions & 4 deletions dot/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/ChainSafe/gossamer/lib/genesis"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/require"
)
Expand All @@ -23,8 +24,11 @@ func TestMain(m *testing.M) {

time.Sleep(time.Millisecond)
// instantiate telemetry to connect to websocket (test) server
var testEndpoints = []interface{}{}
var testEndpoint1 = []interface{}{"ws://127.0.0.1:8001/", float64(0)}
var testEndpoints []genesis.TelemetryEndpoint
var testEndpoint1 = genesis.TelemetryEndpoint{
Endpoint: "ws://127.0.0.1:8001/",
Verbosity: 0,
}
GetInstance().AddConnections(append(testEndpoints, testEndpoint1))

// Start all tests
Expand All @@ -33,8 +37,17 @@ func TestMain(m *testing.M) {
}
func TestHandler_SendConnection(t *testing.T) {
expected := []byte(`{"id":1,"payload":{"authority":false,"chain":"chain","config":"","genesis_hash":"hash","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","version":"version"},"ts":`)
GetInstance().SendConnection(false, "chain", "hash", "systemName", "nodeName",
"version", "netID", "startTime")
data := &ConnectionData{
Authority: false,
Chain: "chain",
GenesisHash: "hash",
SystemName: "systemName",
NodeName: "nodeName",
SystemVersion: "version",
NetworkID: "netID",
StartTime: "startTime",
}
GetInstance().SendConnection(data)
time.Sleep(time.Millisecond)
// note, we only check the first 234 bytes because the remaining bytes are the timestamp, which we can't estimate
require.Equal(t, expected, lastMessage[:234])
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 // indirect
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect
golang.org/x/tools v0.0.0-20200221224223-e1da425f72fd // indirect
golang.org/x/tools v0.0.0-20200221224223-e1da425f72fd
google.golang.org/appengine v1.6.5 // indirect
google.golang.org/protobuf v1.25.0
)
Expand Down
35 changes: 33 additions & 2 deletions lib/genesis/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@ type Data struct {
ID string
ChainType string
Bootnodes [][]byte
TelemetryEndpoints []interface{}
TelemetryEndpoints []TelemetryEndpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this to pointer.
TelemetryEndpoints []*TelemetryEndpoint

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

ProtocolID string
Properties map[string]interface{}
ForkBlocks []string
BadBlocks []string
ConsensusEngine string
}

// TelemetryEndpoint struct to hold telemetry endpoint information
type TelemetryEndpoint struct {
Endpoint string
Verbosity int
}

// Fields stores genesis raw data, and human readable runtime data
type Fields struct {
Raw map[string]map[string]string `json:"raw,omitempty"`
Expand All @@ -62,7 +68,7 @@ func (g *Genesis) GenesisData() *Data {
ID: g.ID,
ChainType: g.ChainType,
Bootnodes: common.StringArrayToBytes(g.Bootnodes),
TelemetryEndpoints: g.TelemetryEndpoints,
TelemetryEndpoints: interfaceToTelemetryEndpoint(g.TelemetryEndpoints),
ProtocolID: g.ProtocolID,
Properties: g.Properties,
ForkBlocks: g.ForkBlocks,
Expand Down Expand Up @@ -97,3 +103,28 @@ func (g *Genesis) ToRaw() error {
g.Genesis.Raw["top"] = res
return nil
}

func interfaceToTelemetryEndpoint(endpoints []interface{}) []TelemetryEndpoint {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return []*TelemetryEndpoint

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

var res []TelemetryEndpoint
for _, v := range endpoints {
epi, ok := v.([]interface{})
if !ok {
continue
}
eps, ok := epi[0].(string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verfiy len(epi) == 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added check.

if !ok {
continue
}
epv, ok := epi[1].(float64)
if !ok {
continue
}
ep := TelemetryEndpoint{
Endpoint: eps,
Verbosity: int(epv),
}
res = append(res, ep)
}

return res
}