From 84efeb4480e72bb10dd58668eec8a5d321ce8c42 Mon Sep 17 00:00:00 2001 From: Joe LeGasse Date: Fri, 18 Aug 2017 09:28:48 -0400 Subject: [PATCH 1/3] monitor: backport writePoints method --- monitor/service.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/monitor/service.go b/monitor/service.go index 69ca20e8ecc..9e425765f12 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -125,6 +125,16 @@ func (m *Monitor) Open() error { return nil } +func (m *Monitor) writePoints(p models.Points) error { + m.mu.RLock() + defer m.mu.RUnlock() + + if err := m.PointsWriter.WritePoints(m.storeDatabase, m.storeRetentionPolicy, p); err != nil { + m.Logger.Info(fmt.Sprintf("failed to store statistics: %s", err)) + } + return nil +} + // Close closes the monitor system. func (m *Monitor) Close() error { if !m.open() { @@ -424,14 +434,7 @@ func (m *Monitor) storeStatistics() { points = append(points, pt) } - func() { - m.mu.RLock() - defer m.mu.RUnlock() - - if err := m.PointsWriter.WritePoints(m.storeDatabase, m.storeRetentionPolicy, points); err != nil { - m.Logger.Info(fmt.Sprintf("failed to store statistics: %s", err)) - } - }() + m.writePoints(points) case <-m.done: m.Logger.Info(fmt.Sprintf("terminating storage of statistics")) return From 2f5888d436a638e1204de59f12a6f270fd709028 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 16 Aug 2017 13:09:25 -0600 Subject: [PATCH 2/3] Batch up writes for monitor service The monitor service was writing one big batch for all stats. If this batch was large, it causes some slower and more expensive write paths to be taken that incur a lot of memory allocations. This changes the monitor service to write in batches of up to 5000 points which should avoid the slower paths. --- monitor/service.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/monitor/service.go b/monitor/service.go index 9e425765f12..6d0ea9abd04 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -424,17 +424,26 @@ func (m *Monitor) storeStatistics() { return } - points := make(models.Points, 0, len(stats)) + // Write all stats in batches + batch := make(models.Points, 0, 5000) for _, s := range stats { pt, err := models.NewPoint(s.Name, models.NewTags(s.Tags), s.Values, now) if err != nil { m.Logger.Info(fmt.Sprintf("Dropping point %v: %v", s.Name, err)) return } - points = append(points, pt) + batch = append(batch, pt) + if len(batch) == cap(batch) { + m.writePoints(batch) + batch = batch[:0] + + } } - m.writePoints(points) + // Write the last batch + if len(batch) > 0 { + m.writePoints(batch) + } case <-m.done: m.Logger.Info(fmt.Sprintf("terminating storage of statistics")) return From b3a6027c35e66bd07503b1b6670503661b92c76d Mon Sep 17 00:00:00 2001 From: Joe LeGasse Date: Fri, 18 Aug 2017 09:31:27 -0400 Subject: [PATCH 3/3] updated CHANGELOG --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7e0da7daf1..9339be6a858 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ - [#8706](https://github.com/influxdata/influxdb/pull/8706): Cursor leak, resulting in an accumulation of `.tsm.tmp` files after compactions. - [#8713](https://github.com/influxdata/influxdb/issues/8713): Deadlock when dropping measurement and writing +### Features + +- [#8711](https://github.com/influxdata/influxdb/pull/8711): Batch up writes for monitor service + ## v1.3.3 [2017-08-10] ### Bugfixes