Skip to content

Commit

Permalink
Reroute 'ALTER VITESS_MIGRATION ... THROTTLE ...' through topo (#13511)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Aug 1, 2023
1 parent 3328588 commit dcacec7
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 53 deletions.
10 changes: 8 additions & 2 deletions go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
Expand Down Expand Up @@ -501,7 +502,7 @@ func testScheduler(t *testing.T) {
testTableSequentialTimes(t, t1uuid, t2uuid)
})

t.Run("ALTER both tables, elligible for concurrenct", func(t *testing.T) {
t.Run("ALTER both tables, elligible for concurrent", func(t *testing.T) {
// ALTER TABLE is allowed to run concurrently when no other ALTER is busy with copy state. Our tables are tiny so we expect to find both migrations running
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --allow-concurrent --postpone-completion", "vtgate", "", "", true)) // skip wait
t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy+" --allow-concurrent --postpone-completion", "vtgate", "", "", true)) // skip wait
Expand Down Expand Up @@ -536,9 +537,11 @@ func testScheduler(t *testing.T) {
})
testTableCompletionTimes(t, t2uuid, t1uuid)
})
t.Run("ALTER both tables, elligible for concurrenct, with throttling", func(t *testing.T) {
t.Run("ALTER both tables, elligible for concurrent, with throttling", func(t *testing.T) {
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)

// ALTER TABLE is allowed to run concurrently when no other ALTER is busy with copy state. Our tables are tiny so we expect to find both migrations running
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait
t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait
Expand All @@ -555,6 +558,7 @@ func testScheduler(t *testing.T) {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady)
})

t.Run("check ready to complete (before)", func(t *testing.T) {
for _, uuid := range []string{t1uuid, t2uuid} {
rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
Expand Down Expand Up @@ -607,6 +611,8 @@ func testScheduler(t *testing.T) {

testTableCompletionTimes(t, t2uuid, t1uuid)
})
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, false)

t.Run("REVERT both tables concurrent, postponed", func(t *testing.T) {
t1uuid = testRevertMigration(t, createRevertParams(t1uuid, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", true))
t2uuid = testRevertMigration(t, createRevertParams(t2uuid, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", true))
Expand Down
65 changes: 30 additions & 35 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ import (

"vitess.io/vitess/go/test/endtoend/cluster"

"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const (
ThrottledAppsTimeout = 60 * time.Second
)

// VtgateExecQuery runs a query on VTGate using given query params
func VtgateExecQuery(t *testing.T, vtParams *mysql.ConnParams, query string, expectError string) *sqltypes.Result {
t.Helper()
Expand Down Expand Up @@ -313,16 +316,35 @@ func UnthrottleAllMigrations(t *testing.T, vtParams *mysql.ConnParams) {

// CheckThrottledApps checks for existence or non-existence of an app in the throttled apps list
func CheckThrottledApps(t *testing.T, vtParams *mysql.ConnParams, throttlerApp throttlerapp.Name, expectFind bool) {
query := "show vitess_throttled_apps"
r := VtgateExecQuery(t, vtParams, query, "")

found := false
for _, row := range r.Named().Rows {
if throttlerApp.Equals(row.AsString("app", "")) {
found = true
ctx, cancel := context.WithTimeout(context.Background(), ThrottledAppsTimeout)
defer cancel()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
query := "show vitess_throttled_apps"
r := VtgateExecQuery(t, vtParams, query, "")

appFound := false
for _, row := range r.Named().Rows {
if throttlerApp.Equals(row.AsString("app", "")) {
appFound = true
}
}
if appFound == expectFind {
// we're all good
return
}

select {
case <-ctx.Done():
assert.Failf(t, "CheckThrottledApps timed out waiting for %v to be in throttled status '%v'", throttlerApp.String(), expectFind)
return
case <-ticker.C:
}
}
assert.Equal(t, expectFind, found, "check app %v in throttled apps: %v", throttlerApp, found)
}

// WaitForThrottledTimestamp waits for a migration to have a non-empty last_throttled_timestamp
Expand Down Expand Up @@ -350,33 +372,6 @@ func WaitForThrottledTimestamp(t *testing.T, vtParams *mysql.ConnParams, uuid st
return
}

// WaitForThrottlerStatusEnabled waits for a tablet to report its throttler status as enabled.
func WaitForThrottlerStatusEnabled(t *testing.T, tablet *cluster.Vttablet, timeout time.Duration) {
jsonPath := "IsEnabled"
url := fmt.Sprintf("http://localhost:%d/throttler/status", tablet.HTTPPort)

ctx, cancel := context.WithTimeout(context.Background(), throttlerConfigTimeout)
defer cancel()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
body := getHTTPBody(url)
val, err := jsonparser.GetBoolean([]byte(body), jsonPath)
require.NoError(t, err)
if val {
return
}
select {
case <-ctx.Done():
t.Error("timeout waiting for tablet's throttler status to be enabled")
return
case <-ticker.C:
}
}
}

func getHTTPBody(url string) string {
resp, err := http.Get(url)
if err != nil {
Expand Down
26 changes: 12 additions & 14 deletions go/test/endtoend/throttler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,19 +209,23 @@ func UnthrottleApp(clusterInstance *cluster.LocalProcessCluster, throttlerApp th
return throttleApp(clusterInstance, throttlerApp, false)
}

// ThrottleAppAndWaitUntilTabletsConfirm
func ThrottleAppAndWaitUntilTabletsConfirm(t *testing.T, clusterInstance *cluster.LocalProcessCluster, throttlerApp throttlerapp.Name) (string, error) {
res, err := throttleApp(clusterInstance, throttlerApp, true)
if err != nil {
return res, err
}
func WaitUntilTabletsConfirmThrottledApp(t *testing.T, clusterInstance *cluster.LocalProcessCluster, throttlerApp throttlerapp.Name, expectThrottled bool) {
for _, ks := range clusterInstance.Keyspaces {
for _, shard := range ks.Shards {
for _, tablet := range shard.Vttablets {
WaitForThrottledApp(t, tablet, throttlerApp, true, ConfigTimeout)
WaitForThrottledApp(t, tablet, throttlerApp, expectThrottled, ConfigTimeout)
}
}
}
}

// ThrottleAppAndWaitUntilTabletsConfirm
func ThrottleAppAndWaitUntilTabletsConfirm(t *testing.T, clusterInstance *cluster.LocalProcessCluster, throttlerApp throttlerapp.Name) (string, error) {
res, err := throttleApp(clusterInstance, throttlerApp, true)
if err != nil {
return res, err
}
WaitUntilTabletsConfirmThrottledApp(t, clusterInstance, throttlerApp, true)
return res, nil
}

Expand All @@ -231,13 +235,7 @@ func UnthrottleAppAndWaitUntilTabletsConfirm(t *testing.T, clusterInstance *clus
if err != nil {
return res, err
}
for _, ks := range clusterInstance.Keyspaces {
for _, shard := range ks.Shards {
for _, tablet := range shard.Vttablets {
WaitForThrottledApp(t, tablet, throttlerApp, false, ConfigTimeout)
}
}
}
WaitUntilTabletsConfirmThrottledApp(t, clusterInstance, throttlerApp, false)
return res, nil
}

Expand Down
18 changes: 18 additions & 0 deletions go/vt/proto/topodata/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions go/vt/proto/vttime/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func (t *noopVCursor) SetExec(ctx context.Context, name string, value string) er
panic("implement me")
}

func (t *noopVCursor) ThrottleApp(ctx context.Context, throttleAppRule *topodatapb.ThrottledAppRule) error {
panic("implement me")
}

func (t *noopVCursor) ShowExec(ctx context.Context, command sqlparser.ShowCommandType, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
panic("implement me")
}
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

Expand Down Expand Up @@ -110,6 +111,8 @@ type (
ShowExec(ctx context.Context, command sqlparser.ShowCommandType, filter *sqlparser.ShowFilter) (*sqltypes.Result, error)
// SetExec takes in k,v pair and use executor to set them in topo metadata.
SetExec(ctx context.Context, name string, value string) error
// ThrottleApp sets a ThrottlerappRule in topo
ThrottleApp(ctx context.Context, throttleAppRule *topodatapb.ThrottledAppRule) error

// CanUseSetVar returns true if system_settings can use SET_VAR hint.
CanUseSetVar() bool
Expand Down
89 changes: 89 additions & 0 deletions go/vt/vtgate/engine/throttle_app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright 2023 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package engine

import (
"context"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

var _ Primitive = (*ThrottleApp)(nil)

// ThrottleApp represents the instructions to perform an online schema change via vtctld
type ThrottleApp struct {
Keyspace *vindexes.Keyspace
ThrottledAppRule *topodatapb.ThrottledAppRule

noTxNeeded

noInputs
}

func (v *ThrottleApp) description() PrimitiveDescription {
return PrimitiveDescription{
OperatorType: "ThrottleApp",
Keyspace: v.Keyspace,
Other: map[string]any{
"appName": v.ThrottledAppRule.Name,
"expireAt": v.ThrottledAppRule.ExpiresAt,
"ratio": v.ThrottledAppRule.Ratio,
},
}
}

// RouteType implements the Primitive interface
func (v *ThrottleApp) RouteType() string {
return "ThrottleApp"
}

// GetKeyspaceName implements the Primitive interface
func (v *ThrottleApp) GetKeyspaceName() string {
return v.Keyspace.Name
}

// GetTableName implements the Primitive interface
func (v *ThrottleApp) GetTableName() string {
return ""
}

// TryExecute implements the Primitive interface
func (v *ThrottleApp) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (result *sqltypes.Result, err error) {
if err := vcursor.ThrottleApp(ctx, v.ThrottledAppRule); err != nil {
return nil, err
}
return &sqltypes.Result{}, nil
}

// TryStreamExecute implements the Primitive interface
func (v *ThrottleApp) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
results, err := v.TryExecute(ctx, vcursor, bindVars, wantfields)
if err != nil {
return err
}
return callback(results)
}

// GetFields implements the Primitive interface
func (v *ThrottleApp) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] GetFields is not reachable")
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func createInstructionFor(ctx context.Context, query string, stmt sqlparser.Stat
case sqlparser.DDLStatement:
return buildGeneralDDLPlan(ctx, query, stmt, reservedVars, vschema, enableOnlineDDL, enableDirectDDL)
case *sqlparser.AlterMigration:
return buildAlterMigrationPlan(query, vschema, enableOnlineDDL)
return buildAlterMigrationPlan(query, stmt, vschema, enableOnlineDDL)
case *sqlparser.RevertMigration:
return buildRevertMigrationPlan(query, stmt, vschema, enableOnlineDDL)
case *sqlparser.ShowMigrationLogs:
Expand Down
Loading

0 comments on commit dcacec7

Please sign in to comment.