diff --git a/common/persistence/visibility/store/sql/query_converter.go b/common/persistence/visibility/store/sql/query_converter.go index dba408e0499..76354c6bd74 100644 --- a/common/persistence/visibility/store/sql/query_converter.go +++ b/common/persistence/visibility/store/sql/query_converter.go @@ -51,6 +51,7 @@ type ( queryString string, pageSize int, token *pageToken, + orderByClause string, ) (string, []any) buildCountStmt(namespaceID namespace.ID, queryString string) (string, []any) @@ -138,6 +139,7 @@ func newQueryConverterInternal( func (c *QueryConverter) BuildSelectStmt( pageSize int, nextPageToken []byte, + orderByClause string, ) (*sqlplugin.VisibilitySelectFilter, error) { token, err := deserializePageToken(nextPageToken) if err != nil { @@ -147,11 +149,13 @@ func (c *QueryConverter) BuildSelectStmt( if err != nil { return nil, err } + queryString, queryArgs := c.buildSelectStmt( c.namespaceID, queryString, pageSize, token, + orderByClause, ) return &sqlplugin.VisibilitySelectFilter{Query: queryString, QueryArgs: queryArgs}, nil } @@ -591,6 +595,13 @@ func (c *QueryConverter) convertIsExpr(exprRef *sqlparser.Expr) error { return nil } +func (c *QueryConverter) getDefaultOrderByClause() string { + return fmt.Sprintf("ORDER BY %s DESC, %s DESC, %s", + sqlparser.String(c.getCoalesceCloseTimeExpr()), + searchattribute.GetSqlDbColName(searchattribute.StartTime), + searchattribute.GetSqlDbColName(searchattribute.RunID), + ) +} func isSupportedOperator(supportedOperators []string, operator string) bool { for _, op := range supportedOperators { if operator == op { diff --git a/common/persistence/visibility/store/sql/query_converter_mysql.go b/common/persistence/visibility/store/sql/query_converter_mysql.go index 28bc775edb3..df5cb96740a 100644 --- a/common/persistence/visibility/store/sql/query_converter_mysql.go +++ b/common/persistence/visibility/store/sql/query_converter_mysql.go @@ -214,6 +214,7 @@ func (c *mysqlQueryConverter) buildSelectStmt( queryString string, pageSize int, token *pageToken, + orderByClause string, ) (string, []any) { var whereClauses []string var queryArgs []any @@ -260,15 +261,13 @@ func (c *mysqlQueryConverter) buildSelectStmt( LEFT JOIN custom_search_attributes USING (%s, %s) WHERE %s - ORDER BY %s DESC, %s DESC, %s + %s LIMIT ?`, strings.Join(addPrefix("ev.", sqlplugin.DbFields), ", "), searchattribute.GetSqlDbColName(searchattribute.NamespaceID), searchattribute.GetSqlDbColName(searchattribute.RunID), strings.Join(whereClauses, " AND "), - sqlparser.String(c.getCoalesceCloseTimeExpr()), - searchattribute.GetSqlDbColName(searchattribute.StartTime), - searchattribute.GetSqlDbColName(searchattribute.RunID), + orderByClause, ), queryArgs } diff --git a/common/persistence/visibility/store/sql/query_converter_postgresql.go b/common/persistence/visibility/store/sql/query_converter_postgresql.go index 44933b3c80f..8ac6ddd38dc 100644 --- a/common/persistence/visibility/store/sql/query_converter_postgresql.go +++ b/common/persistence/visibility/store/sql/query_converter_postgresql.go @@ -221,6 +221,7 @@ func (c *pgQueryConverter) buildSelectStmt( queryString string, pageSize int, token *pageToken, + orderByClause string, ) (string, []any) { var whereClauses []string var queryArgs []any @@ -265,13 +266,11 @@ func (c *pgQueryConverter) buildSelectStmt( `SELECT %s FROM executions_visibility WHERE %s - ORDER BY %s DESC, %s DESC, %s + %s LIMIT ?`, strings.Join(sqlplugin.DbFields, ", "), strings.Join(whereClauses, " AND "), - sqlparser.String(c.getCoalesceCloseTimeExpr()), - searchattribute.GetSqlDbColName(searchattribute.StartTime), - searchattribute.GetSqlDbColName(searchattribute.RunID), + orderByClause, ), queryArgs } diff --git a/common/persistence/visibility/store/sql/query_converter_sqlite.go b/common/persistence/visibility/store/sql/query_converter_sqlite.go index 9081e877b3f..c298a6fa19f 100644 --- a/common/persistence/visibility/store/sql/query_converter_sqlite.go +++ b/common/persistence/visibility/store/sql/query_converter_sqlite.go @@ -234,6 +234,7 @@ func (c *sqliteQueryConverter) buildSelectStmt( queryString string, pageSize int, token *pageToken, + orderByClause string, ) (string, []any) { var whereClauses []string var queryArgs []any @@ -278,13 +279,11 @@ func (c *sqliteQueryConverter) buildSelectStmt( `SELECT %s FROM executions_visibility WHERE %s - ORDER BY %s DESC, %s DESC, %s + %s LIMIT ?`, strings.Join(sqlplugin.DbFields, ", "), strings.Join(whereClauses, " AND "), - sqlparser.String(c.getCoalesceCloseTimeExpr()), - searchattribute.GetSqlDbColName(searchattribute.StartTime), - searchattribute.GetSqlDbColName(searchattribute.RunID), + orderByClause, ), queryArgs } diff --git a/common/persistence/visibility/store/sql/visibility_store.go b/common/persistence/visibility/store/sql/visibility_store.go index aba4672aace..00caf8f5667 100644 --- a/common/persistence/visibility/store/sql/visibility_store.go +++ b/common/persistence/visibility/store/sql/visibility_store.go @@ -357,78 +357,22 @@ func (s *VisibilityStore) ListWorkflowExecutions( ctx context.Context, request *manager.ListWorkflowExecutionsRequestV2, ) (*store.InternalListWorkflowExecutionsResponse, error) { - saTypeMap, err := s.searchAttributesProvider.GetSearchAttributes(s.GetIndexName(), false) + selectFilter, err := s.buildSelectStmt(ctx, request, true) if err != nil { return nil, err } - - saMapper, err := s.searchAttributesMapperProvider.GetMapper(request.Namespace) - if err != nil { - return nil, err - } - - converter := NewQueryConverter( - s.GetName(), - request.Namespace, - request.NamespaceID, - saTypeMap, - saMapper, - request.Query, - ) - selectFilter, err := converter.BuildSelectStmt(request.PageSize, request.NextPageToken) - if err != nil { - // Convert ConverterError to InvalidArgument and pass through all other errors (which should be only mapper errors). - var converterErr *query.ConverterError - if errors.As(err, &converterErr) { - return nil, converterErr.ToInvalidArgument() - } - return nil, err - } - - rows, err := s.sqlStore.Db.SelectFromVisibility(ctx, *selectFilter) - if err != nil { - return nil, serviceerror.NewUnavailable( - fmt.Sprintf("ListWorkflowExecutions operation failed. Select failed: %v", err)) - } - if len(rows) == 0 { - return &store.InternalListWorkflowExecutionsResponse{}, nil - } - - var infos = make([]*store.InternalWorkflowExecutionInfo, len(rows)) - for i, row := range rows { - infos[i], err = s.rowToInfo(&row, request.Namespace) - if err != nil { - return nil, err - } - } - - var nextPageToken []byte - if len(rows) == request.PageSize { - lastRow := rows[len(rows)-1] - closeTime := maxTime - if lastRow.CloseTime != nil { - closeTime = *lastRow.CloseTime - } - nextPageToken, err = serializePageToken(&pageToken{ - CloseTime: closeTime, - StartTime: lastRow.StartTime, - RunID: lastRow.RunID, - }) - if err != nil { - return nil, err - } - } - return &store.InternalListWorkflowExecutionsResponse{ - Executions: infos, - NextPageToken: nextPageToken, - }, nil + return s.executeSelectStmt(ctx, request, selectFilter) } func (s *VisibilityStore) ScanWorkflowExecutions( - _ context.Context, - _ *manager.ListWorkflowExecutionsRequestV2, + ctx context.Context, + request *manager.ListWorkflowExecutionsRequestV2, ) (*store.InternalListWorkflowExecutionsResponse, error) { - return nil, store.OperationNotSupportedErr + selectFilter, err := s.buildSelectStmt(ctx, request, false) + if err != nil { + return nil, err + } + return s.executeSelectStmt(ctx, request, selectFilter) } func (s *VisibilityStore) CountWorkflowExecutions( @@ -692,3 +636,87 @@ func (s *VisibilityStore) buildQueryStringFromListRequest( return strings.Join(queryTerms, " AND ") } + +func (s *VisibilityStore) buildSelectStmt( + ctx context.Context, + request *manager.ListWorkflowExecutionsRequestV2, + withDefaultOrderBy bool, +) (*sqlplugin.VisibilitySelectFilter, error) { + saTypeMap, err := s.searchAttributesProvider.GetSearchAttributes(s.GetIndexName(), false) + if err != nil { + return nil, err + } + + saMapper, err := s.searchAttributesMapperProvider.GetMapper(request.Namespace) + if err != nil { + return nil, err + } + + converter := NewQueryConverter( + s.GetName(), + request.Namespace, + request.NamespaceID, + saTypeMap, + saMapper, + request.Query, + ) + orderByClause := "" + if withDefaultOrderBy { + orderByClause = converter.getDefaultOrderByClause() + } + selectFilter, err := converter.BuildSelectStmt(request.PageSize, request.NextPageToken, orderByClause) + if err != nil { + // Convert ConverterError to InvalidArgument and pass through all other errors (which should be only mapper errors). + var converterErr *query.ConverterError + if errors.As(err, &converterErr) { + return nil, converterErr.ToInvalidArgument() + } + return nil, err + } + + return selectFilter, nil +} + +func (s *VisibilityStore) executeSelectStmt( + ctx context.Context, + request *manager.ListWorkflowExecutionsRequestV2, + selectFilter *sqlplugin.VisibilitySelectFilter, +) (*store.InternalListWorkflowExecutionsResponse, error) { + rows, err := s.sqlStore.Db.SelectFromVisibility(ctx, *selectFilter) + if err != nil { + return nil, serviceerror.NewUnavailable( + fmt.Sprintf("Select failed: %v", err)) + } + if len(rows) == 0 { + return &store.InternalListWorkflowExecutionsResponse{}, nil + } + + var infos = make([]*store.InternalWorkflowExecutionInfo, len(rows)) + for i, row := range rows { + infos[i], err = s.rowToInfo(&row, request.Namespace) + if err != nil { + return nil, err + } + } + + var nextPageToken []byte + if len(rows) == request.PageSize { + lastRow := rows[len(rows)-1] + closeTime := maxTime + if lastRow.CloseTime != nil { + closeTime = *lastRow.CloseTime + } + nextPageToken, err = serializePageToken(&pageToken{ + CloseTime: closeTime, + StartTime: lastRow.StartTime, + RunID: lastRow.RunID, + }) + if err != nil { + return nil, err + } + } + return &store.InternalListWorkflowExecutionsResponse{ + Executions: infos, + NextPageToken: nextPageToken, + }, nil +}