From ad5f5b7e54b6bb6bf6b6d365b343e9444bee0f60 Mon Sep 17 00:00:00 2001 From: lisaifei Date: Sun, 10 Apr 2022 18:50:06 +0800 Subject: [PATCH] =?UTF-8?q?[PDR-16694][bug]=20kafka=20metric=E7=9B=91?= =?UTF-8?q?=E6=8E=A7=E9=87=87=E9=9B=86=E9=87=87=E9=9B=86=E9=A2=91=E7=8E=87?= =?UTF-8?q?=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mgr/metric_runner.go | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/mgr/metric_runner.go b/mgr/metric_runner.go index b463355c3..19b3235d7 100644 --- a/mgr/metric_runner.go +++ b/mgr/metric_runner.go @@ -33,7 +33,8 @@ const ( ) const ( - defaultCollectInterval = 30 + defaultCollectInterval = 30 + defaultTimeoutCountToReset = 2 ) type MetricConfig struct { @@ -54,6 +55,7 @@ type MetricRunner struct { startedWG *sync.WaitGroup collectors []metric.Collector + timeoutCount map[string]int senders []sender.Sender transformers map[string][]transforms.Transformer commonTrans []transforms.Transformer @@ -104,6 +106,7 @@ func NewMetricRunner(rc RunnerConfig, wg *sync.WaitGroup, sr *sender.Registry) ( rc.SendersConfig[i][KeyRunnerName] = rc.RunnerName } collectors := make([]metric.Collector, 0) + timeoutCount := make(map[string]int) transformers := make(map[string][]transforms.Transformer) for _, m := range rc.MetricConfig { @@ -132,6 +135,7 @@ func NewMetricRunner(rc RunnerConfig, wg *sync.WaitGroup, sr *sender.Registry) ( } collectors = append(collectors, c) + timeoutCount[c.Name()] = 0 // 配置文件中明确标明 false 的 attr 加入 discard transformer config := c.Config() @@ -238,6 +242,7 @@ func NewMetricRunner(rc RunnerConfig, wg *sync.WaitGroup, sr *sender.Registry) ( rsMutex: new(sync.RWMutex), collectInterval: interval, collectors: collectors, + timeoutCount: timeoutCount, transformers: transformers, commonTrans: commonTransformers, senders: senders, @@ -301,8 +306,9 @@ func (r *MetricRunner) Run() { dataCnt := 0 datas := make([]Data, 0) metricTime := time.Now() - tags[metric.Timestamp] = metricTime.Format(time.RFC3339Nano) + tags[metric.Timestamp] = metricTime.UnixNano() / 1e6 for _, c := range r.collectors { + before := time.Now() metricName := c.Name() tmpdatas, err := c.Collect() if err != nil { @@ -352,6 +358,23 @@ func (r *MetricRunner) Run() { datas = append(datas, data) dataCnt++ } + + // 处理读取超时 + if time.Now().Sub(before) > r.collectInterval { + log.Warnf("collecter <%v> exec timeout %d seconds", time.Now().Sub(before).Seconds()) + r.timeoutCount[metricName]++ + if reset, ok := c.(Resetable); ok && r.timeoutCount[metricName] >= defaultTimeoutCountToReset { + if err = reset.Reset(); err != nil { + log.Errorf("collecter <%v> reset fail: %v", metricTime, err) + continue + } else { + log.Infof("collecter <%v> reset success", metricTime) + r.timeoutCount[metricName] = 0 + } + } + } else { + r.timeoutCount[metricName] = 0 + } } if r.isBlock {