Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Fix count index for scalar predicates #9292

Merged
merged 18 commits into from
Jan 30, 2025
29 changes: 25 additions & 4 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,17 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List,
return emptyCountParams, errors.Wrapf(ErrTsTooOld, "Adding reverse mutation helper count")
}
}

if !(hasCountIndex && !shouldAddCountEdge(found, edge)) {
if err := plist.addMutationInternal(ctx, txn, edge); err != nil {
return emptyCountParams, err
}
}

if hasCountIndex {
countAfter = countAfterMutation(countBefore, found, edge.Op)
pk, _ := x.Parse(plist.key)
shouldCountOneUid := !schema.State().IsList(edge.Attr) && !pk.IsReverse()
countAfter = countAfterMutation(countBefore, found, edge.Op, shouldCountOneUid)
return countParams{
attr: edge.Attr,
countBefore: countBefore,
Expand Down Expand Up @@ -475,7 +479,21 @@ func (txn *Txn) updateCount(ctx context.Context, params countParams) error {
return nil
}

func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op) int {
// Gives the count of the posting after the mutation has finished. Currently we use this to figure after the mutation
// what is the count. For non scalar predicate, we need to use found and the operation that the user did to figure out
// if the new node was inserted or not. However, for single uid predicates this information is not useful. For scalar
// predicate, delete only works if the value was found. Set would just result in 1 alaways.
func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op, shouldCountOneUid bool) int {
if shouldCountOneUid {
if op == pb.DirectedEdge_SET {
return 1
} else if op == pb.DirectedEdge_DEL && found {
return 0
} else {
return countBefore
}
}

if !found && op != pb.DirectedEdge_DEL {
return countBefore + 1
} else if found && op == pb.DirectedEdge_DEL {
Expand Down Expand Up @@ -516,7 +534,8 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo
var found bool
var err error

delNonListPredicate := !schema.State().IsList(t.Attr) &&
isScalarPredicate := !schema.State().IsList(t.Attr)
delNonListPredicate := isScalarPredicate &&
t.Op == pb.DirectedEdge_DEL && string(t.Value) != x.Star

switch {
Expand Down Expand Up @@ -560,7 +579,9 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo
}

if hasCountIndex {
countAfter = countAfterMutation(countBefore, found, t.Op)
pk, _ := x.Parse(l.key)
shouldCountOneUid := isScalarPredicate && !pk.IsReverse()
countAfter = countAfterMutation(countBefore, found, t.Op, shouldCountOneUid)
return val, found, countParams{
attr: t.Attr,
countBefore: countBefore,
Expand Down
40 changes: 27 additions & 13 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (mm *MutableLayer) populateUidMap(pl *pb.PostingList) {
}

// insertPosting inserts a new posting in the mutable layers. It updates the currentUids map.
func (mm *MutableLayer) insertPosting(mpost *pb.Posting) {
func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) {
if mm.readTs != 0 {
x.AssertTrue(mpost.StartTs == mm.readTs)
}
Expand All @@ -359,8 +359,30 @@ func (mm *MutableLayer) insertPosting(mpost *pb.Posting) {
}

if mpost.Uid != 0 {
// If hasCountIndex, in that case while inserting uids, if there's a delete, we only delete from the
// current entries, we dont' insert the delete posting. If we insert the delete posting, there won't be
// any set posting in the list. This would mess up the count. We can do this for all types, however,
// there might be a performance hit becasue of it.
mm.populateUidMap(mm.currentEntries)
if postIndex, ok := mm.currentUids[mpost.Uid]; ok {
if hasCountIndex && mpost.Op == Del {
// If the posting was there before, just remove it from the map, and then remove it
// from the array.
post := mm.currentEntries.Postings[postIndex]
if post.Op == Del {
// No need to do anything
mm.currentEntries.Postings[postIndex] = mpost
return
}
res := mm.currentEntries.Postings[:postIndex]
if postIndex+1 <= len(mm.currentEntries.Postings) {
mm.currentEntries.Postings = append(res,
mm.currentEntries.Postings[(postIndex+1):]...)
}
mm.currentUids = nil
mm.currentEntries.Postings = res
return
}
mm.currentEntries.Postings[postIndex] = mpost
} else {
mm.currentEntries.Postings = append(mm.currentEntries.Postings, mpost)
Expand Down Expand Up @@ -721,7 +743,7 @@ func hasDeleteAll(mpost *pb.Posting) bool {
}

// Ensure that you either abort the uncommitted postings or commit them before calling me.
func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) error {
func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate, hasCountIndex bool) error {
l.AssertLock()
x.AssertTrue(mpost.Op == Set || mpost.Op == Del || mpost.Op == Ovr)

Expand Down Expand Up @@ -752,12 +774,7 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) erro
// Add the deletions in the existing plist because those postings are not picked
// up by iterating. Not doing so would result in delete operations that are not
// applied when the transaction is committed.
for _, post := range l.mutationMap.currentEntries.Postings {
if post.Op == Del && post.Uid != mpost.Uid {
newPlist.Postings = append(newPlist.Postings, post)
}
}

l.mutationMap.currentEntries = &pb.PostingList{}
err := l.iterate(mpost.StartTs, 0, func(obj *pb.Posting) error {
// Ignore values which have the same uid as they will get replaced
// by the current value.
Expand All @@ -776,14 +793,11 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) erro
if err != nil {
return err
}

// Update the mutation map with the new plist. Return here since the code below
// does not apply for predicates of type uid.
l.mutationMap.setCurrentEntries(mpost.StartTs, newPlist)
return nil
}

l.mutationMap.insertPosting(mpost)
l.mutationMap.insertPosting(mpost, hasCountIndex)
return nil
}

Expand Down Expand Up @@ -907,7 +921,7 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed
isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID &&
pk.IsData() && mpost.Op != Del && mpost.PostingType == pb.Posting_REF

if err != l.updateMutationLayer(mpost, isSingleUidUpdate) {
if err != l.updateMutationLayer(mpost, isSingleUidUpdate, pred.GetCount() && (pk.IsData() || pk.IsReverse())) {
return errors.Wrapf(err, "cannot update mutation layer of key %s with value %+v",
hex.EncodeToString(l.key), mpost)
}
Expand Down
Loading