diff --git a/pkg/object/easemonitormetrics/easemonitormetrics.go b/pkg/object/easemonitormetrics/easemonitormetrics.go index 230eaf2172..5c3fc0f5dd 100644 --- a/pkg/object/easemonitormetrics/easemonitormetrics.go +++ b/pkg/object/easemonitormetrics/easemonitormetrics.go @@ -66,8 +66,6 @@ type ( client atomic.Value clientMutex sync.Mutex - latestTimestamp int64 - done chan struct{} } @@ -249,9 +247,12 @@ func (emm *EaseMonitorMetrics) closeClient() { } func (emm *EaseMonitorMetrics) run() { + var latestTimestamp int64 + for { select { case <-emm.done: + emm.closeClient() return case <-time.After(statussynccontroller.SyncStatusPaceInUnixSeconds * time.Second): client, err := emm.getClient() @@ -263,18 +264,15 @@ func (emm *EaseMonitorMetrics) run() { records := emm.ssc.GetStatusesRecords() for _, record := range records { - if record.UnixTimestamp <= emm.latestTimestamp { + if record.UnixTimestamp <= latestTimestamp { continue } - messages := emm.record2Messages(record) + latestTimestamp = record.UnixTimestamp + messages := emm.record2Messages(record) for _, message := range messages { client.Input() <- message } - - if err != nil { - emm.latestTimestamp = record.UnixTimestamp - } } } } @@ -375,7 +373,6 @@ func (emm *EaseMonitorMetrics) httpPipeline2Metrics(baseFields *GlobalFields, pi reqMetrics = append(reqMetrics, req) codeMetrics = append(codeMetrics, codes...) } - } return @@ -471,10 +468,7 @@ func (emm *EaseMonitorMetrics) Status() *supervisor.Status { // Close closes EaseMonitorMetrics. func (emm *EaseMonitorMetrics) Close() { - // NOTE: close the channel first in case of - // using closed client in the run(). close(emm.done) - emm.closeClient() } func getHostIPv4() string { diff --git a/pkg/object/statussynccontroller/statussynccontroller.go b/pkg/object/statussynccontroller/statussynccontroller.go index 51aca1df3c..c7747657d6 100644 --- a/pkg/object/statussynccontroller/statussynccontroller.go +++ b/pkg/object/statussynccontroller/statussynccontroller.go @@ -234,9 +234,6 @@ func (ssc *StatusSyncController) GetStatusesRecords() []*StatusesRecord { defer ssc.StatusesRecordsMutex.RUnlock() records := make([]*StatusesRecord, len(ssc.statusesRecords)) - for i, record := range ssc.statusesRecords { - records[i] = record - } - + copy(records, ssc.statusesRecords) return records } diff --git a/pkg/util/timetool/distributedtimer.go b/pkg/util/timetool/distributedtimer.go index 6020e981fe..2d21b68ce1 100644 --- a/pkg/util/timetool/distributedtimer.go +++ b/pkg/util/timetool/distributedtimer.go @@ -57,7 +57,11 @@ func (dt *DistributedTimer) run() { case <-dt.done: return case now := <-time.After(dt.nextDurationFunc()): - dt.C <- now + // use a select to avoid block + select { + case dt.C <- now: + default: + } } } }