Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Harshil goel/temp fix count #9295

Closed
wants to merge 16 commits into from
31 changes: 26 additions & 5 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,17 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List,
return emptyCountParams, errors.Wrapf(ErrTsTooOld, "Adding reverse mutation helper count")
}
}

if !(hasCountIndex && !shouldAddCountEdge(found, edge)) {
if err := plist.addMutationInternal(ctx, txn, edge); err != nil {
return emptyCountParams, err
}
}

if hasCountIndex {
countAfter = countAfterMutation(countBefore, found, edge.Op)
pk, _ := x.Parse(plist.key)
shouldCountOneUid := !schema.State().IsList(edge.Attr) && !pk.IsReverse() && false
countAfter = countAfterMutation(countBefore, found, edge.Op, shouldCountOneUid)
return countParams{
attr: edge.Attr,
countBefore: countBefore,
Expand Down Expand Up @@ -361,7 +365,7 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd
Facets: t.Facets,
}

cp, err := txn.addReverseMutationHelper(ctx, plist, hasCountIndex, edge)
cp, err := txn.addReverseMutationHelper(ctx, plist, true, edge)
if err != nil {
return err
}
Expand Down Expand Up @@ -475,7 +479,21 @@ func (txn *Txn) updateCount(ctx context.Context, params countParams) error {
return nil
}

func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op) int {
// Gives the count of the posting after the mutation has finished. Currently we use this to figure after the mutation
// what is the count. For non scalar predicate, we need to use found and the operation that the user did to figure out
// if the new node was inserted or not. However, for single uid predicates this information is not useful. For scalar
// predicate, delete only works if the value was found. Set would just result in 1 alaways.
func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op, shouldCountOneUid bool) int {
if shouldCountOneUid {
if op == pb.DirectedEdge_SET {
return 1
} else if op == pb.DirectedEdge_DEL && found {
return 0
} else {
return countBefore
}
}

if !found && op != pb.DirectedEdge_DEL {
return countBefore + 1
} else if found && op == pb.DirectedEdge_DEL {
Expand Down Expand Up @@ -516,7 +534,8 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo
var found bool
var err error

delNonListPredicate := !schema.State().IsList(t.Attr) &&
isScalarPredicate := !schema.State().IsList(t.Attr)
delNonListPredicate := isScalarPredicate &&
t.Op == pb.DirectedEdge_DEL && string(t.Value) != x.Star

switch {
Expand Down Expand Up @@ -560,7 +579,9 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo
}

if hasCountIndex {
countAfter = countAfterMutation(countBefore, found, t.Op)
pk, _ := x.Parse(l.key)
shouldCountOneUid := isScalarPredicate && !pk.IsReverse() && false
countAfter = countAfterMutation(countBefore, found, t.Op, shouldCountOneUid)
return val, found, countParams{
attr: t.Attr,
countBefore: countBefore,
Expand Down
38 changes: 29 additions & 9 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@
}

// insertPosting inserts a new posting in the mutable layers. It updates the currentUids map.
func (mm *MutableLayer) insertPosting(mpost *pb.Posting) {
func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) {
if mm.readTs != 0 {
x.AssertTrue(mpost.StartTs == mm.readTs)
}
Expand All @@ -359,8 +359,30 @@
}

if mpost.Uid != 0 {
// If hasCountIndex, in that case while inserting uids, if there's a delete, we only delete from the
// current entries, we dont' insert the delete posting. If we insert the delete posting, there won't be
// any set posting in the list. This would mess up the count. We can do this for all types, however,
// there might be a performance hit becasue of it.
mm.populateUidMap(mm.currentEntries)
if postIndex, ok := mm.currentUids[mpost.Uid]; ok {
//if hasCountIndex && mpost.Op == Del {
// // If the posting was there before, just remove it from the map, and then remove it
// // from the array.
// post := mm.currentEntries.Postings[postIndex]
// if post.Op == Del {
// // No need to do anything
// mm.currentEntries.Postings[postIndex] = mpost
// return
// }
// res := mm.currentEntries.Postings[:postIndex]
// if postIndex+1 <= len(mm.currentEntries.Postings) {
// mm.currentEntries.Postings = append(res,
// mm.currentEntries.Postings[(postIndex+1):]...)
// }
// mm.currentUids = nil
// mm.currentEntries.Postings = res
// return
//}
mm.currentEntries.Postings[postIndex] = mpost
} else {
mm.currentEntries.Postings = append(mm.currentEntries.Postings, mpost)
Expand All @@ -382,7 +404,7 @@
mm.deleteAllMarker)
}

func (l *List) print() string {
func (l *List) Print() string {
return fmt.Sprintf("minTs: %d, plist: %+v, mutationMap: %s", l.minTs, l.plist, l.mutationMap.print())
}

Expand Down Expand Up @@ -721,7 +743,7 @@
}

// Ensure that you either abort the uncommitted postings or commit them before calling me.
func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) error {
func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate, hasCountIndex bool) error {
l.AssertLock()
x.AssertTrue(mpost.Op == Set || mpost.Op == Del || mpost.Op == Ovr)

Expand Down Expand Up @@ -752,6 +774,7 @@
// Add the deletions in the existing plist because those postings are not picked
// up by iterating. Not doing so would result in delete operations that are not
// applied when the transaction is committed.
//l.mutationMap.currentEntries = &pb.PostingList{}
for _, post := range l.mutationMap.currentEntries.Postings {
if post.Op == Del && post.Uid != mpost.Uid {
newPlist.Postings = append(newPlist.Postings, post)
Expand All @@ -776,14 +799,11 @@
if err != nil {
return err
}

// Update the mutation map with the new plist. Return here since the code below
// does not apply for predicates of type uid.
l.mutationMap.setCurrentEntries(mpost.StartTs, newPlist)
return nil
}

l.mutationMap.insertPosting(mpost)
l.mutationMap.insertPosting(mpost, hasCountIndex)
return nil
}

Expand Down Expand Up @@ -907,7 +927,7 @@
isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID &&
pk.IsData() && mpost.Op != Del && mpost.PostingType == pb.Posting_REF

if err != l.updateMutationLayer(mpost, isSingleUidUpdate) {
if err != l.updateMutationLayer(mpost, isSingleUidUpdate, pred.GetCount() && (pk.IsData() || pk.IsReverse())) {
return errors.Wrapf(err, "cannot update mutation layer of key %s with value %+v",
hex.EncodeToString(l.key), mpost)
}
Expand Down Expand Up @@ -1123,7 +1143,7 @@
// pitr iterates through immutable postings
err = pitr.seek(l, afterUid, deleteBelowTs)
if err != nil {
return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate "+l.print())
return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate "+l.Print())

Check failure on line 1146 in posting/list.go

View check run for this annotation

Trunk.io / Trunk Check

golangci-lint(govet)

[new] printf: non-constant format string in call to github.com/pkg/errors.Wrapf
}

loop:
Expand Down
Loading
Loading