|
| 1 | +package audit |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "fmt" |
| 6 | + "io" |
| 7 | + "path" |
| 8 | + "strings" |
| 9 | + "time" |
| 10 | + |
| 11 | + "github.com/go-kit/log" |
| 12 | + "github.com/go-kit/log/level" |
| 13 | + progressbar "github.com/schollz/progressbar/v3" |
| 14 | + "go.uber.org/atomic" |
| 15 | + "golang.org/x/sync/errgroup" |
| 16 | + |
| 17 | + "github.com/grafana/loki/v3/pkg/compactor" |
| 18 | + "github.com/grafana/loki/v3/pkg/compactor/retention" |
| 19 | + "github.com/grafana/loki/v3/pkg/storage" |
| 20 | + loki_storage "github.com/grafana/loki/v3/pkg/storage" |
| 21 | + "github.com/grafana/loki/v3/pkg/storage/chunk/client" |
| 22 | + indexshipper_storage "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage" |
| 23 | + shipperutil "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage" |
| 24 | + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" |
| 25 | + util_log "github.com/grafana/loki/v3/pkg/util/log" |
| 26 | +) |
| 27 | + |
| 28 | +const ( |
| 29 | + TsFormat = time.RFC3339Nano |
| 30 | +) |
| 31 | + |
| 32 | +func Run(ctx context.Context, cloudIndexPath, table string, cfg Config, logger log.Logger) (int, int, error) { |
| 33 | + level.Info(logger).Log("msg", "auditing index", "index", cloudIndexPath, "table", table, "tenant", cfg.Tenant, "working_dir", cfg.WorkingDir) |
| 34 | + |
| 35 | + objClient, err := GetObjectClient(cfg) |
| 36 | + if err != nil { |
| 37 | + return 0, 0, err |
| 38 | + } |
| 39 | + |
| 40 | + localFile, err := DownloadIndexFile(ctx, cfg, cloudIndexPath, objClient, logger) |
| 41 | + if err != nil { |
| 42 | + return 0, 0, err |
| 43 | + } |
| 44 | + |
| 45 | + compactedIdx, err := ParseCompactexIndex(ctx, localFile, table, cfg) |
| 46 | + if err != nil { |
| 47 | + return 0, 0, err |
| 48 | + } |
| 49 | + defer compactedIdx.Cleanup() |
| 50 | + |
| 51 | + return ValidateCompactedIndex(ctx, objClient, compactedIdx, cfg.Concurrency, logger) |
| 52 | +} |
| 53 | + |
| 54 | +func GetObjectClient(cfg Config) (client.ObjectClient, error) { |
| 55 | + periodCfg := cfg.SchemaConfig.Configs[len(cfg.SchemaConfig.Configs)-1] // only check the last period. |
| 56 | + |
| 57 | + objClient, err := loki_storage.NewObjectClient(periodCfg.ObjectType, cfg.StorageConfig, storage.NewClientMetrics()) |
| 58 | + if err != nil { |
| 59 | + return nil, fmt.Errorf("couldn't create object client: %w", err) |
| 60 | + } |
| 61 | + |
| 62 | + return objClient, nil |
| 63 | +} |
| 64 | + |
| 65 | +func DownloadIndexFile(ctx context.Context, cfg Config, cloudIndexPath string, objClient client.ObjectClient, logger log.Logger) (string, error) { |
| 66 | + splitPath := strings.Split(cloudIndexPath, "/") |
| 67 | + localFileName := splitPath[len(splitPath)-1] |
| 68 | + decompress := indexshipper_storage.IsCompressedFile(cloudIndexPath) |
| 69 | + if decompress { |
| 70 | + // get rid of the last extension, which is .gz |
| 71 | + localFileName = strings.TrimSuffix(localFileName, path.Ext(localFileName)) |
| 72 | + } |
| 73 | + localFilePath := path.Join(cfg.WorkingDir, localFileName) |
| 74 | + if err := shipperutil.DownloadFileFromStorage(localFilePath, decompress, false, logger, func() (io.ReadCloser, error) { |
| 75 | + r, _, err := objClient.GetObject(ctx, cloudIndexPath) |
| 76 | + return r, err |
| 77 | + }); err != nil { |
| 78 | + return "", fmt.Errorf("couldn't download file %q from storage: %w", cloudIndexPath, err) |
| 79 | + } |
| 80 | + |
| 81 | + level.Info(logger).Log("msg", "file successfully downloaded from storage", "path", cloudIndexPath) |
| 82 | + return localFileName, nil |
| 83 | +} |
| 84 | + |
| 85 | +func ParseCompactexIndex(ctx context.Context, localFilePath, table string, cfg Config) (compactor.CompactedIndex, error) { |
| 86 | + periodCfg := cfg.SchemaConfig.Configs[len(cfg.SchemaConfig.Configs)-1] // only check the last period. |
| 87 | + idxCompactor := tsdb.NewIndexCompactor() |
| 88 | + compactedIdx, err := idxCompactor.OpenCompactedIndexFile(ctx, localFilePath, table, cfg.Tenant, cfg.WorkingDir, periodCfg, util_log.Logger) |
| 89 | + if err != nil { |
| 90 | + return nil, fmt.Errorf("couldn't open compacted index file %q: %w", localFilePath, err) |
| 91 | + } |
| 92 | + return compactedIdx, nil |
| 93 | +} |
| 94 | + |
| 95 | +func ValidateCompactedIndex(ctx context.Context, objClient client.ObjectClient, compactedIdx compactor.CompactedIndex, parallelism int, logger log.Logger) (int, int, error) { |
| 96 | + var missingChunks, foundChunks atomic.Int32 |
| 97 | + foundChunks.Store(0) |
| 98 | + missingChunks.Store(0) |
| 99 | + bar := progressbar.NewOptions(-1, |
| 100 | + progressbar.OptionShowCount(), |
| 101 | + progressbar.OptionSetDescription("Chunks validated"), |
| 102 | + ) |
| 103 | + |
| 104 | + g, ctx := errgroup.WithContext(ctx) |
| 105 | + g.SetLimit(parallelism) |
| 106 | + compactedIdx.ForEachChunk(ctx, func(ce retention.ChunkEntry) (deleteChunk bool, err error) { //nolint:errcheck |
| 107 | + bar.Add(1) // nolint:errcheck |
| 108 | + g.Go(func() error { |
| 109 | + exists, err := CheckChunkExistance(string(ce.ChunkID), objClient) |
| 110 | + if err != nil || !exists { |
| 111 | + missingChunks.Add(1) |
| 112 | + logger.Log("msg", "chunk is missing", "err", err, "chunk_id", string(ce.ChunkID)) |
| 113 | + return nil |
| 114 | + } |
| 115 | + foundChunks.Add(1) |
| 116 | + return nil |
| 117 | + }) |
| 118 | + |
| 119 | + return false, nil |
| 120 | + }) |
| 121 | + g.Wait() // nolint:errcheck |
| 122 | + |
| 123 | + return int(foundChunks.Load()), int(missingChunks.Load()), nil |
| 124 | +} |
| 125 | + |
| 126 | +func CheckChunkExistance(key string, objClient client.ObjectClient) (bool, error) { |
| 127 | + exists, err := objClient.ObjectExists(context.Background(), key) |
| 128 | + if err != nil { |
| 129 | + return false, err |
| 130 | + } |
| 131 | + |
| 132 | + return exists, nil |
| 133 | +} |
0 commit comments