diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 74ae067afdae..fe94fa364433 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -846,6 +846,15 @@ func tsOrNull(micros int64) (tree.Datum, error) { return tree.MakeDTimestamp(ts, time.Microsecond) } +const ( + jobsQSelect = `SELECT id, status, created, payload, progress, claim_session_id, claim_instance_id` + jobsQFrom = ` FROM system.jobs` + jobsBackoffArgs = `(SELECT $1::FLOAT AS initial_delay, $2::FLOAT AS max_delay) args` + jobsStatusFilter = ` WHERE status = $3` + jobsQuery = jobsQSelect + `, last_run, COALESCE(num_runs, 0), ` + jobs.NextRunClause + + ` as next_run` + jobsQFrom + ", " + jobsBackoffArgs +) + // TODO(tbg): prefix with kv_. var crdbInternalJobsTable = virtualSchemaTable{ schema: ` @@ -871,243 +880,256 @@ CREATE TABLE crdb_internal.jobs ( next_run TIMESTAMP, num_runs INT, execution_errors STRING[], - execution_events JSONB + execution_events JSONB, + INDEX(status) )`, comment: `decoded job metadata from system.jobs (KV scan)`, - generator: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, _ *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { - currentUser := p.SessionData().User() - isAdmin, err := p.HasAdminRole(ctx) - if err != nil { - return nil, nil, err - } + indexes: []virtualIndex{{ + populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) { + q := jobsQuery + jobsStatusFilter + targetStatus := tree.MustBeDString(unwrappedConstraint) + return makeJobsTableRows(ctx, p, addRow, q, p.execCfg.JobRegistry.RetryInitialDelay(), p.execCfg.JobRegistry.RetryMaxDelay(), targetStatus) + }, + }}, + populate: func(ctx context.Context, p *planner, db catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { + _, err := makeJobsTableRows(ctx, p, addRow, jobsQuery, p.execCfg.JobRegistry.RetryInitialDelay(), p.execCfg.JobRegistry.RetryMaxDelay()) + return err + }, +} - hasControlJob, err := p.HasRoleOption(ctx, roleoption.CONTROLJOB) - if err != nil { - return nil, nil, err +// makeJobsTableRows calls addRow for each job. It returns true if addRow was called +// successfully at least once. +func makeJobsTableRows( + ctx context.Context, + p *planner, + addRow func(...tree.Datum) error, + query string, + params ...interface{}, +) (matched bool, err error) { + // Beware: we're querying system.jobs as root; we need to be careful to filter + // out results that the current user is not able to see. + currentUser := p.SessionData().User() + isAdmin, err := p.HasAdminRole(ctx) + if err != nil { + return matched, err + } + + hasControlJob, err := p.HasRoleOption(ctx, roleoption.CONTROLJOB) + if err != nil { + return matched, err + } + + it, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryIteratorEx( + ctx, "crdb-internal-jobs-table", p.txn, + sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + query, params...) + if err != nil { + return matched, err + } + + cleanup := func(ctx context.Context) { + if err := it.Close(); err != nil { + // TODO(yuzefovich): this error should be propagated further up + // and not simply being logged. Fix it (#61123). + // + // Doing that as a return parameter would require changes to + // `planNode.Close` signature which is a bit annoying. One other + // possible solution is to panic here and catch the error + // somewhere. + log.Warningf(ctx, "error closing an iterator: %v", err) } + } + defer cleanup(ctx) - // Beware: we're querying system.jobs as root; we need to be careful to filter - // out results that the current user is not able to see. - const ( - qSelect = `SELECT id, status, created, payload, progress, claim_session_id, claim_instance_id` - qFrom = ` FROM system.jobs` - backoffArgs = `(SELECT $1::FLOAT AS initial_delay, $2::FLOAT AS max_delay) args` - queryWithBackoff = qSelect + `, last_run, COALESCE(num_runs, 0), ` + jobs.NextRunClause + ` as next_run` + qFrom + ", " + backoffArgs - ) + sessionJobs := make([]*jobs.Record, 0, len(p.extendedEvalCtx.SchemaChangeJobRecords)) + uniqueJobs := make(map[*jobs.Record]struct{}) + for _, job := range p.extendedEvalCtx.SchemaChangeJobRecords { + if _, ok := uniqueJobs[job]; ok { + continue + } + sessionJobs = append(sessionJobs, job) + uniqueJobs[job] = struct{}{} + } - it, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryIteratorEx( - ctx, "crdb-internal-jobs-table", p.txn, - sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - queryWithBackoff, p.execCfg.JobRegistry.RetryInitialDelay(), p.execCfg.JobRegistry.RetryMaxDelay()) + // Loop while we need to skip a row. + for { + ok, err := it.Next(ctx) if err != nil { - return nil, nil, err + return matched, err + } + var id, status, created, payloadBytes, progressBytes, sessionIDBytes, + instanceID tree.Datum + lastRun, nextRun, numRuns := tree.DNull, tree.DNull, tree.DNull + if ok { + r := it.Cur() + id, status, created, payloadBytes, progressBytes, sessionIDBytes, instanceID = + r[0], r[1], r[2], r[3], r[4], r[5], r[6] + lastRun, numRuns, nextRun = r[7], r[8], r[9] + } else if !ok { + if len(sessionJobs) == 0 { + return matched, nil + } + job := sessionJobs[len(sessionJobs)-1] + sessionJobs = sessionJobs[:len(sessionJobs)-1] + // Convert the job into datums, where protobufs will be intentionally, + // marshalled. + id = tree.NewDInt(tree.DInt(job.JobID)) + status = tree.NewDString(string(jobs.StatusPending)) + created = eval.TimestampToInexactDTimestamp(p.txn.ReadTimestamp()) + progressBytes, payloadBytes, err = getPayloadAndProgressFromJobsRecord(p, job) + if err != nil { + return matched, err + } + sessionIDBytes = tree.NewDBytes(tree.DBytes(p.extendedEvalCtx.SessionID.GetBytes())) + instanceID = tree.NewDInt(tree.DInt(p.extendedEvalCtx.ExecCfg.JobRegistry.ID())) } - cleanup := func(ctx context.Context) { - if err := it.Close(); err != nil { - // TODO(yuzefovich): this error should be propagated further up - // and not simply being logged. Fix it (#61123). - // - // Doing that as a return parameter would require changes to - // `planNode.Close` signature which is a bit annoying. One other - // possible solution is to panic here and catch the error - // somewhere. - log.Warningf(ctx, "error closing an iterator: %v", err) + var jobType, description, statement, user, descriptorIDs, started, runningStatus, + finished, modified, fractionCompleted, highWaterTimestamp, errorStr, coordinatorID, + traceID, executionErrors, executionEvents = tree.DNull, tree.DNull, tree.DNull, + tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, + tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull + + // Extract data from the payload. + payload, err := jobs.UnmarshalPayload(payloadBytes) + + // We filter out masked rows before we allocate all the + // datums. Needless allocate when not necessary. + ownedByAdmin := false + var sqlUsername username.SQLUsername + if payload != nil { + sqlUsername = payload.UsernameProto.Decode() + ownedByAdmin, err = p.UserHasAdminRole(ctx, sqlUsername) + if err != nil { + errorStr = tree.NewDString(fmt.Sprintf("error decoding payload: %v", err)) } } - - // We'll reuse this container on each loop. - container := make(tree.Datums, 0, 21) - sessionJobs := make([]*jobs.Record, 0, len(p.extendedEvalCtx.SchemaChangeJobRecords)) - uniqueJobs := make(map[*jobs.Record]struct{}) - for _, job := range p.extendedEvalCtx.SchemaChangeJobRecords { - if _, ok := uniqueJobs[job]; ok { - continue + if sessionID, ok := sessionIDBytes.(*tree.DBytes); ok { + if isAlive, err := p.EvalContext().SQLLivenessReader.IsAlive( + ctx, sqlliveness.SessionID(*sessionID), + ); err != nil { + // Silently swallow the error for checking for liveness. + } else if instanceID, ok := instanceID.(*tree.DInt); ok && isAlive { + coordinatorID = instanceID } - sessionJobs = append(sessionJobs, job) - uniqueJobs[job] = struct{}{} } - return func() (datums tree.Datums, e error) { - // Loop while we need to skip a row. - for { - ok, err := it.Next(ctx) - if err != nil { - return nil, err - } - var id, status, created, payloadBytes, progressBytes, sessionIDBytes, - instanceID tree.Datum - lastRun, nextRun, numRuns := tree.DNull, tree.DNull, tree.DNull - if ok { - r := it.Cur() - id, status, created, payloadBytes, progressBytes, sessionIDBytes, instanceID = - r[0], r[1], r[2], r[3], r[4], r[5], r[6] - lastRun, numRuns, nextRun = r[7], r[8], r[9] - } else if !ok { - if len(sessionJobs) == 0 { - return nil, nil - } - job := sessionJobs[len(sessionJobs)-1] - sessionJobs = sessionJobs[:len(sessionJobs)-1] - // Convert the job into datums, where protobufs will be intentionally, - // marshalled. - id = tree.NewDInt(tree.DInt(job.JobID)) - status = tree.NewDString(string(jobs.StatusPending)) - created = eval.TimestampToInexactDTimestamp(p.txn.ReadTimestamp()) - progressBytes, payloadBytes, err = getPayloadAndProgressFromJobsRecord(p, job) - if err != nil { - return nil, err - } - sessionIDBytes = tree.NewDBytes(tree.DBytes(p.extendedEvalCtx.SessionID.GetBytes())) - instanceID = tree.NewDInt(tree.DInt(p.extendedEvalCtx.ExecCfg.JobRegistry.ID())) - } - var jobType, description, statement, user, descriptorIDs, started, runningStatus, - finished, modified, fractionCompleted, highWaterTimestamp, errorStr, coordinatorID, - traceID, executionErrors, executionEvents = tree.DNull, tree.DNull, tree.DNull, - tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, - tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull - - // Extract data from the payload. - payload, err := jobs.UnmarshalPayload(payloadBytes) - - // We filter out masked rows before we allocate all the - // datums. Needless allocate when not necessary. - ownedByAdmin := false - var sqlUsername username.SQLUsername - if payload != nil { - sqlUsername = payload.UsernameProto.Decode() - ownedByAdmin, err = p.UserHasAdminRole(ctx, sqlUsername) - if err != nil { - errorStr = tree.NewDString(fmt.Sprintf("error decoding payload: %v", err)) - } + sameUser := payload != nil && sqlUsername == currentUser + // The user can access the row if the meet one of the conditions: + // 1. The user is an admin. + // 2. The job is owned by the user. + // 3. The user has CONTROLJOB privilege and the job is not owned by + // an admin. + if canAccess := isAdmin || !ownedByAdmin && hasControlJob || sameUser; !canAccess { + continue + } + + if err != nil { + errorStr = tree.NewDString(fmt.Sprintf("error decoding payload: %v", err)) + } else { + jobType = tree.NewDString(payload.Type().String()) + description = tree.NewDString(payload.Description) + statement = tree.NewDString(strings.Join(payload.Statement, "; ")) + user = tree.NewDString(sqlUsername.Normalized()) + descriptorIDsArr := tree.NewDArray(types.Int) + for _, descID := range payload.DescriptorIDs { + if err := descriptorIDsArr.Append(tree.NewDInt(tree.DInt(int(descID)))); err != nil { + return matched, err } - if sessionID, ok := sessionIDBytes.(*tree.DBytes); ok { - if isAlive, err := p.EvalContext().SQLLivenessReader.IsAlive( - ctx, sqlliveness.SessionID(*sessionID), - ); err != nil { - // Silently swallow the error for checking for liveness. - } else if instanceID, ok := instanceID.(*tree.DInt); ok && isAlive { - coordinatorID = instanceID + } + descriptorIDs = descriptorIDsArr + started, err = tsOrNull(payload.StartedMicros) + if err != nil { + return matched, err + } + finished, err = tsOrNull(payload.FinishedMicros) + if err != nil { + return matched, err + } + errorStr = tree.NewDString(payload.Error) + } + + // Extract data from the progress field. + if progressBytes != tree.DNull { + progress, err := jobs.UnmarshalProgress(progressBytes) + if err != nil { + baseErr := "" + if s, ok := errorStr.(*tree.DString); ok { + baseErr = string(*s) + if baseErr != "" { + baseErr += "\n" } } - - sameUser := payload != nil && sqlUsername == currentUser - // The user can access the row if the meet one of the conditions: - // 1. The user is an admin. - // 2. The job is owned by the user. - // 3. The user has CONTROLJOB privilege and the job is not owned by - // an admin. - if canAccess := isAdmin || !ownedByAdmin && hasControlJob || sameUser; !canAccess { - continue + errorStr = tree.NewDString(fmt.Sprintf("%serror decoding progress: %v", baseErr, err)) + } else { + // Progress contains either fractionCompleted for traditional jobs, + // or the highWaterTimestamp for change feeds. + if highwater := progress.GetHighWater(); highwater != nil { + highWaterTimestamp = eval.TimestampToDecimalDatum(*highwater) + } else { + fractionCompleted = tree.NewDFloat(tree.DFloat(progress.GetFractionCompleted())) } - + modified, err = tsOrNull(progress.ModifiedMicros) if err != nil { - errorStr = tree.NewDString(fmt.Sprintf("error decoding payload: %v", err)) - } else { - jobType = tree.NewDString(payload.Type().String()) - description = tree.NewDString(payload.Description) - statement = tree.NewDString(strings.Join(payload.Statement, "; ")) - user = tree.NewDString(sqlUsername.Normalized()) - descriptorIDsArr := tree.NewDArray(types.Int) - for _, descID := range payload.DescriptorIDs { - if err := descriptorIDsArr.Append(tree.NewDInt(tree.DInt(int(descID)))); err != nil { - return nil, err - } - } - descriptorIDs = descriptorIDsArr - started, err = tsOrNull(payload.StartedMicros) - if err != nil { - return nil, err - } - finished, err = tsOrNull(payload.FinishedMicros) - if err != nil { - return nil, err - } - errorStr = tree.NewDString(payload.Error) + return matched, err } - // Extract data from the progress field. - if progressBytes != tree.DNull { - progress, err := jobs.UnmarshalProgress(progressBytes) - if err != nil { - baseErr := "" - if s, ok := errorStr.(*tree.DString); ok { - baseErr = string(*s) - if baseErr != "" { - baseErr += "\n" - } - } - errorStr = tree.NewDString(fmt.Sprintf("%serror decoding progress: %v", baseErr, err)) - } else { - // Progress contains either fractionCompleted for traditional jobs, - // or the highWaterTimestamp for change feeds. - if highwater := progress.GetHighWater(); highwater != nil { - highWaterTimestamp = eval.TimestampToDecimalDatum(*highwater) - } else { - fractionCompleted = tree.NewDFloat(tree.DFloat(progress.GetFractionCompleted())) - } - modified, err = tsOrNull(progress.ModifiedMicros) - if err != nil { - return nil, err - } - - if s, ok := status.(*tree.DString); ok { - if jobs.Status(*s) == jobs.StatusRunning && len(progress.RunningStatus) > 0 { - runningStatus = tree.NewDString(progress.RunningStatus) - } else if jobs.Status(*s) == jobs.StatusPaused && payload != nil && payload.PauseReason != "" { - errorStr = tree.NewDString(fmt.Sprintf("%s: %s", jobs.PauseRequestExplained, payload.PauseReason)) - } - } - traceID = tree.NewDInt(tree.DInt(progress.TraceID)) + if s, ok := status.(*tree.DString); ok { + if jobs.Status(*s) == jobs.StatusRunning && len(progress.RunningStatus) > 0 { + runningStatus = tree.NewDString(progress.RunningStatus) + } else if jobs.Status(*s) == jobs.StatusPaused && payload != nil && payload.PauseReason != "" { + errorStr = tree.NewDString(fmt.Sprintf("%s: %s", jobs.PauseRequestExplained, payload.PauseReason)) } } - if payload != nil { - executionErrors = jobs.FormatRetriableExecutionErrorLogToStringArray( - ctx, payload.RetriableExecutionFailureLog, - ) - // It's not clear why we'd ever see an error here, - var err error - executionEvents, err = jobs.FormatRetriableExecutionErrorLogToJSON( - ctx, payload.RetriableExecutionFailureLog, - ) - if err != nil { - if errorStr == tree.DNull { - errorStr = tree.NewDString(errors.Wrap(err, "failed to marshal execution error log").Error()) - } else { - executionEvents = tree.DNull - } - } + traceID = tree.NewDInt(tree.DInt(progress.TraceID)) + } + } + if payload != nil { + executionErrors = jobs.FormatRetriableExecutionErrorLogToStringArray( + ctx, payload.RetriableExecutionFailureLog, + ) + // It's not clear why we'd ever see an error here, + var err error + executionEvents, err = jobs.FormatRetriableExecutionErrorLogToJSON( + ctx, payload.RetriableExecutionFailureLog, + ) + if err != nil { + if errorStr == tree.DNull { + errorStr = tree.NewDString(errors.Wrap(err, "failed to marshal execution error log").Error()) + } else { + executionEvents = tree.DNull } - - container = container[:0] - container = append(container, - id, - jobType, - description, - statement, - user, - descriptorIDs, - status, - runningStatus, - created, - started, - finished, - modified, - fractionCompleted, - highWaterTimestamp, - errorStr, - coordinatorID, - traceID, - lastRun, - nextRun, - numRuns, - executionErrors, - executionEvents, - ) - return container, nil } - }, cleanup, nil - }, + } + + if err = addRow( + id, + jobType, + description, + statement, + user, + descriptorIDs, + status, + runningStatus, + created, + started, + finished, + modified, + fractionCompleted, + highWaterTimestamp, + errorStr, + coordinatorID, + traceID, + lastRun, + nextRun, + numRuns, + executionErrors, + executionEvents, + ); err != nil { + return matched, err + } + matched = true + } } // execStatAvg is a helper for execution stats shown in virtual tables. Returns diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements index 24c4b85eed63..270f06d88445 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_statements +++ b/pkg/sql/logictest/testdata/logic_test/create_statements @@ -802,7 +802,8 @@ CREATE TABLE crdb_internal.jobs ( next_run TIMESTAMP NULL, num_runs INT8 NULL, execution_errors STRING[] NULL, - execution_events JSONB NULL + execution_events JSONB NULL, + INDEX jobs_status_idx (status ASC) STORING (job_id, job_type, description, statement, user_name, descriptor_ids, running_status, created, started, finished, modified, fraction_completed, high_water_timestamp, error, coordinator_id, trace_id, last_run, next_run, num_runs, execution_errors, execution_events) ) CREATE TABLE crdb_internal.jobs ( job_id INT8 NULL, job_type STRING NULL, @@ -825,7 +826,8 @@ CREATE TABLE crdb_internal.jobs ( next_run TIMESTAMP NULL, num_runs INT8 NULL, execution_errors STRING[] NULL, - execution_events JSONB NULL + execution_events JSONB NULL, + INDEX jobs_status_idx (status ASC) STORING (job_id, job_type, description, statement, user_name, descriptor_ids, running_status, created, started, finished, modified, fraction_completed, high_water_timestamp, error, coordinator_id, trace_id, last_run, next_run, num_runs, execution_errors, execution_events) ) {} {} CREATE TABLE crdb_internal.kv_node_liveness ( node_id INT8 NOT NULL, diff --git a/pkg/sql/opt/exec/execbuilder/testdata/virtual b/pkg/sql/opt/exec/execbuilder/testdata/virtual index a070dce4fb3e..a3acb874cb2d 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/virtual +++ b/pkg/sql/opt/exec/execbuilder/testdata/virtual @@ -261,3 +261,16 @@ vectorized: true • virtual table table: tables@tables_parent_id_idx (partial index) spans: [/1 - /1] + +# Validate that the virtual index on 'status' works. +# Vectorized execution may be on or off during tests, so exclude it from the output. +query T +SELECT info FROM [EXPLAIN SELECT count(*) FROM crdb_internal.jobs WHERE status = 'paused'] WHERE info NOT LIKE 'vectorized%' +---- + distribution: local + · + • group (scalar) + │ + └── • virtual table + table: jobs@jobs_status_idx + spans: [/'paused' - /'paused']