Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix (dot/telemetry): refactor telemetry to reduce CPU usage #1597

Merged
merged 24 commits into from
May 28, 2021
Merged
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5a7d108
start new telemetry with channels
edwardmack May 17, 2021
3d2e5ce
remove old telemetry code
edwardmack May 18, 2021
e09a762
add tests
edwardmack May 18, 2021
64f9a54
Merge branch 'development' into ed/tel_channelRefactor
edwardmack May 18, 2021
8791a42
Merge branch 'development' into ed/tel_channelRefactor
edwardmack May 18, 2021
94b555c
lint
edwardmack May 18, 2021
a8bbce2
go fmt
edwardmack May 18, 2021
a2c3e01
fix anti-pattern returning unexported types
edwardmack May 18, 2021
416b241
Merge branch 'development' into ed/tel_channelRefactor
edwardmack May 19, 2021
5b0e29c
Merge branch 'development' into ed/tel_channelRefactor
edwardmack May 25, 2021
2389682
added context, send websocket messages is goroutine (broken)
edwardmack May 25, 2021
86841f7
Merge branch 'development' into ed/tel_channelRefactor
edwardmack May 25, 2021
4798c14
move mutex to handler struct
edwardmack May 25, 2021
4a2d693
Merge branch 'development' into ed/tel_channelRefactor
edwardmack May 26, 2021
5d84332
embed mutex inside Handler
edwardmack May 26, 2021
f20eaeb
clean-up formatting to one NewKeyValue per line
edwardmack May 26, 2021
28a5ef8
go fmt
edwardmack May 26, 2021
9e15a35
remove empty body anti-pattern
edwardmack May 26, 2021
14f67ac
go fmt
edwardmack May 26, 2021
8f9d4f1
add test for concurrent connections
edwardmack May 26, 2021
e5690ed
remove context from Handler
edwardmack May 26, 2021
53d8278
move mutex to telemetryConnection struct, make
edwardmack May 27, 2021
cdf1f3d
add logging
edwardmack May 27, 2021
83c05c0
add timeout and error to SendMessage, fix typos, logging
edwardmack May 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions dot/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,32 @@ func TestHandler_SendMulti(t *testing.T) {
require.Contains(t, string(actual[3]), string(expected[3]))
}

func TestListenerConcurrency(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the test. This helped me to debug the issue.

You can take a lock at the connection level which is optimized than taking the lock at the handler level because you can send multiple messages concurrently to different connections.
This issue is connection in Handler.connections is value and not pointer. Hence it creates multiple instances of lock for the same connection.

Add lock at the connection level but change Handler.connections to a pointer.

type telemetryConnection struct {
	wsconn    *websocket.Conn
	verbosity int
	sync.Mutex
}

// Handler struct for holding telemetry related things
type Handler struct {
	msg         chan Message
	connections []*telemetryConnection
}

func (t *Handler) startListening() {
	for {
		msg := <-t.msg
		go func() {
			for _, v := range t.connections {
				v.Lock()
				_ = v.wsconn.WriteMessage(websocket.TextMessage, msgToBytes(msg))
				v.Unlock()
			}
		}()
	}
}

Also, I think users should get notified in logs if they are unable to write messages to the telemetry server. So that they can take action accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for debugging this, it makes sense now. I've updated to fix it, and added logging to catch any error and warn the user.

const qty = 1000
var wg sync.WaitGroup
wg.Add(qty)

resultCh = make(chan []byte)
for i := 0; i < qty; i++ {
go func() {
GetInstance().SendMessage(NewTelemetryMessage(
NewKeyValue("best", "hash"),
NewKeyValue("height", big.NewInt(2)),
NewKeyValue("msg", "block.import"),
NewKeyValue("origin", "NetworkInitialSync")))
wg.Done()
}()
}
wg.Wait()
counter := 0
for range resultCh {
counter++
if counter == qty {
break
}
}
}

func listen(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
Expand Down