diff --git a/docs/sources/operations/storage/boltdb-shipper.md b/docs/sources/operations/storage/boltdb-shipper.md index 5a0275b6b2d70..da482731e299f 100644 --- a/docs/sources/operations/storage/boltdb-shipper.md +++ b/docs/sources/operations/storage/boltdb-shipper.md @@ -86,5 +86,14 @@ Frequency for checking updates can be configured with `resync_interval` config. To avoid keeping downloaded index files forever there is a ttl for them which defaults to 24 hours, which means if index files for a period are not used for 24 hours they would be removed from cache location. ttl can be configured using `cache_ttl` config. +### Write Deduplication disabled + +Loki does write deduplication of chunks and index using Chunks and WriteDedupe cache respectively, configured with [ChunkStoreConfig](../../configuration/README.md#chunk_store_config). +The problem with write deduplication when using `boltdb-shipper` though is ingesters only keep uploading boltdb files periodically to make them available to all the other services which means there would be a brief period where some of the services would not have received updated index yet. +The problem due to that is if an ingester which first wrote the chunks and index goes down and all the other ingesters which were part of replication scheme skipped writing those chunks and index due to deduplication, we would end up missing those logs from query responses since only the ingester which had the index went down. +This problem would be faced even during rollouts which is quite common. + +To avoid this, Loki disables deduplication of index when the replication factor is greater than 1 and `boltdb-shipper` is an active or upcoming index type. +While using `boltdb-shipper` please avoid configuring WriteDedupe cache since it is used purely for the index deduplication, so it would not be used anyways. diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 1bff071dd04a4..6792495ddc815 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -9,6 +9,7 @@ import ( "time" "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/chunk/storage" "github.com/cortexproject/cortex/pkg/cortex" cortex_querier "github.com/cortexproject/cortex/pkg/querier" @@ -178,7 +179,7 @@ func (t *Loki) initIngester() (_ services.Service, err error) { t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort // We want ingester to also query the store when using boltdb-shipper - pc := activePeriodConfig(t.cfg.SchemaConfig) + pc := t.cfg.SchemaConfig.Configs[activePeriodConfig(t.cfg.SchemaConfig)] if pc.IndexType == local.BoltDBShipperType { t.cfg.Ingester.QueryStore = true mlb, err := calculateMaxLookBack(pc, t.cfg.Ingester.QueryStoreMaxLookBackPeriod, t.cfg.Ingester.MaxChunkAge) @@ -240,7 +241,7 @@ func (t *Loki) initTableManager() (services.Service, error) { } func (t *Loki) initStore() (_ services.Service, err error) { - if activePeriodConfig(t.cfg.SchemaConfig).IndexType == local.BoltDBShipperType { + if t.cfg.SchemaConfig.Configs[activePeriodConfig(t.cfg.SchemaConfig)].IndexType == local.BoltDBShipperType { t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID switch t.cfg.Target { case Ingester: @@ -254,6 +255,13 @@ func (t *Loki) initStore() (_ services.Service, err error) { } } + // If RF > 1 and current or upcoming index type is boltdb-shipper then disable index dedupe and write dedupe cache. + // This is to ensure that index entries are replicated to all the boltdb files in ingesters flushing replicated data. + if t.cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && usingBoltdbShipper(t.cfg.SchemaConfig) { + t.cfg.ChunkStoreConfig.DisableIndexDeduplication = true + t.cfg.ChunkStoreConfig.WriteDedupeCacheConfig = cache.Config{} + } + t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer) if err != nil { return @@ -329,9 +337,9 @@ func (t *Loki) initMemberlistKV() (services.Service, error) { return t.memberlistKV, nil } -// activePeriodConfig type returns index type which would be applicable to logs that would be pushed starting now -// Note: Another periodic config can be applicable in future which can change index type -func activePeriodConfig(cfg chunk.SchemaConfig) chunk.PeriodConfig { +// activePeriodConfig returns index of active PeriodicConfig which would be applicable to logs that would be pushed starting now. +// Note: Another PeriodicConfig might be applicable for future logs which can change index type. +func activePeriodConfig(cfg chunk.SchemaConfig) int { now := model.Now() i := sort.Search(len(cfg.Configs), func(i int) bool { return cfg.Configs[i].From.Time > now @@ -339,7 +347,18 @@ func activePeriodConfig(cfg chunk.SchemaConfig) chunk.PeriodConfig { if i > 0 { i-- } - return cfg.Configs[i] + return i +} + +// usingBoltdbShipper check whether current or the next index type is boltdb-shipper, returns true if yes. +func usingBoltdbShipper(cfg chunk.SchemaConfig) bool { + activePCIndex := activePeriodConfig(cfg) + if cfg.Configs[activePCIndex].IndexType == local.BoltDBShipperType || + (len(cfg.Configs)-1 > activePCIndex && cfg.Configs[activePCIndex+1].IndexType == local.BoltDBShipperType) { + return true + } + + return false } func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, maxChunkAge time.Duration) (time.Duration, error) { diff --git a/pkg/loki/modules_test.go b/pkg/loki/modules_test.go index d25f82953f1ab..92ba815da5a92 100644 --- a/pkg/loki/modules_test.go +++ b/pkg/loki/modules_test.go @@ -18,22 +18,43 @@ func TestActiveIndexType(t *testing.T) { IndexType: "first", }} - assert.Equal(t, cfg.Configs[0], activePeriodConfig(cfg)) + assert.Equal(t, 0, activePeriodConfig(cfg)) // add a newer PeriodConfig in the past which should be considered cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{ From: chunk.DayTime{Time: model.Now().Add(-12 * time.Hour)}, IndexType: "second", }) - assert.Equal(t, cfg.Configs[1], activePeriodConfig(cfg)) + assert.Equal(t, 1, activePeriodConfig(cfg)) // add a newer PeriodConfig in the future which should not be considered cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{ From: chunk.DayTime{Time: model.Now().Add(time.Hour)}, IndexType: "third", }) - assert.Equal(t, cfg.Configs[1], activePeriodConfig(cfg)) + assert.Equal(t, 1, activePeriodConfig(cfg)) +} + +func TestUsingBoltdbShipper(t *testing.T) { + var cfg chunk.SchemaConfig + // just one PeriodConfig in the past using boltdb-shipper + cfg.Configs = []chunk.PeriodConfig{{ + From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)}, + IndexType: "boltdb-shipper", + }} + assert.Equal(t, true, usingBoltdbShipper(cfg)) + + // just one PeriodConfig in the past not using boltdb-shipper + cfg.Configs[0].IndexType = "boltdb" + assert.Equal(t, false, usingBoltdbShipper(cfg)) + + // add a newer PeriodConfig in the future using boltdb-shipper + cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{ + From: chunk.DayTime{Time: model.Now().Add(time.Hour)}, + IndexType: "boltdb-shipper", + }) + assert.Equal(t, true, usingBoltdbShipper(cfg)) } func Test_calculateMaxLookBack(t *testing.T) {