Skip to content

Commit

Permalink
fix(core): fix issues with count index for scalar predicates (#9292)
Browse files Browse the repository at this point in the history
  • Loading branch information
harshil-goel authored Jan 30, 2025
1 parent 5dbcd0f commit df56a2f
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 17 deletions.
29 changes: 25 additions & 4 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,17 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List,
return emptyCountParams, errors.Wrapf(ErrTsTooOld, "Adding reverse mutation helper count")
}
}

if !(hasCountIndex && !shouldAddCountEdge(found, edge)) {
if err := plist.addMutationInternal(ctx, txn, edge); err != nil {
return emptyCountParams, err
}
}

if hasCountIndex {
countAfter = countAfterMutation(countBefore, found, edge.Op)
pk, _ := x.Parse(plist.key)
shouldCountOneUid := !schema.State().IsList(edge.Attr) && !pk.IsReverse()
countAfter = countAfterMutation(countBefore, found, edge.Op, shouldCountOneUid)
return countParams{
attr: edge.Attr,
countBefore: countBefore,
Expand Down Expand Up @@ -475,7 +479,21 @@ func (txn *Txn) updateCount(ctx context.Context, params countParams) error {
return nil
}

func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op) int {
// Gives the count of the posting after the mutation has finished. Currently we use this to figure after the mutation
// what is the count. For non scalar predicate, we need to use found and the operation that the user did to figure out
// if the new node was inserted or not. However, for single uid predicates this information is not useful. For scalar
// predicate, delete only works if the value was found. Set would just result in 1 alaways.
func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op, shouldCountOneUid bool) int {
if shouldCountOneUid {
if op == pb.DirectedEdge_SET {
return 1
} else if op == pb.DirectedEdge_DEL && found {
return 0
} else {
return countBefore
}
}

if !found && op != pb.DirectedEdge_DEL {
return countBefore + 1
} else if found && op == pb.DirectedEdge_DEL {
Expand Down Expand Up @@ -516,7 +534,8 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo
var found bool
var err error

delNonListPredicate := !schema.State().IsList(t.Attr) &&
isScalarPredicate := !schema.State().IsList(t.Attr)
delNonListPredicate := isScalarPredicate &&
t.Op == pb.DirectedEdge_DEL && string(t.Value) != x.Star

switch {
Expand Down Expand Up @@ -560,7 +579,9 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo
}

if hasCountIndex {
countAfter = countAfterMutation(countBefore, found, t.Op)
pk, _ := x.Parse(l.key)
shouldCountOneUid := isScalarPredicate && !pk.IsReverse()
countAfter = countAfterMutation(countBefore, found, t.Op, shouldCountOneUid)
return val, found, countParams{
attr: t.Attr,
countBefore: countBefore,
Expand Down
40 changes: 27 additions & 13 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (mm *MutableLayer) populateUidMap(pl *pb.PostingList) {
}

// insertPosting inserts a new posting in the mutable layers. It updates the currentUids map.
func (mm *MutableLayer) insertPosting(mpost *pb.Posting) {
func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) {
if mm.readTs != 0 {
x.AssertTrue(mpost.StartTs == mm.readTs)
}
Expand All @@ -359,8 +359,30 @@ func (mm *MutableLayer) insertPosting(mpost *pb.Posting) {
}

if mpost.Uid != 0 {
// If hasCountIndex, in that case while inserting uids, if there's a delete, we only delete from the
// current entries, we dont' insert the delete posting. If we insert the delete posting, there won't be
// any set posting in the list. This would mess up the count. We can do this for all types, however,
// there might be a performance hit becasue of it.
mm.populateUidMap(mm.currentEntries)
if postIndex, ok := mm.currentUids[mpost.Uid]; ok {
if hasCountIndex && mpost.Op == Del {
// If the posting was there before, just remove it from the map, and then remove it
// from the array.
post := mm.currentEntries.Postings[postIndex]
if post.Op == Del {
// No need to do anything
mm.currentEntries.Postings[postIndex] = mpost
return
}
res := mm.currentEntries.Postings[:postIndex]
if postIndex+1 <= len(mm.currentEntries.Postings) {
mm.currentEntries.Postings = append(res,
mm.currentEntries.Postings[(postIndex+1):]...)
}
mm.currentUids = nil
mm.currentEntries.Postings = res
return
}
mm.currentEntries.Postings[postIndex] = mpost
} else {
mm.currentEntries.Postings = append(mm.currentEntries.Postings, mpost)
Expand Down Expand Up @@ -724,7 +746,7 @@ func hasDeleteAll(mpost *pb.Posting) bool {
}

// Ensure that you either abort the uncommitted postings or commit them before calling me.
func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) error {
func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate, hasCountIndex bool) error {
l.AssertLock()
x.AssertTrue(mpost.Op == Set || mpost.Op == Del || mpost.Op == Ovr)

Expand Down Expand Up @@ -755,12 +777,7 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) erro
// Add the deletions in the existing plist because those postings are not picked
// up by iterating. Not doing so would result in delete operations that are not
// applied when the transaction is committed.
for _, post := range l.mutationMap.currentEntries.Postings {
if post.Op == Del && post.Uid != mpost.Uid {
newPlist.Postings = append(newPlist.Postings, post)
}
}

l.mutationMap.currentEntries = &pb.PostingList{}
err := l.iterate(mpost.StartTs, 0, func(obj *pb.Posting) error {
// Ignore values which have the same uid as they will get replaced
// by the current value.
Expand All @@ -779,14 +796,11 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) erro
if err != nil {
return err
}

// Update the mutation map with the new plist. Return here since the code below
// does not apply for predicates of type uid.
l.mutationMap.setCurrentEntries(mpost.StartTs, newPlist)
return nil
}

l.mutationMap.insertPosting(mpost)
l.mutationMap.insertPosting(mpost, hasCountIndex)
return nil
}

Expand Down Expand Up @@ -910,7 +924,7 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed
isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID &&
pk.IsData() && mpost.Op != Del && mpost.PostingType == pb.Posting_REF

if err != l.updateMutationLayer(mpost, isSingleUidUpdate) {
if err != l.updateMutationLayer(mpost, isSingleUidUpdate, pred.GetCount() && (pk.IsData() || pk.IsReverse())) {
return errors.Wrapf(err, "cannot update mutation layer of key %s with value %+v",
hex.EncodeToString(l.key), mpost)
}
Expand Down
Loading

0 comments on commit df56a2f

Please sign in to comment.