Skip to content

Commit

Permalink
Sidecar: Use flag to ignore overlap check, add test for it (thanos-io…
Browse files Browse the repository at this point in the history
…#4922)

* draft: add ignore overlap flag

Signed-off-by: metonymic-smokey <[email protected]>

* lint fix

Signed-off-by: metonymic-smokey <[email protected]>

* updated values in test cases

Signed-off-by: metonymic-smokey <[email protected]>

* used existing flag

Signed-off-by: metonymic-smokey <[email protected]>

* draft: added unit test for overlapping uploads

Signed-off-by: metonymic-smokey <[email protected]>

* removed test for each Prom version

Signed-off-by: metonymic-smokey <[email protected]>

* nit fix

Signed-off-by: metonymic-smokey <[email protected]>
  • Loading branch information
metonymic-smokey authored Dec 12, 2021
1 parent 4c1deec commit 0145b42
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 7 deletions.
10 changes: 3 additions & 7 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,10 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
continue
}

if m.Compaction.Level > 1 {
// Skip overlap check if out of order uploads is enabled.
if m.Compaction.Level > 1 && !s.allowOutOfOrderUploads {
if err := checker.IsOverlapping(ctx, m.BlockMeta); err != nil {
if !s.allowOutOfOrderUploads {
return 0, errors.Errorf("Found overlap or error during sync, cannot upload compacted block, details: %v", err)
}
level.Error(s.logger).Log("msg", "found overlap or error during sync, cannot upload compacted block", "err", err)
uploadErrs++
continue
return 0, errors.Errorf("Found overlap or error during sync, cannot upload compacted block, details: %v", err)
}
}

Expand Down
151 changes: 151 additions & 0 deletions pkg/shipper/shipper_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,154 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) {
testutil.Assert(t, ok == false, "fifth block was reuploaded")
})
}

// TestShipper_SyncOverlapBlocks_e2e is a unit test for the functionality by allowOutOfOrderUploads flag. This allows compacted(compaction level greater than 1) blocks to be uploaded despite overlapping time ranges.
func TestShipper_SyncOverlapBlocks_e2e(t *testing.T) {
p, err := e2eutil.NewPrometheus()
testutil.Ok(t, err)
dir, err := ioutil.TempDir("", "shipper-e2e-test")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()

bkt := objstore.NewInMemBucket()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

extLset := labels.FromStrings("prometheus", "prom-1")

testutil.Ok(t, p.Start())

logger := log.NewNopLogger()
upctx, upcancel := context.WithTimeout(ctx, 10*time.Second)
defer upcancel()
testutil.Ok(t, p.WaitPrometheusUp(upctx, logger))

p.DisableCompaction()
testutil.Ok(t, p.Restart())

upctx2, upcancel2 := context.WithTimeout(ctx, 10*time.Second)
defer upcancel2()
testutil.Ok(t, p.WaitPrometheusUp(upctx2, logger))

// Here, the allowOutOfOrderUploads flag is set to true, which allows blocks with overlaps to be uploaded.
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, true, metadata.NoneFunc)

// Creating 2 overlapping blocks - both uploaded when OOO uploads allowed.
var (
expBlocks = map[ulid.ULID]struct{}{}
expFiles = map[string][]byte{}
randr = rand.New(rand.NewSource(0))
ids = []ulid.ULID{}
)

id := make([]ulid.ULID, 2)
tmp := make([]string, 2)
m := make([]metadata.Meta, 2)

for i := 0; i < 2; i++ {
id[i] = ulid.MustNew(uint64(i), randr)

bdir := filepath.Join(dir, id[i].String())
tmp := bdir + ".tmp"

testutil.Ok(t, os.Mkdir(tmp, 0777))

m[i] = metadata.Meta{
BlockMeta: tsdb.BlockMeta{
Version: 1,
ULID: id[i],
Stats: tsdb.BlockStats{
NumSamples: 1,
},
Compaction: tsdb.BlockMetaCompaction{
Level: 2,
},
},
Thanos: metadata.Thanos{
Source: metadata.TestSource,
},
}
}

m[0].BlockMeta.MinTime = 10
m[0].BlockMeta.MaxTime = 20

m[1].BlockMeta.MinTime = 15
m[1].BlockMeta.MaxTime = 17

for i := 0; i < 2; i++ {
bdir := filepath.Join(dir, m[i].BlockMeta.ULID.String())
tmp[i] = bdir + ".tmp"

metab, err := json.Marshal(&m[i])
testutil.Ok(t, err)

testutil.Ok(t, ioutil.WriteFile(tmp[i]+"/meta.json", metab, 0666))
testutil.Ok(t, ioutil.WriteFile(tmp[i]+"/index", []byte("indexcontents"), 0666))

// Running shipper while a block is being written to temp dir should not trigger uploads.
b, err := shipper.Sync(ctx)
testutil.Ok(t, err)
testutil.Equals(t, 0, b)

shipMeta, err := ReadMetaFile(dir)
testutil.Ok(t, err)
if len(shipMeta.Uploaded) == 0 {
shipMeta.Uploaded = []ulid.ULID{}
}
testutil.Equals(t, &Meta{Version: MetaVersion1, Uploaded: ids}, shipMeta)

testutil.Ok(t, os.MkdirAll(tmp[i]+"/chunks", 0777))
testutil.Ok(t, ioutil.WriteFile(tmp[i]+"/chunks/0001", []byte("chunkcontents1"), 0666))
testutil.Ok(t, ioutil.WriteFile(tmp[i]+"/chunks/0002", []byte("chunkcontents2"), 0666))

testutil.Ok(t, os.Rename(tmp[i], bdir))

// After rename sync should upload the block.
b, err = shipper.Sync(ctx)
testutil.Ok(t, err)
testutil.Equals(t, 1, b)
ids = append(ids, id[i])

// The external labels must be attached to the meta file on upload.
m[i].Thanos.Labels = extLset.Map()
m[i].Thanos.SegmentFiles = []string{"0001", "0002"}
m[i].Thanos.Files = []metadata.File{
{RelPath: "chunks/0001", SizeBytes: 14},
{RelPath: "chunks/0002", SizeBytes: 14},
{RelPath: "index", SizeBytes: 13},
{RelPath: "meta.json"},
}

buf := bytes.Buffer{}
testutil.Ok(t, m[i].Write(&buf))

expBlocks[id[i]] = struct{}{}
expFiles[id[i].String()+"/meta.json"] = buf.Bytes()
expFiles[id[i].String()+"/index"] = []byte("indexcontents")
expFiles[id[i].String()+"/chunks/0001"] = []byte("chunkcontents1")
expFiles[id[i].String()+"/chunks/0002"] = []byte("chunkcontents2")

// The shipper meta file should show all blocks as uploaded except the compacted one.
shipMeta, err = ReadMetaFile(dir)
testutil.Ok(t, err)
testutil.Equals(t, &Meta{Version: MetaVersion1, Uploaded: ids}, shipMeta)
}

for id := range expBlocks {
ok, _ := bkt.Exists(ctx, path.Join(id.String(), block.MetaFilename))
testutil.Assert(t, ok, "block %s was not uploaded", id)
}

for fn, exp := range expFiles {
rc, err := bkt.Get(ctx, fn)
testutil.Ok(t, err)
act, err := ioutil.ReadAll(rc)
testutil.Ok(t, err)
testutil.Ok(t, rc.Close())
testutil.Equals(t, string(exp), string(act))
}
}

0 comments on commit 0145b42

Please sign in to comment.