From 10521d4e086043fe7bdfea8b8e3b78c97014e2fb Mon Sep 17 00:00:00 2001 From: Yaroslav Kirillov Date: Wed, 31 Jan 2024 11:59:12 +0500 Subject: [PATCH] New metric holder (#411) * New metric holder * Rename metric.Ctl constructor * Fix prometheus import name * Fix build * Add tests * Refactor metric controller * Remove updateTime & beautify * Fix nowTime & tests * Refactor metric controller * Add xtime package * Use xtime in metric holder * Rework holder & wrappers * Rework * Fix go mod * Some performance optimizations to the MetricHolder (#568) * Some performance optimizations * Rename RenameReleaser -> MetricDeleter * Benchmark prometheus.MetricVec * Replace parallel benchmark with the loop * Fix string copy * Add tests for the HeldMetric * Fix naming & build (#569) * Fix naming & build * Rollback comment * Update MetricHolder test * Add metric_hold_duration to pipeline settings * go mod tidy * Fix tests * Fix after merge * Fix tests --- fd/file.d.go | 4 +- fd/util.go | 11 + go.mod | 2 +- metric/controller.go | 113 +++++----- metric/held_counter.go | 43 ++++ metric/held_gauge.go | 58 +++++ metric/held_histogram.go | 40 ++++ metric/held_metric.go | 155 ++++++++++++++ metric/held_metric_test.go | 131 ++++++++++++ metric/holder.go | 56 +++++ pipeline/antispam/antispammer.go | 16 +- pipeline/batch.go | 8 +- pipeline/batch_test.go | 4 +- pipeline/metrics_holder.go | 213 ------------------- pipeline/pipeline.go | 152 +++++++++---- pipeline/pipeline_test.go | 17 +- pipeline/pipeline_whitebox_test.go | 5 +- pipeline/processor.go | 42 +++- plugin/action/mask/mask.go | 3 +- plugin/action/parse_re2/parse_re2.go | 4 +- plugin/action/throttle/limiters_map.go | 6 +- plugin/action/throttle/throttle.go | 3 +- plugin/input/dmesg/dmesg.go | 7 +- plugin/input/file/file.go | 11 +- plugin/input/file/provider.go | 20 +- plugin/input/file/watcher.go | 4 +- plugin/input/file/watcher_test.go | 2 +- plugin/input/file/worker_test.go | 16 +- plugin/input/http/http.go | 12 +- plugin/input/journalctl/journalctl.go | 15 +- plugin/input/journalctl/reader.go | 8 +- plugin/input/kafka/kafka.go | 9 +- plugin/output/clickhouse/clickhouse.go | 9 +- plugin/output/elasticsearch/elasticsearch.go | 7 +- plugin/output/file/helpers_test.go | 1 + plugin/output/gelf/gelf.go | 7 +- plugin/output/kafka/kafka.go | 4 +- plugin/output/postgres/postgres.go | 13 +- plugin/output/postgres/postgres_test.go | 12 +- plugin/output/s3/s3.go | 7 +- plugin/output/s3/s3_test.go | 9 +- plugin/output/splunk/splunk.go | 5 +- test/test.go | 3 +- xtime/xtime.go | 38 ++++ 44 files changed, 859 insertions(+), 446 deletions(-) create mode 100644 metric/held_counter.go create mode 100644 metric/held_gauge.go create mode 100644 metric/held_histogram.go create mode 100644 metric/held_metric.go create mode 100644 metric/held_metric_test.go create mode 100644 metric/holder.go delete mode 100644 pipeline/metrics_holder.go create mode 100644 xtime/xtime.go diff --git a/fd/file.d.go b/fd/file.d.go index b45d5f1db..65e21ce84 100644 --- a/fd/file.d.go +++ b/fd/file.d.go @@ -60,8 +60,8 @@ func (f *FileD) Start() { } func (f *FileD) initMetrics() { - f.metricCtl = metric.New("file_d", f.registry) - f.versionMetric = f.metricCtl.RegisterCounter("version", "", "version") + f.metricCtl = metric.NewCtl("file_d", f.registry) + f.versionMetric = f.metricCtl.RegisterCounterVec("version", "", "version") f.versionMetric.WithLabelValues(buildinfo.Version).Inc() } diff --git a/fd/util.go b/fd/util.go index 3ace3be9c..b5284c607 100644 --- a/fd/util.go +++ b/fd/util.go @@ -25,6 +25,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings { decoder := "auto" isStrict := false eventTimeout := pipeline.DefaultEventTimeout + metricHoldDuration := pipeline.DefaultMetricHoldDuration if settings != nil { val := settings.Get("capacity").MustInt() @@ -85,6 +86,15 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings { antispamExceptions.Prepare() isStrict = settings.Get("is_strict").MustBool() + + str = settings.Get("metric_hold_duration").MustString() + if str != "" { + i, err := time.ParseDuration(str) + if err != nil { + logger.Fatalf("can't parse pipeline metric hold duration: %s", err.Error()) + } + metricHoldDuration = i + } } return &pipeline.Settings{ @@ -98,6 +108,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings { EventTimeout: eventTimeout, StreamField: streamField, IsStrict: isStrict, + MetricHoldDuration: metricHoldDuration, } } diff --git a/go.mod b/go.mod index af94cc504..f14dabe63 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 github.com/alicebob/miniredis/v2 v2.30.5 github.com/bitly/go-simplejson v0.5.1 + github.com/cespare/xxhash/v2 v2.2.0 github.com/euank/go-kmsg-parser v2.0.0+incompatible github.com/go-redis/redis v6.15.9+incompatible github.com/golang/mock v1.6.0 @@ -48,7 +49,6 @@ require ( github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v3 v3.0.0 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cilium/ebpf v0.9.1 // indirect github.com/containerd/cgroups/v3 v3.0.1 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect diff --git a/metric/controller.go b/metric/controller.go index bf55af3f0..36964bb39 100644 --- a/metric/controller.go +++ b/metric/controller.go @@ -20,81 +20,77 @@ type Ctl struct { subsystem string register *prometheus.Registry - counters map[string]*prometheus.CounterVec - counterMx *sync.Mutex - - gauges map[string]*prometheus.GaugeVec - gaugeMx *sync.Mutex - - histograms map[string]*prometheus.HistogramVec - histogramMx *sync.Mutex + metrics map[string]prometheus.Collector + mu sync.RWMutex } -func New(subsystem string, registry *prometheus.Registry) *Ctl { +func NewCtl(subsystem string, registry *prometheus.Registry) *Ctl { ctl := &Ctl{ - subsystem: subsystem, - counters: make(map[string]*prometheus.CounterVec), - counterMx: new(sync.Mutex), - gauges: make(map[string]*prometheus.GaugeVec), - gaugeMx: new(sync.Mutex), - histograms: make(map[string]*prometheus.HistogramVec), - histogramMx: new(sync.Mutex), - register: registry, + subsystem: subsystem, + register: registry, + metrics: make(map[string]prometheus.Collector), } return ctl } -func (mc *Ctl) RegisterCounter(name, help string, labels ...string) *prometheus.CounterVec { - mc.counterMx.Lock() - defer mc.counterMx.Unlock() +func (mc *Ctl) RegisterCounter(name, help string) prometheus.Counter { + counter := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: PromNamespace, + Subsystem: mc.subsystem, + Name: name, + Help: help, + }) - if metric, hasCounter := mc.counters[name]; hasCounter { - return metric - } + return mc.registerMetric(name, counter).(prometheus.Counter) +} - promCounter := prometheus.NewCounterVec(prometheus.CounterOpts{ +func (mc *Ctl) RegisterCounterVec(name, help string, labels ...string) *prometheus.CounterVec { + counterVec := prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: PromNamespace, Subsystem: mc.subsystem, Name: name, Help: help, }, labels) - mc.counters[name] = promCounter - mc.register.Unregister(promCounter) - mc.register.MustRegister(promCounter) - return promCounter + return mc.registerMetric(name, counterVec).(*prometheus.CounterVec) } -func (mc *Ctl) RegisterGauge(name, help string, labels ...string) *prometheus.GaugeVec { - mc.gaugeMx.Lock() - defer mc.gaugeMx.Unlock() +func (mc *Ctl) RegisterGauge(name, help string) prometheus.Gauge { + gauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: PromNamespace, + Subsystem: mc.subsystem, + Name: name, + Help: help, + }) - if metric, hasGauge := mc.gauges[name]; hasGauge { - return metric - } + return mc.registerMetric(name, gauge).(prometheus.Gauge) +} - promGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{ +func (mc *Ctl) RegisterGaugeVec(name, help string, labels ...string) *prometheus.GaugeVec { + gaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: PromNamespace, Subsystem: mc.subsystem, Name: name, Help: help, }, labels) - mc.gauges[name] = promGauge - mc.register.Unregister(promGauge) - mc.register.MustRegister(promGauge) - return promGauge + return mc.registerMetric(name, gaugeVec).(*prometheus.GaugeVec) } -func (mc *Ctl) RegisterHistogram(name, help string, buckets []float64, labels ...string) *prometheus.HistogramVec { - mc.histogramMx.Lock() - defer mc.histogramMx.Unlock() +func (mc *Ctl) RegisterHistogram(name, help string, buckets []float64) prometheus.Histogram { + histogram := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: PromNamespace, + Subsystem: mc.subsystem, + Name: name, + Help: help, + Buckets: buckets, + }) - if metric, hasHistogram := mc.histograms[name]; hasHistogram { - return metric - } + return mc.registerMetric(name, histogram).(prometheus.Histogram) +} - promHistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{ +func (mc *Ctl) RegisterHistogramVec(name, help string, buckets []float64, labels ...string) *prometheus.HistogramVec { + histogramVec := prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: PromNamespace, Subsystem: mc.subsystem, Name: name, @@ -102,8 +98,25 @@ func (mc *Ctl) RegisterHistogram(name, help string, buckets []float64, labels .. Buckets: buckets, }, labels) - mc.histograms[name] = promHistogram - mc.register.Unregister(promHistogram) - mc.register.MustRegister(promHistogram) - return promHistogram + return mc.registerMetric(name, histogramVec).(*prometheus.HistogramVec) +} + +func (mc *Ctl) registerMetric(name string, newMetric prometheus.Collector) prometheus.Collector { + mc.mu.RLock() + metric, has := mc.metrics[name] + mc.mu.RUnlock() + if has { + return metric + } + + mc.mu.Lock() + defer mc.mu.Unlock() + metric, has = mc.metrics[name] + if !has { + metric = newMetric + mc.metrics[name] = metric + mc.register.MustRegister(metric) + } + + return metric } diff --git a/metric/held_counter.go b/metric/held_counter.go new file mode 100644 index 000000000..5449e6113 --- /dev/null +++ b/metric/held_counter.go @@ -0,0 +1,43 @@ +package metric + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +type HeldCounter struct { + *heldMetric[prometheus.Counter] +} + +func (h HeldCounter) Inc() { + h.metric.Inc() + h.updateUsage() +} + +func (h HeldCounter) Add(v float64) { + h.metric.Add(v) + h.updateUsage() +} + +type HeldCounterVec struct { + store *heldMetricsStore[prometheus.Counter] + vec *prometheus.CounterVec +} + +func NewHeldCounterVec(cv *prometheus.CounterVec) HeldCounterVec { + return HeldCounterVec{ + vec: cv, + store: newHeldMetricsStore[prometheus.Counter](), + } +} + +func (h HeldCounterVec) WithLabelValues(lvs ...string) HeldCounter { + return HeldCounter{ + heldMetric: h.store.GetOrCreate(lvs, h.vec.WithLabelValues), + } +} + +func (h HeldCounterVec) DeleteOldMetrics(holdDuration time.Duration) { + h.store.DeleteOldMetrics(holdDuration, h.vec) +} diff --git a/metric/held_gauge.go b/metric/held_gauge.go new file mode 100644 index 000000000..bb684a190 --- /dev/null +++ b/metric/held_gauge.go @@ -0,0 +1,58 @@ +package metric + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +type HeldGauge struct { + *heldMetric[prometheus.Gauge] +} + +func (h HeldGauge) Set(v float64) { + h.metric.Set(v) + h.updateUsage() +} + +func (h HeldGauge) Inc() { + h.metric.Inc() + h.updateUsage() +} + +func (h HeldGauge) Dec() { + h.metric.Dec() + h.updateUsage() +} + +func (h HeldGauge) Add(v float64) { + h.metric.Add(v) + h.updateUsage() +} + +func (h HeldGauge) Sub(v float64) { + h.metric.Sub(v) + h.updateUsage() +} + +type HeldGaugeVec struct { + store *heldMetricsStore[prometheus.Gauge] + vec *prometheus.GaugeVec +} + +func NewHeldGaugeVec(gv *prometheus.GaugeVec) HeldGaugeVec { + return HeldGaugeVec{ + vec: gv, + store: newHeldMetricsStore[prometheus.Gauge](), + } +} + +func (h HeldGaugeVec) WithLabelValues(lvs ...string) HeldGauge { + return HeldGauge{ + heldMetric: h.store.GetOrCreate(lvs, h.vec.WithLabelValues), + } +} + +func (h HeldGaugeVec) DeleteOldMetrics(holdDuration time.Duration) { + h.store.DeleteOldMetrics(holdDuration, h.vec) +} diff --git a/metric/held_histogram.go b/metric/held_histogram.go new file mode 100644 index 000000000..fb6324e35 --- /dev/null +++ b/metric/held_histogram.go @@ -0,0 +1,40 @@ +package metric + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +type HeldHistogram struct { + *heldMetric[prometheus.Histogram] +} + +func (h HeldHistogram) Observe(v float64) { + h.metric.Observe(v) + h.updateUsage() +} + +type HeldHistogramVec struct { + store *heldMetricsStore[prometheus.Histogram] + vec *prometheus.HistogramVec +} + +func NewHeldHistogramVec(hv *prometheus.HistogramVec) HeldHistogramVec { + return HeldHistogramVec{ + vec: hv, + store: newHeldMetricsStore[prometheus.Histogram](), + } +} + +func (h HeldHistogramVec) WithLabelValues(lvs ...string) HeldHistogram { + return HeldHistogram{ + heldMetric: h.store.GetOrCreate(lvs, func(s ...string) prometheus.Histogram { + return h.vec.WithLabelValues(s...).(prometheus.Histogram) + }), + } +} + +func (h HeldHistogramVec) DeleteOldMetrics(holdDuration time.Duration) { + h.store.DeleteOldMetrics(holdDuration, h.vec) +} diff --git a/metric/held_metric.go b/metric/held_metric.go new file mode 100644 index 000000000..7e30d77d5 --- /dev/null +++ b/metric/held_metric.go @@ -0,0 +1,155 @@ +package metric + +import ( + "slices" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/cespare/xxhash/v2" + "github.com/ozontech/file.d/xtime" + "github.com/prometheus/client_golang/prometheus" +) + +type heldMetric[T prometheus.Metric] struct { + labels []string + lastUsage atomic.Int64 // unixnano timestamp + metric T +} + +func newHeldMetric[T prometheus.Metric](labels []string, metric T) *heldMetric[T] { + // copy labels because they are unsafe converted bytes + // TODO: replace with [][]byte to make it explicit + labelsCopy := make([]string, len(labels)) + for i := range labels { + labelsCopy[i] = strings.Clone(labels[i]) + } + + hl := &heldMetric[T]{ + labels: labelsCopy, + lastUsage: atomic.Int64{}, + metric: metric, + } + hl.updateUsage() + return hl +} + +var updateThreshold = (time.Second * 10).Nanoseconds() + +func (h *heldMetric[T]) updateUsage() { + now := xtime.GetInaccurateUnixNano() + + // optimize atomic writes, + // because it is not important for us to have the newest state + if lastUsage := h.lastUsage.Load(); now-lastUsage > updateThreshold { + h.lastUsage.Store(now) + } +} + +type heldMetricsStore[T prometheus.Metric] struct { + mu sync.RWMutex + metricsByHash map[uint64][]*heldMetric[T] +} + +func newHeldMetricsStore[T prometheus.Metric]() *heldMetricsStore[T] { + return &heldMetricsStore[T]{ + mu: sync.RWMutex{}, + metricsByHash: make(map[uint64][]*heldMetric[T]), + } +} + +func (h *heldMetricsStore[T]) GetOrCreate(labels []string, createMetricFn func(...string) T) *heldMetric[T] { + hash := computeStringsHash(labels) + // fast path - metric exists + h.mu.RLock() + hMetric, ok := h.getHeldMetricByHash(labels, hash) + h.mu.RUnlock() + if ok { + return hMetric + } + // slow path - create new metric + return h.tryCreate(labels, hash, createMetricFn) +} + +func (h *heldMetricsStore[T]) getHeldMetricByHash(labels []string, hash uint64) (*heldMetric[T], bool) { + hMetrics, ok := h.metricsByHash[hash] + if !ok { + return nil, false + } + if len(hMetrics) == 1 { + return hMetrics[0], true + } + + if i := findHeldMetricIndex(hMetrics, labels); i != -1 { + return hMetrics[i], true + } + return nil, false +} + +func (h *heldMetricsStore[T]) tryCreate(labels []string, hash uint64, createMetricFn func(...string) T) *heldMetric[T] { + h.mu.Lock() + defer h.mu.Unlock() + + hMetric, ok := h.getHeldMetricByHash(labels, hash) + if ok { + return hMetric + } + + hMetric = newHeldMetric[T](labels, createMetricFn(labels...)) + h.metricsByHash[hash] = append(h.metricsByHash[hash], hMetric) + return hMetric +} + +type metricDeleter interface { + DeleteLabelValues(...string) bool +} + +func (h *heldMetricsStore[T]) DeleteOldMetrics(holdDuration time.Duration, deleter metricDeleter) { + now := xtime.GetInaccurateUnixNano() + + h.mu.Lock() + defer h.mu.Unlock() + + for hash, hMetrics := range h.metricsByHash { + releasedMetrics := slices.DeleteFunc(hMetrics, func(hMetric *heldMetric[T]) bool { + lastUsage := hMetric.lastUsage.Load() + diff := now - lastUsage + isObsolete := diff > holdDuration.Nanoseconds() + if isObsolete { + deleter.DeleteLabelValues(hMetric.labels...) + *hMetric = heldMetric[T]{} // release objects in the structure + } + return isObsolete + }) + + if len(releasedMetrics) == 0 { + delete(h.metricsByHash, hash) + } + } +} + +func findHeldMetricIndex[T prometheus.Metric](hMetrics []*heldMetric[T], labels []string) int { + idx := -1 + for i := range hMetrics { + if slices.Equal(hMetrics[i].labels, labels) { + idx = i + break + } + } + return idx +} + +func computeStringsHash(s []string) uint64 { + var hash uint64 + if len(s) == 1 { + hash = xxhash.Sum64String(s[0]) + } else { + digest := xxhash.New() + for i := range s { + _, _ = digest.WriteString(s[i]) + } + hash = digest.Sum64() + } + return hash +} diff --git a/metric/held_metric_test.go b/metric/held_metric_test.go new file mode 100644 index 000000000..4e35574a0 --- /dev/null +++ b/metric/held_metric_test.go @@ -0,0 +1,131 @@ +package metric + +import ( + "strings" + "testing" + "time" + "unsafe" + + "github.com/ozontech/file.d/xtime" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func TestLabelExpiration(t *testing.T) { + r := require.New(t) + + ctl := NewCtl("test", prometheus.NewRegistry()) + promCounter := ctl.RegisterCounterVec("errors", "", "level") + + c := NewHeldCounterVec(promCounter) + + now := time.Now().UnixNano() + xtime.SetNowTime(now) + + c.WithLabelValues("error").Inc() + c.WithLabelValues("warn").Inc() + c.WithLabelValues("info").Inc() + + r.Equal(3, len(c.store.metricsByHash)) + c.DeleteOldMetrics(time.Minute) + r.Equal(3, len(c.store.metricsByHash)) + + // update usage of the metric with "info" label + { + newNow := now + (time.Second * 30).Nanoseconds() + xtime.SetNowTime(newNow) + c.WithLabelValues("info").Inc() + } + + // set new time to expire "warn" and "error" metrics + xtime.SetNowTime(now + time.Minute.Nanoseconds() + time.Nanosecond.Nanoseconds()) + + c.DeleteOldMetrics(time.Minute) + + hash := computeStringsHash([]string{"info"}) + r.Equal(1, len(c.store.metricsByHash)) + + infoMetric := c.store.metricsByHash[hash][0] + r.Equal([]string{"info"}, infoMetric.labels) +} + +func TestUnsafeStringInMetric(t *testing.T) { + r := require.New(t) + + bytes := []byte("hello world") + unsafeString := unsafe.String(unsafe.SliceData(bytes), len(bytes)) + + labels := []string{unsafeString} + m := newHeldMetric[prometheus.Counter]([]string{unsafeString}, prometheus.NewCounter(prometheus.CounterOpts{})) + + bytes[0] = '1' + labels[0] = "new" + + r.Equal([]string{"hello world"}, m.labels) +} + +var holderBenchCases = []struct { + Labels []string + LabelValues [][]string +}{ + { + Labels: []string{"l1"}, + LabelValues: [][]string{ + {"test1"}, + {"test2"}, + {"test3"}, + }, + }, + { + Labels: []string{"l1", "l2"}, + LabelValues: [][]string{ + {"first1", "second1"}, + {"first2", "second2"}, + {"first3", "second3"}, + }, + }, + { + Labels: []string{"l1", "l2", "l3"}, + LabelValues: [][]string{ + {"first1", "second1", "third1"}, + {"first2", "second2", "third2"}, + {"first3", "second3", "third3"}, + }, + }, +} + +func BenchmarkMetricHolder(b *testing.B) { + for _, benchCase := range holderBenchCases { + ctl := NewCtl("test", prometheus.NewRegistry()) + holder := NewHolder(time.Minute) + + promCounter := ctl.RegisterCounterVec("test_name", "", benchCase.Labels...) + counter := holder.AddCounterVec(promCounter) + + name := strings.Join(benchCase.Labels, "_") + + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + for _, labels := range benchCase.LabelValues { + counter.WithLabelValues(labels...).Inc() + } + } + }) + } +} + +func BenchmarkPromVec(b *testing.B) { + for _, benchCase := range holderBenchCases { + ctl := NewCtl("test", prometheus.NewRegistry()) + counter := ctl.RegisterCounterVec("test_name", "", benchCase.Labels...) + name := strings.Join(benchCase.Labels, "_") + + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + for _, labels := range benchCase.LabelValues { + counter.WithLabelValues(labels...).Inc() + } + } + }) + } +} diff --git a/metric/holder.go b/metric/holder.go new file mode 100644 index 000000000..68c37385a --- /dev/null +++ b/metric/holder.go @@ -0,0 +1,56 @@ +package metric + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +type heldMetricVec interface { + DeleteOldMetrics(holdDuration time.Duration) +} + +type Holder struct { + holdDuration time.Duration + heldMetrics []heldMetricVec +} + +// NewHolder returns new metric holder. The holdDuration must be more than 1m. +func NewHolder(holdDuration time.Duration) *Holder { + if holdDuration < time.Minute { + panic("hold duration must be greater than 1m") + } + return &Holder{ + holdDuration: holdDuration, + heldMetrics: make([]heldMetricVec, 0), + } +} + +func (h *Holder) Maintenance() { + h.DeleteOldMetrics() +} + +func (h *Holder) AddCounterVec(counterVec *prometheus.CounterVec) HeldCounterVec { + hcv := NewHeldCounterVec(counterVec) + h.heldMetrics = append(h.heldMetrics, hcv) + return hcv +} + +func (h *Holder) AddGaugeVec(gaugeVec *prometheus.GaugeVec) HeldGaugeVec { + hgv := NewHeldGaugeVec(gaugeVec) + h.heldMetrics = append(h.heldMetrics, hgv) + return hgv +} + +func (h *Holder) AddHistogramVec(histogramVec *prometheus.HistogramVec) HeldHistogramVec { + hhv := NewHeldHistogramVec(histogramVec) + h.heldMetrics = append(h.heldMetrics, hhv) + return hhv +} + +// DeleteOldMetrics delete old metric labels, that aren't in use since last update. +func (h *Holder) DeleteOldMetrics() { + for i := range h.heldMetrics { + h.heldMetrics[i].DeleteOldMetrics(h.holdDuration) + } +} diff --git a/pipeline/antispam/antispammer.go b/pipeline/antispam/antispammer.go index 4173c2870..8977b0078 100644 --- a/pipeline/antispam/antispammer.go +++ b/pipeline/antispam/antispammer.go @@ -27,8 +27,8 @@ type Antispammer struct { logger *zap.Logger // antispammer metrics - activeMetric *prometheus.GaugeVec - banMetric *prometheus.GaugeVec + activeMetric prometheus.Gauge + banMetric prometheus.Gauge exceptionMetric *prometheus.CounterVec } @@ -66,14 +66,14 @@ func NewAntispammer(o Options) *Antispammer { banMetric: o.MetricsController.RegisterGauge("antispam_banned", "How many times a source was banned", ), - exceptionMetric: o.MetricsController.RegisterCounter("antispam_exceptions", + exceptionMetric: o.MetricsController.RegisterCounterVec("antispam_exceptions", "How many times an exception match with an event", "name", ), } // not enabled by default - a.activeMetric.WithLabelValues().Set(0) + a.activeMetric.Set(0) return a } @@ -119,8 +119,8 @@ func (a *Antispammer) IsSpam(id uint64, name string, isNewSource bool, event []b x := src.counter.Inc() if x == int32(a.threshold) { src.counter.Swap(int32(a.unbanIterations * a.threshold)) - a.activeMetric.WithLabelValues().Set(1) - a.banMetric.WithLabelValues().Inc() + a.activeMetric.Set(1) + a.banMetric.Inc() a.logger.Warn("source has been banned", zap.Uint64("id", id), zap.String("name", name)) } @@ -147,7 +147,7 @@ func (a *Antispammer) Maintenance() { } if isMore && x < a.threshold { - a.banMetric.WithLabelValues().Dec() + a.banMetric.Dec() a.logger.Info("source has been unbanned", zap.Uint64("id", sourceID)) } @@ -163,7 +163,7 @@ func (a *Antispammer) Maintenance() { } if allUnbanned { - a.activeMetric.WithLabelValues().Set(0) + a.activeMetric.Set(0) } else { a.logger.Info("there are banned sources") } diff --git a/pipeline/batch.go b/pipeline/batch.go index 5b3a2dec7..4fb0380fa 100644 --- a/pipeline/batch.go +++ b/pipeline/batch.go @@ -153,7 +153,7 @@ type ( func NewBatcher(opts BatcherOptions) *Batcher { // nolint: gocritic // hugeParam is ok here ctl := opts.MetricCtl - jobsDone := ctl.RegisterCounter("batcher_jobs_done_total", "", "status") + jobsDone := ctl.RegisterCounterVec("batcher_jobs_done_total", "", "status") freeBatches := make(chan *Batch, opts.Workers) fullBatches := make(chan *Batch, opts.Workers) @@ -168,9 +168,9 @@ func NewBatcher(opts BatcherOptions) *Batcher { // nolint: gocritic // hugeParam freeBatches: freeBatches, fullBatches: fullBatches, opts: opts, - batchOutFnSeconds: ctl.RegisterHistogram("batcher_out_fn_seconds", "", metric.SecondsBucketsLong).WithLabelValues(), - commitWaitingSeconds: ctl.RegisterHistogram("batcher_commit_waiting_seconds", "", metric.SecondsBucketsDetailed).WithLabelValues(), - workersInProgress: ctl.RegisterGauge("batcher_workers_in_progress", "").WithLabelValues(), + batchOutFnSeconds: ctl.RegisterHistogram("batcher_out_fn_seconds", "", metric.SecondsBucketsLong), + commitWaitingSeconds: ctl.RegisterHistogram("batcher_commit_waiting_seconds", "", metric.SecondsBucketsDetailed), + workersInProgress: ctl.RegisterGauge("batcher_workers_in_progress", ""), batchesDoneByMaxSize: jobsDone.WithLabelValues("max_size_exceeded"), batchesDoneByTimeout: jobsDone.WithLabelValues("timeout_exceeded"), } diff --git a/pipeline/batch_test.go b/pipeline/batch_test.go index d3e79bb14..8ae5fae57 100644 --- a/pipeline/batch_test.go +++ b/pipeline/batch_test.go @@ -67,7 +67,7 @@ func TestBatcher(t *testing.T) { Workers: 8, BatchSizeCount: batchSize, FlushTimeout: time.Second, - MetricCtl: metric.New("", prometheus.NewRegistry()), + MetricCtl: metric.NewCtl("", prometheus.NewRegistry()), }) ctx := context.TODO() @@ -138,7 +138,7 @@ func TestBatcherMaxSize(t *testing.T) { Workers: 8, BatchSizeBytes: batchSize, FlushTimeout: time.Minute, - MetricCtl: metric.New("", prometheus.NewRegistry()), + MetricCtl: metric.NewCtl("", prometheus.NewRegistry()), }) batcher.Start(context.Background()) diff --git a/pipeline/metrics_holder.go b/pipeline/metrics_holder.go deleted file mode 100644 index 254cc025e..000000000 --- a/pipeline/metrics_holder.go +++ /dev/null @@ -1,213 +0,0 @@ -package pipeline - -import ( - "fmt" - "strconv" - "sync" - "time" - - "github.com/ozontech/file.d/buildinfo" - "github.com/prometheus/client_golang/prometheus" - "go.uber.org/atomic" -) - -const ( - namespace = "file_d" - subsystemPrefix = "pipeline_" -) - -// metricHolder has nextMetricsGen method that creates new prometheus.CounterOpts, -// differs form previous only by prometheus.CounterOpts.ConstLabels: {"gen": incValue}. -// This decision help throw away from memory old metric\s, that wasn't written for a while. -// TODO create better mechanism that'll delete only old metric\s, that aren't in use for N minutes. -type metricsHolder struct { - pipelineName string - metricsGen int // generation is used to drop unused metrics from counters. - metricsGenTime time.Time - metricsGenInterval time.Duration - metrics []metrics - registry *prometheus.Registry -} - -type counter struct { - count *prometheus.CounterVec - // totalCounter is a map of eventStatus to counter for `/info` endpoint. - totalCounter map[string]*atomic.Uint64 - size *prometheus.CounterVec -} - -type metrics struct { - name string - labels []string - skipStatus bool - - root mNode - - current counter - previous counter -} - -type mNode struct { - childs map[string]mNode - mu *sync.RWMutex - self string -} - -func newMetricsHolder(pipelineName string, registry *prometheus.Registry, - metricsGenInterval time.Duration) *metricsHolder { - return &metricsHolder{ - pipelineName: pipelineName, - registry: registry, - - metrics: make([]metrics, 0), - metricsGenInterval: metricsGenInterval, - } -} - -func (m *metricsHolder) AddAction(metricName string, metricLabels []string, skipStatus bool) { - m.metrics = append(m.metrics, metrics{ - name: metricName, - labels: metricLabels, - skipStatus: skipStatus, - root: mNode{ - childs: make(map[string]mNode), - mu: &sync.RWMutex{}, - }, - current: counter{nil, make(map[string]*atomic.Uint64), nil}, - previous: counter{nil, make(map[string]*atomic.Uint64), nil}, - }) -} - -func (m *metricsHolder) start() { - m.nextMetricsGen() -} - -func (c *counter) register(registry *prometheus.Registry) { - registry.MustRegister(c.count) - registry.MustRegister(c.size) -} - -func (c *counter) unregister(registry *prometheus.Registry) { - registry.Unregister(c.count) - registry.Unregister(c.size) -} - -func (m *metricsHolder) nextMetricsGen() { - metricsGen := strconv.Itoa(m.metricsGen % 3) // 2 (for key variance) + 1 (since we must register first) == 3 - for index := range m.metrics { - metrics := &m.metrics[index] - if metrics.name == "" { - continue - } - - labels := make([]string, 0, len(metrics.labels)+1) - if !metrics.skipStatus { - labels = append(labels, "status") - } - labels = append(labels, metrics.labels...) - - cnt := counter{nil, make(map[string]*atomic.Uint64), nil} - for _, st := range allEventStatuses() { - cnt.totalCounter[string(st)] = atomic.NewUint64(0) - } - opts := prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystemPrefix + m.pipelineName, - Name: metrics.name + "_events_count_total", - Help: fmt.Sprintf("how many events processed by pipeline %q and #%d action", m.pipelineName, index), - ConstLabels: map[string]string{"gen": metricsGen, "version": buildinfo.Version}, - } - cnt.count = prometheus.NewCounterVec(opts, labels) - opts = prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystemPrefix + m.pipelineName, - Name: metrics.name + "_events_size_total", - Help: fmt.Sprintf("total size of events processed by pipeline %q and #%d action", m.pipelineName, index), - ConstLabels: map[string]string{"gen": metricsGen, "version": buildinfo.Version}, - } - cnt.size = prometheus.NewCounterVec(opts, labels) - - obsolete := metrics.previous - - metrics.previous = metrics.current - metrics.current = cnt - - metrics.current.register(m.registry) - if obsolete.count != nil { - obsolete.unregister(m.registry) - } - } - - m.metricsGen++ - m.metricsGenTime = time.Now() -} - -func (m *metricsHolder) count(event *Event, actionIndex int, eventStatus eventStatus, valuesBuf []string) []string { - if len(m.metrics) == 0 { - return valuesBuf - } - - metrics := &m.metrics[actionIndex] - if metrics.name == "" { - return valuesBuf - } - - if metrics.skipStatus && eventStatus == eventStatusReceived { - return valuesBuf - } - - valuesBuf = valuesBuf[:0] - if !metrics.skipStatus { - valuesBuf = append(valuesBuf, string(eventStatus)) - } - - mn := metrics.root - for _, field := range metrics.labels { - val := DefaultFieldValue - - node := event.Root.Dig(field) - if node != nil { - val = node.AsString() - } - - mn.mu.RLock() - nextMN, has := mn.childs[val] - mn.mu.RUnlock() - - if !has { - mn.mu.Lock() - nextMN, has = mn.childs[val] - if !has { - key := DefaultFieldValue - if node != nil { - key = string(node.AsBytes()) // make string from []byte to make map string keys works good - } - - nextMN = mNode{ - childs: make(map[string]mNode), - self: key, - mu: &sync.RWMutex{}, - } - mn.childs[key] = nextMN - } - mn.mu.Unlock() - } - - valuesBuf = append(valuesBuf, nextMN.self) - mn = nextMN - } - - metrics.current.totalCounter[string(eventStatus)].Inc() - metrics.current.count.WithLabelValues(valuesBuf...).Inc() - metrics.current.size.WithLabelValues(valuesBuf...).Add(float64(event.Size)) - - return valuesBuf -} - -func (m *metricsHolder) maintenance() { - if time.Since(m.metricsGenTime) < m.metricsGenInterval { - return - } - - m.nextMetricsGen() -} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 854ee9974..b02394b43 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -33,11 +33,11 @@ const ( DefaultEventTimeout = time.Second * 30 DefaultFieldValue = "not_set" DefaultStreamName = StreamName("not_set") + DefaultMetricHoldDuration = time.Minute * 30 EventSeqIDError = uint64(0) antispamUnbanIterations = 4 - metricsGenInterval = time.Hour ) type finalizeFn = func(event *Event, notifyInput bool, backEvent bool) @@ -87,16 +87,18 @@ type Pipeline struct { inputInfo *InputPluginInfo antispamer *antispam.Antispammer - actionInfos []*ActionPluginStaticInfo - Procs []*processor - procCount *atomic.Int32 - activeProcs *atomic.Int32 - actionParams PluginDefaultParams + actionInfos []*ActionPluginStaticInfo + actionMetrics actionMetrics + actionParams PluginDefaultParams + + Procs []*processor + procCount *atomic.Int32 + activeProcs *atomic.Int32 output OutputPlugin outputInfo *OutputPluginInfo - metricsHolder *metricsHolder + metricHolder *metric.Holder // some debugging stuff logger *zap.Logger @@ -111,16 +113,15 @@ type Pipeline struct { readOps atomic.Int64 // all pipeline`s metrics - - inUseEventsMetric *prometheus.GaugeVec - eventPoolCapacityMetric *prometheus.GaugeVec - inputEventsCountMetric *prometheus.CounterVec - inputEventSizeMetric *prometheus.CounterVec - outputEventsCountMetric *prometheus.CounterVec - outputEventSizeMetric *prometheus.CounterVec - readOpsEventsSizeMetric *prometheus.CounterVec - wrongEventCRIFormatMetric *prometheus.CounterVec - maxEventSizeExceededMetric *prometheus.CounterVec + inUseEventsMetric prometheus.Gauge + eventPoolCapacityMetric prometheus.Gauge + inputEventsCountMetric prometheus.Counter + inputEventSizeMetric prometheus.Counter + outputEventsCountMetric prometheus.Counter + outputEventSizeMetric prometheus.Counter + readOpsEventsSizeMetric prometheus.Counter + wrongEventCRIFormatMetric prometheus.Counter + maxEventSizeExceededMetric prometheus.Counter eventPoolLatency prometheus.Observer } @@ -135,11 +136,12 @@ type Settings struct { MaxEventSize int StreamField string IsStrict bool + MetricHoldDuration time.Duration } // New creates new pipeline. Consider using `SetupHTTPHandlers` next. func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeline { - metricCtl := metric.New("pipeline_"+name, registry) + metricCtl := metric.NewCtl("pipeline_"+name, registry) lg := logger.Instance.Named(name).Desugar() @@ -154,10 +156,13 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli PipelineSettings: settings, MetricCtl: metricCtl, }, - - metricsHolder: newMetricsHolder(name, registry, metricsGenInterval), - streamer: newStreamer(settings.EventTimeout), - eventPool: newEventPool(settings.Capacity, settings.AvgEventSize), + actionMetrics: actionMetrics{ + m: make(map[string]*actionMetric), + mu: new(sync.RWMutex), + }, + metricHolder: metric.NewHolder(settings.MetricHoldDuration), + streamer: newStreamer(settings.EventTimeout), + eventPool: newEventPool(settings.Capacity, settings.AvgEventSize), antispamer: antispam.NewAntispammer(antispam.Options{ MaintenanceInterval: settings.MaintenanceInterval, Threshold: settings.AntispamThreshold, @@ -199,7 +204,7 @@ func (p *Pipeline) IncReadOps() { } func (p *Pipeline) IncMaxEventSizeExceeded() { - p.maxEventSizeExceededMetric.WithLabelValues().Inc() + p.maxEventSizeExceededMetric.Inc() } func (p *Pipeline) registerMetrics() { @@ -214,12 +219,11 @@ func (p *Pipeline) registerMetrics() { p.wrongEventCRIFormatMetric = m.RegisterCounter("wrong_event_cri_format", "Wrong event CRI format counter") p.maxEventSizeExceededMetric = m.RegisterCounter("max_event_size_exceeded", "Max event size exceeded counter") p.eventPoolLatency = m.RegisterHistogram("event_pool_latency_seconds", - "How long we are wait an event from the pool", metric.SecondsBucketsDetailedNano). - WithLabelValues() + "How long we are wait an event from the pool", metric.SecondsBucketsDetailedNano) } func (p *Pipeline) setDefaultMetrics() { - p.eventPoolCapacityMetric.WithLabelValues().Set(float64(p.settings.Capacity)) + p.eventPoolCapacityMetric.Set(float64(p.settings.Capacity)) } // SetupHTTPHandlers creates handlers for plugin endpoints and pipeline info. @@ -265,7 +269,6 @@ func (p *Pipeline) Start() { } p.initProcs() - p.metricsHolder.start() outputParams := &OutputPluginParams{ PluginDefaultParams: p.actionParams, @@ -367,7 +370,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes } else if dec == decoder.CRI { row, err = decoder.DecodeCRI(bytes) if err != nil { - p.wrongEventCRIFormatMetric.WithLabelValues().Inc() + p.wrongEventCRIFormatMetric.Inc() p.Error(fmt.Sprintf("wrong cri format offset=%d, length=%d, err=%s, source=%d:%s, cri=%s", offset, length, err.Error(), sourceID, sourceName, bytes)) return EventSeqIDError } @@ -535,9 +538,73 @@ func (p *Pipeline) finalize(event *Event, notifyInput bool, backEvent bool) { p.eventPool.back(event) } +type actionMetric struct { + count metric.HeldCounterVec + size metric.HeldCounterVec + // totalCounter is a map of eventStatus to counter for `/info` endpoint. + totalCounter map[string]*atomic.Uint64 +} + +type actionMetrics struct { + m map[string]*actionMetric + mu *sync.RWMutex +} + +func (am *actionMetrics) set(name string, m *actionMetric) { + if name == "" { + return + } + + am.mu.Lock() + am.m[name] = m + am.mu.Unlock() +} + +func (am *actionMetrics) get(name string) *actionMetric { + if name == "" { + return nil + } + + am.mu.RLock() + defer am.mu.RUnlock() + return am.m[name] +} + func (p *Pipeline) AddAction(info *ActionPluginStaticInfo) { p.actionInfos = append(p.actionInfos, info) - p.metricsHolder.AddAction(info.MetricName, info.MetricLabels, info.MetricSkipStatus) + + mCtl := p.actionParams.MetricCtl + + labels := make([]string, 0, len(info.MetricLabels)+1) + if !info.MetricSkipStatus { + labels = append(labels, "status") + } + labels = append(labels, info.MetricLabels...) + + count := mCtl.RegisterCounterVec( + info.MetricName+"_events_count_total", + fmt.Sprintf("how many events processed by pipeline %q and #%d action", p.Name, len(p.actionInfos)-1), + labels..., + ) + heldCount := p.metricHolder.AddCounterVec(count) + + size := mCtl.RegisterCounterVec( + info.MetricName+"_events_size_total", + fmt.Sprintf("total size of events processed by pipeline %q and #%d action", p.Name, len(p.actionInfos)-1), + labels..., + ) + heldSize := p.metricHolder.AddCounterVec(size) + + totalCounter := make(map[string]*atomic.Uint64) + for _, st := range allEventStatuses() { + totalCounter[string(st)] = atomic.NewUint64(0) + } + + p.actionMetrics.set(info.MetricName, &actionMetric{ + count: heldCount, + size: heldSize, + totalCounter: totalCounter, + }) } func (p *Pipeline) initProcs() { @@ -560,7 +627,7 @@ func (p *Pipeline) initProcs() { func (p *Pipeline) newProc(id int) *processor { proc := newProcessor( id, - p.metricsHolder, + &p.actionMetrics, p.activeProcs, p.output, p.streamer, @@ -669,17 +736,17 @@ func (p *Pipeline) incMetrics(inputEvents, inputSize, outputEvents, outputSize, deltaReads, } - p.inputEventsCountMetric.WithLabelValues().Add(myDeltas.deltaInputEvents) - p.inputEventSizeMetric.WithLabelValues().Add(myDeltas.deltaInputSize) - p.outputEventsCountMetric.WithLabelValues().Add(myDeltas.deltaOutputEvents) - p.outputEventSizeMetric.WithLabelValues().Add(myDeltas.deltaOutputSize) - p.readOpsEventsSizeMetric.WithLabelValues().Add(myDeltas.deltaReads) + p.inputEventsCountMetric.Add(myDeltas.deltaInputEvents) + p.inputEventSizeMetric.Add(myDeltas.deltaInputSize) + p.outputEventsCountMetric.Add(myDeltas.deltaOutputEvents) + p.outputEventSizeMetric.Add(myDeltas.deltaOutputSize) + p.readOpsEventsSizeMetric.Add(myDeltas.deltaReads) return myDeltas } func (p *Pipeline) setMetrics(inUseEvents int64) { - p.inUseEventsMetric.WithLabelValues().Set(float64(inUseEvents)) + p.inUseEventsMetric.Set(float64(inUseEvents)) } func (p *Pipeline) maintenance() { @@ -696,7 +763,7 @@ func (p *Pipeline) maintenance() { } p.antispamer.Maintenance() - p.metricsHolder.maintenance() + p.metricHolder.Maintenance() myDeltas := p.incMetrics(inputEvents, inputSize, outputEvents, outputSize, readOps) p.setMetrics(p.eventPool.inUseEvents.Load()) @@ -776,14 +843,7 @@ func (p *Pipeline) serveActionInfo(info *ActionPluginStaticInfo) func(http.Respo return } - var actionMetric *metrics - for i := range p.metricsHolder.metrics { - m := &p.metricsHolder.metrics[i] - if m.name == info.MetricName { - actionMetric = m - break - } - } + am := p.actionMetrics.get(info.MetricName) var events []Event for _, status := range []eventStatus{ @@ -791,7 +851,7 @@ func (p *Pipeline) serveActionInfo(info *ActionPluginStaticInfo) func(http.Respo eventStatusDiscarded, eventStatusPassed, } { - c := actionMetric.current.totalCounter[string(status)] + c := am.totalCounter[string(status)] if c == nil { c = atomic.NewUint64(0) } diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go index 6dce69506..2722a781a 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/pipeline_test.go @@ -29,8 +29,9 @@ func TestInUnparsableMessages(t *testing.T) { name := "invalid_json" message := []byte("{wHo Is Json: YoU MeAn SoN oF JoHn???") pipelineSettings := &pipeline.Settings{ - Capacity: 5, - Decoder: "json", + Capacity: 5, + Decoder: "json", + MetricHoldDuration: pipeline.DefaultMetricHoldDuration, } offset := int64(666) sourceID := pipeline.SourceID(3<<16 + int(10)) @@ -91,8 +92,9 @@ func TestInInvalidMessages(t *testing.T) { name: "empty_message", message: []byte(""), pipelineSettings: &pipeline.Settings{ - Capacity: 5, - Decoder: "json", + Capacity: 5, + Decoder: "json", + MetricHoldDuration: pipeline.DefaultMetricHoldDuration, }, offset: int64(666), sourceID: pipeline.SourceID(1<<16 + int(1)), @@ -101,9 +103,10 @@ func TestInInvalidMessages(t *testing.T) { name: "too_long_message", message: []byte("{\"value\":\"i'm longer than 1 byte\""), pipelineSettings: &pipeline.Settings{ - Capacity: 5, - Decoder: "json", - MaxEventSize: 1, + Capacity: 5, + Decoder: "json", + MaxEventSize: 1, + MetricHoldDuration: pipeline.DefaultMetricHoldDuration, }, offset: int64(666), sourceID: pipeline.SourceID(2<<16 + int(3)), diff --git a/pipeline/pipeline_whitebox_test.go b/pipeline/pipeline_whitebox_test.go index f5958b63c..b2508ae12 100644 --- a/pipeline/pipeline_whitebox_test.go +++ b/pipeline/pipeline_whitebox_test.go @@ -10,8 +10,9 @@ import ( func TestPipeline_streamEvent(t *testing.T) { settings := &Settings{ - Capacity: 5, - Decoder: "json", + Capacity: 5, + Decoder: "json", + MetricHoldDuration: DefaultMetricHoldDuration, } p := New("test", settings, prometheus.NewRegistry()) diff --git a/pipeline/processor.go b/pipeline/processor.go index 694d12d33..e3f354e50 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -51,16 +51,16 @@ func allEventStatuses() []eventStatus { // processor is a goroutine which doing pipeline actions type processor struct { - id int - streamer *streamer - metricsHolder *metricsHolder - output OutputPlugin - finalize finalizeFn + id int + streamer *streamer + output OutputPlugin + finalize finalizeFn activeCounter *atomic.Int32 actions []ActionPlugin actionInfos []*ActionPluginStaticInfo + actionMetrics *actionMetrics busyActions []bool busyActionsTotal int actionWatcher *actionWatcher @@ -73,7 +73,7 @@ type processor struct { func newProcessor( id int, - metricsHolder *metricsHolder, + actionMetrics *actionMetrics, activeCounter *atomic.Int32, output OutputPlugin, streamer *streamer, @@ -83,7 +83,7 @@ func newProcessor( processor := &processor{ id: id, streamer: streamer, - metricsHolder: metricsHolder, + actionMetrics: actionMetrics, output: output, finalize: finalizeFn, @@ -263,7 +263,33 @@ func (p *processor) countEvent(event *Event, actionIndex int, status eventStatus if event.IsTimeoutKind() { return } - p.metricsValues = p.metricsHolder.count(event, actionIndex, status, p.metricsValues) + + actionInfo := p.actionInfos[actionIndex] + am := p.actionMetrics.get(actionInfo.MetricName) + + if am == nil || (actionInfo.MetricSkipStatus && status == eventStatusReceived) { + return + } + + p.metricsValues = p.metricsValues[:0] + if !actionInfo.MetricSkipStatus { + p.metricsValues = append(p.metricsValues, string(status)) + } + + for _, field := range actionInfo.MetricLabels { + val := DefaultFieldValue + + node := event.Root.Dig(field) + if node != nil { + val = node.AsString() + } + + p.metricsValues = append(p.metricsValues, val) + } + + am.totalCounter[string(status)].Inc() + am.count.WithLabelValues(p.metricsValues...).Inc() + am.size.WithLabelValues(p.metricsValues...).Add(float64(event.Size)) } func (p *processor) isMatch(index int, event *Event) bool { diff --git a/plugin/action/mask/mask.go b/plugin/action/mask/mask.go index 6cf96e7d4..3e9ad4f02 100644 --- a/plugin/action/mask/mask.go +++ b/plugin/action/mask/mask.go @@ -59,7 +59,6 @@ type Plugin struct { logger *zap.Logger // plugin metrics - maskAppliedMetric *prometheus.CounterVec } @@ -185,7 +184,7 @@ func (p *Plugin) makeMetric(ctl *metric.Ctl, name, help string, labels ...string labelNames = append(labelNames, label) } - return ctl.RegisterCounter(name, help, labelNames...) + return ctl.RegisterCounterVec(name, help, labelNames...) } func compileMasks(masks []Mask, logger *zap.Logger) ([]Mask, *regexp.Regexp) { diff --git a/plugin/action/parse_re2/parse_re2.go b/plugin/action/parse_re2/parse_re2.go index 558c28f10..6808b74ea 100644 --- a/plugin/action/parse_re2/parse_re2.go +++ b/plugin/action/parse_re2/parse_re2.go @@ -20,7 +20,7 @@ type Plugin struct { re *regexp.Regexp // plugin metrics - eventNotMatchingPatternMetric *prometheus.CounterVec + eventNotMatchingPatternMetric prometheus.Counter } // ! config-params @@ -73,7 +73,7 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { sm := p.re.FindSubmatch(jsonNode.AsBytes()) if len(sm) == 0 { - p.eventNotMatchingPatternMetric.WithLabelValues().Inc() + p.eventNotMatchingPatternMetric.Inc() return pipeline.ActionPass } diff --git a/plugin/action/throttle/limiters_map.go b/plugin/action/throttle/limiters_map.go index 2b656af9b..4d8cfa9eb 100644 --- a/plugin/action/throttle/limiters_map.go +++ b/plugin/action/throttle/limiters_map.go @@ -71,7 +71,7 @@ type limitersMapConfig struct { limiterCfg *limiterConfig - mapSizeMetric *prometheus.GaugeVec + mapSizeMetric prometheus.Gauge } // limitersMap is auxiliary type for storing the map of strings to limiters with additional info for cleanup @@ -91,7 +91,7 @@ type limitersMap struct { limiterCfg *limiterConfig - mapSizeMetric *prometheus.GaugeVec + mapSizeMetric prometheus.Gauge } func newLimitersMap(lmCfg limitersMapConfig, redisOpts *redis.Options) *limitersMap { @@ -212,7 +212,7 @@ func (l *limitersMap) maintenance(ctx context.Context) { delete(l.lims, key) } mapSize := float64(len(l.lims)) - l.mapSizeMetric.WithLabelValues().Set(mapSize) + l.mapSizeMetric.Set(mapSize) l.mu.Unlock() } } diff --git a/plugin/action/throttle/throttle.go b/plugin/action/throttle/throttle.go index 976bc0029..381eadc8a 100644 --- a/plugin/action/throttle/throttle.go +++ b/plugin/action/throttle/throttle.go @@ -45,7 +45,8 @@ type Plugin struct { limiterBuf []byte rules []*rule - limitersMapSizeMetric *prometheus.GaugeVec + // plugin metrics + limitersMapSizeMetric prometheus.Gauge } // ! config-params diff --git a/plugin/input/dmesg/dmesg.go b/plugin/input/dmesg/dmesg.go index c33c40b4d..2f11d245f 100644 --- a/plugin/input/dmesg/dmesg.go +++ b/plugin/input/dmesg/dmesg.go @@ -28,8 +28,7 @@ type Plugin struct { logger *zap.SugaredLogger // plugin metrics - - offsetErrorsMetric *prometheus.CounterVec + offsetErrorsMetric prometheus.Counter } // ! config-params @@ -65,7 +64,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa p.state = &state{} if err := offset.LoadYAML(p.config.OffsetsFile, p.state); err != nil { - p.offsetErrorsMetric.WithLabelValues().Inc() + p.offsetErrorsMetric.Inc() p.logger.Error("can't load offset file: %s", err.Error()) } @@ -126,7 +125,7 @@ func (p *Plugin) Commit(event *pipeline.Event) { p.state.TS = event.Offset if err := offset.SaveYAML(p.config.OffsetsFile, p.state); err != nil { - p.offsetErrorsMetric.WithLabelValues().Inc() + p.offsetErrorsMetric.Inc() p.logger.Errorf("can't save offset file: %s", err.Error()) } } diff --git a/plugin/input/file/file.go b/plugin/input/file/file.go index f0bbddbdc..2f4ed55f6 100644 --- a/plugin/input/file/file.go +++ b/plugin/input/file/file.go @@ -55,11 +55,10 @@ type Plugin struct { jobProvider *jobProvider // plugin metrics - - possibleOffsetCorruptionMetric *prometheus.CounterVec - alreadyWrittenEventsSkippedMetric *prometheus.CounterVec - errorOpenFileMetric *prometheus.CounterVec - notifyChannelLengthMetric *prometheus.GaugeVec + possibleOffsetCorruptionMetric prometheus.Counter + alreadyWrittenEventsSkippedMetric prometheus.Counter + errorOpenFileMetric prometheus.Counter + notifyChannelLengthMetric prometheus.Gauge } type persistenceMode int @@ -263,7 +262,7 @@ func (p *Plugin) PassEvent(event *pipeline.Event) bool { // and file-d went down after commit pass := event.Offset > savedOffset if !pass { - p.alreadyWrittenEventsSkippedMetric.WithLabelValues().Inc() + p.alreadyWrittenEventsSkippedMetric.Inc() return false } diff --git a/plugin/input/file/provider.go b/plugin/input/file/provider.go index 103aae727..8c2fd7117 100644 --- a/plugin/input/file/provider.go +++ b/plugin/input/file/provider.go @@ -54,9 +54,8 @@ type jobProvider struct { logger *zap.SugaredLogger // provider metrics - - possibleOffsetCorruptionMetric *prometheus.CounterVec - errorOpenFileMetric *prometheus.CounterVec + possibleOffsetCorruptionMetric prometheus.Counter + errorOpenFileMetric prometheus.Counter } type Job struct { @@ -101,15 +100,14 @@ type symlinkInfo struct { } type metricCollection struct { - possibleOffsetCorruptionMetric *prometheus.CounterVec - errorOpenFileMetric *prometheus.CounterVec - notifyChannelLengthMetric *prometheus.GaugeVec + possibleOffsetCorruptionMetric prometheus.Counter + errorOpenFileMetric prometheus.Counter + notifyChannelLengthMetric prometheus.Gauge } func newMetricCollection( - possibleOffsetCorruptionMetric *prometheus.CounterVec, - errorOpenFileMetric *prometheus.CounterVec, - notifyChannelLengthMetric *prometheus.GaugeVec, + possibleOffsetCorruptionMetric, errorOpenFileMetric prometheus.Counter, + notifyChannelLengthMetric prometheus.Gauge, ) *metricCollection { return &metricCollection{ possibleOffsetCorruptionMetric: possibleOffsetCorruptionMetric, @@ -216,7 +214,7 @@ func (jp *jobProvider) commit(event *pipeline.Event) { } if value == 0 && event.Offset >= 16*1024*1024 { - jp.possibleOffsetCorruptionMetric.WithLabelValues().Inc() + jp.possibleOffsetCorruptionMetric.Inc() jp.logger.Errorf("it maybe an offset corruption: committing=%d, current=%d, event id=%d, source=%d:%s", event.Offset, value, event.SeqID, event.SourceID, event.SourceName) } @@ -303,7 +301,7 @@ func (jp *jobProvider) refreshFile(stat os.FileInfo, filename string, symlink st file, err := os.Open(filename) if err != nil { jp.logger.Warnf("file was already moved from creation place %s: %s", filename, err.Error()) - jp.errorOpenFileMetric.WithLabelValues().Inc() + jp.errorOpenFileMetric.Inc() return } diff --git a/plugin/input/file/watcher.go b/plugin/input/file/watcher.go index 674aef60d..6f69aa792 100644 --- a/plugin/input/file/watcher.go +++ b/plugin/input/file/watcher.go @@ -30,7 +30,7 @@ func NewWatcher( dirPattern string, notifyFn notifyFn, shouldWatchWrites bool, - notifyChannelLengthMetric *prometheus.GaugeVec, + notifyChannelLengthMetric prometheus.Gauge, logger *zap.SugaredLogger, ) *watcher { return &watcher{ @@ -39,7 +39,7 @@ func NewWatcher( dirPattern: dirPattern, notifyFn: notifyFn, shouldWatchWrites: shouldWatchWrites, - notifyChannelLengthMetric: notifyChannelLengthMetric.WithLabelValues(), + notifyChannelLengthMetric: notifyChannelLengthMetric, logger: logger, } } diff --git a/plugin/input/file/watcher_test.go b/plugin/input/file/watcher_test.go index 181820537..3b7715bc4 100644 --- a/plugin/input/file/watcher_test.go +++ b/plugin/input/file/watcher_test.go @@ -34,7 +34,7 @@ func TestWatcher(t *testing.T) { notifyFn := func(_ notify.Event, _ string, _ os.FileInfo) { shouldCreate.Inc() } - ctl := metric.New("test", prometheus.NewRegistry()) + ctl := metric.NewCtl("test", prometheus.NewRegistry()) w := NewWatcher( path, tt.filenamePattern, diff --git a/plugin/input/file/worker_test.go b/plugin/input/file/worker_test.go index 6431bd079..1388631ba 100644 --- a/plugin/input/file/worker_test.go +++ b/plugin/input/file/worker_test.go @@ -92,11 +92,11 @@ func TestWorkerWork(t *testing.T) { shouldSkip: *atomic.NewBool(false), mu: &sync.Mutex{}, } - ctl := metric.New("test", prometheus.NewRegistry()) + ctl := metric.NewCtl("test", prometheus.NewRegistry()) metrics := newMetricCollection( - ctl.RegisterCounter("worker", "help_test"), - ctl.RegisterCounter("worker", "help_test"), - ctl.RegisterGauge("worker", "help_test"), + ctl.RegisterCounter("worker1", "help_test"), + ctl.RegisterCounter("worker2", "help_test"), + ctl.RegisterGauge("worker3", "help_test"), ) jp := NewJobProvider(&Config{}, metrics, &zap.SugaredLogger{}) jp.jobsChan = make(chan *Job, 2) @@ -225,11 +225,11 @@ func TestWorkerWorkMultiData(t *testing.T) { mu: &sync.Mutex{}, } - ctl := metric.New("test", prometheus.NewRegistry()) + ctl := metric.NewCtl("test", prometheus.NewRegistry()) metrics := newMetricCollection( - ctl.RegisterCounter("worker", "help_test"), - ctl.RegisterCounter("worker", "help_test"), - ctl.RegisterGauge("worker", "help_test"), + ctl.RegisterCounter("worker1", "help_test"), + ctl.RegisterCounter("worker2", "help_test"), + ctl.RegisterGauge("worker3", "help_test"), ) jp := NewJobProvider(&Config{}, metrics, &zap.SugaredLogger{}) jp.jobsChan = make(chan *Job, 2) diff --git a/plugin/input/http/http.go b/plugin/input/http/http.go index e297d45b8..504e66a19 100644 --- a/plugin/input/http/http.go +++ b/plugin/input/http/http.go @@ -210,18 +210,18 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa } func (p *Plugin) registerMetrics(ctl *metric.Ctl) { - p.bulkRequestsDoneTotal = ctl.RegisterCounter("bulk_requests_done_total", "").WithLabelValues() - p.requestsInProgress = ctl.RegisterGauge("requests_in_progress", "").WithLabelValues() - p.processBulkSeconds = ctl.RegisterHistogram("process_bulk_seconds", "", metric.SecondsBucketsDetailed).WithLabelValues() - p.errorsTotal = ctl.RegisterCounter("input_http_errors", "Total http errors").WithLabelValues() + p.bulkRequestsDoneTotal = ctl.RegisterCounter("bulk_requests_done_total", "") + p.requestsInProgress = ctl.RegisterGauge("requests_in_progress", "") + p.processBulkSeconds = ctl.RegisterHistogram("process_bulk_seconds", "", metric.SecondsBucketsDetailed) + p.errorsTotal = ctl.RegisterCounter("input_http_errors", "Total http errors") if p.config.Auth.Strategy_ != StrategyDisabled { - httpAuthTotal := ctl.RegisterCounter("http_auth_success_total", "", "secret_name") + httpAuthTotal := ctl.RegisterCounterVec("http_auth_success_total", "", "secret_name") p.successfulAuthTotal = make(map[string]prometheus.Counter, len(p.config.Auth.Secrets)) for key := range p.config.Auth.Secrets { p.successfulAuthTotal[key] = httpAuthTotal.WithLabelValues(key) } - p.failedAuthTotal = ctl.RegisterCounter("http_auth_fails_total", "").WithLabelValues() + p.failedAuthTotal = ctl.RegisterCounter("http_auth_fails_total", "") } } diff --git a/plugin/input/journalctl/journalctl.go b/plugin/input/journalctl/journalctl.go index 9e8a2ff45..34802600b 100644 --- a/plugin/input/journalctl/journalctl.go +++ b/plugin/input/journalctl/journalctl.go @@ -27,10 +27,9 @@ type Plugin struct { logger *zap.Logger // plugin metrics - - offsetErrorsMetric *prometheus.CounterVec - journalCtlStopErrorMetric *prometheus.CounterVec - readerErrorsMetric *prometheus.CounterVec + offsetErrorsMetric prometheus.Counter + journalCtlStopErrorMetric prometheus.Counter + readerErrorsMetric prometheus.Counter } type Config struct { @@ -89,7 +88,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa offInfo := &offsetInfo{} if err := offset.LoadYAML(p.config.OffsetsFile, offInfo); err != nil { - p.offsetErrorsMetric.WithLabelValues().Inc() + p.offsetErrorsMetric.Inc() p.logger.Error("can't load offset file", zap.Error(err)) } p.offInfo.Store(offInfo) @@ -116,13 +115,13 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) { func (p *Plugin) Stop() { err := p.reader.stop() if err != nil { - p.journalCtlStopErrorMetric.WithLabelValues().Inc() + p.journalCtlStopErrorMetric.Inc() p.logger.Error("can't stop journalctl cmd", zap.Error(err)) } offsets := *p.offInfo.Load() if err := offset.SaveYAML(p.config.OffsetsFile, offsets); err != nil { - p.offsetErrorsMetric.WithLabelValues().Inc() + p.offsetErrorsMetric.Inc() p.logger.Error("can't save offset file", zap.Error(err)) } } @@ -133,7 +132,7 @@ func (p *Plugin) Commit(event *pipeline.Event) { p.offInfo.Store(&offInfo) if err := offset.SaveYAML(p.config.OffsetsFile, offInfo); err != nil { - p.offsetErrorsMetric.WithLabelValues().Inc() + p.offsetErrorsMetric.Inc() p.logger.Error("can't save offset file", zap.Error(err)) } } diff --git a/plugin/input/journalctl/reader.go b/plugin/input/journalctl/reader.go index 15f8722a8..2c9ccab0b 100644 --- a/plugin/input/journalctl/reader.go +++ b/plugin/input/journalctl/reader.go @@ -25,7 +25,7 @@ type journalReader struct { args []string // reader metrics - readerErrorsMetric *prometheus.CounterVec + readerErrorsMetric prometheus.Counter } func (r *journalReader) readLines(rd io.Reader, config *journalReaderConfig) { @@ -47,13 +47,13 @@ func (r *journalReader) readLines(rd io.Reader, config *journalReaderConfig) { break } if err != nil { - r.readerErrorsMetric.WithLabelValues().Inc() + r.readerErrorsMetric.Inc() r.config.logger.Error(err.Error()) continue } _, err = config.output.Write(bytes) if err != nil { - r.readerErrorsMetric.WithLabelValues().Inc() + r.readerErrorsMetric.Inc() r.config.logger.Error(err.Error()) } @@ -64,7 +64,7 @@ func (r *journalReader) readLines(rd io.Reader, config *journalReaderConfig) { } } -func newJournalReader(config *journalReaderConfig, readerErrorsCounter *prometheus.CounterVec) *journalReader { +func newJournalReader(config *journalReaderConfig, readerErrorsCounter prometheus.Counter) *journalReader { res := &journalReader{ config: config, readerErrorsMetric: readerErrorsCounter, diff --git a/plugin/input/kafka/kafka.go b/plugin/input/kafka/kafka.go index 7dbf21311..ad8281c51 100644 --- a/plugin/input/kafka/kafka.go +++ b/plugin/input/kafka/kafka.go @@ -53,9 +53,8 @@ type Plugin struct { idByTopic map[string]int // plugin metrics - - commitErrorsMetric *prometheus.CounterVec - consumeErrorsMetric *prometheus.CounterVec + commitErrorsMetric prometheus.Counter + consumeErrorsMetric prometheus.Counter } type OffsetType byte @@ -190,7 +189,7 @@ func (p *Plugin) consume(ctx context.Context) { for { err := p.consumerGroup.Consume(ctx, p.config.Topics, p) if err != nil { - p.consumeErrorsMetric.WithLabelValues().Inc() + p.consumeErrorsMetric.Inc() p.logger.Errorf("can't consume from kafka: %s", err.Error()) } @@ -207,7 +206,7 @@ func (p *Plugin) Stop() { func (p *Plugin) Commit(event *pipeline.Event) { session := p.session if session == nil { - p.commitErrorsMetric.WithLabelValues().Inc() + p.commitErrorsMetric.Inc() p.logger.Errorf("no kafka consumer session for event commit") return } diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index 7cbec378f..3e82a10b5 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -54,9 +54,8 @@ type Plugin struct { requestID atomic.Int64 // plugin metrics - - insertErrorsMetric *prometheus.CounterVec - queriesCountMetric *prometheus.CounterVec + insertErrorsMetric prometheus.Counter + queriesCountMetric prometheus.Counter } type Setting struct { @@ -492,7 +491,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { if err == nil { break } - p.insertErrorsMetric.WithLabelValues().Inc() + p.insertErrorsMetric.Inc() time.Sleep(p.config.Retention_) p.logger.Error("an attempt to insert a batch failed", zap.Error(err)) } @@ -504,7 +503,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { } func (p *Plugin) do(clickhouse Clickhouse, queryInput proto.Input) error { - defer p.queriesCountMetric.WithLabelValues().Inc() + defer p.queriesCountMetric.Inc() ctx, cancel := context.WithTimeout(p.ctx, p.config.InsertTimeout_) defer cancel() diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go index 52a84bb88..176b0cfbc 100644 --- a/plugin/output/elasticsearch/elasticsearch.go +++ b/plugin/output/elasticsearch/elasticsearch.go @@ -51,8 +51,7 @@ type Plugin struct { mu *sync.Mutex // plugin metrics - - sendErrorMetric *prometheus.CounterVec + sendErrorMetric prometheus.Counter indexingErrorsMetric prometheus.Counter } @@ -235,7 +234,7 @@ func (p *Plugin) Out(event *pipeline.Event) { func (p *Plugin) registerMetrics(ctl *metric.Ctl) { p.sendErrorMetric = ctl.RegisterCounter("output_elasticsearch_send_error", "Total elasticsearch send errors") - p.indexingErrorsMetric = ctl.RegisterCounter("output_elasticsearch_index_error", "Number of elasticsearch indexing errors").WithLabelValues() + p.indexingErrorsMetric = ctl.RegisterCounter("output_elasticsearch_index_error", "Number of elasticsearch indexing errors") } func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { @@ -258,7 +257,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { for { if err := p.send(data.outBuf); err != nil { - p.sendErrorMetric.WithLabelValues().Inc() + p.sendErrorMetric.Inc() p.logger.Error("can't send to the elastic, will try other endpoint", zap.Error(err)) } else { break diff --git a/plugin/output/file/helpers_test.go b/plugin/output/file/helpers_test.go index efe667da4..2265e6b90 100644 --- a/plugin/output/file/helpers_test.go +++ b/plugin/output/file/helpers_test.go @@ -54,6 +54,7 @@ func newPipeline(t *testing.T, configOutput *Config) *pipeline.Pipeline { AvgEventSize: 2048, StreamField: "stream", Decoder: "json", + MetricHoldDuration: pipeline.DefaultMetricHoldDuration, } p := pipeline.New("test_pipeline", settings, prometheus.NewRegistry()) diff --git a/plugin/output/gelf/gelf.go b/plugin/output/gelf/gelf.go index 25f2a6eee..448bddc16 100644 --- a/plugin/output/gelf/gelf.go +++ b/plugin/output/gelf/gelf.go @@ -45,8 +45,7 @@ type Plugin struct { controller pipeline.OutputPluginController // plugin metrics - - sendErrorMetric *prometheus.CounterVec + sendErrorMetric prometheus.Counter } // ! config-params @@ -252,7 +251,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { gelf, err := newClient(p.config.Endpoint, p.config.ConnectionTimeout_, p.config.WriteTimeout_, false, nil) if err != nil { - p.sendErrorMetric.WithLabelValues().Inc() + p.sendErrorMetric.Inc() p.logger.Errorf("can't connect to gelf endpoint address=%s: %s", p.config.Endpoint, err.Error()) time.Sleep(time.Second) continue @@ -262,7 +261,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { _, err := data.gelf.send(outBuf) if err != nil { - p.sendErrorMetric.WithLabelValues().Inc() + p.sendErrorMetric.Inc() p.logger.Errorf("can't send data to gelf address=%s, err: %s", p.config.Endpoint, err.Error()) _ = data.gelf.close() data.gelf = nil diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go index 383752a66..a1c2c0c7a 100644 --- a/plugin/output/kafka/kafka.go +++ b/plugin/output/kafka/kafka.go @@ -39,7 +39,7 @@ type Plugin struct { batcher *pipeline.Batcher // plugin metrics - sendErrorMetric *prometheus.CounterVec + sendErrorMetric prometheus.Counter } // ! config-params @@ -219,7 +219,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { for _, e := range errs { p.logger.Errorf("can't write batch: %s", e.Err.Error()) } - p.sendErrorMetric.WithLabelValues().Add(float64(len(errs))) + p.sendErrorMetric.Add(float64(len(errs))) p.controller.Error("some events from batch were not written") } } diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go index f4ca1eed3..ad206e78c 100644 --- a/plugin/output/postgres/postgres.go +++ b/plugin/output/postgres/postgres.go @@ -71,10 +71,9 @@ type Plugin struct { pool PgxIface // plugin metrics - - discardedEventMetric *prometheus.CounterVec - duplicatedEventMetric *prometheus.CounterVec - writtenEventMetric *prometheus.CounterVec + discardedEventMetric prometheus.Counter + duplicatedEventMetric prometheus.Counter + writtenEventMetric prometheus.Counter } type ConfigColumn struct { @@ -272,7 +271,7 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) { switch { case errors.Is(err, ErrEventDoesntHaveField), errors.Is(err, ErrEventFieldHasWrongType), errors.Is(err, ErrTimestampFromDistantPastOrFuture): - p.discardedEventMetric.WithLabelValues().Inc() + p.discardedEventMetric.Inc() if p.config.StrictFields || p.config.Strict { p.logger.Fatal(err) } @@ -286,7 +285,7 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) { // passes here only if event valid. if _, ok := uniqueEventsMap[uniqueID]; ok { - p.duplicatedEventMetric.WithLabelValues().Inc() + p.duplicatedEventMetric.Inc() p.logger.Infof("event duplicated. Fields: %v, values: %v", pgFields, fieldValues) } else { if uniqueID != "" { @@ -323,7 +322,7 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) { time.Sleep(p.config.Retention_) continue } - p.writtenEventMetric.WithLabelValues().Add(float64(len(uniqueEventsMap))) + p.writtenEventMetric.Add(float64(len(uniqueEventsMap))) break } diff --git a/plugin/output/postgres/postgres_test.go b/plugin/output/postgres/postgres_test.go index 0800ae61c..41f5eddcc 100644 --- a/plugin/output/postgres/postgres_test.go +++ b/plugin/output/postgres/postgres_test.go @@ -90,7 +90,7 @@ func TestPrivateOut(t *testing.T) { ctx: ctx, } - p.registerMetrics(metric.New("test", prometheus.NewRegistry())) + p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry())) batch := pipeline.NewPreparedBatch([]*pipeline.Event{{Root: root}}) p.out(nil, batch) @@ -165,7 +165,7 @@ func TestPrivateOutWithRetry(t *testing.T) { ctx: ctx, } - p.registerMetrics(metric.New("test", prometheus.NewRegistry())) + p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry())) batch := pipeline.NewPreparedBatch([]*pipeline.Event{{Root: root}}) p.out(nil, batch) @@ -218,7 +218,7 @@ func TestPrivateOutNoGoodEvents(t *testing.T) { logger: testLogger, } - p.registerMetrics(metric.New("test", prometheus.NewRegistry())) + p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry())) batch := pipeline.NewPreparedBatch([]*pipeline.Event{{Root: root}}) p.out(nil, batch) @@ -298,7 +298,7 @@ func TestPrivateOutDeduplicatedEvents(t *testing.T) { ctx: ctx, } - p.registerMetrics(metric.New("test", prometheus.NewRegistry())) + p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry())) batch := pipeline.NewPreparedBatch([]*pipeline.Event{ {Root: root}, @@ -364,7 +364,7 @@ func TestPrivateOutWrongTypeInField(t *testing.T) { logger: testLogger, } - p.registerMetrics(metric.New("test", prometheus.NewRegistry())) + p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry())) batch := pipeline.NewPreparedBatch([]*pipeline.Event{{Root: root}}) p.out(nil, batch) @@ -469,7 +469,7 @@ func TestPrivateOutFewUniqueEventsYetWithDeduplicationEventsAnpooladEvents(t *te ctx: ctx, } - p.registerMetrics(metric.New("test", prometheus.NewRegistry())) + p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry())) batch := pipeline.NewPreparedBatch([]*pipeline.Event{ {Root: root}, diff --git a/plugin/output/s3/s3.go b/plugin/output/s3/s3.go index fcc03200c..b8d0050b5 100644 --- a/plugin/output/s3/s3.go +++ b/plugin/output/s3/s3.go @@ -150,8 +150,7 @@ type Plugin struct { compressor compressor // plugin metrics - - sendErrorMetric *prometheus.CounterVec + sendErrorMetric prometheus.Counter uploadFileMetric *prometheus.CounterVec rnd rand.Rand @@ -271,7 +270,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP func (p *Plugin) registerMetrics(ctl *metric.Ctl) { p.sendErrorMetric = ctl.RegisterCounter("output_s3_send_error", "Total s3 send errors") - p.uploadFileMetric = ctl.RegisterCounter("output_s3_upload_file", "Total files upload", "bucket_name") + p.uploadFileMetric = ctl.RegisterCounterVec("output_s3_upload_file", "Total files upload", "bucket_name") } func (p *Plugin) StartWithMinio(config pipeline.AnyConfig, params *pipeline.OutputPluginParams, factory objStoreFactory) { @@ -564,7 +563,7 @@ func (p *Plugin) uploadToS3(compressedDTO fileDTO) error { ) if err != nil { - p.sendErrorMetric.WithLabelValues().Inc() + p.sendErrorMetric.Inc() return fmt.Errorf("could not upload file: %s into bucket: %s, error: %s", compressedDTO.fileName, compressedDTO.bucketName, err.Error()) } return nil diff --git a/plugin/output/s3/s3_test.go b/plugin/output/s3/s3_test.go index b6239c8d1..b84ce3fca 100644 --- a/plugin/output/s3/s3_test.go +++ b/plugin/output/s3/s3_test.go @@ -393,10 +393,11 @@ func newPipeline(t *testing.T, configOutput *Config, objStoreF objStoreFactory) Capacity: 4096, MaintenanceInterval: time.Second * 10, // MaintenanceInterval: time.Second * 100000, - AntispamThreshold: 0, - AvgEventSize: 2048, - StreamField: "stream", - Decoder: "json", + AntispamThreshold: 0, + AvgEventSize: 2048, + StreamField: "stream", + Decoder: "json", + MetricHoldDuration: pipeline.DefaultMetricHoldDuration, } p := pipeline.New("test_pipeline", settings, prometheus.NewRegistry()) diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go index f1425338a..95f605991 100644 --- a/plugin/output/splunk/splunk.go +++ b/plugin/output/splunk/splunk.go @@ -36,8 +36,7 @@ type Plugin struct { controller pipeline.OutputPluginController // plugin metrics - - sendErrorMetric *prometheus.CounterVec + sendErrorMetric prometheus.Counter } // ! config-params @@ -165,7 +164,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { for { err := p.send(outBuf) if err != nil { - p.sendErrorMetric.WithLabelValues().Inc() + p.sendErrorMetric.Inc() p.logger.Errorf("can't send data to splunk address=%s: %s", p.config.Endpoint, err.Error()) time.Sleep(time.Second) diff --git a/test/test.go b/test/test.go index 8a2e62946..0b3ad7045 100644 --- a/test/test.go +++ b/test/test.go @@ -122,6 +122,7 @@ func NewPipeline(actions []*pipeline.ActionPluginStaticInfo, pipelineOpts ...str AvgEventSize: 2048, StreamField: "stream", Decoder: "json", + MetricHoldDuration: pipeline.DefaultMetricHoldDuration, } pName := "test_pipeline" @@ -227,7 +228,7 @@ func newDefaultParams() pipeline.PluginDefaultParams { return pipeline.PluginDefaultParams{ PipelineName: "test_pipeline", PipelineSettings: &pipeline.Settings{}, - MetricCtl: metric.New("test", prometheus.NewRegistry()), + MetricCtl: metric.NewCtl("test", prometheus.NewRegistry()), } } diff --git a/xtime/xtime.go b/xtime/xtime.go new file mode 100644 index 000000000..5ee809e80 --- /dev/null +++ b/xtime/xtime.go @@ -0,0 +1,38 @@ +package xtime + +import ( + "sync/atomic" + "time" +) + +func init() { + SetNowTime(time.Now().UnixNano()) + ticker := time.NewTicker(updateTimeInterval) + go func() { + for t := range ticker.C { + SetNowTime(t.UnixNano()) + } + }() +} + +const updateTimeInterval = time.Second + +var nowTime atomic.Int64 + +func GetInaccurateUnixNano() int64 { + return nowTime.Load() +} + +func GetInaccurateTime() time.Time { + return time.Unix(0, GetInaccurateUnixNano()) +} + +// SetNowTime sets the current time. +// Function should be used only in tests. +// +// An alternative to this approach is to store and redefine +// a function through the fields of the tested struct. +// But in this case, the inlining function GetInaccurateUnixNano is lost. +func SetNowTime(unixNano int64) { + nowTime.Store(unixNano) +}