-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
Copy pathcache_test.go
126 lines (107 loc) · 3.32 KB
/
cache_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package bloomshipper
import (
"context"
"io/fs"
"os"
"path/filepath"
"sync"
"testing"
"time"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
)
type mockCache[K comparable, V any] struct {
sync.Mutex
cache map[K]V
}
func (m *mockCache[K, V]) Store(_ context.Context, keys []K, values []V) error {
m.Lock()
defer m.Unlock()
for i := range keys {
m.cache[keys[i]] = values[i]
}
return nil
}
func (m *mockCache[K, V]) Fetch(_ context.Context, keys []K) (found []K, values []V, missing []K, err error) {
m.Lock()
defer m.Unlock()
for _, key := range keys {
buf, ok := m.cache[key]
if ok {
found = append(found, key)
values = append(values, buf)
} else {
missing = append(missing, key)
}
}
return
}
func (m *mockCache[K, V]) Stop() {
}
func (m *mockCache[K, V]) GetCacheType() stats.CacheType {
return "mock"
}
func newTypedMockCache[K comparable, V any]() *mockCache[K, V] {
return &mockCache[K, V]{
cache: make(map[K]V),
}
}
func Test_LoadBlocksDirIntoCache(t *testing.T) {
logger := log.NewNopLogger()
wd := t.TempDir()
// plain file
fp, _ := os.Create(filepath.Join(wd, "regular-file.tar.gz"))
fp.Close()
// invalid directory
invalidDir := "not/a/valid/blockdir"
_ = os.MkdirAll(filepath.Join(wd, invalidDir), 0o755)
// empty block directories
emptyDir1 := "bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-abcd"
_ = os.MkdirAll(filepath.Join(wd, emptyDir1), 0o755)
emptyDir2 := "bloom/table_1/tenant/blocks/0000000000010000-000000000001ffff/0-3600000-ef01"
_ = os.MkdirAll(filepath.Join(wd, emptyDir2), 0o755)
emptyDir3 := "bloom/table_1/tenant/blocks/0000000000020000-000000000002ffff/0-3600000-2345"
_ = os.MkdirAll(filepath.Join(wd, emptyDir3), 0o755)
// valid block directory
validDir := "bloom/table_2/tenant/blocks/0000000000010000-000000000001ffff/0-3600000-abcd"
_ = os.MkdirAll(filepath.Join(wd, validDir), 0o755)
for _, fn := range []string{"bloom", "series"} {
fp, _ = os.Create(filepath.Join(wd, validDir, fn))
fp.Close()
}
cfg := config.BlocksCacheConfig{
SoftLimit: 1 << 20,
HardLimit: 2 << 20,
TTL: time.Hour,
PurgeInterval: time.Hour,
}
c := NewFsBlocksCache(cfg, nil, log.NewNopLogger())
err := LoadBlocksDirIntoCache([]string{wd, t.TempDir()}, c, logger)
require.NoError(t, err)
require.Equal(t, 1, len(c.entries))
key := validDir + ".tar.gz" // cache key must not contain directory prefix
elem, found := c.entries[key]
require.True(t, found)
blockDir := elem.Value.(*Entry).Value
require.Equal(t, filepath.Join(wd, validDir), blockDir.Path)
// check cleaned directories
dirs := make([]string, 0, 6)
_ = filepath.WalkDir(wd, func(path string, dirEntry fs.DirEntry, _ error) error {
if !dirEntry.IsDir() {
return nil
}
dirs = append(dirs, path)
return nil
})
require.Equal(t, []string{
filepath.Join(wd),
filepath.Join(wd, "bloom/"),
filepath.Join(wd, "bloom/table_2/"),
filepath.Join(wd, "bloom/table_2/tenant/"),
filepath.Join(wd, "bloom/table_2/tenant/blocks/"),
filepath.Join(wd, "bloom/table_2/tenant/blocks/0000000000010000-000000000001ffff"),
filepath.Join(wd, "bloom/table_2/tenant/blocks/0000000000010000-000000000001ffff/0-3600000-abcd"),
}, dirs)
}