Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream/multiconnection: Default process reporter #1734

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions engine/websocketroutine_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func (m *WebsocketRoutineManager) websocketRoutine() {
continue
}

if m.verbose {
ws.SetProcessReportManager(&stream.DefaultProcessReporterManager{})
}

wg.Add(1)
go func() {
defer wg.Done()
Expand Down
104 changes: 104 additions & 0 deletions exchanges/stream/reporting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package stream

import (
"sync"
"time"

"github.com/thrasher-corp/gocryptotrader/log"
)

// ProcessReporterManager defines an interface for managing ProcessReporter instances across connections, this will
// create a new ProcessReporter instance for each new connection reader.
type ProcessReporterManager interface {
New(conn Connection) ProcessReporter
}

// ProcessReporter defines an interface for reporting processed data from a connection
type ProcessReporter interface {
// Report logs the processing time for a received data packet and updates metrics.
// read is the time the data was read from the connection.
// data is the raw data received from the connection.
// err is any error that occurred while processing the data.
Report(read time.Time, data []byte, err error)
// close closes the process reporter and handles any cleanup.
Close()
}

// SetProcessReportManager sets the ProcessReporterManager for the Websocket instance which will be used to create new ProcessReporter instances.
// This will track metrics for processing websocket data.
func (w *Websocket) SetProcessReportManager(m ProcessReporterManager) {
w.m.Lock()
defer w.m.Unlock()
w.processReporter = m
}

// DefaultProcessReporter is a default implementation of ProcessReporter

Check failure on line 35 in exchanges/stream/reporting.go

View workflow job for this annotation

GitHub Actions / lint

exported: comment on exported type DefaultProcessReporterManager should be of the form "DefaultProcessReporterManager ..." (with optional leading article) (revive)
type DefaultProcessReporterManager struct{}

// New returns a new DefaultProcessReporter instance for a connection
func (d DefaultProcessReporterManager) New(conn Connection) ProcessReporter {
reporter := &DefaultProcessReporter{ch: make(chan struct{})}
go reporter.collectMetrics(conn)
return reporter
}

// DefaultProcessReporter provides a thread-safe implementation of the ProcessReporter interface.
// It tracks operation metrics, including the number of operations, average processing time, and peak processing time.
type DefaultProcessReporter struct {
operations int64
totalProcessingTime time.Duration
peakProcessingTime time.Duration
ch chan struct{}
m sync.Mutex
}

// Report logs the processing time for a received data packet and updates metrics.
func (r *DefaultProcessReporter) Report(read time.Time, _ []byte, _ error) {
processingDuration := time.Since(read)
r.m.Lock()
defer r.m.Unlock()
r.operations++
r.totalProcessingTime += processingDuration
if processingDuration > r.peakProcessingTime {
r.peakProcessingTime = processingDuration
}
}

// Close closes the process reporter
func (r *DefaultProcessReporter) Close() {
r.m.Lock()
defer r.m.Unlock()
if r.ch != nil {
close(r.ch)
}
}

// collectMetrics runs in a separate goroutine to periodically log aggregated metrics.
func (r *DefaultProcessReporter) collectMetrics(conn Connection) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

for {
select {
case <-r.ch:
return
case <-ticker.C:
r.m.Lock()
if r.operations > 0 {
avgOperationsPerSecond := r.operations / 60
avgProcessingTime := r.totalProcessingTime / time.Duration(r.operations)
peakTime := r.peakProcessingTime

// Reset metrics for the next interval.
r.operations, r.totalProcessingTime, r.peakProcessingTime = 0, 0, 0
Copy link
Collaborator

@gloriousCode gloriousCode Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there'd be value in not resetting some so you can know the slowest ever etc. Trudging through logs looking for the highest number doesn't sound so good, but I don't really feel strongly to make you change it

edit: I SEE YOUR EYES. I think there is still great value in highlighting the slowest thing across all time. But I will add it for myself


r.m.Unlock()

// Log metrics outside of the critical section to avoid blocking other threads.
log.Debugf(log.WebsocketMgr, "%v: Operations/Second: %d, Avg Processing/Operation: %v, Peak: %v", conn.GetURL(), avgOperationsPerSecond, avgProcessingTime, peakTime)
} else {
r.m.Unlock()
}
}
}
}
15 changes: 14 additions & 1 deletion exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,14 +1075,27 @@ func (w *Websocket) checkSubscriptions(conn Connection, subs subscription.List)
// Reader reads and handles data from a specific connection
func (w *Websocket) Reader(ctx context.Context, conn Connection, handler func(ctx context.Context, message []byte) error) {
defer w.Wg.Done()
var reporter ProcessReporter
if w.processReporter != nil {
reporter = w.processReporter.New(conn)
}
for {
resp := conn.ReadMessage()
readAt := time.Now()
if resp.Raw == nil {
if reporter != nil {
reporter.Close()
}
return // Connection has been closed
}
if err := handler(ctx, resp.Raw); err != nil {
err := handler(ctx, resp.Raw)
if err != nil {
w.DataHandler <- fmt.Errorf("connection URL:[%v] error: %w", conn.GetURL(), err)
}

if reporter != nil {
reporter.Report(readAt, resp.Raw, err)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions exchanges/stream/websocket_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type Websocket struct {
// rateLimitDefinitions contains the rate limiters shared between Websocket and REST connections for all potential
// endpoints.
rateLimitDefinitions request.RateLimitDefinitions
processReporter ProcessReporterManager
}

// WebsocketSetup defines variables for setting up a websocket connection
Expand Down
Loading