From 31d5cbbf1b7386d80478a7c46aecd5b7d4ecfa3d Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Tue, 28 Jan 2025 04:28:50 +0530 Subject: [PATCH 01/16] fix count --- posting/index.go | 24 ++++++++-- posting/list.go | 4 +- worker/sort_test.go | 106 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 128 insertions(+), 6 deletions(-) diff --git a/posting/index.go b/posting/index.go index 739b45898a1..40c66fecdc6 100644 --- a/posting/index.go +++ b/posting/index.go @@ -268,7 +268,8 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List, } } if hasCountIndex { - countAfter = countAfterMutation(countBefore, found, edge.Op) + isScalarPredicate := !schema.State().IsList(edge.Attr) + countAfter = countAfterMutation(countBefore, found, edge.Op, isScalarPredicate) return countParams{ attr: edge.Attr, countBefore: countBefore, @@ -475,7 +476,21 @@ func (txn *Txn) updateCount(ctx context.Context, params countParams) error { return nil } -func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op) int { +// Gives the count of the posting after the mutation has finished. Currently we use this to figure after the mutation +// what is the count. For non scalar predicate, we need to use found and the operation that the user did to figure out +// if the new node was inserted or not. However, for single uid predicates this information is not useful. For scalar +// predicate, delete only works if the value was found. Set would just result in 1 alaways. +func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op, isScalarPredicate bool) int { + if isScalarPredicate { + if op == pb.DirectedEdge_SET { + return 1 + } else if op == pb.DirectedEdge_DEL && found { + return 0 + } else { + return countBefore + } + } + if !found && op != pb.DirectedEdge_DEL { return countBefore + 1 } else if found && op == pb.DirectedEdge_DEL { @@ -516,7 +531,8 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo var found bool var err error - delNonListPredicate := !schema.State().IsList(t.Attr) && + isScalarPredicate := !schema.State().IsList(t.Attr) + delNonListPredicate := isScalarPredicate && t.Op == pb.DirectedEdge_DEL && string(t.Value) != x.Star switch { @@ -560,7 +576,7 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo } if hasCountIndex { - countAfter = countAfterMutation(countBefore, found, t.Op) + countAfter = countAfterMutation(countBefore, found, t.Op, isScalarPredicate) return val, found, countParams{ attr: t.Attr, countBefore: countBefore, diff --git a/posting/list.go b/posting/list.go index d61b6f20966..44e50be32ca 100644 --- a/posting/list.go +++ b/posting/list.go @@ -382,7 +382,7 @@ func (mm *MutableLayer) print() string { mm.deleteAllMarker) } -func (l *List) print() string { +func (l *List) Print() string { return fmt.Sprintf("minTs: %d, plist: %+v, mutationMap: %s", l.minTs, l.plist, l.mutationMap.print()) } @@ -1123,7 +1123,7 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e // pitr iterates through immutable postings err = pitr.seek(l, afterUid, deleteBelowTs) if err != nil { - return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate "+l.print()) + return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate "+l.Print()) } loop: diff --git a/worker/sort_test.go b/worker/sort_test.go index 0c67f2a0689..52b08921843 100644 --- a/worker/sort_test.go +++ b/worker/sort_test.go @@ -66,6 +66,112 @@ func writePostingListToDisk(kvs []*bpb.KV) error { return writer.Flush() } +func TestScalarPredicateIntCount(t *testing.T) { + dir, err := os.MkdirTemp("", "storetest_") + x.Check(err) + defer os.RemoveAll(dir) + + opt := badger.DefaultOptions(dir) + ps, err := badger.OpenManaged(opt) + x.Check(err) + pstore = ps + posting.Init(ps, 0, false) + Init(ps) + err = schema.ParseBytes([]byte("scalarPredicateCount: string @count ."), 1) + require.NoError(t, err) + + ctx := context.Background() + attr := x.GalaxyAttr("scalarPredicateCount") + + runM := func(startTs, commitTs uint64, edge *pb.DirectedEdge) { + txn := posting.Oracle().RegisterStartTs(startTs) + x.Check(runMutation(ctx, edge, txn)) + txn.Update() + writer := posting.NewTxnWriter(pstore) + require.NoError(t, txn.CommitToDisk(writer, commitTs)) + require.NoError(t, writer.Flush()) + txn.UpdateCachedKeys(commitTs) + } + + runM(5, 7, &pb.DirectedEdge{ + Value: []byte("a"), + ValueType: pb.Posting_STRING, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_SET, + }) + + key := x.CountKey(attr, 1, false) + rollup(t, key, ps, 8) + + runM(9, 11, &pb.DirectedEdge{ + Value: []byte("a"), + ValueType: pb.Posting_STRING, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_DEL, + }) + + txn := posting.Oracle().RegisterStartTs(20) + l, err := txn.Get(key) + l.RLock() + require.Equal(t, 0, l.GetLength(20)) + l.RUnlock() +} + +func TestScalarPredicateCount(t *testing.T) { + dir, err := os.MkdirTemp("", "storetest_") + x.Check(err) + defer os.RemoveAll(dir) + + opt := badger.DefaultOptions(dir) + ps, err := badger.OpenManaged(opt) + x.Check(err) + pstore = ps + posting.Init(ps, 0, false) + Init(ps) + err = schema.ParseBytes([]byte("scalarPredicateCount: uid @count ."), 1) + require.NoError(t, err) + + ctx := context.Background() + attr := x.GalaxyAttr("scalarPredicateCount") + + runM := func(startTs, commitTs uint64, edge *pb.DirectedEdge) { + txn := posting.Oracle().RegisterStartTs(startTs) + x.Check(runMutation(ctx, edge, txn)) + txn.Update() + writer := posting.NewTxnWriter(pstore) + require.NoError(t, txn.CommitToDisk(writer, commitTs)) + require.NoError(t, writer.Flush()) + txn.UpdateCachedKeys(commitTs) + } + + runM(5, 7, &pb.DirectedEdge{ + ValueId: 2, + ValueType: pb.Posting_UID, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_SET, + }) + + key := x.CountKey(attr, 1, false) + rollup(t, key, ps, 8) + + runM(9, 11, &pb.DirectedEdge{ + ValueId: 3, + ValueType: pb.Posting_UID, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_SET, + }) + + txn := posting.Oracle().RegisterStartTs(15) + l, err := txn.Get(key) + l.RLock() + require.Equal(t, 1, l.GetLength(15)) + l.RUnlock() +} + func TestSingleUid(t *testing.T) { dir, err := os.MkdirTemp("", "storetest_") x.Check(err) From 67401df9173b1f0a283b2162706953d3b0e970f7 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Tue, 28 Jan 2025 20:37:26 +0530 Subject: [PATCH 02/16] added something --- posting/list.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/posting/list.go b/posting/list.go index 44e50be32ca..808a4e17a34 100644 --- a/posting/list.go +++ b/posting/list.go @@ -752,12 +752,7 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) erro // Add the deletions in the existing plist because those postings are not picked // up by iterating. Not doing so would result in delete operations that are not // applied when the transaction is committed. - for _, post := range l.mutationMap.currentEntries.Postings { - if post.Op == Del && post.Uid != mpost.Uid { - newPlist.Postings = append(newPlist.Postings, post) - } - } - + l.mutationMap.currentEntries = &pb.PostingList{} err := l.iterate(mpost.StartTs, 0, func(obj *pb.Posting) error { // Ignore values which have the same uid as they will get replaced // by the current value. @@ -776,9 +771,6 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) erro if err != nil { return err } - - // Update the mutation map with the new plist. Return here since the code below - // does not apply for predicates of type uid. l.mutationMap.setCurrentEntries(mpost.StartTs, newPlist) return nil } From eefaffb942178f49634ae766ff9a88a6ee97e608 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 29 Jan 2025 04:26:50 +0530 Subject: [PATCH 03/16] added something --- posting/index.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/posting/index.go b/posting/index.go index 40c66fecdc6..9368dff9504 100644 --- a/posting/index.go +++ b/posting/index.go @@ -253,22 +253,27 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List, hasCountIndex bool, edge *pb.DirectedEdge) (countParams, error) { countBefore, countAfter := 0, 0 found := false + var currPost *pb.Posting + isScalarPredicate := !schema.State().IsList(edge.Attr) plist.Lock() defer plist.Unlock() if hasCountIndex { - countBefore, found, _ = plist.getPostingAndLengthNoSort(txn.StartTs, 0, edge.ValueId) + countBefore, found, currPost = plist.getPostingAndLengthNoSort(txn.StartTs, 0, edge.ValueId) if countBefore < 0 { return emptyCountParams, errors.Wrapf(ErrTsTooOld, "Adding reverse mutation helper count") } } - if !(hasCountIndex && !shouldAddCountEdge(found, edge)) { + + if isScalarPredicate && edge.Op == pb.DirectedEdge_DEL && currPost != nil && currPost.StartTs == txn.StartTs { + plist.mutationMap.setCurrentEntries(txn.StartTs, &pb.PostingList{}) + } else if !(hasCountIndex && !shouldAddCountEdge(found, edge)) { if err := plist.addMutationInternal(ctx, txn, edge); err != nil { return emptyCountParams, err } } + if hasCountIndex { - isScalarPredicate := !schema.State().IsList(edge.Attr) countAfter = countAfterMutation(countBefore, found, edge.Op, isScalarPredicate) return countParams{ attr: edge.Attr, From 378c3f202775c7affef09007e01a9cf79a90d9a5 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 29 Jan 2025 04:58:24 +0530 Subject: [PATCH 04/16] update fix --- posting/index.go | 7 ++----- posting/list.go | 30 ++++++++++++++++++++++++++---- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/posting/index.go b/posting/index.go index 9368dff9504..09859709422 100644 --- a/posting/index.go +++ b/posting/index.go @@ -253,21 +253,18 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List, hasCountIndex bool, edge *pb.DirectedEdge) (countParams, error) { countBefore, countAfter := 0, 0 found := false - var currPost *pb.Posting isScalarPredicate := !schema.State().IsList(edge.Attr) plist.Lock() defer plist.Unlock() if hasCountIndex { - countBefore, found, currPost = plist.getPostingAndLengthNoSort(txn.StartTs, 0, edge.ValueId) + countBefore, found, _ = plist.getPostingAndLengthNoSort(txn.StartTs, 0, edge.ValueId) if countBefore < 0 { return emptyCountParams, errors.Wrapf(ErrTsTooOld, "Adding reverse mutation helper count") } } - if isScalarPredicate && edge.Op == pb.DirectedEdge_DEL && currPost != nil && currPost.StartTs == txn.StartTs { - plist.mutationMap.setCurrentEntries(txn.StartTs, &pb.PostingList{}) - } else if !(hasCountIndex && !shouldAddCountEdge(found, edge)) { + if !(hasCountIndex && !shouldAddCountEdge(found, edge)) { if err := plist.addMutationInternal(ctx, txn, edge); err != nil { return emptyCountParams, err } diff --git a/posting/list.go b/posting/list.go index 808a4e17a34..11e973a293d 100644 --- a/posting/list.go +++ b/posting/list.go @@ -345,7 +345,7 @@ func (mm *MutableLayer) populateUidMap(pl *pb.PostingList) { } // insertPosting inserts a new posting in the mutable layers. It updates the currentUids map. -func (mm *MutableLayer) insertPosting(mpost *pb.Posting) { +func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) { if mm.readTs != 0 { x.AssertTrue(mpost.StartTs == mm.readTs) } @@ -360,6 +360,28 @@ func (mm *MutableLayer) insertPosting(mpost *pb.Posting) { if mpost.Uid != 0 { mm.populateUidMap(mm.currentEntries) + // If hasCountIndex, in that case while inserting uids, if there's a delete, we only delete from the + // current entries, we dont' insert the delete posting. If we insert the delete posting, there won't be + // any set posting in the list. This would mess up the count. We can do this for all types, however, + // there might be a performance hit becasue of it. + if hasCountIndex && mpost.Op == Del { + postIndex, ok := mm.currentUids[mpost.Uid] + if !ok { + return + } + + // If the posting was there before, just remove it from the map, and then remove it from the + // array. + delete(mm.currentUids, mpost.Uid) + res := mm.currentEntries.Postings[:postIndex] + if postIndex+1 <= len(mm.currentEntries.Postings) { + mm.currentEntries.Postings = append(res, + mm.currentEntries.Postings[(postIndex+1):]...) + } + mm.currentEntries.Postings = res + return + } + if postIndex, ok := mm.currentUids[mpost.Uid]; ok { mm.currentEntries.Postings[postIndex] = mpost } else { @@ -721,7 +743,7 @@ func hasDeleteAll(mpost *pb.Posting) bool { } // Ensure that you either abort the uncommitted postings or commit them before calling me. -func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) error { +func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate, hasCountIndex bool) error { l.AssertLock() x.AssertTrue(mpost.Op == Set || mpost.Op == Del || mpost.Op == Ovr) @@ -775,7 +797,7 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) erro return nil } - l.mutationMap.insertPosting(mpost) + l.mutationMap.insertPosting(mpost, hasCountIndex) return nil } @@ -899,7 +921,7 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID && pk.IsData() && mpost.Op != Del && mpost.PostingType == pb.Posting_REF - if err != l.updateMutationLayer(mpost, isSingleUidUpdate) { + if err != l.updateMutationLayer(mpost, isSingleUidUpdate, pred.GetCount()) { return errors.Wrapf(err, "cannot update mutation layer of key %s with value %+v", hex.EncodeToString(l.key), mpost) } From 3e740801319f7c5a50033ba9d09ffc81d82e9a1b Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 29 Jan 2025 05:07:40 +0530 Subject: [PATCH 05/16] fixed stuff --- posting/list.go | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/posting/list.go b/posting/list.go index 11e973a293d..28716cd23b7 100644 --- a/posting/list.go +++ b/posting/list.go @@ -359,30 +359,24 @@ func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) { } if mpost.Uid != 0 { - mm.populateUidMap(mm.currentEntries) // If hasCountIndex, in that case while inserting uids, if there's a delete, we only delete from the // current entries, we dont' insert the delete posting. If we insert the delete posting, there won't be // any set posting in the list. This would mess up the count. We can do this for all types, however, // there might be a performance hit becasue of it. - if hasCountIndex && mpost.Op == Del { - postIndex, ok := mm.currentUids[mpost.Uid] - if !ok { + mm.populateUidMap(mm.currentEntries) + if postIndex, ok := mm.currentUids[mpost.Uid]; ok { + if hasCountIndex && mpost.Op == Del { + // If the posting was there before, just remove it from the map, and then remove it + // from the array. + delete(mm.currentUids, mpost.Uid) + res := mm.currentEntries.Postings[:postIndex] + if postIndex+1 <= len(mm.currentEntries.Postings) { + mm.currentEntries.Postings = append(res, + mm.currentEntries.Postings[(postIndex+1):]...) + } + mm.currentEntries.Postings = res return } - - // If the posting was there before, just remove it from the map, and then remove it from the - // array. - delete(mm.currentUids, mpost.Uid) - res := mm.currentEntries.Postings[:postIndex] - if postIndex+1 <= len(mm.currentEntries.Postings) { - mm.currentEntries.Postings = append(res, - mm.currentEntries.Postings[(postIndex+1):]...) - } - mm.currentEntries.Postings = res - return - } - - if postIndex, ok := mm.currentUids[mpost.Uid]; ok { mm.currentEntries.Postings[postIndex] = mpost } else { mm.currentEntries.Postings = append(mm.currentEntries.Postings, mpost) From 07111338ef3fa98be6fc1d07f38b9431748bedb7 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 29 Jan 2025 05:19:01 +0530 Subject: [PATCH 06/16] fixed a test --- worker/sort_test.go | 79 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/worker/sort_test.go b/worker/sort_test.go index 52b08921843..0c5d8c13090 100644 --- a/worker/sort_test.go +++ b/worker/sort_test.go @@ -66,6 +66,85 @@ func writePostingListToDisk(kvs []*bpb.KV) error { return writer.Flush() } +func TestScalarPredicateRevCount(t *testing.T) { + dir, err := os.MkdirTemp("", "storetest_") + x.Check(err) + defer os.RemoveAll(dir) + + opt := badger.DefaultOptions(dir) + ps, err := badger.OpenManaged(opt) + x.Check(err) + pstore = ps + posting.Init(ps, 0, false) + Init(ps) + err = schema.ParseBytes([]byte("scalarPredicateCount: uid @reverse @count ."), 1) + require.NoError(t, err) + + ctx := context.Background() + attr := x.GalaxyAttr("scalarPredicateCount") + + runM := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) { + txn := posting.Oracle().RegisterStartTs(startTs) + for _, edge := range edges { + x.Check(runMutation(ctx, edge, txn)) + } + txn.Update() + writer := posting.NewTxnWriter(pstore) + require.NoError(t, txn.CommitToDisk(writer, commitTs)) + require.NoError(t, writer.Flush()) + txn.UpdateCachedKeys(commitTs) + } + + runM(9, 11, []*pb.DirectedEdge{{ + ValueId: 3, + ValueType: pb.Posting_UID, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_SET, + }, { + ValueId: 3, + ValueType: pb.Posting_UID, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_DEL, + }}) + + txn := posting.Oracle().RegisterStartTs(13) + key := x.CountKey(attr, 1, false) + l, err := txn.Get(key) + l.RLock() + require.Equal(t, 0, l.GetLength(13)) + l.RUnlock() + + runM(15, 17, []*pb.DirectedEdge{{ + ValueId: 3, + ValueType: pb.Posting_UID, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_SET, + }}) + + txn = posting.Oracle().RegisterStartTs(18) + l, err = txn.Get(key) + l.RLock() + require.Equal(t, 1, l.GetLength(18)) + l.RUnlock() + + runM(18, 19, []*pb.DirectedEdge{{ + ValueId: 3, + ValueType: pb.Posting_UID, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_DEL, + }}) + + txn = posting.Oracle().RegisterStartTs(20) + l, err = txn.Get(key) + l.RLock() + require.Equal(t, 0, l.GetLength(20)) + l.RUnlock() +} + func TestScalarPredicateIntCount(t *testing.T) { dir, err := os.MkdirTemp("", "storetest_") x.Check(err) From 901fe4ce591e4bd00ac823a4a22ec317caf6b1a1 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 29 Jan 2025 06:36:18 +0530 Subject: [PATCH 07/16] fixed test --- worker/sort_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/worker/sort_test.go b/worker/sort_test.go index 0c5d8c13090..f363efb1851 100644 --- a/worker/sort_test.go +++ b/worker/sort_test.go @@ -77,11 +77,11 @@ func TestScalarPredicateRevCount(t *testing.T) { pstore = ps posting.Init(ps, 0, false) Init(ps) - err = schema.ParseBytes([]byte("scalarPredicateCount: uid @reverse @count ."), 1) + err = schema.ParseBytes([]byte("scalarPredicateCount2: uid @reverse @count ."), 1) require.NoError(t, err) ctx := context.Background() - attr := x.GalaxyAttr("scalarPredicateCount") + attr := x.GalaxyAttr("scalarPredicateCount2") runM := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) { txn := posting.Oracle().RegisterStartTs(startTs) @@ -156,11 +156,11 @@ func TestScalarPredicateIntCount(t *testing.T) { pstore = ps posting.Init(ps, 0, false) Init(ps) - err = schema.ParseBytes([]byte("scalarPredicateCount: string @count ."), 1) + err = schema.ParseBytes([]byte("scalarPredicateCount1: string @count ."), 1) require.NoError(t, err) ctx := context.Background() - attr := x.GalaxyAttr("scalarPredicateCount") + attr := x.GalaxyAttr("scalarPredicateCount1") runM := func(startTs, commitTs uint64, edge *pb.DirectedEdge) { txn := posting.Oracle().RegisterStartTs(startTs) From 4d9d9afa445064b6d8645d59b2bbb8c51bbeb1b2 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 29 Jan 2025 06:49:34 +0530 Subject: [PATCH 08/16] fixed posting --- posting/list.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/posting/list.go b/posting/list.go index 28716cd23b7..4c1e0c1d9a6 100644 --- a/posting/list.go +++ b/posting/list.go @@ -369,6 +369,11 @@ func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) { // If the posting was there before, just remove it from the map, and then remove it // from the array. delete(mm.currentUids, mpost.Uid) + post := mm.currentEntries.Postings[postIndex] + if post.Op == Del { + // No need to do anything + return + } res := mm.currentEntries.Postings[:postIndex] if postIndex+1 <= len(mm.currentEntries.Postings) { mm.currentEntries.Postings = append(res, From 2353b89b4e16a06f8fd80b5ebd4b1820e06b7072 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 29 Jan 2025 07:01:23 +0530 Subject: [PATCH 09/16] fixed something --- posting/list.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/posting/list.go b/posting/list.go index 4c1e0c1d9a6..dd572ed84c7 100644 --- a/posting/list.go +++ b/posting/list.go @@ -368,14 +368,18 @@ func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) { if hasCountIndex && mpost.Op == Del { // If the posting was there before, just remove it from the map, and then remove it // from the array. - delete(mm.currentUids, mpost.Uid) post := mm.currentEntries.Postings[postIndex] if post.Op == Del { // No need to do anything + mm.currentEntries.Postings[postIndex] = mpost return } + delete(mm.currentUids, mpost.Uid) res := mm.currentEntries.Postings[:postIndex] if postIndex+1 <= len(mm.currentEntries.Postings) { + for i, pos := range mm.currentEntries.Postings[(postIndex + 1):] { + mm.currentUids[pos.Uid] = i - 1 + } mm.currentEntries.Postings = append(res, mm.currentEntries.Postings[(postIndex+1):]...) } From 68cfd5f74e3594661a84769d50e0eda2dc640f42 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 29 Jan 2025 07:03:20 +0530 Subject: [PATCH 10/16] fixed something --- posting/list.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posting/list.go b/posting/list.go index dd572ed84c7..1a3c995bf02 100644 --- a/posting/list.go +++ b/posting/list.go @@ -378,7 +378,7 @@ func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) { res := mm.currentEntries.Postings[:postIndex] if postIndex+1 <= len(mm.currentEntries.Postings) { for i, pos := range mm.currentEntries.Postings[(postIndex + 1):] { - mm.currentUids[pos.Uid] = i - 1 + mm.currentUids[pos.Uid] = i + postIndex } mm.currentEntries.Postings = append(res, mm.currentEntries.Postings[(postIndex+1):]...) From 6e5507a81aa1f2295135114933fa15caa8794063 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 29 Jan 2025 07:06:38 +0530 Subject: [PATCH 11/16] temp --- posting/list.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/posting/list.go b/posting/list.go index 1a3c995bf02..5c91c853a17 100644 --- a/posting/list.go +++ b/posting/list.go @@ -374,14 +374,13 @@ func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) { mm.currentEntries.Postings[postIndex] = mpost return } - delete(mm.currentUids, mpost.Uid) res := mm.currentEntries.Postings[:postIndex] if postIndex+1 <= len(mm.currentEntries.Postings) { - for i, pos := range mm.currentEntries.Postings[(postIndex + 1):] { - mm.currentUids[pos.Uid] = i + postIndex - } mm.currentEntries.Postings = append(res, mm.currentEntries.Postings[(postIndex+1):]...) + mm.currentUids = nil + } else { + delete(mm.currentUids, mpost.Uid) } mm.currentEntries.Postings = res return From fe45995eae64be1599fbfdc03fbf5613d45d103e Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 29 Jan 2025 07:09:14 +0530 Subject: [PATCH 12/16] fixed live load issue --- posting/list.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/posting/list.go b/posting/list.go index 5c91c853a17..e9ccaf984a0 100644 --- a/posting/list.go +++ b/posting/list.go @@ -378,10 +378,8 @@ func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) { if postIndex+1 <= len(mm.currentEntries.Postings) { mm.currentEntries.Postings = append(res, mm.currentEntries.Postings[(postIndex+1):]...) - mm.currentUids = nil - } else { - delete(mm.currentUids, mpost.Uid) } + mm.currentUids = nil mm.currentEntries.Postings = res return } From 0f16d381c6e855d535c4a9840fba113b83d24b0b Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 29 Jan 2025 09:52:08 +0530 Subject: [PATCH 13/16] addes somethign --- posting/list.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posting/list.go b/posting/list.go index e9ccaf984a0..cc4a7e88418 100644 --- a/posting/list.go +++ b/posting/list.go @@ -921,7 +921,7 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID && pk.IsData() && mpost.Op != Del && mpost.PostingType == pb.Posting_REF - if err != l.updateMutationLayer(mpost, isSingleUidUpdate, pred.GetCount()) { + if err != l.updateMutationLayer(mpost, isSingleUidUpdate, pred.GetCount() && pk.IsData()) { return errors.Wrapf(err, "cannot update mutation layer of key %s with value %+v", hex.EncodeToString(l.key), mpost) } From d15410ee3f5bb368dccadad855bb72adf63ee430 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 29 Jan 2025 12:53:22 +0530 Subject: [PATCH 14/16] fixed another test --- posting/index.go | 2 +- posting/list.go | 2 +- worker/sort_test.go | 61 ++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/posting/index.go b/posting/index.go index 09859709422..2a8e215dca9 100644 --- a/posting/index.go +++ b/posting/index.go @@ -364,7 +364,7 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd Facets: t.Facets, } - cp, err := txn.addReverseMutationHelper(ctx, plist, hasCountIndex, edge) + cp, err := txn.addReverseMutationHelper(ctx, plist, true, edge) if err != nil { return err } diff --git a/posting/list.go b/posting/list.go index cc4a7e88418..ca6f627913b 100644 --- a/posting/list.go +++ b/posting/list.go @@ -921,7 +921,7 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID && pk.IsData() && mpost.Op != Del && mpost.PostingType == pb.Posting_REF - if err != l.updateMutationLayer(mpost, isSingleUidUpdate, pred.GetCount() && pk.IsData()) { + if err != l.updateMutationLayer(mpost, isSingleUidUpdate, pred.GetCount() && (pk.IsData() || pk.IsReverse())) { return errors.Wrapf(err, "cannot update mutation layer of key %s with value %+v", hex.EncodeToString(l.key), mpost) } diff --git a/worker/sort_test.go b/worker/sort_test.go index f363efb1851..5f6ac8f861f 100644 --- a/worker/sort_test.go +++ b/worker/sort_test.go @@ -66,6 +66,65 @@ func writePostingListToDisk(kvs []*bpb.KV) error { return writer.Flush() } +func TestMultipleTxnListCount(t *testing.T) { + dir, err := os.MkdirTemp("", "storetest_") + x.Check(err) + defer os.RemoveAll(dir) + + opt := badger.DefaultOptions(dir) + ps, err := badger.OpenManaged(opt) + x.Check(err) + pstore = ps + posting.Init(ps, 0, false) + Init(ps) + err = schema.ParseBytes([]byte("scalarPredicateCount3: [uid] @count ."), 1) + require.NoError(t, err) + + ctx := context.Background() + attr := x.GalaxyAttr("scalarPredicateCount3") + + runM := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) { + txn := posting.Oracle().RegisterStartTs(startTs) + for _, edge := range edges { + x.Check(runMutation(ctx, edge, txn)) + } + txn.Update() + writer := posting.NewTxnWriter(pstore) + require.NoError(t, txn.CommitToDisk(writer, commitTs)) + require.NoError(t, writer.Flush()) + txn.UpdateCachedKeys(commitTs) + } + + runM(9, 11, []*pb.DirectedEdge{{ + ValueId: 3, + ValueType: pb.Posting_UID, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_SET, + }, { + ValueId: 2, + ValueType: pb.Posting_UID, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_SET, + }}) + + txn := posting.Oracle().RegisterStartTs(13) + key := x.CountKey(attr, 1, false) + l, err := txn.Get(key) + require.Nil(t, err) + uids, err := l.Uids(posting.ListOptions{ReadTs: 13}) + require.Nil(t, err) + require.Equal(t, 0, len(uids.Uids)) + + key = x.CountKey(attr, 2, false) + l, err = txn.Get(key) + require.Nil(t, err) + uids, err = l.Uids(posting.ListOptions{ReadTs: 13}) + require.Nil(t, err) + require.Equal(t, 1, len(uids.Uids)) +} + func TestScalarPredicateRevCount(t *testing.T) { dir, err := os.MkdirTemp("", "storetest_") x.Check(err) @@ -110,7 +169,7 @@ func TestScalarPredicateRevCount(t *testing.T) { }}) txn := posting.Oracle().RegisterStartTs(13) - key := x.CountKey(attr, 1, false) + key := x.DataKey(attr, 1) l, err := txn.Get(key) l.RLock() require.Equal(t, 0, l.GetLength(13)) From 2f9f302559cdec9d3d1e7142f2dff39daac13221 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Thu, 30 Jan 2025 01:11:08 +0530 Subject: [PATCH 15/16] fixed another test --- posting/index.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/posting/index.go b/posting/index.go index 2a8e215dca9..8f698d257e4 100644 --- a/posting/index.go +++ b/posting/index.go @@ -253,7 +253,8 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List, hasCountIndex bool, edge *pb.DirectedEdge) (countParams, error) { countBefore, countAfter := 0, 0 found := false - isScalarPredicate := !schema.State().IsList(edge.Attr) + pk, _ := x.Parse(plist.key) + isScalarPredicate := !schema.State().IsList(edge.Attr) && !pk.IsReverse() plist.Lock() defer plist.Unlock() From a2f573d1618295851124c0fc88f4ed293dc8b715 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Thu, 30 Jan 2025 04:12:52 +0530 Subject: [PATCH 16/16] temp --- posting/index.go | 14 ++++++++------ posting/list.go | 44 +++++++++++++++++++++++++------------------- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/posting/index.go b/posting/index.go index 8f698d257e4..562083d6546 100644 --- a/posting/index.go +++ b/posting/index.go @@ -253,8 +253,6 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List, hasCountIndex bool, edge *pb.DirectedEdge) (countParams, error) { countBefore, countAfter := 0, 0 found := false - pk, _ := x.Parse(plist.key) - isScalarPredicate := !schema.State().IsList(edge.Attr) && !pk.IsReverse() plist.Lock() defer plist.Unlock() @@ -272,7 +270,9 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List, } if hasCountIndex { - countAfter = countAfterMutation(countBefore, found, edge.Op, isScalarPredicate) + pk, _ := x.Parse(plist.key) + shouldCountOneUid := !schema.State().IsList(edge.Attr) && !pk.IsReverse() && false + countAfter = countAfterMutation(countBefore, found, edge.Op, shouldCountOneUid) return countParams{ attr: edge.Attr, countBefore: countBefore, @@ -483,8 +483,8 @@ func (txn *Txn) updateCount(ctx context.Context, params countParams) error { // what is the count. For non scalar predicate, we need to use found and the operation that the user did to figure out // if the new node was inserted or not. However, for single uid predicates this information is not useful. For scalar // predicate, delete only works if the value was found. Set would just result in 1 alaways. -func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op, isScalarPredicate bool) int { - if isScalarPredicate { +func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op, shouldCountOneUid bool) int { + if shouldCountOneUid { if op == pb.DirectedEdge_SET { return 1 } else if op == pb.DirectedEdge_DEL && found { @@ -579,7 +579,9 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo } if hasCountIndex { - countAfter = countAfterMutation(countBefore, found, t.Op, isScalarPredicate) + pk, _ := x.Parse(l.key) + shouldCountOneUid := isScalarPredicate && !pk.IsReverse() && false + countAfter = countAfterMutation(countBefore, found, t.Op, shouldCountOneUid) return val, found, countParams{ attr: t.Attr, countBefore: countBefore, diff --git a/posting/list.go b/posting/list.go index ca6f627913b..6f940d5210c 100644 --- a/posting/list.go +++ b/posting/list.go @@ -365,24 +365,24 @@ func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) { // there might be a performance hit becasue of it. mm.populateUidMap(mm.currentEntries) if postIndex, ok := mm.currentUids[mpost.Uid]; ok { - if hasCountIndex && mpost.Op == Del { - // If the posting was there before, just remove it from the map, and then remove it - // from the array. - post := mm.currentEntries.Postings[postIndex] - if post.Op == Del { - // No need to do anything - mm.currentEntries.Postings[postIndex] = mpost - return - } - res := mm.currentEntries.Postings[:postIndex] - if postIndex+1 <= len(mm.currentEntries.Postings) { - mm.currentEntries.Postings = append(res, - mm.currentEntries.Postings[(postIndex+1):]...) - } - mm.currentUids = nil - mm.currentEntries.Postings = res - return - } + //if hasCountIndex && mpost.Op == Del { + // // If the posting was there before, just remove it from the map, and then remove it + // // from the array. + // post := mm.currentEntries.Postings[postIndex] + // if post.Op == Del { + // // No need to do anything + // mm.currentEntries.Postings[postIndex] = mpost + // return + // } + // res := mm.currentEntries.Postings[:postIndex] + // if postIndex+1 <= len(mm.currentEntries.Postings) { + // mm.currentEntries.Postings = append(res, + // mm.currentEntries.Postings[(postIndex+1):]...) + // } + // mm.currentUids = nil + // mm.currentEntries.Postings = res + // return + //} mm.currentEntries.Postings[postIndex] = mpost } else { mm.currentEntries.Postings = append(mm.currentEntries.Postings, mpost) @@ -774,7 +774,13 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate, hasCountI // Add the deletions in the existing plist because those postings are not picked // up by iterating. Not doing so would result in delete operations that are not // applied when the transaction is committed. - l.mutationMap.currentEntries = &pb.PostingList{} + //l.mutationMap.currentEntries = &pb.PostingList{} + for _, post := range l.mutationMap.currentEntries.Postings { + if post.Op == Del && post.Uid != mpost.Uid { + newPlist.Postings = append(newPlist.Postings, post) + } + } + err := l.iterate(mpost.StartTs, 0, func(obj *pb.Posting) error { // Ignore values which have the same uid as they will get replaced // by the current value.