-
Notifications
You must be signed in to change notification settings - Fork 129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix (dot/telemetry): refactor telemetry to reduce CPU usage #1597
Changes from 23 commits
5a7d108
3d2e5ce
e09a762
64f9a54
8791a42
94b555c
a8bbce2
a2c3e01
416b241
5b0e29c
2389682
86841f7
4798c14
4a2d693
5d84332
f20eaeb
28a5ef8
9e15a35
14f67ac
8f9d4f1
e5690ed
53d8278
cdf1f3d
83c05c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -17,38 +17,37 @@ | |||||
package telemetry | ||||||
|
||||||
import ( | ||||||
"bytes" | ||||||
"encoding/json" | ||||||
"fmt" | ||||||
"math/big" | ||||||
"sync" | ||||||
"time" | ||||||
|
||||||
"github.com/ChainSafe/gossamer/lib/common" | ||||||
"github.com/ChainSafe/gossamer/lib/genesis" | ||||||
log "github.com/ChainSafe/log15" | ||||||
"github.com/gorilla/websocket" | ||||||
log "github.com/sirupsen/logrus" | ||||||
) | ||||||
|
||||||
// Handler struct for holding telemetry related things | ||||||
type Handler struct { | ||||||
buf bytes.Buffer | ||||||
wsConn []*websocket.Conn | ||||||
sync.RWMutex | ||||||
type telemetryConnection struct { | ||||||
wsconn *websocket.Conn | ||||||
verbosity int | ||||||
sync.Mutex | ||||||
} | ||||||
|
||||||
// MyJSONFormatter struct for defining JSON Formatter | ||||||
type MyJSONFormatter struct { | ||||||
// Message struct to hold telemetry message data | ||||||
type Message struct { | ||||||
values map[string]interface{} | ||||||
} | ||||||
|
||||||
// Format function for handling JSON formatting, this overrides default logging formatter to remove | ||||||
// log level, line number and timestamp | ||||||
func (f *MyJSONFormatter) Format(entry *log.Entry) ([]byte, error) { | ||||||
serialised, err := json.Marshal(entry.Data) | ||||||
if err != nil { | ||||||
return nil, fmt.Errorf("failed to marshal fields to JSON, %w", err) | ||||||
} | ||||||
return append(serialised, '\n'), nil | ||||||
// Handler struct for holding telemetry related things | ||||||
type Handler struct { | ||||||
msg chan Message | ||||||
connections []*telemetryConnection | ||||||
log log.Logger | ||||||
} | ||||||
|
||||||
// KeyValue object to hold key value pairs used in telemetry messages | ||||||
type KeyValue struct { | ||||||
key string | ||||||
value interface{} | ||||||
} | ||||||
|
||||||
var ( | ||||||
|
@@ -57,126 +56,85 @@ var ( | |||||
) | ||||||
|
||||||
// GetInstance singleton pattern to for accessing TelemetryHandler | ||||||
func GetInstance() *Handler { | ||||||
func GetInstance() *Handler { //nolint | ||||||
if handlerInstance == nil { | ||||||
once.Do( | ||||||
func() { | ||||||
handlerInstance = &Handler{ | ||||||
buf: bytes.Buffer{}, | ||||||
msg: make(chan Message, 256), | ||||||
log: log.New("pkg", "telemetry"), | ||||||
} | ||||||
log.SetOutput(&handlerInstance.buf) | ||||||
log.SetFormatter(new(MyJSONFormatter)) | ||||||
go handlerInstance.sender() | ||||||
go handlerInstance.startListening() | ||||||
}) | ||||||
} | ||||||
return handlerInstance | ||||||
} | ||||||
|
||||||
// AddConnections adds connections to telemetry sever | ||||||
func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) { | ||||||
// NewTelemetryMessage builds a telemetry message | ||||||
func NewTelemetryMessage(values ...*KeyValue) *Message { //nolint | ||||||
mvals := make(map[string]interface{}) | ||||||
for _, v := range values { | ||||||
mvals[v.key] = v.value | ||||||
} | ||||||
return &Message{ | ||||||
values: mvals, | ||||||
} | ||||||
} | ||||||
|
||||||
// NewKeyValue builds a key value pair for telemetry messages | ||||||
func NewKeyValue(key string, value interface{}) *KeyValue { //nolint | ||||||
return &KeyValue{ | ||||||
key: key, | ||||||
value: value, | ||||||
} | ||||||
} | ||||||
|
||||||
// AddConnections adds the given telemetry endpoint as listeners that will receive telemetry data | ||||||
func (t *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) { | ||||||
for _, v := range conns { | ||||||
c, _, err := websocket.DefaultDialer.Dial(v.Endpoint, nil) | ||||||
if err != nil { | ||||||
fmt.Printf("Error %v\n", err) | ||||||
// todo (ed) try reconnecting if there is an error connecting | ||||||
t.log.Warn("issue adding telemetry connection", err) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. logger needs even arguments (apart from the initial message string) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, updated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||||||
continue | ||||||
} | ||||||
h.wsConn = append(h.wsConn, c) | ||||||
tConn := &telemetryConnection{ | ||||||
wsconn: c, | ||||||
verbosity: v.Verbosity, | ||||||
} | ||||||
t.connections = append(t.connections, tConn) | ||||||
} | ||||||
} | ||||||
|
||||||
// ConnectionData struct to hold connection data | ||||||
type ConnectionData struct { | ||||||
Authority bool | ||||||
Chain string | ||||||
GenesisHash string | ||||||
SystemName string | ||||||
NodeName string | ||||||
SystemVersion string | ||||||
NetworkID string | ||||||
StartTime string | ||||||
} | ||||||
|
||||||
// SendConnection sends connection request message to telemetry connection | ||||||
func (h *Handler) SendConnection(data *ConnectionData) { | ||||||
h.Lock() | ||||||
defer h.Unlock() | ||||||
payload := log.Fields{"authority": data.Authority, "chain": data.Chain, "config": "", "genesis_hash": data.GenesisHash, | ||||||
"implementation": data.SystemName, "msg": "system.connected", "name": data.NodeName, "network_id": data.NetworkID, "startup_time": data.StartTime, | ||||||
"version": data.SystemVersion} | ||||||
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()}) | ||||||
telemetryLogger.Print() | ||||||
} | ||||||
|
||||||
// SendBlockImport sends block imported message to telemetry connection | ||||||
func (h *Handler) SendBlockImport(bestHash string, height *big.Int) { | ||||||
h.Lock() | ||||||
defer h.Unlock() | ||||||
payload := log.Fields{"best": bestHash, "height": height.Int64(), "msg": "block.import", "origin": "NetworkInitialSync"} | ||||||
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()}) | ||||||
telemetryLogger.Print() | ||||||
// SendMessage sends Message to connected telemetry listeners | ||||||
func (t *Handler) SendMessage(msg *Message) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||||||
t.msg <- *msg | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. improvement: add a timeout and error return for this method. I see that that it's a buffered channel of length 256, so this shouldn't happen, but it's a good practice so that this function call isn't blocking. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good suggestion, I've added timeout and return error. |
||||||
} | ||||||
|
||||||
// NetworkData struct to hold network data telemetry information | ||||||
type NetworkData struct { | ||||||
peers int | ||||||
rateIn float64 | ||||||
rateOut float64 | ||||||
} | ||||||
|
||||||
// NewNetworkData creates networkData struct | ||||||
func NewNetworkData(peers int, rateIn, rateOut float64) *NetworkData { | ||||||
return &NetworkData{ | ||||||
peers: peers, | ||||||
rateIn: rateIn, | ||||||
rateOut: rateOut, | ||||||
func (t *Handler) startListening() { | ||||||
for { | ||||||
msg := <-t.msg | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case, is possible to have a context on
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agree, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A added context to the Handler struct. |
||||||
go func() { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you care about sending messages in order to all the connections? It doesn't look like you're waiting for the goroutine to finish to broadcast to all connections. This will essentially just spam it out, and it may be out of sequence. Maybe add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, I don't think order matters with these messages. |
||||||
for _, v := range t.connections { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||||||
v.Lock() | ||||||
err := v.wsconn.WriteMessage(websocket.TextMessage, msgToBytes(msg)) | ||||||
if err != nil { | ||||||
t.log.Warn("issue while sending telemetry message", err) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also please change to debug or just ignore, I can see the node getting spammed with this sometimes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||||||
} | ||||||
v.Unlock() | ||||||
} | ||||||
}() | ||||||
} | ||||||
} | ||||||
|
||||||
// SendNetworkData send network data system.interval message to telemetry connection | ||||||
func (h *Handler) SendNetworkData(data *NetworkData) { | ||||||
h.Lock() | ||||||
defer h.Unlock() | ||||||
payload := log.Fields{"bandwidth_download": data.rateIn, "bandwidth_upload": data.rateOut, "msg": "system.interval", "peers": data.peers} | ||||||
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()}) | ||||||
telemetryLogger.Print() | ||||||
} | ||||||
|
||||||
// BlockIntervalData struct to hold data for block system.interval message | ||||||
type BlockIntervalData struct { | ||||||
BestHash common.Hash | ||||||
BestHeight *big.Int | ||||||
FinalizedHash common.Hash | ||||||
FinalizedHeight *big.Int | ||||||
TXCount int | ||||||
UsedStateCacheSize int | ||||||
} | ||||||
|
||||||
// SendBlockIntervalData send block data system interval information to telemetry connection | ||||||
func (h *Handler) SendBlockIntervalData(data *BlockIntervalData) { | ||||||
h.Lock() | ||||||
defer h.Unlock() | ||||||
payload := log.Fields{"best": data.BestHash.String(), "finalized_hash": data.FinalizedHash.String(), // nolint | ||||||
"finalized_height": data.FinalizedHeight, "height": data.BestHeight, "msg": "system.interval", "txcount": data.TXCount, // nolint | ||||||
"used_state_cache_size": data.UsedStateCacheSize} | ||||||
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()}) | ||||||
telemetryLogger.Print() | ||||||
} | ||||||
|
||||||
func (h *Handler) sender() { | ||||||
for { | ||||||
h.RLock() | ||||||
line, err := h.buf.ReadBytes(byte(10)) // byte 10 is newline character, used as delimiter | ||||||
h.RUnlock() | ||||||
if err != nil { | ||||||
continue | ||||||
} | ||||||
|
||||||
for _, c := range h.wsConn { | ||||||
err := c.WriteMessage(websocket.TextMessage, line) | ||||||
if err != nil { | ||||||
// TODO (ed) determine how to handle this error | ||||||
fmt.Printf("ERROR connecting to telemetry %v\n", err) | ||||||
} | ||||||
} | ||||||
func msgToBytes(message Message) []byte { | ||||||
res := make(map[string]interface{}) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe create a helper struct to construct the json response.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea, created. |
||||||
res["id"] = 1 // todo (ed) determine how this is used | ||||||
res["payload"] = message.values | ||||||
res["ts"] = time.Now() | ||||||
resB, err := json.Marshal(res) | ||||||
if err != nil { | ||||||
return nil | ||||||
} | ||||||
return resB | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this cause a problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, is
block.import
used for all new builds (ie ones we built as well?) if so this might be better placed inBlockState.AddBlock
. however if it's strictly imported blocks this is fineThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, it hasn't yet.