From 72875fde7a31f46a1fe9d58faf487ae383c0b0d8 Mon Sep 17 00:00:00 2001 From: SpencerMalone Date: Wed, 3 Apr 2019 11:36:34 -0400 Subject: [PATCH] Adding mapping cache, only reload config on changes Signed-off-by: SpencerMalone --- exporter_test.go | 2 +- main.go | 19 ++-- pkg/mapper/mapper.go | 57 +++++++++-- pkg/mapper/mapper_benchmark_test.go | 153 ++++++++++++++++++++++++---- pkg/mapper/mapper_cache.go | 50 +++++++++ pkg/mapper/mapper_test.go | 4 +- 6 files changed, 244 insertions(+), 41 deletions(-) create mode 100644 pkg/mapper/mapper_cache.go diff --git a/exporter_test.go b/exporter_test.go index f353c965..c3e23ddd 100644 --- a/exporter_test.go +++ b/exporter_test.go @@ -196,7 +196,7 @@ mappings: ` // Create mapper from config and start an Exporter with a synchronous channel testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config) + err := testMapper.InitFromYAMLString(config, false) if err != nil { t.Fatalf("Config load error: %s %s", config, err) } diff --git a/main.go b/main.go index 7127afbe..ddc75243 100644 --- a/main.go +++ b/main.go @@ -88,7 +88,7 @@ func tcpAddrFromString(addr string) *net.TCPAddr { } } -func watchConfig(fileName string, mapper *mapper.MetricMapper) { +func watchConfig(fileName string, mapper *mapper.MetricMapper, useMetricCache bool) { watcher, err := fsnotify.NewWatcher() if err != nil { log.Fatal(err) @@ -103,13 +103,18 @@ func watchConfig(fileName string, mapper *mapper.MetricMapper) { select { case ev := <-watcher.Event: log.Infof("Config file changed (%s), attempting reload", ev) - err = mapper.InitFromFile(fileName) + reloaded, err := mapper.InitFromFile(fileName, useMetricCache) if err != nil { log.Errorln("Error reloading config:", err) configLoads.WithLabelValues("failure").Inc() } else { - log.Infoln("Config reloaded successfully") - configLoads.WithLabelValues("success").Inc() + if reloaded == true { + log.Infoln("Config reloaded successfully") + configLoads.WithLabelValues("success").Inc() + } else { + log.Infoln("Config reload skipped") + configLoads.WithLabelValues("skipped").Inc() + } } // Re-add the file watcher since it can get lost on some changes. E.g. // saving a file with vim results in a RENAME-MODIFY-DELETE event @@ -144,6 +149,8 @@ func main() { mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String() readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int() dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String() + + useMetricCache = kingpin.Flag("statsd.enable-mapping-cache", "Should you use the metric mapping cache. Increases throughput at the cost of memory").Default("true").Bool() ) log.AddFlags(kingpin.CommandLine) @@ -197,7 +204,7 @@ func main() { mapper := &mapper.MetricMapper{MappingsCount: mappingsCount} if *mappingConfig != "" { - err := mapper.InitFromFile(*mappingConfig) + _, err := mapper.InitFromFile(*mappingConfig, *useMetricCache) if err != nil { log.Fatal("Error loading config:", err) } @@ -207,7 +214,7 @@ func main() { log.Fatal("Error dumping FSM:", err) } } - go watchConfig(*mappingConfig, mapper) + go watchConfig(*mappingConfig, mapper, *useMetricCache) } exporter := NewExporter(mapper) exporter.Listen(events) diff --git a/pkg/mapper/mapper.go b/pkg/mapper/mapper.go index 4cb4fbe1..b535b0d6 100644 --- a/pkg/mapper/mapper.go +++ b/pkg/mapper/mapper.go @@ -14,6 +14,7 @@ package mapper import ( + "crypto/sha256" "fmt" "io/ioutil" "regexp" @@ -49,7 +50,10 @@ type MetricMapper struct { FSM *fsm.FSM doFSM bool doRegex bool - mutex sync.Mutex + useCache bool + cache *MetricMapperCache + mutex sync.RWMutex + sha string MappingsCount prometheus.Gauge } @@ -83,7 +87,7 @@ var defaultQuantiles = []metricObjective{ {Quantile: 0.99, Error: 0.001}, } -func (m *MetricMapper) InitFromYAMLString(fileContents string) error { +func (m *MetricMapper) InitFromYAMLString(fileContents string, useCache bool) error { var n MetricMapper if err := yaml.Unmarshal([]byte(fileContents), &n); err != nil { @@ -189,6 +193,8 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error { m.Defaults = n.Defaults m.Mappings = n.Mappings + m.cache = NewMetricMapperCache() + m.useCache = useCache if n.doFSM { var mappings []string for _, mapping := range n.Mappings { @@ -206,19 +212,39 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error { if m.MappingsCount != nil { m.MappingsCount.Set(float64(len(n.Mappings))) } - return nil } -func (m *MetricMapper) InitFromFile(fileName string) error { +func (m *MetricMapper) InitFromFile(fileName string, useCache bool) (bool, error) { mappingStr, err := ioutil.ReadFile(fileName) if err != nil { - return err + return false, err } - return m.InitFromYAMLString(string(mappingStr)) + h := sha256.New() + h.Write(mappingStr) + newSha := string(h.Sum(nil)) + + if newSha != m.sha { + err = m.InitFromYAMLString(string(mappingStr), useCache) + if err != nil { + return false, err + } + m.sha = newSha + return true, nil + } + + return false, nil } func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricType) (*MetricMapping, prometheus.Labels, bool) { + m.mutex.RLock() + defer m.mutex.RUnlock() + if m.useCache { + result, cached := m.cache.Get(statsdMetric) + if cached { + return result.Mapping, result.Labels, result.Matched + } + } // glob matching if m.doFSM { finalState, captures := m.FSM.GetMapping(statsdMetric, string(statsdMetricType)) @@ -230,17 +256,22 @@ func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricTy for index, formatter := range result.labelFormatters { labels[result.labelKeys[index]] = formatter.Format(captures) } + + if m.useCache { + m.cache.AddMatch(statsdMetric, result, labels) + } + return result, labels, true } else if !m.doRegex { // if there's no regex match type, return immediately + if m.useCache { + m.cache.AddMiss(statsdMetric) + } return nil, nil, false } } // regex matching - m.mutex.Lock() - defer m.mutex.Unlock() - for _, mapping := range m.Mappings { // if a rule don't have regex matching type, the regex field is unset if mapping.regex == nil { @@ -268,8 +299,14 @@ func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricTy labels[label] = string(value) } + if m.useCache { + m.cache.AddMatch(statsdMetric, &mapping, labels) + } + return &mapping, labels, true } - + if m.useCache { + m.cache.AddMiss(statsdMetric) + } return nil, nil, false } diff --git a/pkg/mapper/mapper_benchmark_test.go b/pkg/mapper/mapper_benchmark_test.go index 0388c59e..f1e109fb 100644 --- a/pkg/mapper/mapper_benchmark_test.go +++ b/pkg/mapper/mapper_benchmark_test.go @@ -104,7 +104,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -168,7 +168,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -239,7 +239,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -303,7 +303,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -330,7 +330,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -358,7 +358,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -384,7 +384,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -411,7 +411,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -437,7 +437,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -464,7 +464,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -501,7 +501,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -539,7 +539,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -560,7 +560,28 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) + if err != nil { + b.Fatalf("Config load error: %s %s", config, err) + } + + b.ResetTimer() + for j := 0; j < b.N; j++ { + for _, metric := range mappings { + mapper.GetMapping(metric, MetricTypeCounter) + } + } +} + +func BenchmarkGlob10RulesCached(b *testing.B) { + config := `--- +mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) + mappings := []string{ + "metric100.a", + } + + mapper := MetricMapper{} + err := mapper.InitFromYAMLString(config, true) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -583,7 +604,30 @@ mappings:` + duplicateRules(10, ruleTemplateSingleMatchRegex) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) + if err != nil { + b.Fatalf("Config load error: %s %s", config, err) + } + + b.ResetTimer() + for j := 0; j < b.N; j++ { + for _, metric := range mappings { + mapper.GetMapping(metric, MetricTypeCounter) + } + } +} + +func BenchmarkRegex10RulesAverageCached(b *testing.B) { + config := `--- +defaults: + match_type: regex +mappings:` + duplicateRules(10, ruleTemplateSingleMatchRegex) + mappings := []string{ + "metric5.a", + } + + mapper := MetricMapper{} + err := mapper.InitFromYAMLString(config, true) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -604,7 +648,28 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) + if err != nil { + b.Fatalf("Config load error: %s %s", config, err) + } + + b.ResetTimer() + for j := 0; j < b.N; j++ { + for _, metric := range mappings { + mapper.GetMapping(metric, MetricTypeCounter) + } + } +} + +func BenchmarkGlob100RulesCached(b *testing.B) { + config := `--- +mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) + mappings := []string{ + "metric100.a", + } + + mapper := MetricMapper{} + err := mapper.InitFromYAMLString(config, true) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -625,7 +690,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -648,7 +713,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -671,7 +736,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchRegex) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -694,7 +759,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchRegex) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -715,7 +780,28 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchGlob) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) + if err != nil { + b.Fatalf("Config load error: %s %s", config, err) + } + + b.ResetTimer() + for j := 0; j < b.N; j++ { + for _, metric := range mappings { + mapper.GetMapping(metric, MetricTypeCounter) + } + } +} + +func BenchmarkGlob100RulesMultipleCapturesCached(b *testing.B) { + config := `--- +mappings:` + duplicateRules(100, ruleTemplateMultipleMatchGlob) + mappings := []string{ + "metric50.a.b.c.d.e.f.g.h.i.j.k.l", + } + + mapper := MetricMapper{} + err := mapper.InitFromYAMLString(config, true) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -738,7 +824,7 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchRegex) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -761,7 +847,30 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchRegex) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) + err := mapper.InitFromYAMLString(config, false) + if err != nil { + b.Fatalf("Config load error: %s %s", config, err) + } + + b.ResetTimer() + for j := 0; j < b.N; j++ { + for _, metric := range mappings { + mapper.GetMapping(metric, MetricTypeCounter) + } + } +} + +func BenchmarkRegex100RulesMultipleCapturesWorstCached(b *testing.B) { + config := `--- +defaults: + match_type: regex +mappings:` + duplicateRules(100, ruleTemplateMultipleMatchRegex) + mappings := []string{ + "metric100.a.b.c.d.e.f.g.h.i.j.k.l", + } + + mapper := MetricMapper{} + err := mapper.InitFromYAMLString(config, true) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } diff --git a/pkg/mapper/mapper_cache.go b/pkg/mapper/mapper_cache.go new file mode 100644 index 00000000..789b3833 --- /dev/null +++ b/pkg/mapper/mapper_cache.go @@ -0,0 +1,50 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mapper + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +type MetricMapperCacheResult struct { + Mapping *MetricMapping + Matched bool + Labels prometheus.Labels +} + +type MetricMapperCache struct { + cache sync.Map +} + +func NewMetricMapperCache() *MetricMapperCache { + return &MetricMapperCache{} +} + +func (m *MetricMapperCache) Get(metricString string) (*MetricMapperCacheResult, bool) { + if result, ok := m.cache.Load(metricString); ok { + return result.(*MetricMapperCacheResult), true + } else { + return nil, false + } +} + +func (m *MetricMapperCache) AddMatch(metricString string, mapping *MetricMapping, labels prometheus.Labels) { + m.cache.Store(metricString, &MetricMapperCacheResult{Mapping: mapping, Matched: true, Labels: labels}) +} + +func (m *MetricMapperCache) AddMiss(metricString string) { + m.cache.Store(metricString, &MetricMapperCacheResult{Matched: false}) +} diff --git a/pkg/mapper/mapper_test.go b/pkg/mapper/mapper_test.go index 8d1ea490..8c97de47 100644 --- a/pkg/mapper/mapper_test.go +++ b/pkg/mapper/mapper_test.go @@ -669,7 +669,7 @@ mappings: mapper := MetricMapper{} for i, scenario := range scenarios { - err := mapper.InitFromYAMLString(scenario.config) + err := mapper.InitFromYAMLString(scenario.config, false) if err != nil && !scenario.configBad { t.Fatalf("%d. Config load error: %s %s", i, scenario.config, err) } @@ -769,7 +769,7 @@ mappings: for i, scenario := range scenarios { mapper := MetricMapper{} - err := mapper.InitFromYAMLString(scenario.config) + err := mapper.InitFromYAMLString(scenario.config, false) if err != nil && !scenario.configBad { t.Fatalf("%d. Config load error: %s %s", i, scenario.config, err) }