diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index eafe0ea88f35d..c8d490db90ce1 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -7,6 +7,8 @@ import ( "fmt" "net/http" + "github.com/prometheus/client_golang/prometheus" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/querier/frontend" "github.com/cortexproject/cortex/pkg/ring" @@ -126,7 +128,7 @@ func New(cfg Config) (*Loki, error) { } loki.setupAuthMiddleware() - storage.RegisterCustomIndexClients(cfg.StorageConfig) + storage.RegisterCustomIndexClients(cfg.StorageConfig, prometheus.DefaultRegisterer) serviceMap, err := loki.initModuleServices(cfg.Target) if err != nil { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 3df298c8356fd..595f9a44a0a57 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -206,7 +206,7 @@ func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk. return filtered } -func RegisterCustomIndexClients(cfg Config) { +func RegisterCustomIndexClients(cfg Config, registerer prometheus.Registerer) { // BoltDB Shipper is supposed to be run as a singleton. // This could also be done in NewBoltDBIndexClientWithShipper factory method but we are doing it here because that method is used // in tests for creating multiple instances of it at a time. @@ -222,7 +222,10 @@ func RegisterCustomIndexClients(cfg Config) { return nil, err } - boltDBIndexClientWithShipper, err = local.NewBoltDBIndexClientWithShipper(cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}, objectClient, cfg.BoltDBShipperConfig) + boltDBIndexClientWithShipper, err = local.NewBoltDBIndexClientWithShipper( + cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}, + objectClient, cfg.BoltDBShipperConfig, registerer) + return boltDBIndexClientWithShipper, err }, func() (client chunk.TableClient, e error) { objectClient, err := storage.NewObjectClient(cfg.BoltDBShipperConfig.SharedStoreType, cfg.Config) diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 3e8e851b35759..df0d978e1b75e 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -460,7 +460,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) { BoltDBShipperConfig: boltdbShipperConfig, } - RegisterCustomIndexClients(config) + RegisterCustomIndexClients(config, nil) store, err := NewStore(config, chunk.StoreConfig{}, chunk.SchemaConfig{ Configs: []chunk.PeriodConfig{ diff --git a/pkg/storage/stores/local/boltdb_index_client.go b/pkg/storage/stores/local/boltdb_index_client.go index ef60ff4e57268..2fa21615fd46d 100644 --- a/pkg/storage/stores/local/boltdb_index_client.go +++ b/pkg/storage/stores/local/boltdb_index_client.go @@ -6,6 +6,8 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/local" chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/instrument" "go.etcd.io/bbolt" ) @@ -15,13 +17,13 @@ type BoltdbIndexClientWithShipper struct { } // NewBoltDBIndexClientWithShipper creates a new IndexClient that used BoltDB. -func NewBoltDBIndexClientWithShipper(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig) (chunk.IndexClient, error) { +func NewBoltDBIndexClientWithShipper(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig, registerer prometheus.Registerer) (chunk.IndexClient, error) { boltDBIndexClient, err := local.NewBoltDBIndexClient(cfg) if err != nil { return nil, err } - shipper, err := NewShipper(archiverCfg, archiveStoreClient, boltDBIndexClient) + shipper, err := NewShipper(archiverCfg, archiveStoreClient, boltDBIndexClient, registerer) if err != nil { return nil, err } @@ -55,7 +57,9 @@ func (b *BoltdbIndexClientWithShipper) query(ctx context.Context, query chunk.In } } - return b.shipper.forEach(ctx, query.TableName, func(db *bbolt.DB) error { - return b.QueryDB(ctx, db, query, callback) + return instrument.CollectedRequest(ctx, "QUERY", instrument.NewHistogramCollector(b.shipper.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error { + return b.shipper.forEach(ctx, query.TableName, func(db *bbolt.DB) error { + return b.QueryDB(ctx, db, query, callback) + }) }) } diff --git a/pkg/storage/stores/local/downloads.go b/pkg/storage/stores/local/downloads.go index bd7112ddbaa66..48319eae24225 100644 --- a/pkg/storage/stores/local/downloads.go +++ b/pkg/storage/stores/local/downloads.go @@ -7,6 +7,7 @@ import ( "os" "path" "strings" + "time" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/local" @@ -156,20 +157,31 @@ func (s *Shipper) getFileFromStorage(ctx context.Context, objectKey, destination // downloadFilesForPeriod should be called when files for a period does not exist i.e they were never downloaded or got cleaned up later on by TTL // While files are being downloaded it will block all reads/writes on filesCollection by taking an exclusive lock -func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc *filesCollection) error { +func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc *filesCollection) (err error) { fc.Lock() defer fc.Unlock() + defer func() { + status := statusSuccess + if err != nil { + status = statusFailure + } + s.metrics.filesDownloadOperationTotal.WithLabelValues(status).Inc() + }() + + startTime := time.Now() + totalFilesSize := int64(0) + objects, _, err := s.storageClient.List(ctx, period+"/") if err != nil { - return err + return } level.Debug(util.Logger).Log("msg", fmt.Sprintf("list of files to download for period %s: %s", period, objects)) folderPath, err := s.getFolderPathForPeriod(period, true) if err != nil { - return err + return } for _, object := range objects { @@ -181,21 +193,33 @@ func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc filePath := path.Join(folderPath, uploader) df := downloadedFiles{} - err := s.getFileFromStorage(ctx, object.Key, filePath) + err = s.getFileFromStorage(ctx, object.Key, filePath) if err != nil { - return err + return } df.mtime = object.ModifiedAt df.boltdb, err = local.OpenBoltdbFile(filePath) if err != nil { - return err + return + } + + var stat os.FileInfo + stat, err = os.Stat(filePath) + if err != nil { + return } + totalFilesSize += stat.Size() + fc.files[uploader] = df } - return nil + duration := time.Since(startTime).Seconds() + s.metrics.filesDownloadDurationSeconds.add(period, duration) + s.metrics.filesDownloadSizeBytes.add(period, totalFilesSize) + + return } func (s *Shipper) getFolderPathForPeriod(period string, ensureExists bool) (string, error) { diff --git a/pkg/storage/stores/local/metrics.go b/pkg/storage/stores/local/metrics.go new file mode 100644 index 0000000000000..426b0ba8fc576 --- /dev/null +++ b/pkg/storage/stores/local/metrics.go @@ -0,0 +1,101 @@ +package local + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/instrument" +) + +const ( + statusFailure = "failure" + statusSuccess = "success" +) + +type downloadPeriodDurationMetric struct { + sync.RWMutex + gauge prometheus.Gauge + periods map[string]float64 +} + +func (m *downloadPeriodDurationMetric) add(period string, downloadDuration float64) { + m.Lock() + defer m.Unlock() + m.periods[period] = downloadDuration + + totalDuration := float64(0) + for _, dur := range m.periods { + totalDuration += dur + } + + m.gauge.Set(totalDuration) +} + +type downloadPeriodBytesMetric struct { + sync.RWMutex + gauge prometheus.Gauge + periods map[string]int64 +} + +func (m *downloadPeriodBytesMetric) add(period string, downloadedBytes int64) { + m.Lock() + defer m.Unlock() + m.periods[period] = downloadedBytes + + totalDownloadedBytes := int64(0) + for _, downloadedBytes := range m.periods { + totalDownloadedBytes += downloadedBytes + } + + m.gauge.Set(float64(totalDownloadedBytes)) +} + +type boltDBShipperMetrics struct { + // metrics for measuring performance of downloading of files per period initially i.e for the first time + filesDownloadDurationSeconds *downloadPeriodDurationMetric + filesDownloadSizeBytes *downloadPeriodBytesMetric + + // duration in seconds spent in serving request on index managed by BoltDB Shipper + requestDurationSeconds *prometheus.HistogramVec + + filesDownloadOperationTotal *prometheus.CounterVec + filesUploadOperationTotal *prometheus.CounterVec +} + +func newBoltDBShipperMetrics(r prometheus.Registerer) *boltDBShipperMetrics { + m := &boltDBShipperMetrics{ + filesDownloadDurationSeconds: &downloadPeriodDurationMetric{ + periods: map[string]float64{}, + gauge: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: "loki_boltdb_shipper", + Name: "initial_files_download_duration_seconds", + Help: "Time (in seconds) spent in downloading of files per period, initially i.e for the first time", + })}, + filesDownloadSizeBytes: &downloadPeriodBytesMetric{ + periods: map[string]int64{}, + gauge: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: "loki_boltdb_shipper", + Name: "initial_files_download_size_bytes", + Help: "Size of files (in bytes) downloaded per period, initially i.e for the first time", + })}, + requestDurationSeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "loki_boltdb_shipper", + Name: "request_duration_seconds", + Help: "Time (in seconds) spent serving requests when using boltdb shipper", + Buckets: instrument.DefBuckets, + }, []string{"operation", "status_code"}), + filesDownloadOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki_boltdb_shipper", + Name: "files_download_operation_total", + Help: "Total number of download operations done by status", + }, []string{"status"}), + filesUploadOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki_boltdb_shipper", + Name: "files_upload_operation_total", + Help: "Total number of upload operations done by status", + }, []string{"status"}), + } + + return m +} diff --git a/pkg/storage/stores/local/shipper.go b/pkg/storage/stores/local/shipper.go index 1856e3698a6e6..bd60ea2be1ba4 100644 --- a/pkg/storage/stores/local/shipper.go +++ b/pkg/storage/stores/local/shipper.go @@ -14,6 +14,7 @@ import ( chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" pkg_util "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" "go.etcd.io/bbolt" "github.com/grafana/loki/pkg/storage/stores/util" @@ -88,12 +89,13 @@ type Shipper struct { uploadedFilesMtime map[string]time.Time uploadedFilesMtimeMtx sync.RWMutex - done chan struct{} - wait sync.WaitGroup + done chan struct{} + wait sync.WaitGroup + metrics *boltDBShipperMetrics } // NewShipper creates a shipper for syncing local objects with a store -func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGetter BoltDBGetter) (*Shipper, error) { +func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGetter BoltDBGetter, registerer prometheus.Registerer) (*Shipper, error) { err := chunk_util.EnsureDirectory(cfg.CacheLocation) if err != nil { return nil, err @@ -106,6 +108,7 @@ func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGette storageClient: util.NewPrefixedObjectClient(storageClient, storageKeyPrefix), done: make(chan struct{}), uploadedFilesMtime: map[string]time.Time{}, + metrics: newBoltDBShipperMetrics(registerer), } shipper.uploader, err = shipper.getUploaderName() @@ -113,6 +116,8 @@ func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGette return nil, err } + level.Info(pkg_util.Logger).Log("msg", fmt.Sprintf("starting boltdb shipper in %d mode", cfg.Mode)) + shipper.wait.Add(1) go shipper.loop() @@ -229,17 +234,25 @@ func (s *Shipper) cleanupCache() error { // syncLocalWithStorage syncs all the periods that we have in the cache with the storage // i.e download new and updated files and remove files which were delete from the storage. -func (s *Shipper) syncLocalWithStorage(ctx context.Context) error { +func (s *Shipper) syncLocalWithStorage(ctx context.Context) (err error) { s.downloadedPeriodsMtx.RLock() defer s.downloadedPeriodsMtx.RUnlock() + defer func() { + status := statusSuccess + if err != nil { + status = statusFailure + } + s.metrics.filesDownloadOperationTotal.WithLabelValues(status).Inc() + }() + for period := range s.downloadedPeriods { if err := s.syncFilesForPeriod(ctx, period, s.downloadedPeriods[period]); err != nil { return err } } - return nil + return } // deleteFileFromCache removes a file from cache. diff --git a/pkg/storage/stores/local/uploads.go b/pkg/storage/stores/local/uploads.go index b44e52928900e..1990f3dcf025e 100644 --- a/pkg/storage/stores/local/uploads.go +++ b/pkg/storage/stores/local/uploads.go @@ -16,14 +16,22 @@ import ( // uploadFiles uploads all new and updated files to storage. // It uploads the files from configured boltdb dir where ingester writes the index. -func (s *Shipper) uploadFiles(ctx context.Context) error { +func (s *Shipper) uploadFiles(ctx context.Context) (err error) { if s.cfg.Mode == ShipperModeReadOnly { - return nil + return } + defer func() { + status := statusSuccess + if err != nil { + status = statusFailure + } + s.metrics.filesUploadOperationTotal.WithLabelValues(status).Inc() + }() + filesInfo, err := ioutil.ReadDir(s.cfg.ActiveIndexDirectory) if err != nil { - return err + return } for _, fileInfo := range filesInfo { @@ -40,9 +48,9 @@ func (s *Shipper) uploadFiles(ctx context.Context) error { continue } - err := s.uploadFile(ctx, fileInfo.Name()) + err = s.uploadFile(ctx, fileInfo.Name()) if err != nil { - return err + return } s.uploadedFilesMtimeMtx.Lock() @@ -50,7 +58,7 @@ func (s *Shipper) uploadFiles(ctx context.Context) error { s.uploadedFilesMtimeMtx.Unlock() } - return nil + return } // uploadFile uploads one of the files locally written by ingesters to storage. diff --git a/pkg/storage/stores/local/uploads_test.go b/pkg/storage/stores/local/uploads_test.go index 40404ae2773c2..b21249dfbe776 100644 --- a/pkg/storage/stores/local/uploads_test.go +++ b/pkg/storage/stores/local/uploads_test.go @@ -38,7 +38,8 @@ func createTestBoltDBWithShipper(t *testing.T, parentTempDir, ingesterName, loca }) require.NoError(t, err) - boltdbIndexClientWithShipper, err := NewBoltDBIndexClientWithShipper(local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig) + boltdbIndexClientWithShipper, err := NewBoltDBIndexClientWithShipper( + local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig, nil) require.NoError(t, err) return boltdbIndexClientWithShipper.(*BoltdbIndexClientWithShipper)