Skip to content

Commit

Permalink
Log Events for Alter Table, Async Schema Change
Browse files Browse the repository at this point in the history
Commit adds three new event log types:
+ "Alter Table", logged for every ALTER TABLE DDL statement.
+ "Finish Schema Change", logged when an asynchronous schema change is
completed.
+ "Reverse Schema Change", logged when an asynchronous schema change encounters
an error (such as a constraint violation during backfill) and is rolled back.

This Commit encountered an interesting, perhaps unintended effect with
`SystemConfigTrigger`: in a "mixed" transaction which updates both system and
non-system keys, the system changes will only be gossiped if the transaction
record is located on the system range. In other words, "mixed" transactions will
only work correctly if the system keys are modified first. Issue cockroachdb#7570 had been
logged to track this unexpected behavior; this commit has also added a number of
comments and a logged error to address this.
  • Loading branch information
Matt Tracy committed Jul 7, 2016
1 parent d3e117a commit bde7d24
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 15 deletions.
7 changes: 7 additions & 0 deletions internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ func (txn *Txn) InternalSetPriority(priority int32) {

// SetSystemConfigTrigger sets the system db trigger to true on this transaction.
// This will impact the EndTransactionRequest.
//
// NOTE: The system db trigger will only execute correctly if the transaction
// record is located on the range that contains the system span. If a
// transaction is created which modifies both system *and* non-system data, it
// should be ensured that the transaction record itself is on the system span.
// This can be done by making sure a system key is the first key touched in the
// transaction.
func (txn *Txn) SetSystemConfigTrigger() {
txn.systemConfigTrigger = true
}
Expand Down
16 changes: 16 additions & 0 deletions sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,22 @@ func (n *alterTableNode) Start() error {
if err := n.p.writeTableDesc(n.tableDesc); err != nil {
return err
}

// Record this table alteration in the event log.
if err := MakeEventLogger(n.p.leaseMgr).InsertEventRecord(n.p.txn,
EventLogAlterTable,
int32(n.tableDesc.ID),
int32(n.p.evalCtx.NodeID),
struct {
TableName string
Statement string
User string
MutationID uint32
}{n.tableDesc.Name, n.n.String(), n.p.session.User, uint32(mutationID)},
); err != nil {
return err
}

n.p.notifySchemaChange(n.tableDesc.ID, mutationID)

return nil
Expand Down
9 changes: 9 additions & 0 deletions sql/event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ const (
EventLogCreateTable EventLogType = "create_table"
// EventLogDropTable is recorded when a table is dropped.
EventLogDropTable EventLogType = "drop_table"
// EventLogAlterTable is recorded when a table is altered.
EventLogAlterTable EventLogType = "alter_table"
// EventLogReverseSchemaChange is recorded when an in-progress schema change
// encounters a problem and is reversed.
EventLogReverseSchemaChange EventLogType = "reverse_schema_change"
// EventLogFinishSchemaChange is recorded when a previously initiated schema
// change has completed.
EventLogFinishSchemaChange EventLogType = "finish_schema_change"

// EventLogNodeJoin is recorded when a node joins the cluster.
EventLogNodeJoin EventLogType = "node_join"
// EventLogNodeRestart is recorded when an existing node rejoins the cluster
Expand Down
22 changes: 20 additions & 2 deletions sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,9 @@ var errDidntUpdateDescriptor = errors.New("didn't update the table descriptor")
// not have side effects.
// Returns the updated version of the descriptor.
func (s LeaseStore) Publish(
tableID sqlbase.ID, update func(*sqlbase.TableDescriptor) error,
tableID sqlbase.ID,
update func(*sqlbase.TableDescriptor) error,
logEvent func(*client.Txn) error,
) (*sqlbase.Descriptor, error) {
errLeaseVersionChanged := errors.New("lease version changed")
// Retry while getting errLeaseVersionChanged.
Expand Down Expand Up @@ -334,9 +336,25 @@ func (s LeaseStore) Publish(
}

// Write the updated descriptor.
txn.SetSystemConfigTrigger()
b := txn.NewBatch()
b.Put(descKey, desc)
txn.SetSystemConfigTrigger()
if logEvent != nil {
// If an event log is required for this update, ensure that the
// descriptor change occurs first in the transaction. This is
// necessary to ensure that the System configuration change is
// gossiped. See the documentation for
// transaction.SetSystemConfigTrigger() for more information.
if err := txn.Run(b); err != nil {
return err
}
if err := logEvent(txn); err != nil {
return err
}
return txn.Commit()
}
// More efficient batching can be used if no event log message
// is required.
return txn.CommitInBatch(b)
})

Expand Down
11 changes: 5 additions & 6 deletions sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,9 @@ func (t *leaseTest) mustRelease(
}

func (t *leaseTest) publish(nodeID uint32, descID sqlbase.ID) error {
_, err := t.node(nodeID).Publish(descID,
func(*sqlbase.TableDescriptor) error {
return nil
})
_, err := t.node(nodeID).Publish(descID, func(*sqlbase.TableDescriptor) error {
return nil
}, nil)
return err
}

Expand Down Expand Up @@ -369,7 +368,7 @@ func TestLeaseManagerPublishVersionChanged(testingT *testing.T) {
// a new version.
<-n1update
return nil
})
}, nil)
if err != nil {
panic(err)
}
Expand All @@ -382,7 +381,7 @@ func TestLeaseManagerPublishVersionChanged(testingT *testing.T) {
<-n2start
_, err := n2.Publish(descID, func(*sqlbase.TableDescriptor) error {
return nil
})
}, nil)
if err != nil {
panic(err)
}
Expand Down
36 changes: 31 additions & 5 deletions sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (sc SchemaChanger) exec(
_, err = sc.leaseMgr.Publish(sc.tableID, func(desc *sqlbase.TableDescriptor) error {
desc.Renames = nil
return nil
})
}, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -290,7 +290,7 @@ func (sc SchemaChanger) exec(
// errors that are resolved by retrying the backfill.
if sqlbase.IsIntegrityConstraintError(err) {
log.Warningf("reversing schema change due to irrecoverable error: %s", err)
if errReverse := sc.reverseMutations(); errReverse != nil {
if errReverse := sc.reverseMutations(err); errReverse != nil {
// Although the backfill did hit an integrity constraint violation
// and made a decision to reverse the mutations,
// reverseMutations() failed. If exec() is called again the entire
Expand Down Expand Up @@ -327,7 +327,7 @@ func (sc *SchemaChanger) MaybeIncrementVersion() (*sqlbase.Descriptor, error) {
desc.UpVersion = false
// Publish() will increment the version.
return nil
})
}, nil)
}

// RunStateMachineBeforeBackfill moves the state machine forward
Expand Down Expand Up @@ -376,7 +376,7 @@ func (sc *SchemaChanger) RunStateMachineBeforeBackfill() error {
return errDidntUpdateDescriptor
}
return nil
}); err != nil {
}, nil); err != nil {
return err
}
// wait for the state change to propagate to all leases.
Expand Down Expand Up @@ -427,6 +427,18 @@ func (sc *SchemaChanger) done() (*sqlbase.Descriptor, error) {
// Trim the executed mutations from the descriptor.
desc.Mutations = desc.Mutations[i:]
return nil
}, func(txn *client.Txn) error {
// Log "Finish Schema Change" event. Only the table ID and mutation ID
// are logged; this can be correlated with the DDL statement that
// initiated the change using the mutation id.
return MakeEventLogger(sc.leaseMgr).InsertEventRecord(txn,
EventLogFinishSchemaChange,
int32(sc.tableID),
int32(sc.evalCtx.NodeID),
struct {
MutationID uint32
}{uint32(sc.mutationID)},
)
})
}

Expand Down Expand Up @@ -460,7 +472,7 @@ func (sc *SchemaChanger) runStateMachineAndBackfill(
// mutationID. This is called after hitting an irrecoverable error while
// applying a schema change. If a column being added is reversed and dropped,
// all new indexes referencing the column will also be dropped.
func (sc *SchemaChanger) reverseMutations() error {
func (sc *SchemaChanger) reverseMutations(causingError error) error {
// Reverse the flow of the state machine.
_, err := sc.leaseMgr.Publish(sc.tableID, func(desc *sqlbase.TableDescriptor) error {
// Keep track of the column mutations being reversed so that indexes
Expand Down Expand Up @@ -491,8 +503,22 @@ func (sc *SchemaChanger) reverseMutations() error {
if len(columns) > 0 {
sc.deleteIndexMutationsWithReversedColumns(desc, columns)
}

// Publish() will increment the version.
return nil
}, func(txn *client.Txn) error {
// Log "Reverse Schema Change" event. Only the causing error and the
// mutation ID are logged; this can be correlated with the DDL statement
// that initiated the change using the mutation id.
return MakeEventLogger(sc.leaseMgr).InsertEventRecord(txn,
EventLogReverseSchemaChange,
int32(sc.tableID),
int32(sc.evalCtx.NodeID),
struct {
Error string
MutationID uint32
}{fmt.Sprintf("%+v", causingError), uint32(sc.mutationID)},
)
})
return err
}
Expand Down
2 changes: 1 addition & 1 deletion sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT);

// Ensure that sql is using the correct table lease.
if len(cols) != len(expectedCols) {
return errors.Errorf("incorrect columns: %v", cols)
return errors.Errorf("incorrect columns: %v, expected: %v", cols, expectedCols)
}
if cols[0] != expectedCols[0] || cols[1] != expectedCols[1] {
t.Fatalf("incorrect columns: %v", cols)
Expand Down
68 changes: 68 additions & 0 deletions sql/testdata/event_log
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,74 @@ WHERE eventType = 'create_table'
----
0

# Alter the table. Expect "alter_table" and "finish_schema_change" events.
##################

query II
SELECT targetID, reportingID FROM system.eventlog
WHERE eventType = 'alter_table'
----

statement ok
ALTER TABLE test.a ADD val INT

query II
SELECT targetID, reportingID FROM system.eventlog
WHERE eventType = 'alter_table'
----
51 1

query II
SELECT targetID, reportingID FROM system.eventlog
WHERE eventType = 'finish_schema_change'
----
51 0

query II
SELECT targetID, reportingID FROM system.eventlog
WHERE eventType = 'reverse_schema_change'
----

# Verify the contents of the 'Info' field of each log message using a LIKE
# statement.
##################
query II
SELECT targetID, reportingID FROM system.eventlog
WHERE eventType = 'alter_table'
AND info LIKE '%ALTER TABLE test.a%'
----
51 1

# Add a UNIQUE constraint to the table in a way that will ensure the schema
# change is reversed.
##################

statement ok
INSERT INTO test.a VALUES (1, 1), (2, 1)

statement error pq: duplicate key value \(val\)=\(1\) violates unique constraint \"foo\"
ALTER TABLE test.a ADD CONSTRAINT foo UNIQUE(val)

query II
SELECT targetID, reportingID FROM system.eventlog
WHERE eventType = 'alter_table'
----
51 1
51 1

query II
SELECT targetID, reportingID FROM system.eventlog
WHERE eventType = 'finish_schema_change'
----
51 0
51 0

query II
SELECT targetID, reportingID FROM system.eventlog
WHERE eventType = 'reverse_schema_change'
----
51 0


# Drop both tables + superfluous "IF EXISTS"
##################
Expand Down
15 changes: 14 additions & 1 deletion storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,20 @@ func (r *Replica) runCommitTrigger(ctx context.Context, batch engine.Batch, ms *
if ct.GetModifiedSpanTrigger() != nil {
if ct.ModifiedSpanTrigger.SystemConfigSpan {
// Check if we need to gossip the system config.
batch.Defer(r.maybeGossipSystemConfig)
// NOTE: System config gossiping can only execute correctly if
// the transaction record is located on the range that contains
// the system span. If a transaction is created which modifies
// both system *and* non-system data, it should be ensured that
// the transaction record itself is on the system span. This can
// be done by making sure a system key is the first key touched
// in the transaction.
if !r.ContainsKey(keys.SystemConfigSpan.Key) {
log.Errorc(ctx, "System configuration span was modified, but the "+
"modification trigger is executing on a non-system range. Configuration "+
"changes will not be gossiped.")
} else {
batch.Defer(r.maybeGossipSystemConfig)
}
}
}
return nil
Expand Down

0 comments on commit bde7d24

Please sign in to comment.