Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix (dot/telemetry): refactor telemetry to reduce CPU usage #1597

Merged
merged 24 commits into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5a7d108
start new telemetry with channels
edwardmack May 17, 2021
3d2e5ce
remove old telemetry code
edwardmack May 18, 2021
e09a762
add tests
edwardmack May 18, 2021
64f9a54
Merge branch 'development' into ed/tel_channelRefactor
edwardmack May 18, 2021
8791a42
Merge branch 'development' into ed/tel_channelRefactor
edwardmack May 18, 2021
94b555c
lint
edwardmack May 18, 2021
a8bbce2
go fmt
edwardmack May 18, 2021
a2c3e01
fix anti-pattern returning unexported types
edwardmack May 18, 2021
416b241
Merge branch 'development' into ed/tel_channelRefactor
edwardmack May 19, 2021
5b0e29c
Merge branch 'development' into ed/tel_channelRefactor
edwardmack May 25, 2021
2389682
added context, send websocket messages is goroutine (broken)
edwardmack May 25, 2021
86841f7
Merge branch 'development' into ed/tel_channelRefactor
edwardmack May 25, 2021
4798c14
move mutex to handler struct
edwardmack May 25, 2021
4a2d693
Merge branch 'development' into ed/tel_channelRefactor
edwardmack May 26, 2021
5d84332
embed mutex inside Handler
edwardmack May 26, 2021
f20eaeb
clean-up formatting to one NewKeyValue per line
edwardmack May 26, 2021
28a5ef8
go fmt
edwardmack May 26, 2021
9e15a35
remove empty body anti-pattern
edwardmack May 26, 2021
14f67ac
go fmt
edwardmack May 26, 2021
8f9d4f1
add test for concurrent connections
edwardmack May 26, 2021
e5690ed
remove context from Handler
edwardmack May 26, 2021
53d8278
move mutex to telemetryConnection struct, make
edwardmack May 27, 2021
cdf1f3d
add logging
edwardmack May 27, 2021
83c05c0
add timeout and error to SendMessage, fix typos, logging
edwardmack May 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,12 @@ main:

case <-ticker.C:
o := s.host.bwc.GetBandwidthTotals()
telemetry.GetInstance().SendNetworkData(telemetry.NewNetworkData(s.host.peerCount(), o.RateIn, o.RateOut))
telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("bandwidth_download", o.RateIn),
telemetry.NewKeyValue("bandwidth_upload", o.RateOut),
telemetry.NewKeyValue("msg", "system.interval"),
telemetry.NewKeyValue("peers", s.host.peerCount())))
}

}
}

Expand All @@ -330,14 +333,14 @@ func (s *Service) sentBlockIntervalTelemetry() {
continue
}

telemetry.GetInstance().SendBlockIntervalData(&telemetry.BlockIntervalData{
BestHash: best.Hash(),
BestHeight: best.Number,
FinalizedHash: finalized.Hash(),
FinalizedHeight: finalized.Number,
TXCount: 0, // todo (ed) determine where to get tx count
UsedStateCacheSize: 0, // todo (ed) determine where to get used_state_cache_size
})
telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("best", best.Hash().String()),
telemetry.NewKeyValue("finalized_hash", finalized.Hash().String()), //nolint
telemetry.NewKeyValue("finalized_height", finalized.Number), //nolint
telemetry.NewKeyValue("height", best.Number),
telemetry.NewKeyValue("msg", "system.interval"),
telemetry.NewKeyValue("txcount", 0), // todo (ed) determine where to get tx count
telemetry.NewKeyValue("used_state_cache_size", 0))) // todo (ed) determine where to get used_state_cache_size
time.Sleep(s.telemetryInterval)
}
}
Expand Down
22 changes: 11 additions & 11 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,17 +350,17 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
}

telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)
data := &telemetry.ConnectionData{
Authority: cfg.Core.GrandpaAuthority,
Chain: sysSrvc.ChainName(),
GenesisHash: stateSrvc.Block.GenesisHash().String(),
SystemName: sysSrvc.SystemName(),
NodeName: cfg.Global.Name,
SystemVersion: sysSrvc.SystemVersion(),
NetworkID: networkSrvc.NetworkState().PeerID,
StartTime: strconv.FormatInt(time.Now().UnixNano(), 10),
}
telemetry.GetInstance().SendConnection(data)

telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("authority", cfg.Core.GrandpaAuthority),
telemetry.NewKeyValue("chain", sysSrvc.ChainName()),
telemetry.NewKeyValue("genesis_hash", stateSrvc.Block.GenesisHash().String()),
telemetry.NewKeyValue("implementation", sysSrvc.SystemName()),
telemetry.NewKeyValue("msg", "system.connected"),
telemetry.NewKeyValue("name", cfg.Global.Name),
telemetry.NewKeyValue("network_id", networkSrvc.NetworkState().PeerID),
telemetry.NewKeyValue("startup_time", strconv.FormatInt(time.Now().UnixNano(), 10)),
telemetry.NewKeyValue("version", sysSrvc.SystemVersion())))

return node, nil
}
Expand Down
7 changes: 6 additions & 1 deletion dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,12 @@ func (s *Service) handleBlock(block *types.Block) error {
}
} else {
logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", block.Header.Hash())
telemetry.GetInstance().SendBlockImport(block.Header.Hash().String(), block.Header.Number)
telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("best", block.Header.Hash().String()),
telemetry.NewKeyValue("height", block.Header.Number.Uint64()),
telemetry.NewKeyValue("msg", "block.import"),
telemetry.NewKeyValue("origin", "NetworkInitialSync")))
// todo(ed) add timer to avoid a lot of sends
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this cause a problem?

Copy link
Contributor

@noot noot May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, is block.import used for all new builds (ie ones we built as well?) if so this might be better placed in BlockState.AddBlock. however if it's strictly imported blocks this is fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, it hasn't yet.

}

// handle consensus digest for authority changes
Expand Down
192 changes: 76 additions & 116 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,38 @@
package telemetry

import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/big"
"sync"
"time"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/genesis"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)

// Handler struct for holding telemetry related things
type Handler struct {
buf bytes.Buffer
wsConn []*websocket.Conn
sync.RWMutex
type telemetryConnection struct {
wsconn *websocket.Conn
verbosity int
}

// MyJSONFormatter struct for defining JSON Formatter
type MyJSONFormatter struct {
// Message struct to hold telemetry message data
type Message struct {
values map[string]interface{}
}

// Format function for handling JSON formatting, this overrides default logging formatter to remove
// log level, line number and timestamp
func (f *MyJSONFormatter) Format(entry *log.Entry) ([]byte, error) {
serialised, err := json.Marshal(entry.Data)
if err != nil {
return nil, fmt.Errorf("failed to marshal fields to JSON, %w", err)
}
return append(serialised, '\n'), nil
// Handler struct for holding telemetry related things
type Handler struct {
msg chan Message
ctx context.Context
connections []telemetryConnection
sync.Mutex
}

// KeyValue object to hold key value pairs used in telemetry messages
type KeyValue struct {
key string
value interface{}
}

var (
Expand All @@ -57,126 +57,86 @@ var (
)

// GetInstance singleton pattern to for accessing TelemetryHandler
func GetInstance() *Handler {
func GetInstance() *Handler { //nolint
if handlerInstance == nil {
once.Do(
func() {
handlerInstance = &Handler{
buf: bytes.Buffer{},
msg: make(chan Message, 256),
ctx: context.Background(),
}
log.SetOutput(&handlerInstance.buf)
log.SetFormatter(new(MyJSONFormatter))
go handlerInstance.sender()
go handlerInstance.startListening()
})
}
return handlerInstance
}

// AddConnections adds connections to telemetry sever
func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {
// NewTelemetryMessage builds a telemetry message
func NewTelemetryMessage(values ...*KeyValue) *Message { //nolint
mvals := make(map[string]interface{})
for _, v := range values {
mvals[v.key] = v.value
}
return &Message{
values: mvals,
}
}

// NewKeyValue builds a key value pair for telemetry messages
func NewKeyValue(key string, value interface{}) *KeyValue { //nolint
return &KeyValue{
key: key,
value: value,
}
}

// AddConnections adds the given telemetry endpoint as listeners that will receive telemetry data
func (t *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {
for _, v := range conns {
c, _, err := websocket.DefaultDialer.Dial(v.Endpoint, nil)
if err != nil {
// todo (ed) try reconnecting if there is an error connecting
fmt.Printf("Error %v\n", err)
continue
}
h.wsConn = append(h.wsConn, c)
}
}

// ConnectionData struct to hold connection data
type ConnectionData struct {
Authority bool
Chain string
GenesisHash string
SystemName string
NodeName string
SystemVersion string
NetworkID string
StartTime string
}

// SendConnection sends connection request message to telemetry connection
func (h *Handler) SendConnection(data *ConnectionData) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"authority": data.Authority, "chain": data.Chain, "config": "", "genesis_hash": data.GenesisHash,
"implementation": data.SystemName, "msg": "system.connected", "name": data.NodeName, "network_id": data.NetworkID, "startup_time": data.StartTime,
"version": data.SystemVersion}
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
}

// SendBlockImport sends block imported message to telemetry connection
func (h *Handler) SendBlockImport(bestHash string, height *big.Int) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"best": bestHash, "height": height.Int64(), "msg": "block.import", "origin": "NetworkInitialSync"}
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
}

// NetworkData struct to hold network data telemetry information
type NetworkData struct {
peers int
rateIn float64
rateOut float64
}

// NewNetworkData creates networkData struct
func NewNetworkData(peers int, rateIn, rateOut float64) *NetworkData {
return &NetworkData{
peers: peers,
rateIn: rateIn,
rateOut: rateOut,
tConn := telemetryConnection{
wsconn: c,
verbosity: v.Verbosity,
}
t.connections = append(t.connections, tConn)
}
}

// SendNetworkData send network data system.interval message to telemetry connection
func (h *Handler) SendNetworkData(data *NetworkData) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"bandwidth_download": data.rateIn, "bandwidth_upload": data.rateOut, "msg": "system.interval", "peers": data.peers}
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
}

// BlockIntervalData struct to hold data for block system.interval message
type BlockIntervalData struct {
BestHash common.Hash
BestHeight *big.Int
FinalizedHash common.Hash
FinalizedHeight *big.Int
TXCount int
UsedStateCacheSize int
// SendMessage sends Message to connected telemetry listeners
func (t *Handler) SendMessage(msg *Message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (t *Handler) SendMessage(msg *Message) {
func (h *Handler) SendMessage(msg *Message) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

t.msg <- *msg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

improvement: add a timeout and error return for this method. I see that that it's a buffered channel of length 256, so this shouldn't happen, but it's a good practice so that this function call isn't blocking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, I've added timeout and return error.

}

// SendBlockIntervalData send block data system interval information to telemetry connection
func (h *Handler) SendBlockIntervalData(data *BlockIntervalData) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"best": data.BestHash.String(), "finalized_hash": data.FinalizedHash.String(), // nolint
"finalized_height": data.FinalizedHeight, "height": data.BestHeight, "msg": "system.interval", "txcount": data.TXCount, // nolint
"used_state_cache_size": data.UsedStateCacheSize}
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
}

func (h *Handler) sender() {
func (t *Handler) startListening() {
for {
h.RLock()
line, err := h.buf.ReadBytes(byte(10)) // byte 10 is newline character, used as delimiter
h.RUnlock()
if err != nil {
continue
select {
case msg := <-t.msg:
go func() {
t.Lock()
for _, v := range t.connections {
v.wsconn.WriteMessage(websocket.TextMessage, msgToBytes(msg)) // nolint
}
t.Unlock()
}()
case <-t.ctx.Done():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context is not required since we are never canceling it. This will never be triggered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed context

return
}
}
}

for _, c := range h.wsConn {
err := c.WriteMessage(websocket.TextMessage, line)
if err != nil {
// TODO (ed) determine how to handle this error
fmt.Printf("ERROR connecting to telemetry %v\n", err)
}
}
func msgToBytes(message Message) []byte {
res := make(map[string]interface{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe create a helper struct to construct the json response.

type response struct {
  ID int `json:"id"`
  ....
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, created.

res["id"] = 1 // todo (ed) determine how this is used
res["payload"] = message.values
res["ts"] = time.Now()
resB, err := json.Marshal(res)
if err != nil {
return nil
}
return resB
}
Loading