From bde7d24a36a6acd256289a2b9d8a3e7bdebb49fe Mon Sep 17 00:00:00 2001 From: Matt Tracy Date: Thu, 30 Jun 2016 18:44:01 -0400 Subject: [PATCH] Log Events for Alter Table, Async Schema Change Commit adds three new event log types: + "Alter Table", logged for every ALTER TABLE DDL statement. + "Finish Schema Change", logged when an asynchronous schema change is completed. + "Reverse Schema Change", logged when an asynchronous schema change encounters an error (such as a constraint violation during backfill) and is rolled back. This Commit encountered an interesting, perhaps unintended effect with `SystemConfigTrigger`: in a "mixed" transaction which updates both system and non-system keys, the system changes will only be gossiped if the transaction record is located on the system range. In other words, "mixed" transactions will only work correctly if the system keys are modified first. Issue #7570 had been logged to track this unexpected behavior; this commit has also added a number of comments and a logged error to address this. --- internal/client/txn.go | 7 ++++ sql/alter_table.go | 16 +++++++++ sql/event_log.go | 9 +++++ sql/lease.go | 22 ++++++++++-- sql/lease_test.go | 11 +++--- sql/schema_changer.go | 36 +++++++++++++++++--- sql/schema_changer_test.go | 2 +- sql/testdata/event_log | 68 ++++++++++++++++++++++++++++++++++++++ storage/replica_command.go | 15 ++++++++- 9 files changed, 171 insertions(+), 15 deletions(-) diff --git a/internal/client/txn.go b/internal/client/txn.go index df9fe699dfea..3d52709975fb 100644 --- a/internal/client/txn.go +++ b/internal/client/txn.go @@ -187,6 +187,13 @@ func (txn *Txn) InternalSetPriority(priority int32) { // SetSystemConfigTrigger sets the system db trigger to true on this transaction. // This will impact the EndTransactionRequest. +// +// NOTE: The system db trigger will only execute correctly if the transaction +// record is located on the range that contains the system span. If a +// transaction is created which modifies both system *and* non-system data, it +// should be ensured that the transaction record itself is on the system span. +// This can be done by making sure a system key is the first key touched in the +// transaction. func (txn *Txn) SetSystemConfigTrigger() { txn.systemConfigTrigger = true } diff --git a/sql/alter_table.go b/sql/alter_table.go index f3d036ecb8eb..39cbe4401418 100644 --- a/sql/alter_table.go +++ b/sql/alter_table.go @@ -242,6 +242,22 @@ func (n *alterTableNode) Start() error { if err := n.p.writeTableDesc(n.tableDesc); err != nil { return err } + + // Record this table alteration in the event log. + if err := MakeEventLogger(n.p.leaseMgr).InsertEventRecord(n.p.txn, + EventLogAlterTable, + int32(n.tableDesc.ID), + int32(n.p.evalCtx.NodeID), + struct { + TableName string + Statement string + User string + MutationID uint32 + }{n.tableDesc.Name, n.n.String(), n.p.session.User, uint32(mutationID)}, + ); err != nil { + return err + } + n.p.notifySchemaChange(n.tableDesc.ID, mutationID) return nil diff --git a/sql/event_log.go b/sql/event_log.go index 188c3ec921bf..dd618839282d 100644 --- a/sql/event_log.go +++ b/sql/event_log.go @@ -40,6 +40,15 @@ const ( EventLogCreateTable EventLogType = "create_table" // EventLogDropTable is recorded when a table is dropped. EventLogDropTable EventLogType = "drop_table" + // EventLogAlterTable is recorded when a table is altered. + EventLogAlterTable EventLogType = "alter_table" + // EventLogReverseSchemaChange is recorded when an in-progress schema change + // encounters a problem and is reversed. + EventLogReverseSchemaChange EventLogType = "reverse_schema_change" + // EventLogFinishSchemaChange is recorded when a previously initiated schema + // change has completed. + EventLogFinishSchemaChange EventLogType = "finish_schema_change" + // EventLogNodeJoin is recorded when a node joins the cluster. EventLogNodeJoin EventLogType = "node_join" // EventLogNodeRestart is recorded when an existing node rejoins the cluster diff --git a/sql/lease.go b/sql/lease.go index ac3582704b66..654a8335decc 100644 --- a/sql/lease.go +++ b/sql/lease.go @@ -280,7 +280,9 @@ var errDidntUpdateDescriptor = errors.New("didn't update the table descriptor") // not have side effects. // Returns the updated version of the descriptor. func (s LeaseStore) Publish( - tableID sqlbase.ID, update func(*sqlbase.TableDescriptor) error, + tableID sqlbase.ID, + update func(*sqlbase.TableDescriptor) error, + logEvent func(*client.Txn) error, ) (*sqlbase.Descriptor, error) { errLeaseVersionChanged := errors.New("lease version changed") // Retry while getting errLeaseVersionChanged. @@ -334,9 +336,25 @@ func (s LeaseStore) Publish( } // Write the updated descriptor. + txn.SetSystemConfigTrigger() b := txn.NewBatch() b.Put(descKey, desc) - txn.SetSystemConfigTrigger() + if logEvent != nil { + // If an event log is required for this update, ensure that the + // descriptor change occurs first in the transaction. This is + // necessary to ensure that the System configuration change is + // gossiped. See the documentation for + // transaction.SetSystemConfigTrigger() for more information. + if err := txn.Run(b); err != nil { + return err + } + if err := logEvent(txn); err != nil { + return err + } + return txn.Commit() + } + // More efficient batching can be used if no event log message + // is required. return txn.CommitInBatch(b) }) diff --git a/sql/lease_test.go b/sql/lease_test.go index f12384ddbfc9..b00100476687 100644 --- a/sql/lease_test.go +++ b/sql/lease_test.go @@ -148,10 +148,9 @@ func (t *leaseTest) mustRelease( } func (t *leaseTest) publish(nodeID uint32, descID sqlbase.ID) error { - _, err := t.node(nodeID).Publish(descID, - func(*sqlbase.TableDescriptor) error { - return nil - }) + _, err := t.node(nodeID).Publish(descID, func(*sqlbase.TableDescriptor) error { + return nil + }, nil) return err } @@ -369,7 +368,7 @@ func TestLeaseManagerPublishVersionChanged(testingT *testing.T) { // a new version. <-n1update return nil - }) + }, nil) if err != nil { panic(err) } @@ -382,7 +381,7 @@ func TestLeaseManagerPublishVersionChanged(testingT *testing.T) { <-n2start _, err := n2.Publish(descID, func(*sqlbase.TableDescriptor) error { return nil - }) + }, nil) if err != nil { panic(err) } diff --git a/sql/schema_changer.go b/sql/schema_changer.go index 83aedf744c8b..a42f76d59f47 100644 --- a/sql/schema_changer.go +++ b/sql/schema_changer.go @@ -259,7 +259,7 @@ func (sc SchemaChanger) exec( _, err = sc.leaseMgr.Publish(sc.tableID, func(desc *sqlbase.TableDescriptor) error { desc.Renames = nil return nil - }) + }, nil) if err != nil { return err } @@ -290,7 +290,7 @@ func (sc SchemaChanger) exec( // errors that are resolved by retrying the backfill. if sqlbase.IsIntegrityConstraintError(err) { log.Warningf("reversing schema change due to irrecoverable error: %s", err) - if errReverse := sc.reverseMutations(); errReverse != nil { + if errReverse := sc.reverseMutations(err); errReverse != nil { // Although the backfill did hit an integrity constraint violation // and made a decision to reverse the mutations, // reverseMutations() failed. If exec() is called again the entire @@ -327,7 +327,7 @@ func (sc *SchemaChanger) MaybeIncrementVersion() (*sqlbase.Descriptor, error) { desc.UpVersion = false // Publish() will increment the version. return nil - }) + }, nil) } // RunStateMachineBeforeBackfill moves the state machine forward @@ -376,7 +376,7 @@ func (sc *SchemaChanger) RunStateMachineBeforeBackfill() error { return errDidntUpdateDescriptor } return nil - }); err != nil { + }, nil); err != nil { return err } // wait for the state change to propagate to all leases. @@ -427,6 +427,18 @@ func (sc *SchemaChanger) done() (*sqlbase.Descriptor, error) { // Trim the executed mutations from the descriptor. desc.Mutations = desc.Mutations[i:] return nil + }, func(txn *client.Txn) error { + // Log "Finish Schema Change" event. Only the table ID and mutation ID + // are logged; this can be correlated with the DDL statement that + // initiated the change using the mutation id. + return MakeEventLogger(sc.leaseMgr).InsertEventRecord(txn, + EventLogFinishSchemaChange, + int32(sc.tableID), + int32(sc.evalCtx.NodeID), + struct { + MutationID uint32 + }{uint32(sc.mutationID)}, + ) }) } @@ -460,7 +472,7 @@ func (sc *SchemaChanger) runStateMachineAndBackfill( // mutationID. This is called after hitting an irrecoverable error while // applying a schema change. If a column being added is reversed and dropped, // all new indexes referencing the column will also be dropped. -func (sc *SchemaChanger) reverseMutations() error { +func (sc *SchemaChanger) reverseMutations(causingError error) error { // Reverse the flow of the state machine. _, err := sc.leaseMgr.Publish(sc.tableID, func(desc *sqlbase.TableDescriptor) error { // Keep track of the column mutations being reversed so that indexes @@ -491,8 +503,22 @@ func (sc *SchemaChanger) reverseMutations() error { if len(columns) > 0 { sc.deleteIndexMutationsWithReversedColumns(desc, columns) } + // Publish() will increment the version. return nil + }, func(txn *client.Txn) error { + // Log "Reverse Schema Change" event. Only the causing error and the + // mutation ID are logged; this can be correlated with the DDL statement + // that initiated the change using the mutation id. + return MakeEventLogger(sc.leaseMgr).InsertEventRecord(txn, + EventLogReverseSchemaChange, + int32(sc.tableID), + int32(sc.evalCtx.NodeID), + struct { + Error string + MutationID uint32 + }{fmt.Sprintf("%+v", causingError), uint32(sc.mutationID)}, + ) }) return err } diff --git a/sql/schema_changer_test.go b/sql/schema_changer_test.go index 34d5990778cc..cc0692f0406b 100644 --- a/sql/schema_changer_test.go +++ b/sql/schema_changer_test.go @@ -989,7 +989,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT); // Ensure that sql is using the correct table lease. if len(cols) != len(expectedCols) { - return errors.Errorf("incorrect columns: %v", cols) + return errors.Errorf("incorrect columns: %v, expected: %v", cols, expectedCols) } if cols[0] != expectedCols[0] || cols[1] != expectedCols[1] { t.Fatalf("incorrect columns: %v", cols) diff --git a/sql/testdata/event_log b/sql/testdata/event_log index d1f2a29373bf..a386fb6ca3fe 100644 --- a/sql/testdata/event_log +++ b/sql/testdata/event_log @@ -57,6 +57,74 @@ WHERE eventType = 'create_table' ---- 0 +# Alter the table. Expect "alter_table" and "finish_schema_change" events. +################## + +query II +SELECT targetID, reportingID FROM system.eventlog +WHERE eventType = 'alter_table' +---- + +statement ok +ALTER TABLE test.a ADD val INT + +query II +SELECT targetID, reportingID FROM system.eventlog +WHERE eventType = 'alter_table' +---- +51 1 + +query II +SELECT targetID, reportingID FROM system.eventlog +WHERE eventType = 'finish_schema_change' +---- +51 0 + +query II +SELECT targetID, reportingID FROM system.eventlog +WHERE eventType = 'reverse_schema_change' +---- + +# Verify the contents of the 'Info' field of each log message using a LIKE +# statement. +################## +query II +SELECT targetID, reportingID FROM system.eventlog +WHERE eventType = 'alter_table' + AND info LIKE '%ALTER TABLE test.a%' +---- +51 1 + +# Add a UNIQUE constraint to the table in a way that will ensure the schema +# change is reversed. +################## + +statement ok +INSERT INTO test.a VALUES (1, 1), (2, 1) + +statement error pq: duplicate key value \(val\)=\(1\) violates unique constraint \"foo\" +ALTER TABLE test.a ADD CONSTRAINT foo UNIQUE(val) + +query II +SELECT targetID, reportingID FROM system.eventlog +WHERE eventType = 'alter_table' +---- +51 1 +51 1 + +query II +SELECT targetID, reportingID FROM system.eventlog +WHERE eventType = 'finish_schema_change' +---- +51 0 +51 0 + +query II +SELECT targetID, reportingID FROM system.eventlog +WHERE eventType = 'reverse_schema_change' +---- +51 0 + # Drop both tables + superfluous "IF EXISTS" ################## diff --git a/storage/replica_command.go b/storage/replica_command.go index 086ba110b601..ca5df9ba2a0a 100644 --- a/storage/replica_command.go +++ b/storage/replica_command.go @@ -789,7 +789,20 @@ func (r *Replica) runCommitTrigger(ctx context.Context, batch engine.Batch, ms * if ct.GetModifiedSpanTrigger() != nil { if ct.ModifiedSpanTrigger.SystemConfigSpan { // Check if we need to gossip the system config. - batch.Defer(r.maybeGossipSystemConfig) + // NOTE: System config gossiping can only execute correctly if + // the transaction record is located on the range that contains + // the system span. If a transaction is created which modifies + // both system *and* non-system data, it should be ensured that + // the transaction record itself is on the system span. This can + // be done by making sure a system key is the first key touched + // in the transaction. + if !r.ContainsKey(keys.SystemConfigSpan.Key) { + log.Errorc(ctx, "System configuration span was modified, but the "+ + "modification trigger is executing on a non-system range. Configuration "+ + "changes will not be gossiped.") + } else { + batch.Defer(r.maybeGossipSystemConfig) + } } } return nil