Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql/schemachanger/scplan: allow plan to move to NonRevertible earlier #84430

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 29 additions & 50 deletions pkg/ccl/schemachangerccl/testdata/end_to_end/create_index
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ notified job registry to adopt jobs: [1]
begin transaction #2
commit transaction #2
begin transaction #3
## PostCommitPhase stage 1 of 7 with 3 MutationType ops
## PostCommitPhase stage 1 of 6 with 3 MutationType ops
upsert descriptor #104
...
- ABSENT
Expand Down Expand Up @@ -314,14 +314,14 @@ upsert descriptor #104
unexposedParentSchemaId: 101
- version: "2"
+ version: "3"
update progress of schema change job #1: "PostCommitPhase stage 2 of 7 with 1 BackfillType op pending"
update progress of schema change job #1: "PostCommitPhase stage 2 of 6 with 1 BackfillType op pending"
commit transaction #3
begin transaction #4
## PostCommitPhase stage 2 of 7 with 1 BackfillType op
## PostCommitPhase stage 2 of 6 with 1 BackfillType op
backfill indexes [2] from index #1 in table #104
commit transaction #4
begin transaction #5
## PostCommitPhase stage 3 of 7 with 3 MutationType ops
## PostCommitPhase stage 3 of 6 with 3 MutationType ops
upsert descriptor #104
...
- PUBLIC
Expand Down Expand Up @@ -350,10 +350,10 @@ upsert descriptor #104
unexposedParentSchemaId: 101
- version: "3"
+ version: "4"
update progress of schema change job #1: "PostCommitPhase stage 4 of 7 with 1 MutationType op pending"
update progress of schema change job #1: "PostCommitPhase stage 4 of 6 with 1 MutationType op pending"
commit transaction #5
begin transaction #6
## PostCommitPhase stage 4 of 7 with 3 MutationType ops
## PostCommitPhase stage 4 of 6 with 3 MutationType ops
upsert descriptor #104
...
- PUBLIC
Expand Down Expand Up @@ -382,29 +382,35 @@ upsert descriptor #104
unexposedParentSchemaId: 101
- version: "4"
+ version: "5"
update progress of schema change job #1: "PostCommitPhase stage 5 of 7 with 1 BackfillType op pending"
update progress of schema change job #1: "PostCommitPhase stage 5 of 6 with 1 BackfillType op pending"
commit transaction #6
begin transaction #7
## PostCommitPhase stage 5 of 7 with 1 BackfillType op
## PostCommitPhase stage 5 of 6 with 1 BackfillType op
merge temporary indexes [3] into backfilled indexes [2] in table #104
commit transaction #7
begin transaction #8
## PostCommitPhase stage 6 of 7 with 1 ValidationType op
## PostCommitPhase stage 6 of 6 with 1 ValidationType op
validate forward indexes [2] in table #104
commit transaction #8
begin transaction #9
## PostCommitPhase stage 7 of 7 with 4 MutationType ops
## PostCommitNonRevertiblePhase stage 1 of 2 with 5 MutationType ops
upsert descriptor #104
...
- PUBLIC
- PUBLIC
- - MERGE_ONLY
- - ABSENT
- PUBLIC
- - WRITE_ONLY
- PUBLIC
- PUBLIC
+ - TRANSIENT_DELETE_ONLY
- PUBLIC
- PUBLIC
+ - PUBLIC
+ - PUBLIC
- WRITE_ONLY
- PUBLIC
jobId: "1"
relevantStatements:
...
BY LIST (id) (PARTITION p1 VALUES IN (1))
statementTag: CREATE INDEX
Expand Down Expand Up @@ -448,7 +454,8 @@ upsert descriptor #104
+ version: 4
+ modificationTime: {}
mutations:
- direction: ADD
- - direction: ADD
+ - direction: DROP
index:
- constraintId: 2
- createdExplicitly: true
Expand Down Expand Up @@ -485,35 +492,6 @@ upsert descriptor #104
- index:
constraintId: 3
createdExplicitly: true
...
time: {}
unexposedParentSchemaId: 101
- version: "5"
+ version: "6"
update progress of schema change job #1: "PostCommitNonRevertiblePhase stage 1 of 2 with 1 MutationType op pending"
set schema change job #1 to non-cancellable
commit transaction #9
begin transaction #10
## PostCommitNonRevertiblePhase stage 1 of 2 with 3 MutationType ops
upsert descriptor #104
...
- PUBLIC
- PUBLIC
- - WRITE_ONLY
+ - TRANSIENT_DELETE_ONLY
- PUBLIC
- PUBLIC
...
- money
version: 4
- modificationTime:
- wallTime: "1640995200000000009"
+ modificationTime: {}
mutations:
- - direction: ADD
+ - direction: DROP
index:
constraintId: 3
...
version: 4
mutationId: 1
Expand All @@ -524,11 +502,12 @@ upsert descriptor #104
...
time: {}
unexposedParentSchemaId: 101
- version: "6"
+ version: "7"
- version: "5"
+ version: "6"
update progress of schema change job #1: "PostCommitNonRevertiblePhase stage 2 of 2 with 2 MutationType ops pending"
commit transaction #10
begin transaction #11
set schema change job #1 to non-cancellable
commit transaction #9
begin transaction #10
## PostCommitNonRevertiblePhase stage 2 of 2 with 4 MutationType ops
upsert descriptor #104
...
Expand Down Expand Up @@ -694,7 +673,7 @@ upsert descriptor #104
- money
version: 4
- modificationTime:
- wallTime: "1640995200000000010"
- wallTime: "1640995200000000009"
- mutations:
- - direction: DROP
- index:
Expand Down Expand Up @@ -737,12 +716,12 @@ upsert descriptor #104
...
time: {}
unexposedParentSchemaId: 101
- version: "7"
+ version: "8"
- version: "6"
+ version: "7"
write *eventpb.FinishSchemaChange to event log for descriptor 104
create job #2 (non-cancelable: true): "GC for "
descriptor IDs: [104]
update progress of schema change job #1: "all stages completed"
commit transaction #11
commit transaction #10
notified job registry to adopt jobs: [2]
# end PostCommitPhase
8 changes: 4 additions & 4 deletions pkg/sql/logictest/testdata/logic_test/new_schema_changer
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ EXPLAIN (DDL, VERBOSE) ALTER TABLE foo ADD COLUMN j INT
│ │ oid: 20
│ │ width: 64
│ │
│ └── • scop.AddColumnToIndex
│ └── • AddColumnToIndex
│ ColumnID: 2
│ IndexID: 1
│ Kind: 2
Expand Down Expand Up @@ -174,6 +174,7 @@ EXPLAIN (DDL, VERBOSE) ALTER TABLE foo ADD COLUMN j INT
JobID: 1
RunningStatus: all stages completed


statement ok
ALTER TABLE foo ADD COLUMN j INT

Expand Down Expand Up @@ -1969,8 +1970,8 @@ Schema change plan for DROP INDEX ‹test›.public.‹parent›@‹idx› CASCA
│ ├── RemoveAllTableComments {"TableID":266}
│ ├── MakeDroppedIndexDeleteOnly {"IndexID":2,"TableID":265}
│ ├── DrainDescriptorName {"Namespace":{"DatabaseID":104,"DescriptorID":266,"Name":"child","SchemaID":105}}
│ ├── scop.RemoveColumnFromIndex {"ColumnID":2,"IndexID":2,"TableID":265}
│ ├── scop.RemoveColumnFromIndex {"ColumnID":1,"IndexID":2,"Kind":1,"TableID":265}
│ ├── RemoveColumnFromIndex {"ColumnID":2,"IndexID":2,"TableID":265}
│ ├── RemoveColumnFromIndex {"ColumnID":1,"IndexID":2,"Kind":1,"TableID":265}
│ ├── SetJobStateOnDescriptor {"DescriptorID":265}
│ ├── SetJobStateOnDescriptor {"DescriptorID":266}
│ └── UpdateSchemaChangerJob {"IsNonCancelable":true,"RunningStatus":"PostCommitNonRev..."}
Expand All @@ -1994,7 +1995,6 @@ Schema change plan for DROP INDEX ‹test›.public.‹parent›@‹idx› CASCA
├── RemoveJobStateFromDescriptor {"DescriptorID":266}
└── UpdateSchemaChangerJob {"IsNonCancelable":true,"RunningStatus":"all stages compl..."}


statement ok
SET use_declarative_schema_changer = 'on'

Expand Down
53 changes: 42 additions & 11 deletions pkg/sql/schemachanger/scplan/internal/opgen/op_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,19 @@ func makeTargetsWithElementMap(cs scpb.CurrentState) targetsWithElementMap {
// given an element value.
type opsFunc func(element scpb.Element, md *targetsWithElementMap) []scop.Op

func makeOpsFunc(el scpb.Element, fns []interface{}) (opsFunc, error) {
func makeOpsFunc(el scpb.Element, fns []interface{}) (opsFunc, scop.Type, error) {
var opType scop.Type
var funcValues []reflect.Value
for _, fn := range fns {
if err := checkOpFunc(el, fn); err != nil {
return nil, err
typ, err := checkOpFunc(el, fn)
if err != nil {
return nil, 0, err
}
if len(funcValues) > 0 && typ != opType {
return nil, 0, errors.Errorf("conflicting operation types for %T: %s != %s",
el, opType, typ)
}
opType = typ
funcValues = append(funcValues, reflect.ValueOf(fn))
}
return func(element scpb.Element, md *targetsWithElementMap) []scop.Op {
Expand All @@ -124,31 +131,55 @@ func makeOpsFunc(el scpb.Element, fns []interface{}) (opsFunc, error) {
}
}
return ret
}, nil
}, opType, nil
}

var opType = reflect.TypeOf((*scop.Op)(nil)).Elem()
var (
opInterfaceType = reflect.TypeOf((*scop.Op)(nil)).Elem()
mutationOpInterfaceType = reflect.TypeOf((*scop.MutationOp)(nil)).Elem()
validationOpInterfaceType = reflect.TypeOf((*scop.ValidationOp)(nil)).Elem()
backfillOpInterfaceType = reflect.TypeOf((*scop.BackfillOp)(nil)).Elem()
)

func checkOpFunc(el scpb.Element, fn interface{}) error {
func checkOpFunc(el scpb.Element, fn interface{}) (opType scop.Type, _ error) {
fnV := reflect.ValueOf(fn)
fnT := fnV.Type()
if fnT.Kind() != reflect.Func {
return errors.Errorf(
return 0, errors.Errorf(
"%v is a %s, expected %s", fnT, fnT.Kind(), reflect.Func,
)
}
elType := reflect.TypeOf(el)
if !(fnT.NumIn() == 1 && fnT.In(0) == elType) &&
!(fnT.NumIn() == 2 && fnT.In(0) == elType &&
fnT.In(1) == reflect.TypeOf((*targetsWithElementMap)(nil))) {
return errors.Errorf(
return 0, errors.Errorf(
"expected %v to be a func with one argument of type %s", fnT, elType,
)
}
if fnT.NumOut() != 1 || !fnT.Out(0).Implements(opType) {
returnTypeError := func() error {
return errors.Errorf(
"expected %v to be a func with one return value of type %s", fnT, opType,
"expected %v to be a func with one return value of a "+
"pointer type which implements %s", fnT, opType,
)
}
return nil
if fnT.NumOut() != 1 {
return 0, returnTypeError()
}
out := fnT.Out(0)
if out.Kind() != reflect.Ptr || !out.Implements(opInterfaceType) {
return 0, returnTypeError()
}
switch {
case out.Implements(mutationOpInterfaceType):
opType = scop.MutationType
case out.Implements(validationOpInterfaceType):
opType = scop.ValidationType
case out.Implements(backfillOpInterfaceType):
opType = scop.BackfillType
default:
return 0, errors.AssertionFailedf("%s implemented %s but does not conform to any known type",
out, opInterfaceType)
}
return opType, nil
}
2 changes: 1 addition & 1 deletion pkg/sql/schemachanger/scplan/internal/opgen/op_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (r *registry) buildGraph(cs scpb.CurrentState) (_ *scgraph.Graph, err error
ops = e.ops(e.n.Element(), &md)
}
if err := g.AddOpEdges(
e.n.Target, e.from, e.to, e.revertible, e.minPhase, ops...,
e.n.Target, e.from, e.to, e.revertible, e.canFail, e.minPhase, ops...,
); err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ func init() {
scpb.Status_ABSENT,
equiv(scpb.Status_DROPPED),
to(scpb.Status_OFFLINE,
emit(func(this *scpb.AliasType) scop.Op {
emit(func(this *scpb.AliasType) *scop.NotImplemented {
return notImplemented(this)
}),
),
to(scpb.Status_PUBLIC,
emit(func(this *scpb.AliasType) scop.Op {
emit(func(this *scpb.AliasType) *scop.MarkDescriptorAsPublic {
return &scop.MarkDescriptorAsPublic{
DescID: this.TypeID,
}
Expand All @@ -36,7 +36,7 @@ func init() {
toAbsent(
scpb.Status_PUBLIC,
to(scpb.Status_OFFLINE,
emit(func(this *scpb.AliasType, md *targetsWithElementMap) scop.Op {
emit(func(this *scpb.AliasType, md *targetsWithElementMap) *scop.MarkDescriptorAsOffline {
return &scop.MarkDescriptorAsOffline{
DescID: this.TypeID,
Reason: statementForDropJob(this, md).Statement,
Expand All @@ -45,17 +45,17 @@ func init() {
),
to(scpb.Status_DROPPED,
revertible(false),
emit(func(this *scpb.AliasType) scop.Op {
emit(func(this *scpb.AliasType) *scop.MarkDescriptorAsDropped {
return &scop.MarkDescriptorAsDropped{
DescID: this.TypeID,
}
}),
),
to(scpb.Status_ABSENT,
emit(func(this *scpb.AliasType, md *targetsWithElementMap) scop.Op {
emit(func(this *scpb.AliasType, md *targetsWithElementMap) *scop.LogEvent {
return newLogEventOp(this, md)
}),
emit(func(this *scpb.AliasType) scop.Op {
emit(func(this *scpb.AliasType) *scop.DeleteDescriptor {
return &scop.DeleteDescriptor{
DescriptorID: this.TypeID,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func init() {
toPublic(
scpb.Status_ABSENT,
to(scpb.Status_PUBLIC,
emit(func(this *scpb.CheckConstraint) scop.Op {
emit(func(this *scpb.CheckConstraint) *scop.NotImplemented {
return notImplemented(this)
}),
),
Expand All @@ -30,13 +30,13 @@ func init() {
to(scpb.Status_ABSENT,
// TODO(postamar): remove revertibility constraint when possible
revertible(false),
emit(func(this *scpb.CheckConstraint) scop.Op {
emit(func(this *scpb.CheckConstraint) *scop.RemoveCheckConstraint {
return &scop.RemoveCheckConstraint{
TableID: this.TableID,
ConstraintID: this.ConstraintID,
}
}),
emit(func(this *scpb.CheckConstraint) scop.Op {
emit(func(this *scpb.CheckConstraint) *scop.UpdateTableBackReferencesInTypes {
if len(this.UsesTypeIDs) == 0 {
return nil
}
Expand All @@ -45,7 +45,7 @@ func init() {
BackReferencedTableID: this.TableID,
}
}),
emit(func(this *scpb.CheckConstraint) scop.Op {
emit(func(this *scpb.CheckConstraint) *scop.UpdateBackReferencesInSequences {
if len(this.UsesSequenceIDs) == 0 {
return nil
}
Expand Down
Loading