Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some metrics for measuring performance and failures in boltdb shipper #2034

Merged
merged 3 commits into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"net/http"

"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/ring"
Expand Down Expand Up @@ -126,7 +128,7 @@ func New(cfg Config) (*Loki, error) {
}

loki.setupAuthMiddleware()
storage.RegisterCustomIndexClients(cfg.StorageConfig)
storage.RegisterCustomIndexClients(cfg.StorageConfig, prometheus.DefaultRegisterer)

serviceMap, err := loki.initModuleServices(cfg.Target)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.
return filtered
}

func RegisterCustomIndexClients(cfg Config) {
func RegisterCustomIndexClients(cfg Config, registerer prometheus.Registerer) {
// BoltDB Shipper is supposed to be run as a singleton.
// This could also be done in NewBoltDBIndexClientWithShipper factory method but we are doing it here because that method is used
// in tests for creating multiple instances of it at a time.
Expand All @@ -222,7 +222,10 @@ func RegisterCustomIndexClients(cfg Config) {
return nil, err
}

boltDBIndexClientWithShipper, err = local.NewBoltDBIndexClientWithShipper(cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}, objectClient, cfg.BoltDBShipperConfig)
boltDBIndexClientWithShipper, err = local.NewBoltDBIndexClientWithShipper(
cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory},
objectClient, cfg.BoltDBShipperConfig, registerer)

return boltDBIndexClientWithShipper, err
}, func() (client chunk.TableClient, e error) {
objectClient, err := storage.NewObjectClient(cfg.BoltDBShipperConfig.SharedStoreType, cfg.Config)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) {
BoltDBShipperConfig: boltdbShipperConfig,
}

RegisterCustomIndexClients(config)
RegisterCustomIndexClients(config, nil)

store, err := NewStore(config, chunk.StoreConfig{}, chunk.SchemaConfig{
Configs: []chunk.PeriodConfig{
Expand Down
12 changes: 8 additions & 4 deletions pkg/storage/stores/local/boltdb_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/instrument"
"go.etcd.io/bbolt"
)

Expand All @@ -15,13 +17,13 @@ type BoltdbIndexClientWithShipper struct {
}

// NewBoltDBIndexClientWithShipper creates a new IndexClient that used BoltDB.
func NewBoltDBIndexClientWithShipper(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig) (chunk.IndexClient, error) {
func NewBoltDBIndexClientWithShipper(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig, registerer prometheus.Registerer) (chunk.IndexClient, error) {
boltDBIndexClient, err := local.NewBoltDBIndexClient(cfg)
if err != nil {
return nil, err
}

shipper, err := NewShipper(archiverCfg, archiveStoreClient, boltDBIndexClient)
shipper, err := NewShipper(archiverCfg, archiveStoreClient, boltDBIndexClient, registerer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -55,7 +57,9 @@ func (b *BoltdbIndexClientWithShipper) query(ctx context.Context, query chunk.In
}
}

return b.shipper.forEach(ctx, query.TableName, func(db *bbolt.DB) error {
return b.QueryDB(ctx, db, query, callback)
return instrument.CollectedRequest(ctx, "QUERY", instrument.NewHistogramCollector(b.shipper.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error {
return b.shipper.forEach(ctx, query.TableName, func(db *bbolt.DB) error {
return b.QueryDB(ctx, db, query, callback)
})
})
}
15 changes: 15 additions & 0 deletions pkg/storage/stores/local/downloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path"
"strings"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
Expand Down Expand Up @@ -160,6 +161,9 @@ func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc
fc.Lock()
defer fc.Unlock()

startTime := time.Now()
totalFilesSize := int64(0)

objects, _, err := s.storageClient.List(ctx, period+"/")
if err != nil {
return err
Expand Down Expand Up @@ -192,9 +196,20 @@ func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc
return err
}

stat, err := os.Stat(filePath)
if err != nil {
return err
}

totalFilesSize += stat.Size()

fc.files[uploader] = df
}

duration := time.Since(startTime).Seconds()
s.metrics.initialFilesDownloadDurationSeconds.WithLabelValues(period).Set(duration)
s.metrics.initialFilesDownloadSizeBytes.WithLabelValues(period).Set(float64(totalFilesSize))

return nil
}

Expand Down
57 changes: 57 additions & 0 deletions pkg/storage/stores/local/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package local

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/instrument"
)

const (
statusFailure = "failure"
statusSuccess = "success"
)

type boltDBShipperMetrics struct {
// metrics for measuring performance of downloading of files per period initially i.e for the first time
initialFilesDownloadDurationSeconds *prometheus.GaugeVec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if these are good metrics - won't they be too irregular? I expect it'll be hard to query these in prom if they're not regularly populated. I wonder if gauges which aren't updated regularly are still scraped regularly, hrm...

Also I'm wary of creating labels based on the table name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that these metrics are irregular but they are there just to help understand slow queries which would most likely happen when downloading large index file. Without this metric we would have to rely on logs to find if query is slow because of downloading of files or some other contention is there.
Regarding labels based on table names, there would not be too many tables at a time. Also without that label, we would not have much visibility because otherwise the same metric would be updated for all the table downloads and the last one which was updated before prometheus scrapes would be visible, which would not be much of a use. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other we have is adding a span in traces and getting rid of this metric.

Copy link
Member

@owen-d owen-d May 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concerns about sparsely populated gauges have been alleviated (this is handled via our instrumentation lib).

However, it seems weird to be creating per-table gauges that are only written on startup/first pull. This is even more costly if tables are/eventually become per-tenant. Could this instead be calculated across all tables? I find it hard to imagine needing this level of granularity: we'll be pulling tables from the same object stores. I suspect having one gauge across all tables will be enough to help us and if we need to find one problematic table, we can use logs (we'll need to ensure per-table downloads are logged).

Did I explain that well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand your concern. The problem here is IndexClients get per table query so we can't have a metric across all tables downloaded for a query. Best option we have here is adding a span to the trace.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking some shared struct which keeps individual download times/bytes for each table, then updates one gauge whenever a new table is downloaded. The gauge can be calculated from the sum of download times/bytes from each table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like

type DownloadTableMetrics struct{
  gauge *prometheus.Gauge
  tables map[string]struct{
    dur time.Duration
    bytes int
  }
}

Then when downloading a table you can do

downloadTableMetrics.Add(tableName, downloadTime, byteCt), which will internally add that value then recalculate/update its underlying gauge.

initialFilesDownloadSizeBytes *prometheus.GaugeVec

// duration in seconds spent in serving request on index managed by BoltDB Shipper
requestDurationSeconds *prometheus.HistogramVec

filesDownloadOperationTotal *prometheus.CounterVec
filesUploadOperationTotal *prometheus.CounterVec
}

func newBoltDBShipperMetrics(r prometheus.Registerer) *boltDBShipperMetrics {
m := &boltDBShipperMetrics{
initialFilesDownloadDurationSeconds: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Namespace: "loki_boltdb_shipper",
Name: "initial_files_download_duration_seconds",
Help: "Time (in seconds) spent in downloading of files per period, initially i.e for the first time",
}, []string{"period"}),
initialFilesDownloadSizeBytes: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Namespace: "loki_boltdb_shipper",
Name: "initial_files_download_size_bytes",
Help: "Size of files (in bytes) downloaded per period, initially i.e for the first time",
}, []string{"period"}),
requestDurationSeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki_boltdb_shipper",
Name: "request_duration_seconds",
Help: "Time (in seconds) spent serving requests when using boltdb shipper",
Buckets: instrument.DefBuckets,
}, []string{"operation", "status_code"}),
filesDownloadOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki_boltdb_shipper",
Name: "files_download_operation_total",
Help: "Total number of download operations done by status",
}, []string{"status"}),
filesUploadOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki_boltdb_shipper",
Name: "files_upload_operation_total",
Help: "Total number of upload operations done by status",
}, []string{"status"}),
}

return m
}
22 changes: 19 additions & 3 deletions pkg/storage/stores/local/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
pkg_util "github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/bbolt"

"github.com/grafana/loki/pkg/storage/stores/util"
Expand Down Expand Up @@ -88,12 +89,13 @@ type Shipper struct {
uploadedFilesMtime map[string]time.Time
uploadedFilesMtimeMtx sync.RWMutex

done chan struct{}
wait sync.WaitGroup
done chan struct{}
wait sync.WaitGroup
metrics *boltDBShipperMetrics
}

// NewShipper creates a shipper for syncing local objects with a store
func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGetter BoltDBGetter) (*Shipper, error) {
func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGetter BoltDBGetter, registerer prometheus.Registerer) (*Shipper, error) {
err := chunk_util.EnsureDirectory(cfg.CacheLocation)
if err != nil {
return nil, err
Expand All @@ -106,13 +108,16 @@ func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGette
storageClient: util.NewPrefixedObjectClient(storageClient, storageKeyPrefix),
done: make(chan struct{}),
uploadedFilesMtime: map[string]time.Time{},
metrics: newBoltDBShipperMetrics(registerer),
}

shipper.uploader, err = shipper.getUploaderName()
if err != nil {
return nil, err
}

level.Info(pkg_util.Logger).Log("msg", fmt.Sprintf("starting boltdb shipper in %d mode", cfg.Mode))

shipper.wait.Add(1)
go shipper.loop()

Expand Down Expand Up @@ -164,15 +169,21 @@ func (s *Shipper) loop() {
for {
select {
case <-resyncTicker.C:
status := statusSuccess
err := s.syncLocalWithStorage(context.Background())
if err != nil {
status = statusFailure
level.Error(pkg_util.Logger).Log("msg", "error syncing local boltdb files with storage", "err", err)
}
s.metrics.filesDownloadOperationTotal.WithLabelValues(status).Inc()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a defer could do a better job but that's fine !

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @cyriltovena here -- defer would help with some code deduplication.

case <-uploadFilesTicker.C:
status := statusSuccess
err := s.uploadFiles(context.Background())
if err != nil {
status = statusFailure
level.Error(pkg_util.Logger).Log("msg", "error pushing archivable files to store", "err", err)
}
s.metrics.filesUploadOperationTotal.WithLabelValues(status).Inc()
case <-cacheCleanupTicker.C:
err := s.cleanupCache()
if err != nil {
Expand All @@ -190,10 +201,13 @@ func (s *Shipper) Stop() {
s.wait.Wait()

// Push all boltdb files to storage before returning
status := statusSuccess
err := s.uploadFiles(context.Background())
if err != nil {
status = statusFailure
level.Error(pkg_util.Logger).Log("msg", "error pushing archivable files to store", "err", err)
}
s.metrics.filesUploadOperationTotal.WithLabelValues(status).Inc()

s.downloadedPeriodsMtx.Lock()
defer s.downloadedPeriodsMtx.Unlock()
Expand Down Expand Up @@ -275,8 +289,10 @@ func (s *Shipper) forEach(ctx context.Context, period string, callback func(db *
s.downloadedPeriodsMtx.Unlock()

if err := s.downloadFilesForPeriod(ctx, period, fc); err != nil {
s.metrics.filesDownloadOperationTotal.WithLabelValues(statusFailure).Inc()
return err
}
s.metrics.filesDownloadOperationTotal.WithLabelValues(statusSuccess).Inc()
}

}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/stores/local/uploads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ func createTestBoltDBWithShipper(t *testing.T, parentTempDir, ingesterName, loca
})
require.NoError(t, err)

boltdbIndexClientWithShipper, err := NewBoltDBIndexClientWithShipper(local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig)
boltdbIndexClientWithShipper, err := NewBoltDBIndexClientWithShipper(
local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig, nil)
require.NoError(t, err)

return boltdbIndexClientWithShipper.(*BoltdbIndexClientWithShipper)
Expand Down