Skip to content

Commit dbc0e24

Browse files
authored
chore(index-gateway): Improve instrumentation of index download/sync (#13194)
This PR contains changes to improve the instrumentation and obervability of the download/sync operations that happen on the index gateways (index shipper). This includes fixes to the download latency and wait time metrics. Signed-off-by: Christian Haudum <[email protected]>
1 parent 82fbb2f commit dbc0e24

File tree

4 files changed

+72
-50
lines changed

4 files changed

+72
-50
lines changed

pkg/storage/stores/shipper/indexshipper/downloads/index_set.go

+23-17
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type IndexSet interface {
4141
LastUsedAt() time.Time
4242
UpdateLastUsedAt()
4343
Sync(ctx context.Context) (err error)
44-
AwaitReady(ctx context.Context) error
44+
AwaitReady(ctx context.Context, reason string) error
4545
}
4646

4747
// indexSet is a collection of multiple files created for a same table by various ingesters.
@@ -62,8 +62,7 @@ type indexSet struct {
6262
cancelFunc context.CancelFunc // helps with cancellation of initialization if we are asked to stop.
6363
}
6464

65-
func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.IndexSet, openIndexFileFunc index.OpenIndexFileFunc,
66-
logger log.Logger) (IndexSet, error) {
65+
func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.IndexSet, openIndexFileFunc index.OpenIndexFileFunc, logger log.Logger) (IndexSet, error) {
6766
if baseIndexSet.IsUserBasedIndexSet() && userID == "" {
6867
return nil, fmt.Errorf("userID must not be empty")
6968
} else if !baseIndexSet.IsUserBasedIndexSet() && userID != "" {
@@ -75,10 +74,7 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I
7574
return nil, err
7675
}
7776

78-
maxConcurrent := runtime.GOMAXPROCS(0) / 2
79-
if maxConcurrent == 0 {
80-
maxConcurrent = 1
81-
}
77+
maxConcurrent := max(runtime.GOMAXPROCS(0)/2, 1)
8278

8379
is := indexSet{
8480
openIndexFileFunc: openIndexFileFunc,
@@ -101,25 +97,25 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I
10197
func (t *indexSet) Init(forQuerying bool) (err error) {
10298
// Using background context to avoid cancellation of download when request times out.
10399
// We would anyways need the files for serving next requests.
104-
ctx, cancelFunc := context.WithTimeout(context.Background(), downloadTimeout)
105-
t.cancelFunc = cancelFunc
100+
ctx := context.Background()
101+
ctx, t.cancelFunc = context.WithTimeout(ctx, downloadTimeout)
106102

107-
logger := spanlogger.FromContextWithFallback(ctx, t.logger)
103+
logger, ctx := spanlogger.NewWithLogger(ctx, t.logger, "indexSet.Init")
108104

109105
defer func() {
110106
if err != nil {
111-
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to initialize table %s, cleaning it up", t.tableName), "err", err)
107+
level.Error(logger).Log("msg", "failed to initialize table, cleaning it up", "table", t.tableName, "err", err)
112108
t.err = err
113109

114110
// cleaning up files due to error to avoid returning invalid results.
115111
for fileName := range t.index {
116112
if err := t.cleanupDB(fileName); err != nil {
117-
level.Error(t.logger).Log("msg", "failed to cleanup partially downloaded file", "filename", fileName, "err", err)
113+
level.Error(logger).Log("msg", "failed to cleanup partially downloaded file", "filename", fileName, "err", err)
118114
}
119115
}
120116
}
121-
t.cancelFunc()
122117
t.indexMtx.markReady()
118+
t.cancelFunc()
123119
}()
124120

125121
dirEntries, err := os.ReadDir(t.cacheLocation)
@@ -137,12 +133,12 @@ func (t *indexSet) Init(forQuerying bool) (err error) {
137133
// if we fail to open an index file, lets skip it and let sync operation re-download the file from storage.
138134
idx, err := t.openIndexFileFunc(fullPath)
139135
if err != nil {
140-
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to open existing index file %s, removing the file and continuing without it to let the sync operation catch up", fullPath), "err", err)
136+
level.Error(logger).Log("msg", fmt.Sprintf("failed to open existing index file %s, removing the file and continuing without it to let the sync operation catch up", fullPath), "err", err)
141137
// Sometimes files get corrupted when the process gets killed in the middle of a download operation which can cause problems in reading the file.
142138
// Implementation of openIndexFileFunc should take care of gracefully handling corrupted files.
143139
// Let us just remove the file and let the sync operation re-download it.
144140
if err := os.Remove(fullPath); err != nil {
145-
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to remove index file %s which failed to open", fullPath))
141+
level.Error(logger).Log("msg", fmt.Sprintf("failed to remove index file %s which failed to open", fullPath))
146142
}
147143
continue
148144
}
@@ -406,8 +402,18 @@ func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock, bypassListC
406402
return
407403
}
408404

409-
func (t *indexSet) AwaitReady(ctx context.Context) error {
410-
return t.indexMtx.awaitReady(ctx)
405+
func (t *indexSet) AwaitReady(ctx context.Context, reason string) error {
406+
start := time.Now()
407+
err := t.indexMtx.awaitReady(ctx)
408+
level.Info(t.logger).Log(
409+
"msg", "waited for index set to become ready",
410+
"reason", reason,
411+
"table", t.tableName,
412+
"user", t.userID,
413+
"wait_time", time.Since(start),
414+
"err", err,
415+
)
416+
return err
411417
}
412418

413419
func (t *indexSet) downloadFileFromStorage(ctx context.Context, fileName, folderPathForTable string) (string, error) {

pkg/storage/stores/shipper/indexshipper/downloads/metrics.go

+13
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ type metrics struct {
1414
queryTimeTableDownloadDurationSeconds *prometheus.CounterVec
1515
tablesSyncOperationTotal *prometheus.CounterVec
1616
tablesDownloadOperationDurationSeconds prometheus.Gauge
17+
18+
// new metrics that will supersed the incorrect old types
19+
queryWaitTime *prometheus.HistogramVec
20+
tableSyncLatency *prometheus.HistogramVec
1721
}
1822

1923
func newMetrics(r prometheus.Registerer) *metrics {
@@ -30,6 +34,15 @@ func newMetrics(r prometheus.Registerer) *metrics {
3034
Name: "tables_download_operation_duration_seconds",
3135
Help: "Time (in seconds) spent in downloading updated files for all the tables",
3236
}),
37+
38+
queryWaitTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
39+
Name: "query_wait_time_seconds",
40+
Help: "Time (in seconds) spent waiting for index files to be queryable at query time",
41+
}, []string{"table"}),
42+
tableSyncLatency: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
43+
Name: "table_sync_latency_seconds",
44+
Help: "Time (in seconds) spent in downloading updated files for all the tables",
45+
}, []string{"table", "status"}),
3346
}
3447

3548
return m

pkg/storage/stores/shipper/indexshipper/downloads/table.go

+21-27
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424

2525
// timeout for downloading initial files for a table to avoid leaking resources by allowing it to take all the time.
2626
const (
27-
downloadTimeout = 5 * time.Minute
27+
downloadTimeout = 1 * time.Minute
2828
maxDownloadConcurrency = 50
2929
)
3030

@@ -57,12 +57,8 @@ type table struct {
5757
// NewTable just creates an instance of table without trying to load files from local storage or object store.
5858
// It is used for initializing table at query time.
5959
func NewTable(name, cacheLocation string, storageClient storage.Client, openIndexFileFunc index.OpenIndexFileFunc, metrics *metrics) Table {
60-
maxConcurrent := runtime.GOMAXPROCS(0) / 2
61-
if maxConcurrent == 0 {
62-
maxConcurrent = 1
63-
}
64-
65-
table := table{
60+
maxConcurrent := max(runtime.GOMAXPROCS(0)/2, 1)
61+
return &table{
6662
name: name,
6763
cacheLocation: cacheLocation,
6864
storageClient: storageClient,
@@ -74,8 +70,6 @@ func NewTable(name, cacheLocation string, storageClient storage.Client, openInde
7470
maxConcurrent: maxConcurrent,
7571
indexSets: map[string]IndexSet{},
7672
}
77-
78-
return &table
7973
}
8074

8175
// LoadTable loads a table from local storage(syncs the table too if we have it locally) or downloads it from the shared store.
@@ -91,10 +85,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd
9185
return nil, err
9286
}
9387

94-
maxConcurrent := runtime.GOMAXPROCS(0) / 2
95-
if maxConcurrent == 0 {
96-
maxConcurrent = 1
97-
}
88+
maxConcurrent := max(runtime.GOMAXPROCS(0)/2, 1)
9889

9990
table := table{
10091
name: name,
@@ -296,6 +287,8 @@ func (t *table) Sync(ctx context.Context) error {
296287
// forQuerying must be set to true only getting the index for querying since
297288
// it captures the amount of time it takes to download the index at query time.
298289
func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying bool) (IndexSet, error) {
290+
logger := spanlogger.FromContextWithFallback(ctx, log.With(t.logger, "user", id, "table", t.name))
291+
299292
t.indexSetsMtx.RLock()
300293
indexSet, ok := t.indexSets[id]
301294
t.indexSetsMtx.RUnlock()
@@ -318,28 +311,29 @@ func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying
318311
}
319312

320313
// instantiate the index set, add it to the map
321-
indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.openIndexFileFunc,
322-
loggerWithUserID(t.logger, id))
314+
indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.openIndexFileFunc, logger)
323315
if err != nil {
324316
return nil, err
325317
}
326318
t.indexSets[id] = indexSet
327319

328-
// initialize the index set in async mode, it would be upto the caller to wait for its readiness using IndexSet.AwaitReady()
320+
// initialize the index set in async mode
321+
// it is up to the caller to wait for its readiness using IndexSet.AwaitReady()
329322
go func() {
323+
start := time.Now()
324+
err := indexSet.Init(forQuerying)
325+
duration := time.Since(start)
326+
327+
level.Info(logger).Log("msg", "init index set", "duration", duration, "success", err == nil)
328+
330329
if forQuerying {
331-
start := time.Now()
332-
defer func() {
333-
duration := time.Since(start)
334-
t.metrics.queryTimeTableDownloadDurationSeconds.WithLabelValues(t.name).Add(duration.Seconds())
335-
logger := spanlogger.FromContextWithFallback(ctx, loggerWithUserID(t.logger, id))
336-
level.Info(logger).Log("msg", "downloaded index set at query time", "duration", duration)
337-
}()
330+
t.metrics.queryTimeTableDownloadDurationSeconds.WithLabelValues(t.name).Add(duration.Seconds())
331+
t.metrics.queryWaitTime.WithLabelValues(t.name).Observe(duration.Seconds())
332+
level.Info(logger).Log("msg", "downloaded index set at query time", "duration", duration)
338333
}
339334

340-
err := indexSet.Init(forQuerying)
341335
if err != nil {
342-
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to init user index set %s", id), "err", err)
336+
level.Error(logger).Log("msg", "failed to init user index set", "err", err)
343337
t.cleanupBrokenIndexSet(ctx, id)
344338
}
345339
}()
@@ -372,7 +366,7 @@ func (t *table) EnsureQueryReadiness(ctx context.Context, userIDs []string) erro
372366
if err != nil {
373367
return err
374368
}
375-
err = commonIndexSet.AwaitReady(ctx)
369+
err = commonIndexSet.AwaitReady(ctx, "ensure query readiness")
376370
if err != nil {
377371
return err
378372
}
@@ -401,7 +395,7 @@ func (t *table) downloadUserIndexes(ctx context.Context, userIDs []string) error
401395
return err
402396
}
403397

404-
return indexSet.AwaitReady(ctx)
398+
return indexSet.AwaitReady(ctx, "download user indexes")
405399
})
406400
}
407401

pkg/storage/stores/shipper/indexshipper/downloads/table_manager.go

+15-6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/go-kit/log"
1313
"github.com/go-kit/log/level"
14+
"github.com/pkg/errors"
1415
"github.com/prometheus/client_golang/prometheus"
1516
"github.com/prometheus/common/model"
1617

@@ -131,7 +132,7 @@ func (tm *tableManager) loop() {
131132
case <-syncTicker.C:
132133
err := tm.syncTables(tm.ctx)
133134
if err != nil {
134-
level.Error(tm.logger).Log("msg", "error syncing local boltdb files with storage", "err", err)
135+
level.Error(tm.logger).Log("msg", "error syncing local index files with storage", "err", err)
135136
}
136137

137138
// we need to keep ensuring query readiness to download every days new table which would otherwise be downloaded only during queries.
@@ -180,10 +181,13 @@ func (tm *tableManager) ForEach(ctx context.Context, tableName, userID string, c
180181

181182
func (tm *tableManager) getOrCreateTable(tableName string) (Table, error) {
182183
// if table is already there, use it.
184+
start := time.Now()
183185
tm.tablesMtx.RLock()
184186
table, ok := tm.tables[tableName]
185187
tm.tablesMtx.RUnlock()
186188

189+
level.Info(tm.logger).Log("msg", "get or create table", "found", ok, "table", tableName, "wait_for_lock", time.Since(start))
190+
187191
if !ok {
188192
tm.tablesMtx.Lock()
189193
defer tm.tablesMtx.Unlock()
@@ -192,7 +196,7 @@ func (tm *tableManager) getOrCreateTable(tableName string) (Table, error) {
192196
table, ok = tm.tables[tableName]
193197
if !ok {
194198
// table not found, creating one.
195-
level.Info(tm.logger).Log("msg", fmt.Sprintf("downloading all files for table %s", tableName))
199+
level.Info(tm.logger).Log("msg", "downloading all files for table", "table", tableName)
196200

197201
tablePath := filepath.Join(tm.cfg.CacheDir, tableName)
198202
err := util.EnsureDirectory(tablePath)
@@ -227,11 +231,16 @@ func (tm *tableManager) syncTables(ctx context.Context) error {
227231

228232
level.Info(tm.logger).Log("msg", "syncing tables")
229233

230-
for _, table := range tm.tables {
234+
for name, table := range tm.tables {
235+
level.Debug(tm.logger).Log("msg", "syncing table", "table", name)
236+
start := time.Now()
231237
err := table.Sync(ctx)
238+
duration := float64(time.Since(start))
232239
if err != nil {
233-
return err
240+
tm.metrics.tableSyncLatency.WithLabelValues(name, statusFailure).Observe(duration)
241+
return errors.Wrapf(err, "failed to sync table '%s'", name)
234242
}
243+
tm.metrics.tableSyncLatency.WithLabelValues(name, statusSuccess).Observe(duration)
235244
}
236245

237246
return nil
@@ -244,10 +253,10 @@ func (tm *tableManager) cleanupCache() error {
244253
level.Info(tm.logger).Log("msg", "cleaning tables cache")
245254

246255
for name, table := range tm.tables {
247-
level.Info(tm.logger).Log("msg", fmt.Sprintf("cleaning up expired table %s", name))
256+
level.Debug(tm.logger).Log("msg", "cleaning up expired table", "table", name)
248257
isEmpty, err := table.DropUnusedIndex(tm.cfg.CacheTTL, time.Now())
249258
if err != nil {
250-
return err
259+
return errors.Wrapf(err, "failed to clean up expired table '%s'", name)
251260
}
252261

253262
if isEmpty {

0 commit comments

Comments
 (0)