Skip to content

Commit

Permalink
Enable Scan API in SQL visibility store (#4161)
Browse files Browse the repository at this point in the history
* Enable Scan API in SQL visibility store

* Refactor code to add support for Scan API

* Refactor BuildSelectStmt and pass OrderByFlag

* Refactor based on PR comments

---------

Co-authored-by: Rodrigo Zhou <[email protected]>
  • Loading branch information
samanbarghi and rodrigozhou authored Apr 18, 2023
1 parent e792a68 commit af066c3
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 77 deletions.
11 changes: 11 additions & 0 deletions common/persistence/visibility/store/sql/query_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type (
queryString string,
pageSize int,
token *pageToken,
orderByClause string,
) (string, []any)

buildCountStmt(namespaceID namespace.ID, queryString string) (string, []any)
Expand Down Expand Up @@ -138,6 +139,7 @@ func newQueryConverterInternal(
func (c *QueryConverter) BuildSelectStmt(
pageSize int,
nextPageToken []byte,
orderByClause string,
) (*sqlplugin.VisibilitySelectFilter, error) {
token, err := deserializePageToken(nextPageToken)
if err != nil {
Expand All @@ -147,11 +149,13 @@ func (c *QueryConverter) BuildSelectStmt(
if err != nil {
return nil, err
}

queryString, queryArgs := c.buildSelectStmt(
c.namespaceID,
queryString,
pageSize,
token,
orderByClause,
)
return &sqlplugin.VisibilitySelectFilter{Query: queryString, QueryArgs: queryArgs}, nil
}
Expand Down Expand Up @@ -591,6 +595,13 @@ func (c *QueryConverter) convertIsExpr(exprRef *sqlparser.Expr) error {
return nil
}

func (c *QueryConverter) getDefaultOrderByClause() string {
return fmt.Sprintf("ORDER BY %s DESC, %s DESC, %s",
sqlparser.String(c.getCoalesceCloseTimeExpr()),
searchattribute.GetSqlDbColName(searchattribute.StartTime),
searchattribute.GetSqlDbColName(searchattribute.RunID),
)
}
func isSupportedOperator(supportedOperators []string, operator string) bool {
for _, op := range supportedOperators {
if operator == op {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (c *mysqlQueryConverter) buildSelectStmt(
queryString string,
pageSize int,
token *pageToken,
orderByClause string,
) (string, []any) {
var whereClauses []string
var queryArgs []any
Expand Down Expand Up @@ -260,15 +261,13 @@ func (c *mysqlQueryConverter) buildSelectStmt(
LEFT JOIN custom_search_attributes
USING (%s, %s)
WHERE %s
ORDER BY %s DESC, %s DESC, %s
%s
LIMIT ?`,
strings.Join(addPrefix("ev.", sqlplugin.DbFields), ", "),
searchattribute.GetSqlDbColName(searchattribute.NamespaceID),
searchattribute.GetSqlDbColName(searchattribute.RunID),
strings.Join(whereClauses, " AND "),
sqlparser.String(c.getCoalesceCloseTimeExpr()),
searchattribute.GetSqlDbColName(searchattribute.StartTime),
searchattribute.GetSqlDbColName(searchattribute.RunID),
orderByClause,
), queryArgs
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func (c *pgQueryConverter) buildSelectStmt(
queryString string,
pageSize int,
token *pageToken,
orderByClause string,
) (string, []any) {
var whereClauses []string
var queryArgs []any
Expand Down Expand Up @@ -265,13 +266,11 @@ func (c *pgQueryConverter) buildSelectStmt(
`SELECT %s
FROM executions_visibility
WHERE %s
ORDER BY %s DESC, %s DESC, %s
%s
LIMIT ?`,
strings.Join(sqlplugin.DbFields, ", "),
strings.Join(whereClauses, " AND "),
sqlparser.String(c.getCoalesceCloseTimeExpr()),
searchattribute.GetSqlDbColName(searchattribute.StartTime),
searchattribute.GetSqlDbColName(searchattribute.RunID),
orderByClause,
), queryArgs
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (c *sqliteQueryConverter) buildSelectStmt(
queryString string,
pageSize int,
token *pageToken,
orderByClause string,
) (string, []any) {
var whereClauses []string
var queryArgs []any
Expand Down Expand Up @@ -278,13 +279,11 @@ func (c *sqliteQueryConverter) buildSelectStmt(
`SELECT %s
FROM executions_visibility
WHERE %s
ORDER BY %s DESC, %s DESC, %s
%s
LIMIT ?`,
strings.Join(sqlplugin.DbFields, ", "),
strings.Join(whereClauses, " AND "),
sqlparser.String(c.getCoalesceCloseTimeExpr()),
searchattribute.GetSqlDbColName(searchattribute.StartTime),
searchattribute.GetSqlDbColName(searchattribute.RunID),
orderByClause,
), queryArgs
}

Expand Down
158 changes: 93 additions & 65 deletions common/persistence/visibility/store/sql/visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,78 +357,22 @@ func (s *VisibilityStore) ListWorkflowExecutions(
ctx context.Context,
request *manager.ListWorkflowExecutionsRequestV2,
) (*store.InternalListWorkflowExecutionsResponse, error) {
saTypeMap, err := s.searchAttributesProvider.GetSearchAttributes(s.GetIndexName(), false)
selectFilter, err := s.buildSelectStmt(ctx, request, true)
if err != nil {
return nil, err
}

saMapper, err := s.searchAttributesMapperProvider.GetMapper(request.Namespace)
if err != nil {
return nil, err
}

converter := NewQueryConverter(
s.GetName(),
request.Namespace,
request.NamespaceID,
saTypeMap,
saMapper,
request.Query,
)
selectFilter, err := converter.BuildSelectStmt(request.PageSize, request.NextPageToken)
if err != nil {
// Convert ConverterError to InvalidArgument and pass through all other errors (which should be only mapper errors).
var converterErr *query.ConverterError
if errors.As(err, &converterErr) {
return nil, converterErr.ToInvalidArgument()
}
return nil, err
}

rows, err := s.sqlStore.Db.SelectFromVisibility(ctx, *selectFilter)
if err != nil {
return nil, serviceerror.NewUnavailable(
fmt.Sprintf("ListWorkflowExecutions operation failed. Select failed: %v", err))
}
if len(rows) == 0 {
return &store.InternalListWorkflowExecutionsResponse{}, nil
}

var infos = make([]*store.InternalWorkflowExecutionInfo, len(rows))
for i, row := range rows {
infos[i], err = s.rowToInfo(&row, request.Namespace)
if err != nil {
return nil, err
}
}

var nextPageToken []byte
if len(rows) == request.PageSize {
lastRow := rows[len(rows)-1]
closeTime := maxTime
if lastRow.CloseTime != nil {
closeTime = *lastRow.CloseTime
}
nextPageToken, err = serializePageToken(&pageToken{
CloseTime: closeTime,
StartTime: lastRow.StartTime,
RunID: lastRow.RunID,
})
if err != nil {
return nil, err
}
}
return &store.InternalListWorkflowExecutionsResponse{
Executions: infos,
NextPageToken: nextPageToken,
}, nil
return s.executeSelectStmt(ctx, request, selectFilter)
}

func (s *VisibilityStore) ScanWorkflowExecutions(
_ context.Context,
_ *manager.ListWorkflowExecutionsRequestV2,
ctx context.Context,
request *manager.ListWorkflowExecutionsRequestV2,
) (*store.InternalListWorkflowExecutionsResponse, error) {
return nil, store.OperationNotSupportedErr
selectFilter, err := s.buildSelectStmt(ctx, request, false)
if err != nil {
return nil, err
}
return s.executeSelectStmt(ctx, request, selectFilter)
}

func (s *VisibilityStore) CountWorkflowExecutions(
Expand Down Expand Up @@ -692,3 +636,87 @@ func (s *VisibilityStore) buildQueryStringFromListRequest(

return strings.Join(queryTerms, " AND ")
}

func (s *VisibilityStore) buildSelectStmt(
ctx context.Context,
request *manager.ListWorkflowExecutionsRequestV2,
withDefaultOrderBy bool,
) (*sqlplugin.VisibilitySelectFilter, error) {
saTypeMap, err := s.searchAttributesProvider.GetSearchAttributes(s.GetIndexName(), false)
if err != nil {
return nil, err
}

saMapper, err := s.searchAttributesMapperProvider.GetMapper(request.Namespace)
if err != nil {
return nil, err
}

converter := NewQueryConverter(
s.GetName(),
request.Namespace,
request.NamespaceID,
saTypeMap,
saMapper,
request.Query,
)
orderByClause := ""
if withDefaultOrderBy {
orderByClause = converter.getDefaultOrderByClause()
}
selectFilter, err := converter.BuildSelectStmt(request.PageSize, request.NextPageToken, orderByClause)
if err != nil {
// Convert ConverterError to InvalidArgument and pass through all other errors (which should be only mapper errors).
var converterErr *query.ConverterError
if errors.As(err, &converterErr) {
return nil, converterErr.ToInvalidArgument()
}
return nil, err
}

return selectFilter, nil
}

func (s *VisibilityStore) executeSelectStmt(
ctx context.Context,
request *manager.ListWorkflowExecutionsRequestV2,
selectFilter *sqlplugin.VisibilitySelectFilter,
) (*store.InternalListWorkflowExecutionsResponse, error) {
rows, err := s.sqlStore.Db.SelectFromVisibility(ctx, *selectFilter)
if err != nil {
return nil, serviceerror.NewUnavailable(
fmt.Sprintf("Select failed: %v", err))
}
if len(rows) == 0 {
return &store.InternalListWorkflowExecutionsResponse{}, nil
}

var infos = make([]*store.InternalWorkflowExecutionInfo, len(rows))
for i, row := range rows {
infos[i], err = s.rowToInfo(&row, request.Namespace)
if err != nil {
return nil, err
}
}

var nextPageToken []byte
if len(rows) == request.PageSize {
lastRow := rows[len(rows)-1]
closeTime := maxTime
if lastRow.CloseTime != nil {
closeTime = *lastRow.CloseTime
}
nextPageToken, err = serializePageToken(&pageToken{
CloseTime: closeTime,
StartTime: lastRow.StartTime,
RunID: lastRow.RunID,
})
if err != nil {
return nil, err
}
}
return &store.InternalListWorkflowExecutionsResponse{
Executions: infos,
NextPageToken: nextPageToken,
}, nil
}

0 comments on commit af066c3

Please sign in to comment.