Skip to content

Commit

Permalink
chore: abciclient metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Jan 11, 2024
1 parent 73f5c57 commit e86d67b
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 41 deletions.
23 changes: 8 additions & 15 deletions abci/client/metrics.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 43 additions & 12 deletions abci/client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"reflect"
"regexp"
"strconv"

sync "github.com/sasha-s/go-deadlock"

Expand All @@ -28,24 +29,61 @@ var (
// Metrics contains metrics exposed by this package.
type Metrics struct {
// Number of messages in ABCI Socket queue
PendingSendMessages metrics.Gauge `metrics_labels:"message_type"`
QueuedMessages metrics.Gauge `metrics_labels:"type, priority"`

// Number of messages received from ABCI Socket
SocketReceiveMessagesTotal metrics.Counter `metrics_labels:"message_type"`
// Number of messages sent to ABCI Socket
SocketSendMessagesTotal metrics.Counter `metrics_labels:"message_type"`
SentMessagesTotal metrics.Counter `metrics_labels:"type, priority"`

// labels cache
labels metricsLabelCache
}

type metricsLabelCache struct {
mtx *sync.RWMutex
mtx sync.RWMutex
messageLabelNames map[reflect.Type]string
}

func (m *Metrics) EnqueuedMessage(reqres *requestAndResponse) {
priority := strconv.Itoa(int(reqres.priority()))
typ := "nil"
if reqres != nil && reqres.Request != nil {
typ = m.labels.ValueToMetricLabel(reqres.Request.Value)
}

m.QueuedMessages.With("type", typ, "priority", priority).Add(1)
}

func (m *Metrics) DequeuedMessage(reqres *requestAndResponse) {
priority := strconv.Itoa(int(reqres.priority()))
typ := "nil"
if reqres != nil && reqres.Request != nil {
typ = m.labels.ValueToMetricLabel(reqres.Request.Value)
}

m.QueuedMessages.With("type", typ, "priority", priority).Add(-1)
}

func (m *Metrics) SentMessage(reqres *requestAndResponse) {
priority := strconv.Itoa(int(reqres.priority()))
typ := "nil"
if reqres != nil && reqres.Request != nil {
typ = m.labels.ValueToMetricLabel(reqres.Request.Value)
}

m.SentMessagesTotal.With("type", typ, "priority", priority).Add(1)
}

// ValueToMetricLabel is a method that is used to produce a prometheus label value of the golang
// type that is passed in.
// This method uses a map on the Metrics struct so that each label name only needs
// to be produced once to prevent expensive string operations.
func (m *metricsLabelCache) ValueToMetricLabel(i interface{}) string {
if m.messageLabelNames == nil {
m.mtx.Lock()
m.messageLabelNames = map[reflect.Type]string{}
m.mtx.Unlock()
}

t := reflect.TypeOf(i)
m.mtx.RLock()

Expand All @@ -63,10 +101,3 @@ func (m *metricsLabelCache) ValueToMetricLabel(i interface{}) string {
m.messageLabelNames[t] = l
return l
}

func newMetricsLabelCache() *metricsLabelCache {
return &metricsLabelCache{
mtx: &sync.RWMutex{},
messageLabelNames: map[reflect.Type]string{},
}
}
38 changes: 24 additions & 14 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,26 @@ func (cli *socketClient) Error() error {
return cli.err
}

//----------------------------------------
// Add the request to the pending messages queue.
//
// Note that you still need to wake up sendRequestsRoutine writing to `cli.reqSignal`
func (cli *socketClient) enqueue(reqres *requestAndResponse) {
cli.mtx.Lock()
cli.reqQueue.Push(reqres, reqres.priority())
cli.mtx.Unlock()

cli.metrics.EnqueuedMessage(reqres)
}

// Remove the first request from the queue and return it.
func (cli *socketClient) dequeue() *requestAndResponse {
cli.mtx.Lock()
reqres := cli.reqQueue.PopItem()
cli.mtx.Unlock()

cli.metrics.DequeuedMessage(reqres)
return reqres
}

func (cli *socketClient) sendRequestsRoutine(ctx context.Context, conn io.Writer) {
bw := bufio.NewWriter(conn)
Expand All @@ -129,11 +148,9 @@ func (cli *socketClient) sendRequestsRoutine(ctx context.Context, conn io.Writer
case <-ctx.Done():
return
case <-cli.reqSignal:
cli.mtx.Lock()
reqres := cli.reqQueue.PopItem()
cli.mtx.Unlock()
reqres := cli.dequeue()

// N.B. We must enqueue before sending out the request, otherwise the
// N.B. We must track request before sending it out, otherwise the
// server may reply before we do it, and the receiver will fail for an
// unsolicited reply.
cli.trackRequest(reqres)
Expand All @@ -142,6 +159,7 @@ func (cli *socketClient) sendRequestsRoutine(ctx context.Context, conn io.Writer
cli.stopForError(fmt.Errorf("write to buffer: %w", err))
return
}
cli.metrics.SentMessage(reqres)

if err := bw.Flush(); err != nil {
cli.stopForError(fmt.Errorf("flush buffer: %w", err))
Expand Down Expand Up @@ -220,15 +238,7 @@ func (cli *socketClient) doRequest(ctx context.Context, req *types.Request) (*ty
}

reqres := makeReqRes(req)

cli.mtx.Lock()
cli.reqQueue.Push(reqres, reqres.priority())
size := cli.reqQueue.Size()
cli.mtx.Unlock()

if size > 0 && size%1000 == 0 {
cli.logger.Error("suspicious abci.socketClient queue size", "size", size)
}
cli.enqueue(reqres)

select {
case cli.reqSignal <- struct{}{}:
Expand Down
1 change: 1 addition & 0 deletions scripts/txs/random.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ for i in `seq 1 $N`; do
VALUE="$i"
echo $(toHex $KEY=$VALUE)
curl 127.0.0.1:$PORT/broadcast_tx_sync?tx=0x$(toHex $KEY=$VALUE)
echo
done


0 comments on commit e86d67b

Please sign in to comment.