Skip to content

Commit 37681ff

Browse files
authored
feat(datastore): Support aggregation query in transaction (#8439)
* feat(datastore): Support aggregation query in transaction * feat(datastore): Refactoring integration test * feat(datastore): Integration tests for sum and average
1 parent a9fff18 commit 37681ff

File tree

3 files changed

+51
-12
lines changed

3 files changed

+51
-12
lines changed

datastore/integration_test.go

+51-6
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,15 @@ func TestIntegration_AggregationQueries(t *testing.T) {
722722
for i := range keys {
723723
keys[i] = IncompleteKey("SQChild", parent)
724724
}
725-
keys, err := client.PutMulti(ctx, keys, children)
725+
726+
// Create transaction with read before creating entities
727+
readTime := time.Now()
728+
txBeforeCreate, err := client.NewTransaction(ctx, []TransactionOption{ReadOnly, WithReadTime(readTime)}...)
729+
if err != nil {
730+
t.Fatalf("client.NewTransaction: %v", err)
731+
}
732+
733+
keys, err = client.PutMulti(ctx, keys, children)
726734
if err != nil {
727735
t.Fatalf("client.PutMulti: %v", err)
728736
}
@@ -733,13 +741,22 @@ func TestIntegration_AggregationQueries(t *testing.T) {
733741
}
734742
}()
735743

744+
// Create transaction with read after creating entities
745+
readTime = time.Now()
746+
txAfterCreate, err := client.NewTransaction(ctx, []TransactionOption{ReadOnly, WithReadTime(readTime)}...)
747+
if err != nil {
748+
t.Fatalf("client.NewTransaction: %v", err)
749+
}
750+
736751
testCases := []struct {
737-
desc string
738-
aggQuery *AggregationQuery
739-
wantFailure bool
740-
wantErrMsg string
741-
wantAggResult AggregationResult
752+
desc string
753+
aggQuery *AggregationQuery
754+
transactionOpts []TransactionOption
755+
wantFailure bool
756+
wantErrMsg string
757+
wantAggResult AggregationResult
742758
}{
759+
743760
{
744761
desc: "Count Failure - Missing index",
745762
aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T>=", now).
@@ -757,6 +774,34 @@ func TestIntegration_AggregationQueries(t *testing.T) {
757774
"count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 5}},
758775
},
759776
},
777+
{
778+
desc: "Aggregations in transaction before creating entities",
779+
aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).
780+
Transaction(txBeforeCreate).
781+
NewAggregationQuery().
782+
WithCount("count").
783+
WithSum("I", "sum").
784+
WithAvg("I", "avg"),
785+
wantAggResult: map[string]interface{}{
786+
"count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 0}},
787+
"sum": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 0}},
788+
"avg": &pb.Value{ValueType: &pb.Value_NullValue{NullValue: structpb.NullValue_NULL_VALUE}},
789+
},
790+
},
791+
{
792+
desc: "Aggregations in transaction after creating entities",
793+
aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).
794+
Transaction(txAfterCreate).
795+
NewAggregationQuery().
796+
WithCount("count").
797+
WithSum("I", "sum").
798+
WithAvg("I", "avg"),
799+
wantAggResult: map[string]interface{}{
800+
"count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 8}},
801+
"sum": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 28}},
802+
"avg": &pb.Value{ValueType: &pb.Value_DoubleValue{DoubleValue: 3.5}},
803+
},
804+
},
760805
{
761806
desc: "Multiple aggregations",
762807
aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).

datastore/query.go

-1
Original file line numberDiff line numberDiff line change
@@ -1026,7 +1026,6 @@ func DecodeCursor(s string) (Cursor, error) {
10261026
// NewAggregationQuery returns an AggregationQuery with this query as its
10271027
// base query.
10281028
func (q *Query) NewAggregationQuery() *AggregationQuery {
1029-
q.eventual = true
10301029
return &AggregationQuery{
10311030
query: q,
10321031
aggregationQueries: make([]*pb.AggregationQuery_Aggregation, 0),

datastore/query_test.go

-5
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,6 @@ func fakeRunAggregationQuery(req *pb.RunAggregationQueryRequest) (*pb.RunAggrega
126126
},
127127
},
128128
},
129-
ReadOptions: &pb.ReadOptions{
130-
ConsistencyType: &pb.ReadOptions_ReadConsistency_{
131-
ReadConsistency: pb.ReadOptions_EVENTUAL,
132-
},
133-
},
134129
}
135130
if !proto.Equal(req, expectedIn) {
136131
return nil, fmt.Errorf("unsupported argument: got %v want %v", req, expectedIn)

0 commit comments

Comments
 (0)