Skip to content

Commit

Permalink
tenantcostserver: use the tenant_usage system table
Browse files Browse the repository at this point in the history
This change implements most of the interaction with the tenant_usage
system table, with the exception of dead instances detection and
clean-up.

We currently tolerate an empty table, but it would be cleaner to
initialize the tenant state (instance_id=0 row) at tenant creation
time (+ implement a migration). I will explore this in a future
change, when we add some configurable defaults for the refill rate
etc.

Release note: None
  • Loading branch information
RaduBerinde committed Jul 27, 2021
1 parent fb79801 commit f4f0be3
Show file tree
Hide file tree
Showing 11 changed files with 1,489 additions and 651 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func (c *tenantSideCostController) mainLoop(ctx context.Context, stopper *stop.S
select {
case <-ticker.C:
req := roachpb.TokenBucketRequest{
// TODO(radu): populate instance ID.
InstanceID: 1,
ConsumptionSinceLastRequest: roachpb.TokenBucketRequest_Consumption{
// Report a dummy 1 RU consumption each time.
RU: 1,
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"configure.go",
"metrics.go",
"server.go",
"system_table.go",
"token_bucket.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/multitenantccl/tenantcostserver",
Expand All @@ -21,6 +22,7 @@ go_library(
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/util",
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/syncutil",
Expand All @@ -39,6 +41,8 @@ go_test(
deps = [
":tenantcostserver",
"//pkg/base",
"//pkg/kv",
"//pkg/multitenant",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/multitenantccl/tenantcostserver/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ func (s *instance) ReconfigureTokenBucket(
if active := *row[0].(*tree.DBool); !active {
return errors.Errorf("tenant %q is not active", tenantID)
}
state, err := readTenantUsageState(ctx, s.executor, txn, tenantID)
h := makeSysTableHelper(ctx, s.executor, txn, tenantID)
state, err := h.readTenantState()
if err != nil {
return err
}
state.Seq++
state.Bucket.Reconfigure(
availableRU, refillRate, maxBurstRU, asOf, asOfConsumedRequestUnits,
timeutil.Now(), state.Consumption.RU,
)
if err := updateTenantUsageState(ctx, s.executor, txn, tenantID, state); err != nil {
if err := h.updateTenantState(state); err != nil {
return err
}
return nil
Expand Down
215 changes: 147 additions & 68 deletions pkg/ccl/multitenantccl/tenantcostserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ package tenantcostserver_test

import (
"context"
gosql "database/sql"
"fmt"
"regexp"
"strconv"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/multitenantccl/tenantcostserver"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand All @@ -36,77 +40,152 @@ func TestDataDriven(t *testing.T) {
datadriven.Walk(t, "testdata", func(t *testing.T, path string) {
defer leaktest.AfterTest(t)()

// Set up a server that we use only for the system tables.
ctx := context.Background()
s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)
r := sqlutils.MakeSQLRunner(db)

tenantUsage := server.NewTenantUsageServer(kvDB, s.InternalExecutor().(*sql.InternalExecutor))
metricsReg := metric.NewRegistry()
metricsReg.AddMetricStruct(tenantUsage.Metrics())
var ts testState
ts.start(t)
defer ts.stop()

datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "create-tenant":
if len(d.CmdArgs) != 1 {
d.Fatalf(t, "expected tenant number")
}
r.Exec(t, fmt.Sprintf("SELECT crdb_internal.create_tenant(%s)", d.CmdArgs[0].Key))
return ""

case "token-bucket-request":
if len(d.CmdArgs) != 1 {
d.Fatalf(t, "expected tenant number")
}
tenantID, err := strconv.Atoi(d.CmdArgs[0].Key)
if err != nil {
d.Fatalf(t, "%v", err)
}
var args struct {
Consumption struct {
RU float64 `yaml:"ru"`
ReadReq uint64 `yaml:"read_req"`
ReadBytes uint64 `yaml:"read_bytes"`
WriteReq uint64 `yaml:"write_req"`
WriteBytes uint64 `yaml:"write_bytes"`
SQLPodsCPUUsage float64 `yaml:"sql_pods_cpu_usage"`
}
}
if err := yaml.UnmarshalStrict([]byte(d.Input), &args); err != nil {
d.Fatalf(t, "failed to parse request yaml: %v", err)
}
req := roachpb.TokenBucketRequest{
ConsumptionSinceLastRequest: roachpb.TokenBucketRequest_Consumption{
RU: args.Consumption.RU,
ReadRequests: args.Consumption.ReadReq,
ReadBytes: args.Consumption.ReadBytes,
WriteRequests: args.Consumption.WriteReq,
WriteBytes: args.Consumption.WriteBytes,
SQLPodCPUSeconds: args.Consumption.SQLPodsCPUUsage,
},
}
_, err = tenantUsage.TokenBucketRequest(ctx, roachpb.MakeTenantID(uint64(tenantID)), &req)
if err != nil {
return fmt.Sprintf("error: %v", err)
}
return ""

case "metrics":
re, err := regexp.Compile(d.Input)
if err != nil {
d.Fatalf(t, "failed to compile pattern: %v", err)
}
str, err := metrictestutils.GetMetricsText(metricsReg, re)
if err != nil {
d.Fatalf(t, "failed to scrape metrics: %v", err)
}
return str

default:
d.Fatalf(t, "unknown command %q", d.Cmd)
return ""
fn, ok := testStateCommands[d.Cmd]
if !ok {
d.Fatalf(t, "unknown command %s", d.Cmd)
}
return fn(&ts, t, d)
})
})
}

type testState struct {
s serverutils.TestServerInterface
db *gosql.DB
kvDB *kv.DB
r *sqlutils.SQLRunner
tenantUsage multitenant.TenantUsageServer
metricsReg *metric.Registry
}

func (ts *testState) start(t *testing.T) {
// Set up a server that we use only for the system tables.
ts.s, ts.db, ts.kvDB = serverutils.StartServer(t, base.TestServerArgs{})
ts.r = sqlutils.MakeSQLRunner(ts.db)

ts.tenantUsage = server.NewTenantUsageServer(ts.kvDB, ts.s.InternalExecutor().(*sql.InternalExecutor))
ts.metricsReg = metric.NewRegistry()
ts.metricsReg.AddMetricStruct(ts.tenantUsage.Metrics())
}

func (ts *testState) stop() {
ts.s.Stopper().Stop(context.Background())
}

var testStateCommands = map[string]func(*testState, *testing.T, *datadriven.TestData) string{
"create-tenant": (*testState).createTenant,
"token-bucket-request": (*testState).tokenBucketRequest,
"metrics": (*testState).metrics,
"configure": (*testState).configure,
"inspect": (*testState).inspect,
}

func (ts *testState) tenantID(t *testing.T, d *datadriven.TestData) uint64 {
for _, arg := range d.CmdArgs {
if arg.Key == "tenant" {
if len(arg.Vals) != 1 {
d.Fatalf(t, "tenant argument requires a value")
}
id, err := strconv.Atoi(arg.Vals[0])
if err != nil || id < 1 {
d.Fatalf(t, "invalid tenant argument")
}
return uint64(id)
}
}
d.Fatalf(t, "command requires tenant=<id> argument")
return 0
}

func (ts *testState) createTenant(t *testing.T, d *datadriven.TestData) string {
ts.r.Exec(t, fmt.Sprintf("SELECT crdb_internal.create_tenant(%d)", ts.tenantID(t, d)))
return ""
}

func (ts *testState) tokenBucketRequest(t *testing.T, d *datadriven.TestData) string {
tenantID := ts.tenantID(t, d)
var args struct {
InstanceID uint32 `yaml:"instance_id"`
Consumption struct {
RU float64 `yaml:"ru"`
ReadReq uint64 `yaml:"read_req"`
ReadBytes uint64 `yaml:"read_bytes"`
WriteReq uint64 `yaml:"write_req"`
WriteBytes uint64 `yaml:"write_bytes"`
SQLPodsCPUUsage float64 `yaml:"sql_pods_cpu_usage"`
}
}
if err := yaml.UnmarshalStrict([]byte(d.Input), &args); err != nil {
d.Fatalf(t, "failed to parse request yaml: %v", err)
}
req := roachpb.TokenBucketRequest{
InstanceID: args.InstanceID,
ConsumptionSinceLastRequest: roachpb.TokenBucketRequest_Consumption{
RU: args.Consumption.RU,
ReadRequests: args.Consumption.ReadReq,
ReadBytes: args.Consumption.ReadBytes,
WriteRequests: args.Consumption.WriteReq,
WriteBytes: args.Consumption.WriteBytes,
SQLPodCPUSeconds: args.Consumption.SQLPodsCPUUsage,
},
}
if _, err := ts.tenantUsage.TokenBucketRequest(
context.Background(), roachpb.MakeTenantID(tenantID), &req,
); err != nil {
return fmt.Sprintf("error: %v", err)
}
return ""
}

func (ts *testState) metrics(t *testing.T, d *datadriven.TestData) string {
re, err := regexp.Compile(d.Input)
if err != nil {
d.Fatalf(t, "failed to compile pattern: %v", err)
}
str, err := metrictestutils.GetMetricsText(ts.metricsReg, re)
if err != nil {
d.Fatalf(t, "failed to scrape metrics: %v", err)
}
return str
}

func (ts *testState) configure(t *testing.T, d *datadriven.TestData) string {
tenantID := ts.tenantID(t, d)
var args struct {
AvailableRU float64 `yaml:"available_ru"`
RefillRate float64 `yaml:"refill_rate"`
MaxBurstRU float64 `yaml:"max_burst_ru"`
// TODO(radu): Add AsOf/AsOfConsumedRU.
}
if err := yaml.UnmarshalStrict([]byte(d.Input), &args); err != nil {
d.Fatalf(t, "failed to parse request yaml: %v", err)
}
ts.r.Exec(
t,
`SELECT crdb_internal.update_tenant_resource_limits($1, $2, $3, $4, now(), 0)`,
tenantID,
args.AvailableRU,
args.RefillRate,
args.MaxBurstRU,
)
return ""
}

func (ts *testState) inspect(t *testing.T, d *datadriven.TestData) string {
tenantID := ts.tenantID(t, d)
res, err := tenantcostserver.InspectTenantMetadata(
context.Background(),
ts.s.InternalExecutor().(*sql.InternalExecutor),
nil, /* txn */
roachpb.MakeTenantID(tenantID),
)
if err != nil {
d.Fatalf(t, "error inspecting tenant state: %v", err)
}
return res
}
Loading

0 comments on commit f4f0be3

Please sign in to comment.