Skip to content

Commit 62e6140

Browse files
committed
Unregister metrics generated from logs after a config reload.
1 parent 7bbbf23 commit 62e6140

17 files changed

+117
-3
lines changed

clients/pkg/logentry/metric/metricvec.go

+6
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool {
8484
return ok
8585
}
8686

87+
func (c *metricVec) DeleteAll() {
88+
c.mtx.Lock()
89+
defer c.mtx.Unlock()
90+
c.metrics = map[model.Fingerprint]prometheus.Metric{}
91+
}
92+
8793
// prune will remove all metrics which implement the Expirable interface and have expired
8894
// it does not take out a lock on the metrics map so whoever calls this function should do so.
8995
func (c *metricVec) prune() {

clients/pkg/logentry/stages/decolorize.go

+5
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry {
3333
func (m *decolorizeStage) Name() string {
3434
return StageTypeDecolorize
3535
}
36+
37+
// Cleanup implements Stage.
38+
func (*decolorizeStage) Cleanup() {
39+
// no-op
40+
}

clients/pkg/logentry/stages/drop.go

+5
Original file line numberDiff line numberDiff line change
@@ -266,3 +266,8 @@ func (m *dropStage) shouldDrop(e Entry) bool {
266266
func (m *dropStage) Name() string {
267267
return StageTypeDrop
268268
}
269+
270+
// Cleanup implements Stage.
271+
func (*dropStage) Cleanup() {
272+
// no-op
273+
}

clients/pkg/logentry/stages/eventlogmessage.go

+5
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ func (m *eventLogMessageStage) Name() string {
142142
return StageTypeEventLogMessage
143143
}
144144

145+
// Cleanup implements Stage.
146+
func (*eventLogMessageStage) Cleanup() {
147+
// no-op
148+
}
149+
145150
// Sanitize a input string to convert it into a valid prometheus label
146151
// TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName
147152
func SanitizeFullLabelName(input string) string {

clients/pkg/logentry/stages/extensions.go

+5
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ func (c *cri) Name() string {
5959
return "cri"
6060
}
6161

62+
// Cleanup implements Stage.
63+
func (*cri) Cleanup() {
64+
// no-op
65+
}
66+
6267
// implements Stage interface
6368
func (c *cri) Run(entry chan Entry) chan Entry {
6469
entry = c.base.Run(entry)

clients/pkg/logentry/stages/geoip.go

+5
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ func (g *geoIPStage) Name() string {
123123
return StageTypeGeoIP
124124
}
125125

126+
// Cleanup implements Stage.
127+
func (*geoIPStage) Cleanup() {
128+
// no-op
129+
}
130+
126131
func (g *geoIPStage) process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, _ *string) {
127132
var ip net.IP
128133
if g.cfgs.Source != nil {

clients/pkg/logentry/stages/json.go

+5
Original file line numberDiff line numberDiff line change
@@ -188,3 +188,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string
188188
func (j *jsonStage) Name() string {
189189
return StageTypeJSON
190190
}
191+
192+
// Cleanup implements Stage.
193+
func (*jsonStage) Cleanup() {
194+
// no-op
195+
}

clients/pkg/logentry/stages/limit.go

+5
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ func (m *limitStage) Name() string {
138138
return StageTypeLimit
139139
}
140140

141+
// Cleanup implements Stage.
142+
func (*limitStage) Cleanup() {
143+
// no-op
144+
}
145+
141146
func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec {
142147
return util.RegisterCounterVec(registerer, "logentry", "dropped_lines_by_label_total",
143148
"A count of all log lines dropped as a result of a pipeline stage",

clients/pkg/logentry/stages/match.go

+5
Original file line numberDiff line numberDiff line change
@@ -206,3 +206,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) {
206206
func (m *matcherStage) Name() string {
207207
return StageTypeMatch
208208
}
209+
210+
// Cleanup implements Stage.
211+
func (*matcherStage) Cleanup() {
212+
// no-op
213+
}

clients/pkg/logentry/stages/metrics.go

+29-2
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,11 @@ func newMetricStage(logger log.Logger, config interface{}, registry prometheus.R
128128
metrics[name] = collector
129129
}
130130
}
131-
return toStage(&metricStage{
131+
return &metricStage{
132132
logger: logger,
133133
cfg: *cfgs,
134134
metrics: metrics,
135-
}), nil
135+
}, nil
136136
}
137137

138138
// metricStage creates and updates prometheus metrics based on extracted pipeline data
@@ -142,6 +142,19 @@ type metricStage struct {
142142
metrics map[string]prometheus.Collector
143143
}
144144

145+
func (m *metricStage) Run(in chan Entry) chan Entry {
146+
out := make(chan Entry)
147+
go func() {
148+
defer close(out)
149+
150+
for e := range in {
151+
m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line)
152+
out <- e
153+
}
154+
}()
155+
return out
156+
}
157+
145158
// Process implements Stage
146159
func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, entry *string) {
147160
for name, collector := range m.metrics {
@@ -178,6 +191,20 @@ func (m *metricStage) Name() string {
178191
return StageTypeMetric
179192
}
180193

194+
// Cleanup implements Stage.
195+
func (m *metricStage) Cleanup() {
196+
for _, collector := range m.metrics {
197+
switch vec := collector.(type) {
198+
case *metric.Counters:
199+
vec.DeleteAll()
200+
case *metric.Gauges:
201+
vec.DeleteAll()
202+
case *metric.Histograms:
203+
vec.DeleteAll()
204+
}
205+
}
206+
}
207+
181208
// recordCounter will update a counter metric
182209
// nolint:goconst
183210
func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) {

clients/pkg/logentry/stages/metrics_test.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,13 @@ func TestMetricsPipeline(t *testing.T) {
127127
strings.NewReader(expectedMetrics)); err != nil {
128128
t.Fatalf("mismatch metrics: %v", err)
129129
}
130+
131+
pl.Cleanup()
132+
133+
if err := testutil.GatherAndCompare(registry,
134+
strings.NewReader("")); err != nil {
135+
t.Fatalf("mismatch metrics: %v", err)
136+
}
130137
}
131138

132139
func TestNegativeGauge(t *testing.T) {
@@ -435,7 +442,7 @@ func TestDefaultIdleDuration(t *testing.T) {
435442
if err != nil {
436443
t.Fatalf("failed to create stage with metrics: %v", err)
437444
}
438-
assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*stageProcessor).Processor.(*metricStage).cfg["total_keys"].maxIdleSec)
445+
assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*metricStage).cfg["total_keys"].maxIdleSec)
439446
}
440447

441448
var (

clients/pkg/logentry/stages/multiline.go

+5
Original file line numberDiff line numberDiff line change
@@ -229,3 +229,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) {
229229
func (m *multilineStage) Name() string {
230230
return StageTypeMultiline
231231
}
232+
233+
// Cleanup implements Stage.
234+
func (*multilineStage) Cleanup() {
235+
// no-op
236+
}

clients/pkg/logentry/stages/pack.go

+5
Original file line numberDiff line numberDiff line change
@@ -218,3 +218,8 @@ func (m *packStage) pack(e Entry) Entry {
218218
func (m *packStage) Name() string {
219219
return StageTypePack
220220
}
221+
222+
// Cleanup implements Stage.
223+
func (*packStage) Cleanup() {
224+
// no-op
225+
}

clients/pkg/logentry/stages/pipeline.go

+8
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ type Pipeline struct {
3030
dropCount *prometheus.CounterVec
3131
}
3232

33+
// Cleanup implements Stage.
34+
func (p *Pipeline) Cleanup() {
35+
for _, s := range p.stages {
36+
s.Cleanup()
37+
}
38+
}
39+
3340
// NewPipeline creates a new log entry pipeline from a configuration
3441
func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, registerer prometheus.Registerer) (*Pipeline, error) {
3542
st := []Stage{}
@@ -169,6 +176,7 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
169176
return api.NewEntryHandler(handlerIn, func() {
170177
once.Do(func() { close(handlerIn) })
171178
wg.Wait()
179+
p.Cleanup()
172180
})
173181
}
174182

clients/pkg/logentry/stages/sampling.go

+5
Original file line numberDiff line numberDiff line change
@@ -111,3 +111,8 @@ func (m *samplingStage) randomNumber() uint64 {
111111
func (m *samplingStage) Name() string {
112112
return StageTypeSampling
113113
}
114+
115+
// Cleanup implements Stage.
116+
func (*samplingStage) Cleanup() {
117+
// no-op
118+
}

clients/pkg/logentry/stages/stage.go

+6
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type Entry struct {
6262
type Stage interface {
6363
Name() string
6464
Run(chan Entry) chan Entry
65+
Cleanup()
6566
}
6667

6768
func (entry *Entry) copy() *Entry {
@@ -228,3 +229,8 @@ func New(logger log.Logger, jobName *string, stageType string,
228229
}
229230
return creator(params)
230231
}
232+
233+
// Cleanup implements Stage.
234+
func (*stageProcessor) Cleanup() {
235+
// no-op
236+
}

clients/pkg/logentry/stages/structuredmetadata.go

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ func (s *structuredMetadataStage) Name() string {
3333
return StageTypeStructuredMetadata
3434
}
3535

36+
// Cleanup implements Stage.
37+
func (*structuredMetadataStage) Cleanup() {
38+
// no-op
39+
}
40+
3641
func (s *structuredMetadataStage) Run(in chan Entry) chan Entry {
3742
return RunWith(in, func(e Entry) Entry {
3843
processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {

0 commit comments

Comments
 (0)