Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql/schemachanger: reorder args on Build, clone nodes, minor renaming #66837

Merged
merged 4 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
626 changes: 313 additions & 313 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ message NewSchemaChangeDetails {

// NewSchemaChangeProgress is the persisted progress for the new schema change job.
message NewSchemaChangeProgress {
repeated cockroach.sql.schemachanger.scpb.State states = 1;
repeated cockroach.sql.schemachanger.scpb.Status states = 1;
}


Expand Down
26 changes: 13 additions & 13 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2699,7 +2699,7 @@ func (ex *connExecutor) notifyStatsRefresherOfNewTables(ctx context.Context) {
// mutate descriptors prior to committing a SQL transaction.
func (ex *connExecutor) runPreCommitStages(ctx context.Context) error {
scs := &ex.extraTxnState.schemaChangerState
if len(scs.nodes) == 0 {
if len(scs.state) == 0 {
return nil
}
executor := scexec.NewExecutor(
Expand All @@ -2710,29 +2710,29 @@ func (ex *connExecutor) runPreCommitStages(ctx context.Context) error {
after, err := runNewSchemaChanger(
ctx,
scplan.PreCommitPhase,
ex.extraTxnState.schemaChangerState.nodes,
ex.extraTxnState.schemaChangerState.state,
executor,
scs.stmts,
)
if err != nil {
return err
}
scs.nodes = after
targetSlice := make([]*scpb.Target, len(scs.nodes))
states := make([]scpb.State, len(scs.nodes))
scs.state = after
targetSlice := make([]*scpb.Target, len(scs.state))
states := make([]scpb.Status, len(scs.state))
// TODO(ajwerner): It may be better in the future to have the builder be
// responsible for determining this set of descriptors. As of the time of
// writing, the descriptors to be "locked," descriptors that need schema
// change jobs, and descriptors with schema change mutations all coincide. But
// there are future schema changes to be implemented in the new schema changer
// (e.g., RENAME TABLE) for which this may no longer be true.
descIDSet := catalog.MakeDescriptorIDSet()
for i := range scs.nodes {
targetSlice[i] = scs.nodes[i].Target
states[i] = scs.nodes[i].State
for i := range scs.state {
targetSlice[i] = scs.state[i].Target
states[i] = scs.state[i].Status
// Depending on the element type either a single descriptor ID
// will exist or multiple (i.e. foreign keys).
if id := scpb.GetDescID(scs.nodes[i].Element()); id != descpb.InvalidID {
if id := scpb.GetDescID(scs.state[i].Element()); id != descpb.InvalidID {
descIDSet.Add(id)
}
}
Expand Down Expand Up @@ -2765,18 +2765,18 @@ func (ex *connExecutor) runPreCommitStages(ctx context.Context) error {
func runNewSchemaChanger(
ctx context.Context,
phase scplan.Phase,
nodes []*scpb.Node,
state scpb.State,
executor *scexec.Executor,
stmts []string,
) (after []*scpb.Node, _ error) {
sc, err := scplan.MakePlan(nodes, scplan.Params{
) (after scpb.State, _ error) {
sc, err := scplan.MakePlan(state, scplan.Params{
ExecutionPhase: phase,
// TODO(ajwerner): Populate the set of new descriptors
})
if err != nil {
return nil, err
}
after = nodes
after = state
for _, s := range sc.Stages {
if err := executor.ExecuteOps(ctx, s.Ops,
scexec.TestingKnobMetadata{
Expand Down
8 changes: 3 additions & 5 deletions pkg/sql/schema_change_plan_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (p *planner) SchemaChange(ctx context.Context, stmt tree.Statement) (planNo
Descs: p.Descriptors(),
AuthAccessor: p,
}
outputNodes, err := scbuild.Build(ctx, stmt, buildDeps, p.extendedEvalCtx.SchemaChangerState.nodes)
outputNodes, err := scbuild.Build(ctx, buildDeps, p.extendedEvalCtx.SchemaChangerState.state, stmt)
if scbuild.HasNotImplemented(err) && mode == sessiondata.UseNewSchemaChangerOn {
return nil, false, nil
}
Expand Down Expand Up @@ -114,9 +114,7 @@ type schemaChangePlanNode struct {
// plannedState contains the set of states produced by the builder combining
// the nodes that existed preceding the current statement with the output of
// the built current statement.
//
// TODO(ajwerner): Give this a better name.
plannedState []*scpb.Node
plannedState scpb.State
}

func (s *schemaChangePlanNode) startExec(params runParams) error {
Expand All @@ -131,7 +129,7 @@ func (s *schemaChangePlanNode) startExec(params runParams) error {
if err != nil {
return err
}
scs.nodes = after
scs.state = after
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/schema_changer_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
// SchemaChangerState is state associated with the new schema changer.
type SchemaChangerState struct {
mode sessiondata.NewSchemaChangerMode
nodes []*scpb.Node
state scpb.State
// stmts contains the SQL statements involved in the schema change. This is
// the bare minimum of statement information we need for testing, but in the
// future we may want sql.Statement or something.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/schemachanger/scbuild/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_library(
"//pkg/sql/sqltelemetry",
"//pkg/sql/types",
"//pkg/util/errorutil/unimplemented",
"//pkg/util/protoutil",
"//pkg/util/sequence",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
Expand Down
48 changes: 26 additions & 22 deletions pkg/sql/schemachanger/scbuild/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -61,9 +62,9 @@ type Dependencies struct {
type buildContext struct {
Dependencies

// outputNodes contains the internal state when building targets for an individual
// output contains the internal state when building targets for an individual
// statement.
outputNodes []*scpb.Node
output scpb.State
}

type notImplementedError struct {
Expand Down Expand Up @@ -110,28 +111,31 @@ func (e *ConcurrentSchemaChangeError) DescriptorID() descpb.ID {
return e.descID
}

// Build builds targets and transforms the provided schema change nodes
// accordingly, given a statement.
// Build constructs a new set state from an initial state and a statement.
func Build(
ctx context.Context, n tree.Statement, dependencies Dependencies, initialNodes []*scpb.Node,
) (outputNodes []*scpb.Node, err error) {
ctx context.Context, dependencies Dependencies, initial scpb.State, n tree.Statement,
) (built scpb.State, err error) {
buildContext := &buildContext{
Dependencies: dependencies,
outputNodes: initialNodes,
output: cloneState(initial),
}
return buildContext.build(ctx, n)
}

func cloneState(state scpb.State) scpb.State {
clone := make(scpb.State, len(state))
for i, n := range state {
clone[i] = &scpb.Node{
Target: protoutil.Clone(n.Target).(*scpb.Target),
Status: n.Status,
}
}
return clone
}

// build builds targets and transforms the provided schema change nodes
// accordingly, given a statement.
//
// TODO(ajwerner): Clarify whether the nodes will be mutated. Potentially just
// clone them defensively here. Similarly, close the statement as some schema
// changes mutate the AST. It's best if this method had a clear contract that
// it did not mutate its arguments.
func (b *buildContext) build(
ctx context.Context, n tree.Statement,
) (outputNodes []*scpb.Node, err error) {
func (b *buildContext) build(ctx context.Context, n tree.Statement) (output scpb.State, err error) {
defer func() {
if recErr := recover(); recErr != nil {
if errObj, ok := recErr.(error); ok {
Expand Down Expand Up @@ -159,7 +163,7 @@ func (b *buildContext) build(
default:
return nil, &notImplementedError{n: n}
}
return b.outputNodes, nil
return b.output, nil
}

// checkIfNodeExists checks if an existing node is already there,
Expand All @@ -169,7 +173,7 @@ func (b *buildContext) checkIfNodeExists(
) (exists bool, index int) {
// Check if any existing node matches the new node we are
// trying to add.
for idx, node := range b.outputNodes {
for idx, node := range b.output {
if scpb.EqualElements(node.Element(), elem) {
return true, idx
}
Expand All @@ -178,22 +182,22 @@ func (b *buildContext) checkIfNodeExists(
}

func (b *buildContext) addNode(dir scpb.Target_Direction, elem scpb.Element) {
var s scpb.State
var s scpb.Status
switch dir {
case scpb.Target_ADD:
s = scpb.State_ABSENT
s = scpb.Status_ABSENT
case scpb.Target_DROP:
s = scpb.State_PUBLIC
s = scpb.Status_PUBLIC
default:
panic(errors.Errorf("unknown direction %s", dir))
}

if exists, _ := b.checkIfNodeExists(dir, elem); exists {
panic(errors.Errorf("attempted to add duplicate element %s", elem))
}
b.outputNodes = append(b.outputNodes, &scpb.Node{
b.output = append(b.output, &scpb.Node{
Target: scpb.NewTarget(dir, elem),
State: s,
Status: s,
})
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/schemachanger/scbuild/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ func TestBuilderAlterTable(t *testing.T) {

stmts, err := parser.Parse(d.Input)
require.NoError(t, err)
var outputNodes []*scpb.Node
var outputNodes scpb.State
for i := range stmts {
outputNodes, err = scbuild.Build(ctx, stmts[i].AST, *deps, outputNodes)
outputNodes, err = scbuild.Build(ctx, *deps, outputNodes, stmts[i].AST)
require.NoError(t, err)
}

Expand All @@ -112,7 +112,7 @@ func TestBuilderAlterTable(t *testing.T) {
stmt := stmts[0]
alter, ok := stmt.AST.(*tree.AlterTable)
require.Truef(t, ok, "not an ALTER TABLE statement: %s", stmt.SQL)
_, err = scbuild.Build(ctx, alter, *deps, nil)
_, err = scbuild.Build(ctx, *deps, nil, alter)
require.Truef(t, scbuild.HasNotImplemented(err), "expected unimplemented, got %v", err)
return ""

Expand All @@ -138,8 +138,8 @@ func indentText(input string, tab string) string {
return result.String()
}

// marshalNodes marshals a []*scpb.Node to YAML.
func marshalNodes(t *testing.T, nodes []*scpb.Node) string {
// marshalNodes marshals a scpb.State to YAML.
func marshalNodes(t *testing.T, nodes scpb.State) string {
var sortedEntries []string
for _, node := range nodes {
var buf bytes.Buffer
Expand All @@ -152,7 +152,7 @@ func marshalNodes(t *testing.T, nodes []*scpb.Node) string {
entry.WriteString(" ")
scpb.FormatAttributes(node.Element(), &entry)
entry.WriteString("\n")
entry.WriteString(indentText(fmt.Sprintf("state: %s\n", node.State.String()), " "))
entry.WriteString(indentText(fmt.Sprintf("state: %s\n", node.Status.String()), " "))
entry.WriteString(indentText("details:\n", " "))
out, err := yaml.Marshal(target)
require.NoError(t, err)
Expand Down
16 changes: 8 additions & 8 deletions pkg/sql/schemachanger/scbuild/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (b *buildContext) validateColumnName(
}
panic(sqlerrors.NewColumnAlreadyExistsError(string(d.Name), table.GetName()))
}
for _, n := range b.outputNodes {
for _, n := range b.output {
switch t := n.Element().(type) {
case *scpb.Column:
if t.TableID != table.GetID() || t.Column.Name != string(d.Name) {
Expand Down Expand Up @@ -251,7 +251,7 @@ func (b *buildContext) findOrAddColumnFamily(
// TODO(ajwerner): Decide what to do if the only column in a family of this
// name is being dropped and then if there is or isn't a create directive.
nextFamilyID := table.GetNextFamilyID()
for _, n := range b.outputNodes {
for _, n := range b.output {
switch col := n.Element().(type) {
case *scpb.Column:
if col.TableID != table.GetID() {
Expand Down Expand Up @@ -293,7 +293,7 @@ func (b *buildContext) alterTableDropColumn(
panic(err)
}
// Check whether the column is being dropped.
for _, n := range b.outputNodes {
for _, n := range b.output {
switch col := n.Element().(type) {
case *scpb.Column:
if col.TableID != table.GetID() ||
Expand Down Expand Up @@ -423,9 +423,9 @@ func (b *buildContext) addOrUpdatePrimaryIndexTargetsForAddColumn(
) (idxID descpb.IndexID) {
// Check whether a target to add a PK already exists. If so, update its
// storing columns.
for i, n := range b.outputNodes {
for i, n := range b.output {
if t, ok := n.Element().(*scpb.PrimaryIndex); ok &&
b.outputNodes[i].Target.Direction == scpb.Target_ADD &&
b.output[i].Target.Direction == scpb.Target_ADD &&
t.TableID == table.GetID() {
t.Index.StoreColumnIDs = append(t.Index.StoreColumnIDs, colID)
t.Index.StoreColumnNames = append(t.Index.StoreColumnNames, colName)
Expand Down Expand Up @@ -475,7 +475,7 @@ func (b *buildContext) addOrUpdatePrimaryIndexTargetsForDropColumn(
) (idxID descpb.IndexID) {
// Check whether a target to add a PK already exists. If so, update its
// storing columns.
for _, n := range b.outputNodes {
for _, n := range b.output {
if t, ok := n.Element().(*scpb.PrimaryIndex); ok &&
n.Target.Direction == scpb.Target_ADD &&
t.TableID == table.GetID() {
Expand Down Expand Up @@ -542,7 +542,7 @@ func (b *buildContext) nextColumnID(table catalog.TableDescriptor) descpb.Column
nextColID := table.GetNextColumnID()
var maxColID descpb.ColumnID

for _, n := range b.outputNodes {
for _, n := range b.output {
if n.Target.Direction != scpb.Target_ADD || scpb.GetDescID(n.Element()) != table.GetID() {
continue
}
Expand All @@ -561,7 +561,7 @@ func (b *buildContext) nextColumnID(table catalog.TableDescriptor) descpb.Column
func (b *buildContext) nextIndexID(table catalog.TableDescriptor) descpb.IndexID {
nextMaxID := table.GetNextIndexID()
var maxIdxID descpb.IndexID
for _, n := range b.outputNodes {
for _, n := range b.output {
if n.Target.Direction != scpb.Target_ADD || scpb.GetDescID(n.Element()) != table.GetID() {
continue
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/schemachanger/scbuild/testdata/drop_database
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ DROP DATABASE db1 CASCADE
tableId: 57
usesSequenceIDs:
- 54
- DROP RelationDependedOnBy:{DescID: 54, DepID: 57}
- DROP RelationDependedOnBy:{DescID: 54, ReferencedDescID: 57}
state: PUBLIC
details:
dependedOn: 57
tableId: 54
- DROP RelationDependedOnBy:{DescID: 55, DepID: 56}
- DROP RelationDependedOnBy:{DescID: 55, ReferencedDescID: 56}
state: PUBLIC
details:
dependedOn: 56
Expand Down Expand Up @@ -141,12 +141,12 @@ DROP DATABASE db1 CASCADE
state: PUBLIC
details:
typeId: 63
- DROP TypeReference:{DescID: 64, DepID: 62}
- DROP TypeReference:{DescID: 64, ReferencedDescID: 62}
state: PUBLIC
details:
descriptorId: 64
typeId: 62
- DROP TypeReference:{DescID: 64, DepID: 63}
- DROP TypeReference:{DescID: 64, ReferencedDescID: 63}
state: PUBLIC
details:
descriptorId: 64
Expand Down
Loading