Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advanced visibility for SQLite #3895

Merged
merged 1 commit into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/persistence/sql/sqlplugin/sqlite/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,5 +204,7 @@ func buildDSNAttr(cfg *config.SQL) (url.Values, error) {
// assume pragma
parameters.Add("_pragma", fmt.Sprintf("%s=%s", key, value))
}
// set time format
parameters.Add("_time_format", "sqlite")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line to make the SQLite driver module to format datetime using a supported SQLite format for datetime. Previously, without this setting, it was using time.String() to write datetime to the table.

return parameters, nil
}
2 changes: 1 addition & 1 deletion common/persistence/sql/sqlplugin/sqlite/typeconv.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *converter) ToSQLiteDateTime(t time.Time) time.Time {
if t.IsZero() {
return minSQLiteDateTime
}
return t.UTC()
return t.UTC().Truncate(time.Microsecond)
}

// FromSQLiteDateTime converts SQLite datetime and returns go time
Expand Down
311 changes: 106 additions & 205 deletions common/persistence/sql/sqlplugin/sqlite/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,248 +29,104 @@ package sqlite
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"

"go.temporal.io/server/common/persistence/sql/sqlplugin"
)

const (
templateCreateWorkflowExecutionStarted = `INSERT INTO executions_visibility (` +
`namespace_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, status, memo, encoding, task_queue) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` +
`ON CONFLICT (namespace_id, run_id) DO NOTHING`
var (
keywordListSeparator = "♡"

templateCreateWorkflowExecutionClosed = `REPLACE INTO executions_visibility (` +
`namespace_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, status, history_length, memo, encoding, task_queue) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `

// RunID condition is needed for correct pagination
templateConditions = ` AND namespace_id = ?
AND start_time >= ?
AND start_time <= ?
AND ((run_id > ? and start_time = ?) OR (start_time < ?))
ORDER BY start_time DESC, run_id
LIMIT ?`

templateConditionsClosedWorkflows = ` AND namespace_id = ?
AND close_time >= ?
AND close_time <= ?
AND ((run_id > ? and close_time = ?) OR (close_time < ?))
ORDER BY close_time DESC, run_id
LIMIT ?`

templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, status, memo, encoding, task_queue`
templateOpenSelect = `SELECT ` + templateOpenFieldNames + ` FROM executions_visibility WHERE status = 1 `

templateClosedSelect = `SELECT ` + templateOpenFieldNames + `, close_time, history_length
FROM executions_visibility WHERE status != 1 `

templateGetOpenWorkflowExecutions = templateOpenSelect + templateConditions

templateGetClosedWorkflowExecutions = templateClosedSelect + templateConditionsClosedWorkflows

templateGetOpenWorkflowExecutionsByType = templateOpenSelect + `AND workflow_type_name = ?` + templateConditions

templateGetClosedWorkflowExecutionsByType = templateClosedSelect + `AND workflow_type_name = ?` + templateConditionsClosedWorkflows

templateGetOpenWorkflowExecutionsByID = templateOpenSelect + `AND workflow_id = ?` + templateConditions

templateGetClosedWorkflowExecutionsByID = templateClosedSelect + `AND workflow_id = ?` + templateConditionsClosedWorkflows

templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND status = ?` + templateConditionsClosedWorkflows
templateInsertWorkflowExecution = fmt.Sprintf(
`INSERT INTO executions_visibility (%s)
VALUES (%s)
ON CONFLICT (namespace_id, run_id) DO NOTHING`,
strings.Join(sqlplugin.DbFields, ", "),
sqlplugin.BuildNamedPlaceholder(sqlplugin.DbFields...),
)

templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, status, history_length, task_queue
FROM executions_visibility
WHERE namespace_id = ? AND status != 1
AND run_id = ?`
templateUpsertWorkflowExecution = fmt.Sprintf(
`INSERT INTO executions_visibility (%s)
VALUES (%s)
%s`,
strings.Join(sqlplugin.DbFields, ", "),
sqlplugin.BuildNamedPlaceholder(sqlplugin.DbFields...),
buildOnDuplicateKeyUpdate(sqlplugin.DbFields...),
)

templateGetWorkflowExecution = `
SELECT
workflow_id,
run_id,
start_time,
execution_time,
memo,
encoding,
close_time,
workflow_type_name,
status,
history_length,
task_queue
FROM executions_visibility
WHERE namespace_id = ? AND run_id = ?`
templateDeleteWorkflowExecution = `
DELETE FROM executions_visibility
WHERE namespace_id = :namespace_id AND run_id = :run_id`

templateDeleteWorkflowExecution = "DELETE FROM executions_visibility WHERE namespace_id = ? AND run_id = ?"
templateGetWorkflowExecution = fmt.Sprintf(
`SELECT %s FROM executions_visibility
WHERE namespace_id = :namespace_id AND run_id = :run_id`,
strings.Join(sqlplugin.DbFields, ", "),
)
)

var errCloseParams = errors.New("missing one of {closeTime, historyLength} params")
func buildOnDuplicateKeyUpdate(fields ...string) string {
items := make([]string, len(fields))
for i, field := range fields {
items[i] = fmt.Sprintf("%s = excluded.%s", field, field)
}
return fmt.Sprintf(
"ON CONFLICT (namespace_id, run_id) DO UPDATE SET %s",
strings.Join(items, ", "),
)
}

// InsertIntoVisibility inserts a row into visibility table. If an row already exist,
// its left as such and no update will be made
func (mdb *db) InsertIntoVisibility(
ctx context.Context,
row *sqlplugin.VisibilityRow,
) (sql.Result, error) {
row.StartTime = mdb.converter.ToSQLiteDateTime(row.StartTime)
return mdb.conn.ExecContext(ctx,
templateCreateWorkflowExecutionStarted,
row.NamespaceID,
row.WorkflowID,
row.RunID,
row.StartTime,
row.ExecutionTime,
row.WorkflowTypeName,
row.Status,
row.Memo,
row.Encoding,
row.TaskQueue,
)
finalRow := mdb.prepareRowForDB(row)
return mdb.conn.NamedExecContext(ctx, templateInsertWorkflowExecution, finalRow)
}

// ReplaceIntoVisibility replaces an existing row if it exist or creates a new row in visibility table
func (mdb *db) ReplaceIntoVisibility(
ctx context.Context,
row *sqlplugin.VisibilityRow,
) (sql.Result, error) {
switch {
case row.CloseTime != nil && row.HistoryLength != nil:
row.StartTime = mdb.converter.ToSQLiteDateTime(row.StartTime)
closeTime := mdb.converter.ToSQLiteDateTime(*row.CloseTime)
return mdb.conn.ExecContext(ctx,
templateCreateWorkflowExecutionClosed,
row.NamespaceID,
row.WorkflowID,
row.RunID,
row.StartTime,
row.ExecutionTime,
row.WorkflowTypeName,
closeTime,
row.Status,
*row.HistoryLength,
row.Memo,
row.Encoding,
row.TaskQueue,
)
default:
return nil, errCloseParams
}
finalRow := mdb.prepareRowForDB(row)
return mdb.conn.NamedExecContext(ctx, templateUpsertWorkflowExecution, finalRow)
}

// DeleteFromVisibility deletes a row from visibility table if it exist
func (mdb *db) DeleteFromVisibility(
ctx context.Context,
filter sqlplugin.VisibilityDeleteFilter,
) (sql.Result, error) {
return mdb.conn.ExecContext(ctx,
templateDeleteWorkflowExecution,
filter.NamespaceID,
filter.RunID,
)
return mdb.conn.NamedExecContext(ctx, templateDeleteWorkflowExecution, filter)
}

// SelectFromVisibility reads one or more rows from visibility table
func (mdb *db) SelectFromVisibility(
ctx context.Context,
filter sqlplugin.VisibilitySelectFilter,
) ([]sqlplugin.VisibilityRow, error) {
var err error
var rows []sqlplugin.VisibilityRow
if filter.MinTime != nil {
*filter.MinTime = mdb.converter.ToSQLiteDateTime(*filter.MinTime)
}
if filter.MaxTime != nil {
*filter.MaxTime = mdb.converter.ToSQLiteDateTime(*filter.MaxTime)
}
// If filter.Status == 0 (UNSPECIFIED) then only closed workflows will be returned (all excluding 1 (RUNNING)).
switch {
case filter.MinTime == nil && filter.RunID != nil && filter.Status != 1:
var row sqlplugin.VisibilityRow
err = mdb.conn.GetContext(ctx,
&row,
templateGetClosedWorkflowExecution,
filter.NamespaceID,
*filter.RunID,
)
if err == nil {
rows = append(rows, row)
if len(filter.Query) == 0 {
// backward compatibility for existing tests
err := sqlplugin.GenerateSelectQuery(&filter, mdb.converter.ToSQLiteDateTime)
if err != nil {
return nil, err
}
case filter.MinTime != nil && filter.MaxTime != nil &&
filter.WorkflowID != nil && filter.RunID != nil && filter.PageSize != nil:
qry := templateGetOpenWorkflowExecutionsByID
if filter.Status != 1 {
qry = templateGetClosedWorkflowExecutionsByID
}
err = mdb.conn.SelectContext(ctx,
&rows,
qry,
*filter.WorkflowID,
filter.NamespaceID,
*filter.MinTime,
*filter.MaxTime,
*filter.RunID,
*filter.MaxTime,
*filter.MaxTime,
*filter.PageSize,
)
case filter.MinTime != nil && filter.MaxTime != nil &&
filter.WorkflowTypeName != nil && filter.RunID != nil && filter.PageSize != nil:
qry := templateGetOpenWorkflowExecutionsByType
if filter.Status != 1 {
qry = templateGetClosedWorkflowExecutionsByType
}
err = mdb.conn.SelectContext(ctx,
&rows,
qry,
*filter.WorkflowTypeName,
filter.NamespaceID,
*filter.MinTime,
*filter.MaxTime,
*filter.RunID,
*filter.MaxTime,
*filter.MaxTime,
*filter.PageSize,
)
case filter.MinTime != nil && filter.MaxTime != nil &&
filter.RunID != nil && filter.PageSize != nil &&
filter.Status != 0 && filter.Status != 1: // 0 is UNSPECIFIED, 1 is RUNNING
err = mdb.conn.SelectContext(ctx,
&rows,
templateGetClosedWorkflowExecutionsByStatus,
filter.Status,
filter.NamespaceID,
*filter.MinTime,
*filter.MaxTime,
*filter.RunID,
*filter.MaxTime,
*filter.MaxTime,
*filter.PageSize,
)
case filter.MinTime != nil && filter.MaxTime != nil &&
filter.RunID != nil && filter.PageSize != nil:
qry := templateGetOpenWorkflowExecutions
if filter.Status != 1 {
qry = templateGetClosedWorkflowExecutions
}
err = mdb.conn.SelectContext(ctx,
&rows,
qry,
filter.NamespaceID,
*filter.MinTime,
*filter.MaxTime,
*filter.RunID,
*filter.MaxTime,
*filter.MaxTime,
*filter.PageSize,
)
default:
return nil, fmt.Errorf("invalid query filter")
}

var rows []sqlplugin.VisibilityRow
err := mdb.conn.SelectContext(ctx, &rows, filter.Query, filter.QueryArgs...)
if err != nil {
return nil, err
}
for i := range rows {
mdb.processRowFromDB(&rows[i])
err = mdb.processRowFromDB(&rows[i])
if err != nil {
return nil, err
}
}
return rows, nil
}
Expand All @@ -281,24 +137,69 @@ func (mdb *db) GetFromVisibility(
filter sqlplugin.VisibilityGetFilter,
) (*sqlplugin.VisibilityRow, error) {
var row sqlplugin.VisibilityRow
err := mdb.conn.GetContext(ctx,
&row,
templateGetWorkflowExecution,
filter.NamespaceID,
filter.RunID,
)
stmt, err := mdb.conn.PrepareNamedContext(ctx, templateGetWorkflowExecution)
if err != nil {
return nil, err
}
err = stmt.GetContext(ctx, &row, filter)
if err != nil {
return nil, err
}
err = mdb.processRowFromDB(&row)
if err != nil {
return nil, err
}
mdb.processRowFromDB(&row)
return &row, nil
}

func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) {
func (mdb *db) prepareRowForDB(row *sqlplugin.VisibilityRow) *sqlplugin.VisibilityRow {
if row == nil {
return nil
}
finalRow := *row
finalRow.StartTime = mdb.converter.ToSQLiteDateTime(finalRow.StartTime)
finalRow.ExecutionTime = mdb.converter.ToSQLiteDateTime(finalRow.ExecutionTime)
if finalRow.CloseTime != nil {
*finalRow.CloseTime = mdb.converter.ToSQLiteDateTime(*finalRow.CloseTime)
}
if finalRow.SearchAttributes != nil {
finalSearchAttributes := sqlplugin.VisibilitySearchAttributes{}
for name, value := range *finalRow.SearchAttributes {
switch v := value.(type) {
case []string:
finalSearchAttributes[name] = strings.Join(v, keywordListSeparator)
default:
finalSearchAttributes[name] = v
}
}
finalRow.SearchAttributes = &finalSearchAttributes
}
return &finalRow
}

func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) error {
if row == nil {
return nil
}
row.StartTime = mdb.converter.FromSQLiteDateTime(row.StartTime)
row.ExecutionTime = mdb.converter.FromSQLiteDateTime(row.ExecutionTime)
if row.CloseTime != nil {
closeTime := mdb.converter.FromSQLiteDateTime(*row.CloseTime)
row.CloseTime = &closeTime
}
if row.SearchAttributes != nil {
for saName, saValue := range *row.SearchAttributes {
switch typedSaValue := saValue.(type) {
case string:
if strings.Index(typedSaValue, keywordListSeparator) >= 0 {
// If the string contains the keywordListSeparator, then we need to split it
// into a list of keywords.
(*row.SearchAttributes)[saName] = strings.Split(typedSaValue, keywordListSeparator)
}
default:
// no-op
}
}
}
return nil
}
Loading