1
1
package planner
2
2
3
3
import (
4
+ "bytes"
4
5
"context"
5
6
"fmt"
6
7
"io"
@@ -20,6 +21,8 @@ import (
20
21
"google.golang.org/grpc"
21
22
22
23
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
24
+ "github.com/grafana/loki/v3/pkg/chunkenc"
25
+ iter "github.com/grafana/loki/v3/pkg/iter/v2"
23
26
"github.com/grafana/loki/v3/pkg/storage"
24
27
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
25
28
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
@@ -166,11 +169,36 @@ func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef {
166
169
}
167
170
}
168
171
169
- func genBlock (ref bloomshipper.BlockRef ) bloomshipper.Block {
172
+ func genBlock (ref bloomshipper.BlockRef ) (bloomshipper.Block , error ) {
173
+ indexBuf := bytes .NewBuffer (nil )
174
+ bloomsBuf := bytes .NewBuffer (nil )
175
+ writer := v1 .NewMemoryBlockWriter (indexBuf , bloomsBuf )
176
+ reader := v1 .NewByteReader (indexBuf , bloomsBuf )
177
+
178
+ blockOpts := v1 .NewBlockOptions (chunkenc .EncNone , 4 , 1 , 0 , 0 )
179
+
180
+ builder , err := v1 .NewBlockBuilder (blockOpts , writer )
181
+ if err != nil {
182
+ return bloomshipper.Block {}, err
183
+ }
184
+
185
+ if _ , err = builder .BuildFrom (iter .NewEmptyIter [v1.SeriesWithBlooms ]()); err != nil {
186
+ return bloomshipper.Block {}, err
187
+ }
188
+
189
+ block := v1 .NewBlock (reader , v1 .NewMetrics (nil ))
190
+
191
+ buf := bytes .NewBuffer (nil )
192
+ if err := v1 .TarGz (buf , block .Reader ()); err != nil {
193
+ return bloomshipper.Block {}, err
194
+ }
195
+
196
+ tarReader := bytes .NewReader (buf .Bytes ())
197
+
170
198
return bloomshipper.Block {
171
199
BlockRef : ref ,
172
- Data : & DummyReadSeekCloser { },
173
- }
200
+ Data : bloomshipper. ClosableReadSeekerAdapter { ReadSeeker : tarReader },
201
+ }, nil
174
202
}
175
203
176
204
func Test_blockPlansForGaps (t * testing.T ) {
@@ -612,7 +640,12 @@ func putMetas(bloomClient bloomshipper.Client, metas []bloomshipper.Meta) error
612
640
}
613
641
614
642
for _ , block := range meta .Blocks {
615
- err := bloomClient .PutBlock (context .Background (), genBlock (block ))
643
+ writtenBlock , err := genBlock (block )
644
+ if err != nil {
645
+ return err
646
+ }
647
+
648
+ err = bloomClient .PutBlock (context .Background (), writtenBlock )
616
649
if err != nil {
617
650
return err
618
651
}
@@ -826,6 +859,7 @@ func Test_deleteOutdatedMetas(t *testing.T) {
826
859
for _ , tc := range []struct {
827
860
name string
828
861
originalMetas []bloomshipper.Meta
862
+ newMetas []bloomshipper.Meta
829
863
expectedUpToDateMetas []bloomshipper.Meta
830
864
}{
831
865
{
@@ -835,6 +869,8 @@ func Test_deleteOutdatedMetas(t *testing.T) {
835
869
name : "only up to date metas" ,
836
870
originalMetas : []bloomshipper.Meta {
837
871
genMeta (0 , 10 , []int {0 }, []bloomshipper.BlockRef {genBlockRef (0 , 10 )}),
872
+ },
873
+ newMetas : []bloomshipper.Meta {
838
874
genMeta (10 , 20 , []int {0 }, []bloomshipper.BlockRef {genBlockRef (10 , 20 )}),
839
875
},
840
876
expectedUpToDateMetas : []bloomshipper.Meta {
@@ -846,13 +882,52 @@ func Test_deleteOutdatedMetas(t *testing.T) {
846
882
name : "outdated metas" ,
847
883
originalMetas : []bloomshipper.Meta {
848
884
genMeta (0 , 5 , []int {0 }, []bloomshipper.BlockRef {genBlockRef (0 , 5 )}),
849
- genMeta (6 , 10 , []int {0 }, []bloomshipper.BlockRef {genBlockRef (6 , 10 )}),
885
+ },
886
+ newMetas : []bloomshipper.Meta {
850
887
genMeta (0 , 10 , []int {1 }, []bloomshipper.BlockRef {genBlockRef (0 , 10 )}),
851
888
},
852
889
expectedUpToDateMetas : []bloomshipper.Meta {
853
890
genMeta (0 , 10 , []int {1 }, []bloomshipper.BlockRef {genBlockRef (0 , 10 )}),
854
891
},
855
892
},
893
+ {
894
+ name : "new metas reuse blocks from outdated meta" ,
895
+ originalMetas : []bloomshipper.Meta {
896
+ genMeta (0 , 10 , []int {0 }, []bloomshipper.BlockRef { // Outdated
897
+ genBlockRef (0 , 5 ), // Reuse
898
+ genBlockRef (5 , 10 ), // Delete
899
+ }),
900
+ genMeta (10 , 20 , []int {0 }, []bloomshipper.BlockRef { // Outdated
901
+ genBlockRef (10 , 20 ), // Reuse
902
+ }),
903
+ genMeta (20 , 30 , []int {0 }, []bloomshipper.BlockRef { // Up to date
904
+ genBlockRef (20 , 30 ),
905
+ }),
906
+ },
907
+ newMetas : []bloomshipper.Meta {
908
+ genMeta (0 , 5 , []int {1 }, []bloomshipper.BlockRef {
909
+ genBlockRef (0 , 5 ), // Reused block
910
+ }),
911
+ genMeta (5 , 20 , []int {1 }, []bloomshipper.BlockRef {
912
+ genBlockRef (5 , 7 ), // New block
913
+ genBlockRef (7 , 10 ), // New block
914
+ genBlockRef (10 , 20 ), // Reused block
915
+ }),
916
+ },
917
+ expectedUpToDateMetas : []bloomshipper.Meta {
918
+ genMeta (0 , 5 , []int {1 }, []bloomshipper.BlockRef {
919
+ genBlockRef (0 , 5 ),
920
+ }),
921
+ genMeta (5 , 20 , []int {1 }, []bloomshipper.BlockRef {
922
+ genBlockRef (5 , 7 ),
923
+ genBlockRef (7 , 10 ),
924
+ genBlockRef (10 , 20 ),
925
+ }),
926
+ genMeta (20 , 30 , []int {0 }, []bloomshipper.BlockRef {
927
+ genBlockRef (20 , 30 ),
928
+ }),
929
+ },
930
+ },
856
931
} {
857
932
t .Run (tc .name , func (t * testing.T ) {
858
933
logger := log .NewNopLogger ()
@@ -867,9 +942,11 @@ func Test_deleteOutdatedMetas(t *testing.T) {
867
942
bloomClient , err := planner .bloomStore .Client (testDay .ModelTime ())
868
943
require .NoError (t , err )
869
944
870
- // Create original metas and blocks
945
+ // Create original/new metas and blocks
871
946
err = putMetas (bloomClient , tc .originalMetas )
872
947
require .NoError (t , err )
948
+ err = putMetas (bloomClient , tc .newMetas )
949
+ require .NoError (t , err )
873
950
874
951
// Get all metas
875
952
metas , err := planner .bloomStore .FetchMetas (
@@ -882,9 +959,9 @@ func Test_deleteOutdatedMetas(t *testing.T) {
882
959
)
883
960
require .NoError (t , err )
884
961
removeLocFromMetasSources (metas )
885
- require .ElementsMatch (t , tc .originalMetas , metas )
962
+ require .ElementsMatch (t , append ( tc .originalMetas , tc . newMetas ... ) , metas )
886
963
887
- upToDate , err := planner .deleteOutdatedMetasAndBlocks (context .Background (), testTable , "fakeTenant" , tc .originalMetas , phasePlanning )
964
+ upToDate , err := planner .deleteOutdatedMetasAndBlocks (context .Background (), testTable , "fakeTenant" , tc .newMetas , tc . originalMetas , phasePlanning )
888
965
require .NoError (t , err )
889
966
require .ElementsMatch (t , tc .expectedUpToDateMetas , upToDate )
890
967
@@ -900,6 +977,13 @@ func Test_deleteOutdatedMetas(t *testing.T) {
900
977
require .NoError (t , err )
901
978
removeLocFromMetasSources (metas )
902
979
require .ElementsMatch (t , tc .expectedUpToDateMetas , metas )
980
+
981
+ // Fetch all blocks from the metas
982
+ for _ , meta := range metas {
983
+ blocks , err := planner .bloomStore .FetchBlocks (context .Background (), meta .Blocks )
984
+ require .NoError (t , err )
985
+ require .Len (t , blocks , len (meta .Blocks ))
986
+ }
903
987
})
904
988
}
905
989
}
0 commit comments