Skip to content

Commit

Permalink
add timeout and error to SendMessage, fix typos, logging
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardmack committed May 28, 2021
1 parent cdf1f3d commit 83c05c0
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 22 deletions.
10 changes: 8 additions & 2 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,14 @@ main:

case <-ticker.C:
o := s.host.bwc.GetBandwidthTotals()
telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
err := telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("bandwidth_download", o.RateIn),
telemetry.NewKeyValue("bandwidth_upload", o.RateOut),
telemetry.NewKeyValue("msg", "system.interval"),
telemetry.NewKeyValue("peers", s.host.peerCount())))
if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
}
}
}
Expand All @@ -333,14 +336,17 @@ func (s *Service) sentBlockIntervalTelemetry() {
continue
}

telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("best", best.Hash().String()),
telemetry.NewKeyValue("finalized_hash", finalized.Hash().String()), //nolint
telemetry.NewKeyValue("finalized_height", finalized.Number), //nolint
telemetry.NewKeyValue("height", best.Number),
telemetry.NewKeyValue("msg", "system.interval"),
telemetry.NewKeyValue("txcount", 0), // todo (ed) determine where to get tx count
telemetry.NewKeyValue("used_state_cache_size", 0))) // todo (ed) determine where to get used_state_cache_size
if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
time.Sleep(s.telemetryInterval)
}
}
Expand Down
6 changes: 4 additions & 2 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,

telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)

telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("authority", cfg.Core.GrandpaAuthority),
telemetry.NewKeyValue("chain", sysSrvc.ChainName()),
telemetry.NewKeyValue("genesis_hash", stateSrvc.Block.GenesisHash().String()),
Expand All @@ -361,7 +361,9 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
telemetry.NewKeyValue("network_id", networkSrvc.NetworkState().PeerID),
telemetry.NewKeyValue("startup_time", strconv.FormatInt(time.Now().UnixNano(), 10)),
telemetry.NewKeyValue("version", sysSrvc.SystemVersion())))

if err != nil {
logger.Debug("problem sending system.connected telemetry message", "err", err)
}
return node, nil
}

Expand Down
6 changes: 4 additions & 2 deletions dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,14 @@ func (s *Service) handleBlock(block *types.Block) error {
}
} else {
logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", block.Header.Hash())
telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
err := telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("best", block.Header.Hash().String()),
telemetry.NewKeyValue("height", block.Header.Number.Uint64()),
telemetry.NewKeyValue("msg", "block.import"),
telemetry.NewKeyValue("origin", "NetworkInitialSync")))
// todo(ed) add timer to avoid a lot of sends
if err != nil {
logger.Debug("problem sending block.import telemetry message", "error", err)
}
}

// handle consensus digest for authority changes
Expand Down
46 changes: 30 additions & 16 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package telemetry

import (
"encoding/json"
"errors"
"sync"
"time"

Expand Down Expand Up @@ -90,48 +91,61 @@ func NewKeyValue(key string, value interface{}) *KeyValue { //nolint
}

// AddConnections adds the given telemetry endpoint as listeners that will receive telemetry data
func (t *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {
func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {
for _, v := range conns {
c, _, err := websocket.DefaultDialer.Dial(v.Endpoint, nil)
if err != nil {
// todo (ed) try reconnecting if there is an error connecting
t.log.Warn("issue adding telemetry connection", err)
h.log.Debug("issue adding telemetry connection", "error", err)
continue
}
tConn := &telemetryConnection{
wsconn: c,
verbosity: v.Verbosity,
}
t.connections = append(t.connections, tConn)
h.connections = append(h.connections, tConn)
}
}

// SendMessage sends Message to connected telemetry listeners
func (t *Handler) SendMessage(msg *Message) {
t.msg <- *msg
func (h *Handler) SendMessage(msg *Message) error {
select {
case h.msg <- *msg:

case <-time.After(time.Second * 1):
return errors.New("timeout sending message")
}
return nil
}

func (t *Handler) startListening() {
func (h *Handler) startListening() {
for {
msg := <-t.msg
msg := <-h.msg
go func() {
for _, v := range t.connections {
v.Lock()
err := v.wsconn.WriteMessage(websocket.TextMessage, msgToBytes(msg))
for _, conn := range h.connections {
conn.Lock()
err := conn.wsconn.WriteMessage(websocket.TextMessage, msgToBytes(msg))
if err != nil {
t.log.Warn("issue while sending telemetry message", err)
h.log.Warn("issue while sending telemetry message", "error", err)
}
v.Unlock()
conn.Unlock()
}
}()
}
}

type response struct {
ID int `json:"id"`
Payload map[string]interface{} `json:"payload"`
Timestamp time.Time `json:"ts"`
}

func msgToBytes(message Message) []byte {
res := make(map[string]interface{})
res["id"] = 1 // todo (ed) determine how this is used
res["payload"] = message.values
res["ts"] = time.Now()
res := response{
ID: 1, // todo (ed) determine how this is used
Payload: message.values,
Timestamp: time.Now(),
}
resB, err := json.Marshal(res)
if err != nil {
return nil
Expand Down

0 comments on commit 83c05c0

Please sign in to comment.