From d21430f4deaac926a16e7f1d9a61b64abbfd7c0b Mon Sep 17 00:00:00 2001 From: SenLin <38672652+formuzi@users.noreply.github.com> Date: Mon, 28 Jan 2019 18:01:27 -0800 Subject: [PATCH] support keyword and key/value search pagination (#70) * support keyword and key/value search pagination 1. query?keyword=test&first=1&offset=1 2. query?objtype=pod&first=1&offset=4 "first" default value is 1000 and cannot exceed maximum value 10000 * change grpc client maximum receive size from default 4M to 20M * revert back entity service unit tests --- service/apis/entity_service.go | 14 ++- service/apis/entity_service_test.go | 30 ++--- service/apis/qsl_service.go | 31 ++--- service/apis/qsl_service_test.go | 2 +- service/apis/query_service.go | 167 ++++++++++++++++++--------- service/db/dgclient.go | 6 +- service/resources/server_resource.go | 10 +- service/server.go | 2 +- service/util/consts.go | 3 + 9 files changed, 166 insertions(+), 99 deletions(-) diff --git a/service/apis/entity_service.go b/service/apis/entity_service.go index 1637759..849ee4a 100644 --- a/service/apis/entity_service.go +++ b/service/apis/entity_service.go @@ -57,14 +57,14 @@ func (s EntityService) DeleteEntity(uuid string) error { // DeleteEntityByResourceID remove object by given resourceid func (s EntityService) DeleteEntityByResourceID(meta string, rid string) error { - qm := map[string][]string{util.ResourceID: {rid}, util.ObjType: {meta}} + qm := map[string][]string{util.ResourceID: {rid}, util.ObjType: {meta}, util.Print: {util.ResourceID}} queryService := NewQueryService(s.dbclient) node, err := queryService.GetQueryResult(qm) if err != nil { log.Error(err) return err } - if len(node[util.Objects].([]interface{})) > 0 { + if node[util.Count].(float64) > 0 { // got existing object id for _, obj := range node[util.Objects].([]interface{}) { err = s.dbclient.DeleteEntity(obj.(map[string]interface{})[util.UID].(string)) @@ -147,14 +147,16 @@ func (s EntityService) CreateEntity(meta string, data map[string]interface{}) (m if mutex.TryLock(data[util.ResourceID]) { defer mutex.Unlock(data[util.ResourceID]) // check if entity already exist - qm := map[string][]string{util.ResourceID: {data[util.ResourceID].(string)}, util.ObjType: {meta}} + qm := map[string][]string{util.ResourceID: {data[util.ResourceID].(string)}, + util.ObjType: {meta}, + util.Print: {util.ResourceVersion}} queryService := NewQueryService(s.dbclient) node, err := queryService.GetQueryResult(qm) if err != nil { log.Error(err) return nil, err } - if len(node[util.Objects].([]interface{})) > 0 { + if node[util.Count].(float64) > 0 { // got existing object id uid := node[util.Objects].([]interface{})[0].(map[string]interface{})[util.UID].(string) data[util.UID] = uid @@ -292,7 +294,7 @@ func buildDataMap(k8sObj interface{}, relData interface{}, relType string, clust // get uid from relationship object, if object not present, create it func (s EntityService) getUIDFromRelData(data map[string]interface{}, objType string) (*string, error) { // query by ResourceID to get uid - qm := map[string][]string{util.ResourceID: {data[util.ResourceID].(string)}, util.ObjType: {objType}} + qm := map[string][]string{util.ResourceID: {data[util.ResourceID].(string)}, util.ObjType: {objType}, util.Print: {util.ResourceID}} queryService := NewQueryService(s.dbclient) node, err := queryService.GetQueryResult(qm) if err != nil { @@ -300,7 +302,7 @@ func (s EntityService) getUIDFromRelData(data map[string]interface{}, objType st return nil, err } var uid string - if len(node[util.Objects].([]interface{})) > 0 { + if node[util.Count].(float64) > 0 { // got existing object id uid = node[util.Objects].([]interface{})[0].(map[string]interface{})[util.UID].(string) } else { diff --git a/service/apis/entity_service_test.go b/service/apis/entity_service_test.go index ea3bdd2..fe414d1 100644 --- a/service/apis/entity_service_test.go +++ b/service/apis/entity_service_test.go @@ -17,7 +17,7 @@ func TestCreateEntity(t *testing.T) { s := NewEntityService(dc) // create node node := map[string]interface{}{ - "objtype": "K8sNode", + "objtype": "k8snode", "name": "node02", "labels": "testingnode02", } @@ -25,14 +25,14 @@ func TestCreateEntity(t *testing.T) { dc.CreateSchema(db.Schema{Predicate: "name", Type: "string", Index: true, Tokenizer: []string{"term"}}) dc.CreateSchema(db.Schema{Predicate: "resourceid", Type: "string", Index: true, Tokenizer: []string{"term"}}) dc.CreateSchema(db.Schema{Predicate: "objtype", Type: "string", Index: true, Tokenizer: []string{"term"}}) - nids, _ := s.CreateEntity("K8sNode", node) + nids, _ := s.CreateEntity("k8snode", node) var nid string for _, v := range nids { nid = v break } defer s.DeleteEntity(nid) - n, _ := s.GetEntity("K8sNode", nid) + n, _ := s.GetEntity("k8snode", nid) o := n["objects"].([]interface{})[0].(map[string]interface{}) if val, ok := o["labels"]; ok { assert.Equal(t, val, "testingnode02", "node label not equals to testnode02") @@ -46,13 +46,13 @@ func TestDeleteEntityByRid(t *testing.T) { s := NewEntityService(dc) // create node node := map[string]interface{}{ - "objtype": "K8sNode", + "objtype": "k8snode", "name": "node02", "labels": "testingnode02", "resourceid": "noderid", } - s.CreateEntity("K8sNode", node) - err := s.DeleteEntityByResourceID("K8sNode", "noderid") + s.CreateEntity("k8snode", node) + err := s.DeleteEntityByResourceID("k8snode", "noderid") assert.Nil(t, err) dc.Close() } @@ -444,8 +444,10 @@ func TestMultiCreateEntity(t *testing.T) { dc := db.NewDGClient("127.0.0.1:9080") q := NewQueryService(dc) defer dc.Close() + dc.CreateSchema(db.Schema{Predicate: "name", Type: "string", Index: true, Tokenizer: []string{"term"}}) + dc.CreateSchema(db.Schema{Predicate: "resourceid", Type: "string", Index: true, Tokenizer: []string{"term"}}) + dc.CreateSchema(db.Schema{Predicate: "objtype", Type: "string", Index: true, Tokenizer: []string{"term"}}) s := NewEntityService(dc) - var wg sync.WaitGroup rest := make(chan map[string]string) wg.Add(100) @@ -453,12 +455,12 @@ func TestMultiCreateEntity(t *testing.T) { go func(version string) { defer wg.Done() node := map[string]interface{}{ - "objtype": "K8sNode", + "objtype": "k8snode", "name": "multinode", "label": version, "resourceid": "cluster:ns:multinode", } - nids, _ := s.CreateEntity("K8sNode", node) + nids, _ := s.CreateEntity("k8snode", node) rest <- nids }(strconv.Itoa(i)) } @@ -467,13 +469,13 @@ func TestMultiCreateEntity(t *testing.T) { go func(version string) { defer wg.Done() node := map[string]interface{}{ - "objtype": "K8sNode", + "objtype": "k8snode", "name": "multinode2", "label": version, "resourceid": "cluster:ns:multinode2", "resourceversion": version, } - nids, _ := s.CreateEntity("K8sNode", node) + nids, _ := s.CreateEntity("k8snode", node) rest <- nids }(strconv.Itoa(i)) } @@ -484,10 +486,10 @@ func TestMultiCreateEntity(t *testing.T) { } }() wg.Wait() - qm := map[string][]string{"resourceid": {"cluster:ns:multinode"}, "objtype": {"K8sNode"}} + qm := map[string][]string{"resourceid": {"cluster:ns:multinode"}, "objtype": {"k8snode"}} n, _ := q.GetQueryResult(qm) o := n["objects"].([]interface{}) assert.Equal(t, 1, len(o), "only one object expect to be created with same resourceid") - s.DeleteEntityByResourceID("K8sNode", "cluster:ns:multinode") - s.DeleteEntityByResourceID("K8sNode", "cluster:ns:multinode2") + s.DeleteEntityByResourceID("k8snode", "cluster:ns:multinode") + s.DeleteEntityByResourceID("k8snode", "cluster:ns:multinode2") } diff --git a/service/apis/qsl_service.go b/service/apis/qsl_service.go index e49e9d8..60c9be3 100644 --- a/service/apis/qsl_service.go +++ b/service/apis/qsl_service.go @@ -11,6 +11,7 @@ import ( "fmt" log "github.com/Sirupsen/logrus" "github.com/intuit/katlas/service/db" + "github.com/intuit/katlas/service/util" "strconv" ) @@ -23,7 +24,6 @@ var filterRegex = `\@([a-zA-Z0-9\(\)]*)([\!\<\>\=]*)(\"?[a-zA-Z0-9\-\.\|\&\:_]*\ // QSLService service for QSL type QSLService struct { DBclient db.IDGClient - metaSvc *MetaService } // MaximumLimit define pagination limit @@ -51,7 +51,8 @@ func IsStar(s string) bool { // GetMetadata - get a list of the fields supoorted for this object type func (qa *QSLService) GetMetadata(objtype string) ([]MetadataField, error) { - metafieldslist, err := qa.metaSvc.GetMetadataFields(objtype) + m := NewMetaService(qa.DBclient) + metafieldslist, err := m.GetMetadataFields(objtype) if err != nil { log.Error(err) return []MetadataField{}, errors.New("Failed to connect to dgraph to get metadata") @@ -64,8 +65,8 @@ func (qa *QSLService) GetMetadata(objtype string) ([]MetadataField, error) { } // NewQSLService creates an instance of a QSLService -func NewQSLService(host db.IDGClient, m *MetaService) *QSLService { - return &QSLService{host, m} +func NewQSLService(host db.IDGClient) *QSLService { + return &QSLService{host} } // CreateFiltersQuery translates the filters part of the qsl string to dgraph @@ -95,16 +96,19 @@ func CreateFiltersQuery(filterlist string) (string, string, string, error) { for _, item := range splitlist { splitval := strings.Split(item, "=") - if splitval[0] == "first" || splitval[0] == "offset" { + switch splitval[0] { + case util.First, util.Offset: paginate += "," + splitval[0] + ": " + splitval[1] - val, err := strconv.Atoi(splitval[1]) - if err != nil || val > MaximumLimit { - return "", "", "", fmt.Errorf("pagination format error or exceeding maxiumum limit %d", MaximumLimit) - } - } else { + default: return "", "", "", errors.New("Invalid pagination filters in " + filterlist) } - + val, err := strconv.Atoi(splitval[1]) + if err != nil { + return "", "", "", errors.New("Pagination format error " + filterlist) + } + if splitval[0] == util.First && val > MaximumLimit { + return "", "", "", fmt.Errorf("pagination exceeding maxiumum limit %d", MaximumLimit) + } } // get rid of the first comma paginate = paginate[0:] @@ -472,14 +476,13 @@ func (qa *QSLService) getRelationName(objType string, parent string) (string, er if !found { // if not, see if we can find the relation from the parent to this object - metafieldslist2, err := qa.metaSvc.GetMetadataFields(parent) + m := NewMetaService(qa.DBclient) + metafieldslist2, err := m.GetMetadataFields(parent) if err != nil { log.Error(err) return "", errors.New("Failed to connect to dgraph to get metadata") } log.Debugf("couldn't find relation for %s->%s,", parent, objType) - log.Debugf("metadata fields for %s: %#v", parent, metafieldslist) - for _, item := range metafieldslist2 { if item.FieldType == "relationship" { log.Debugf("2 found relationship for %s-%s->%s", parent, item.FieldName, item.RefDataType) diff --git a/service/apis/qsl_service_test.go b/service/apis/qsl_service_test.go index cd45b8f..a82d0aa 100644 --- a/service/apis/qsl_service_test.go +++ b/service/apis/qsl_service_test.go @@ -445,7 +445,7 @@ func TestCreateDgraphQuery(t *testing.T) { dc := db.NewDGClient(dgraphHost) defer dc.Close() metaSvc := NewMetaService(dc) - qslSvc := NewQSLService(dc, metaSvc) + qslSvc := NewQSLService(dc) // Initialize metadata meta, err := ioutil.ReadFile("../data/meta.json") diff --git a/service/apis/query_service.go b/service/apis/query_service.go index 2f727ac..3da37c6 100644 --- a/service/apis/query_service.go +++ b/service/apis/query_service.go @@ -5,8 +5,10 @@ import ( "strconv" "strings" + "bytes" log "github.com/Sirupsen/logrus" "github.com/intuit/katlas/service/db" + "github.com/intuit/katlas/service/util" ) //QueryParamKeyword ...param for keyword query @@ -29,100 +31,159 @@ func NewQueryService(dc db.IDGClient) *QueryService { //GetQueryResult ...Api to get Query Results func (s QueryService) GetQueryResult(queryMap map[string][]string) (map[string]interface{}, error) { - var q string var err error - - val, ok := queryMap[QueryParamKeyword] - if ok { + // default limit is 1000 + first, offset := 1000, 0 + // limit should be number and less than 10000 + if val, ok := queryMap[util.First]; ok { + first, err = strconv.Atoi(val[0]) + if err != nil || first > MaximumLimit { + return nil, fmt.Errorf("pagination format error or exceeding maxiumum limit %d", MaximumLimit) + } + } + // offset should be number + if val, ok := queryMap[util.Offset]; ok { + offset, err = strconv.Atoi(val[0]) + if err != nil { + return nil, err + } + } + // keyword search + if val, ok := queryMap[QueryParamKeyword]; ok { if val[0] == "" { err := fmt.Errorf("Value not specified for Query Param [%s]", QueryParamKeyword) return nil, err } - q, err = s.getQueryResultByKeyword(val[0]) + // generate queries include count query + q, cntQry, err := s.getQueryResultByKeyword(val[0], first, offset) + if err != nil { + log.Debug(err) + return nil, err + } + // execute query to get result + ret, err := s.dbclient.GetQueryResult(cntQry) if err != nil { log.Debug(err) return nil, err } - } else { - if len(queryMap) == 0 { - err := fmt.Errorf("Query Params not specified") + total := GetTotalCnt(ret) + ret, err = s.dbclient.GetQueryResult(q) + if err != nil { + log.Debug(err) return nil, err } - q = getQueryResultByKeyValue(queryMap) + ret[util.Count] = total + return ret, nil + } + // key value query + if len(queryMap) == 0 { + err := fmt.Errorf("Query Params not specified") + return nil, err + } + q, cntQry := getQueryResultByKeyValue(queryMap, first, offset) + ret, err := s.dbclient.GetQueryResult(cntQry) + if err != nil { + log.Debug(err) + return nil, err + } + total := GetTotalCnt(ret) + ret, err = s.dbclient.GetQueryResult(q) + if err != nil { + log.Debug(err) + return nil, err } + ret[util.Count] = total + return ret, nil +} - return s.dbclient.GetQueryResult(q) +// GetTotalCnt find count from returned value +func GetTotalCnt(data map[string]interface{}) float64 { + var total float64 + for _, obj := range data[util.Objects].([]interface{}) { + val, ok := obj.(map[string]interface{})[util.Count] + if ok { + total = val.(float64) + } + } + return total } // Keyword query http:///v1/query?keyword=pod -func (s QueryService) getQueryResultByKeyword(keyword string) (string, error) { +func (s QueryService) getQueryResultByKeyword(keyword string, first, offset int) (string, string, error) { smds, err := s.dbclient.GetSchemaFromCache(db.LruCache) if err != nil { log.Debug(err) - return "", err + return "", "", err } + // generate query as following + //{ + // A as var(func: regexp(name, /test/i)) {} + // B as var(func: regexp(labels, /test/i)) {} + // me(func: uid(A,B), first:1000,offset:0) { + // expand(_all_) { + // expand(_all_) + //}}} cnt := 0 - var qr string - qr = "{" - + statements := []string{"{"} for _, schemanode := range smds { - log.Debugf("Predicate: %v Type: %v tokenizer: %v\n", schemanode.Predicate, schemanode.Type, schemanode.Tokenizer) - if schemanode.Type == "string" && schemanode.Index == true && len(schemanode.Tokenizer) > 0 { for _, tokenizer := range schemanode.Tokenizer { tk := tokenizer if tk == "trigram" { - log.Debugf("Found ***** Predicate: %v Type: %v tokenizer: %v\n", schemanode.Predicate, schemanode.Type, schemanode.Tokenizer) - - filter := "obj" + strconv.Itoa(cnt) + "(func:regexp(" + schemanode.Predicate + ",/" + keyword + "/i)) {" - qr = qr + filter + ` - uid - expand(_all_) { - uid - expand(_all_) - } - } - ` + filter := "obj" + strconv.Itoa(cnt) + " as var(func:regexp(" + schemanode.Predicate + ",/" + keyword + "/i)) {}" + statements = append(statements, filter) + cnt++ } - } } - cnt++ } - qr = qr + "}" - log.Debugf("Query string is =%v\n", qr) - return qr, nil + var buf bytes.Buffer + for i := 0; i < cnt; i++ { + buf.WriteString("obj") + buf.WriteString(strconv.Itoa(i)) + if i < cnt-1 { + buf.WriteString(",") + } + } + + cntOnlyStatements := make([]string, len(statements)) + copy(cntOnlyStatements, statements) + cntTemplate := `objects(func: uid(%s)) { %s }` + cntQuery := fmt.Sprintf(cntTemplate, buf.String(), "count(uid)") + cntOnlyStatements = append(cntOnlyStatements, cntQuery) + cntOnlyStatements = append(cntOnlyStatements, "}") + template := `objects(func: uid(%s), first:%d,offset:%d) { %s }` + query := fmt.Sprintf(template, buf.String(), first, offset, "expand(_all_) { uid expand(_all_) }") + statements = append(statements, query) + statements = append(statements, "}") + return strings.Join(statements, "\n"), strings.Join(cntOnlyStatements, "\n"), nil } // Key-Value query http:///v1/query?name=pod01&objtype=Pod -func getQueryResultByKeyValue(queryMap map[string][]string) string { - +func getQueryResultByKeyValue(queryMap map[string][]string, first, offset int) (string, string) { //Only indexed fields can be filtered on //Time must be in correct format "2018-10-18 14:36:32 -0700 PDT" qps := []string{} var funcStr, filterStr string - for k, v := range queryMap { - qp := "eq(" + k + ",\"" + v[0] + "\")" - qps = append(qps, qp) + if k != util.First && k != util.Offset && k != util.Print { + qp := "eq(" + k + ",\"" + v[0] + "\")" + qps = append(qps, qp) + } } - - funcStr = "(func:" + qps[0] + ") " + print := "expand(_all_)" + if p, ok := queryMap[util.Print]; ok { + if "*" != p[0] { + print = p[0] + } + } + funcStr = fmt.Sprintf("(func:%s, first:%d, offset:%d) ", qps[0], first, offset) + cntStr := fmt.Sprintf("(func:%s)", qps[0]) filters := qps[1:] if len(filters) > 0 { filterStr = "@filter(" + strings.Join(filters, " AND ") + ")" } - - q := ` - { - objects` + funcStr + filterStr + ` { - uid - expand(_all_) { - uid - expand(_all_) - } - } - } - ` - return q + q := fmt.Sprintf(`{objects %s %s {uid %s { uid %s }}}`, funcStr, filterStr, print, print) + cntQry := fmt.Sprintf(`{objects %s %s {count(uid)}}`, cntStr, filterStr) + return q, cntQry } diff --git a/service/db/dgclient.go b/service/db/dgclient.go index f5c44b3..7e76875 100644 --- a/service/db/dgclient.go +++ b/service/db/dgclient.go @@ -76,7 +76,10 @@ type IDGClient interface { func NewDGClient(dgraphHost string) *DGClient { // Dial a gRPC connection. log.Infof("Connecting to dgraph [%s]", dgraphHost) - conn, err := grpc.Dial(dgraphHost, grpc.WithInsecure()) + conn, err := grpc.Dial(dgraphHost, + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(20*1024*1024)), + grpc.WithInsecure()) + if err != nil { log.Fatal(err) } @@ -327,7 +330,6 @@ func (s DGClient) GetSchemaFromDB() ([]*api.SchemaNode, error) { log.Errorf("Query [%v] Error [%v]\n", q, err) return nil, err } - log.Infof("Query result: [%s]", resp.Schema) smn := resp.Schema return smn, nil } diff --git a/service/resources/server_resource.go b/service/resources/server_resource.go index 73f747a..f5aa2c8 100644 --- a/service/resources/server_resource.go +++ b/service/resources/server_resource.go @@ -175,7 +175,7 @@ func (s ServerResource) QueryHandler(w http.ResponseWriter, r *http.Request) { obj, err := s.QuerySvc.GetQueryResult(queryMap) if err != nil { - http.Error(w, "Service Error", http.StatusInternalServerError) + http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") @@ -684,13 +684,7 @@ func (s *ServerResource) QSLHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - var total float64 - for _, res := range response[util.Objects].([]interface{}) { - val, ok := res.(map[string]interface{})[util.Count] - if ok { - total = val.(float64) - } - } + total := apis.GetTotalCnt(response) // get query with pagination query, err = s.QSLSvc.CreateDgraphQuery(vars[util.Query], false) diff --git a/service/server.go b/service/server.go index 48dbb1e..32f73d5 100644 --- a/service/server.go +++ b/service/server.go @@ -39,7 +39,7 @@ func serve() { metaSvc := apis.NewMetaService(dc) entitySvc := apis.NewEntityService(dc) querySvc := apis.NewQueryService(dc) - qslSvc := apis.NewQSLService(dc, metaSvc) + qslSvc := apis.NewQSLService(dc) res := resources.ServerResource{EntitySvc: entitySvc, QuerySvc: querySvc, MetaSvc: metaSvc, QSLSvc: qslSvc} router.HandleFunc("/v1/entity/{metadata}/{uid}", res.EntityGetHandler).Methods("GET") // TODO: wire up more resource APIs here diff --git a/service/util/consts.go b/service/util/consts.go index 3a6eced..0fe8032 100644 --- a/service/util/consts.go +++ b/service/util/consts.go @@ -55,4 +55,7 @@ const ( Query = "query" StartTime = "starttime" Count = "count" + First = "first" + Offset = "offset" + Print = "print" )