Skip to content

Commit

Permalink
avoid send duplicated metrics data (close #215)
Browse files Browse the repository at this point in the history
also fix several other minor issues.
  • Loading branch information
localvar committed Aug 31, 2021
1 parent 7e2fb5b commit 8d0facd
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 17 deletions.
18 changes: 6 additions & 12 deletions pkg/object/easemonitormetrics/easemonitormetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ type (
client atomic.Value
clientMutex sync.Mutex

latestTimestamp int64

done chan struct{}
}

Expand Down Expand Up @@ -249,9 +247,12 @@ func (emm *EaseMonitorMetrics) closeClient() {
}

func (emm *EaseMonitorMetrics) run() {
var latestTimestamp int64

for {
select {
case <-emm.done:
emm.closeClient()
return
case <-time.After(statussynccontroller.SyncStatusPaceInUnixSeconds * time.Second):
client, err := emm.getClient()
Expand All @@ -263,18 +264,15 @@ func (emm *EaseMonitorMetrics) run() {

records := emm.ssc.GetStatusesRecords()
for _, record := range records {
if record.UnixTimestamp <= emm.latestTimestamp {
if record.UnixTimestamp <= latestTimestamp {
continue
}
messages := emm.record2Messages(record)
latestTimestamp = record.UnixTimestamp

messages := emm.record2Messages(record)
for _, message := range messages {
client.Input() <- message
}

if err != nil {
emm.latestTimestamp = record.UnixTimestamp
}
}
}
}
Expand Down Expand Up @@ -375,7 +373,6 @@ func (emm *EaseMonitorMetrics) httpPipeline2Metrics(baseFields *GlobalFields, pi
reqMetrics = append(reqMetrics, req)
codeMetrics = append(codeMetrics, codes...)
}

}

return
Expand Down Expand Up @@ -471,10 +468,7 @@ func (emm *EaseMonitorMetrics) Status() *supervisor.Status {

// Close closes EaseMonitorMetrics.
func (emm *EaseMonitorMetrics) Close() {
// NOTE: close the channel first in case of
// using closed client in the run().
close(emm.done)
emm.closeClient()
}

func getHostIPv4() string {
Expand Down
5 changes: 1 addition & 4 deletions pkg/object/statussynccontroller/statussynccontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,6 @@ func (ssc *StatusSyncController) GetStatusesRecords() []*StatusesRecord {
defer ssc.StatusesRecordsMutex.RUnlock()

records := make([]*StatusesRecord, len(ssc.statusesRecords))
for i, record := range ssc.statusesRecords {
records[i] = record
}

copy(records, ssc.statusesRecords)
return records
}
6 changes: 5 additions & 1 deletion pkg/util/timetool/distributedtimer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ func (dt *DistributedTimer) run() {
case <-dt.done:
return
case now := <-time.After(dt.nextDurationFunc()):
dt.C <- now
// use a select to avoid block
select {
case dt.C <- now:
default:
}
}
}
}
Expand Down

0 comments on commit 8d0facd

Please sign in to comment.