Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve account polling (#129) #135

Merged
merged 2 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ require (
github.com/antonfisher/nested-logrus-formatter v1.3.1
github.com/fsnotify/fsnotify v1.6.0
github.com/nats-io/jsm.go v0.0.33
github.com/nats-io/nats-server/v2 v2.8.4
github.com/nats-io/nats.go v1.16.0
github.com/nats-io/nats-server/v2 v2.9.15-0.20230221163408-7ce23c46cecb
github.com/nats-io/nats.go v1.23.0
github.com/nats-io/nuid v1.0.1
github.com/prometheus/client_golang v1.13.0
github.com/prometheus/client_model v0.2.0
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.5.0
github.com/spf13/viper v1.12.0
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4
golang.org/x/crypto v0.5.0
)

require (
Expand All @@ -25,13 +25,13 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.15.5 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b // indirect
github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/prometheus/common v0.37.0 // indirect
Expand All @@ -42,8 +42,8 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.3.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/text v0.3.8 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
golang.org/x/text v0.6.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
33 changes: 16 additions & 17 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.15.5 h1:qyCLMz2JCrKADihKOh9FxnW3houKeNsp2h5OEz0QSEA=
github.com/klauspost/compress v1.15.5/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw=
github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
Expand All @@ -194,14 +194,14 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/jsm.go v0.0.33 h1:mNxlZEnSiHo9BwAFpjZYuopVvtwVUdtoAana2ovyWOU=
github.com/nats-io/jsm.go v0.0.33/go.mod h1:1ySvWrDbPo/Rs1v0Ccoy7QjZKBGfVhvmolfJRBX+fCg=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4=
github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4=
github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g=
github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b h1:exHeHbghpBp1JvdYq7muaKFvJgLD93UDcmoIbFu/9PA=
github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b/go.mod h1:DYujvzCMZzUuqB3i1Pnpf1YtkuTwhdI84Aah9wRXkK0=
github.com/nats-io/nats-server/v2 v2.9.15-0.20230221163408-7ce23c46cecb h1:d7JNXl3VkPEedckKHK4M0j3OHF2wAk4LACxyRs76OaY=
github.com/nats-io/nats-server/v2 v2.9.15-0.20230221163408-7ce23c46cecb/go.mod h1:AT/C9XuOPGsozg2dfiS+9vK0Ge4jheffj8uL/kMGPtw=
github.com/nats-io/nats.go v1.23.0 h1:lR28r7IX44WjYgdiKz9GmUeW0uh/m33uD3yEjLZ2cOE=
github.com/nats-io/nats.go v1.23.0/go.mod h1:ki/Scsa23edbh8IRZbCuNXR9TDcbvfaSijKtaqQgw+Q=
github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546 h1:7ZylVLLiDSFqxJTSibNsO2RjVSXj3QWnDc+zKara2HE=
github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546/go.mod h1:JOEZlxMfMnmaLwr+mpmP+RGIYSxLNBFsZykCGaI2PvA=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
Expand Down Expand Up @@ -286,11 +286,10 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA=
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE=
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -441,13 +440,13 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k=
golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down
226 changes: 86 additions & 140 deletions surveyor/collector_statz.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,178 +527,124 @@ func (sc *StatzCollector) poll() error {

func (sc *StatzCollector) pollAccountInfo() error {
nc := sc.nc
accs, err := sc.getAccounts(nc)
accs, err := sc.getAccStatz(nc)
if err != nil {
return err
}

accStats := make([]accountStats, 0, len(accs))
accStats := make(map[string]accountStats, len(accs))
for _, acc := range accs {
sts := accountStats{accountID: acc}
sts := accountStats{accountID: acc.Account}

accInfo, err := sc.getAccountInfo(nc, acc)
if err != nil {
sc.logger.Warnf("could not get info for account %q: %s", acc, err)
sts.leafCount = float64(acc.LeafNodes)
sts.subCount = float64(acc.NumSubs)
sts.connCount = float64(acc.Conns)
sts.bytesSent = float64(acc.Sent.Bytes)
sts.bytesRecv = float64(acc.Received.Bytes)
sts.msgsSent = float64(acc.Sent.Msgs)
sts.msgsRecv = float64(acc.Received.Msgs)

accStats[acc.Account] = sts
}
jsInfos := sc.getJSInfos(nc, accs)
for _, jsInfo := range jsInfos {
sts, ok := accStats[jsInfo.Id]
if !ok {
continue
}
sts.leafCount = float64(accInfo.LeafCnt)
sts.subCount = float64(accInfo.SubCnt)

if accInfo.JetStream {
sts.jetstreamEnabled = 1.0

jsInfo, err := sc.getJSInfo(nc, acc)
if err != nil {
sc.logger.Warnf("could not get JetStream info for account %q: %s", acc, err)
} else {
sts.jetstreamMemoryUsed = float64(jsInfo.Memory)
sts.jetstreamStorageUsed = float64(jsInfo.Store)
sts.jetstreamMemoryReserved = float64(jsInfo.ReservedMemory)
sts.jetstreamStorageReserved = float64(jsInfo.ReservedStore)

sts.jetstreamStreamCount = float64(len(jsInfo.Streams))
for _, stream := range jsInfo.Streams {
sts.jetstreamStreams = append(sts.jetstreamStreams, streamAccountStats{
streamName: stream.Name,
consumerCount: float64(len(stream.Consumer)),
replicaCount: float64(stream.Config.Replicas),
})
}
}
sts.jetstreamEnabled = 1.0
sts.jetstreamMemoryUsed = float64(jsInfo.Memory)
sts.jetstreamStorageUsed = float64(jsInfo.Store)
sts.jetstreamMemoryReserved = float64(jsInfo.ReservedMemory)
sts.jetstreamStorageReserved = float64(jsInfo.ReservedStore)

sts.jetstreamStreamCount = float64(len(jsInfo.Streams))
for _, stream := range jsInfo.Streams {
sts.jetstreamStreams = append(sts.jetstreamStreams, streamAccountStats{
streamName: stream.Name,
consumerCount: float64(len(stream.Consumer)),
replicaCount: float64(stream.Config.Replicas),
})
}

agg, err := sc.getConnzAggregate(nc, acc)
if err != nil {
sc.logger.Warnf("could not get connection statistics for account %q: %s", acc, err)
} else {
sts.connCount = agg.numConns
sts.bytesSent = agg.bytesSent
sts.bytesRecv = agg.bytesRecv
sts.msgsSent = agg.msgsSent
sts.msgsRecv = agg.msgsRecv
}

accStats = append(accStats, sts)
accStats[jsInfo.Id] = sts
}

sc.Lock()
sc.accStats = accStats
sc.accStats = make([]accountStats, 0, len(accStats))
for _, acc := range accStats {
sc.accStats = append(sc.accStats, acc)
}
sc.Unlock()

return nil
}

func (sc *StatzCollector) getAccounts(nc *nats.Conn) ([]string, error) {
const subj = "$SYS.REQ.SERVER.PING.ACCOUNTZ"
msg, err := nc.Request(subj, nil, sc.pollTimeout)
if err != nil {
return nil, err
}

var r server.ServerAPIResponse
var d server.Accountz
r.Data = &d
if err := json.Unmarshal(msg.Data, &r); err != nil {
return nil, err
}

sort.Strings(d.Accounts)
return d.Accounts, nil
}
func (sc *StatzCollector) getAccountInfo(nc *nats.Conn, account string) (server.AccountInfo, error) {
subj := fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.INFO", account)
msg, err := nc.Request(subj, nil, sc.pollTimeout)
func (sc *StatzCollector) getJSInfos(nc *nats.Conn, accounts []*server.AccountStat) []*server.AccountDetail {
inbox := nc.NewRespInbox()
sub, err := nc.SubscribeSync(inbox)
if err != nil {
return server.AccountInfo{}, err
sc.logger.Warnf("Error creating nats subscription: %s", err)
return nil
}

var r server.ServerAPIResponse
var d server.AccountInfo
r.Data = &d
if err := json.Unmarshal(msg.Data, &r); err != nil {
return server.AccountInfo{}, err
}

return d, nil
}

func (sc *StatzCollector) getJSInfo(nc *nats.Conn, account string) (server.AccountDetail, error) {
subj := fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.JSZ", account)
defer sub.Unsubscribe()
opts := []byte(`{"streams": true, "consumer": true, "config": true}`)
msg, err := nc.Request(subj, opts, sc.pollTimeout)
if err != nil {
return server.AccountDetail{}, err
reqDispatched := len(accounts)
for _, acc := range accounts {
subj := fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.JSZ", acc.Account)
if err := nc.PublishRequest(subj, inbox, opts); err != nil {
reqDispatched--
sc.logger.Warnf("Unable to request JetStream info for account %s: %s", acc.Account, err.Error())
continue
}
}

var r server.ServerAPIResponse
var d server.AccountDetail
r.Data = &d
if err := json.Unmarshal(msg.Data, &r); err != nil {
return server.AccountDetail{}, err
res := make([]*server.AccountDetail, 0, len(accounts))
for i := 0; i < reqDispatched; i++ {
msg, err := sub.NextMsg(sc.pollTimeout)
if err != nil {
sc.logger.Warnf("Error fetching JetStream info: %s", err)
continue
}
var r server.ServerAPIResponse
var d server.AccountDetail
r.Data = &d
if err := json.Unmarshal(msg.Data, &r); err != nil {
sc.logger.Warnf("Error deserializing JetStream info: %s", err)
continue
}
if r.Error != nil {
if strings.Contains(r.Error.Description, "jetstream not enabled") {
// jetstream is not enabled on server
return nil
}
continue
}
res = append(res, &d)
}

return d, nil
return res
}

type connzAggregate struct {
bytesSent float64
bytesRecv float64
msgsSent float64
msgsRecv float64
numConns float64
}

func (sc *StatzCollector) getConnzAggregate(nc *nats.Conn, account string) (connzAggregate, error) {
// TODO: Replace with "$SYS.REQ.ACCOUNT.%s.CONNS" after NATS 2.8.4.
// CONNS returns bytes sent/recv at the account level without needing the
// following code.
subj := fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.CONNZ", account)

rep := nc.NewRespInbox()

msg := nats.NewMsg(subj)
msg.Reply = rep
msg.Data = nil

s, err := nc.SubscribeSync(msg.Reply)
func (sc *StatzCollector) getAccStatz(nc *nats.Conn) ([]*server.AccountStat, error) {
req := &server.AccountStatzOptions{
IncludeUnused: true,
}
reqJSON, err := json.Marshal(req)
if err != nil {
return connzAggregate{}, err
return nil, err
}
defer s.Unsubscribe()

if err := nc.PublishMsg(msg); err != nil {
return connzAggregate{}, err
const subj = "$SYS.REQ.ACCOUNT.PING.STATZ"
msg, err := nc.Request(subj, reqJSON, sc.pollTimeout)
if err != nil {
return nil, err
}

var agg connzAggregate
var r server.ServerAPIResponse
var d server.Connz
var d server.AccountStatz
r.Data = &d

for i := 0; i < sc.numServers; i++ {
m, err := s.NextMsg(sc.pollTimeout)
if err != nil && err == nats.ErrTimeout {
break
}
if err != nil {
return connzAggregate{}, err
}

if err := json.Unmarshal(m.Data, &r); err != nil {
return connzAggregate{}, err
}

agg.numConns += float64(d.NumConns)

for _, c := range d.Conns {
agg.bytesSent += float64(c.InBytes)
agg.bytesRecv += float64(c.OutBytes)
agg.msgsSent += float64(c.InMsgs)
agg.msgsRecv += float64(c.OutMsgs)
}
if err := json.Unmarshal(msg.Data, &r); err != nil {
return nil, err
}

return agg, nil
return d.Accounts, nil
}

// Describe is the Prometheus interface to describe metrics for
Expand Down