Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tenantcostserver: use the tenant_usage system table #68115

Merged
merged 3 commits into from
Jul 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,4 @@ trace.datadog.project string CockroachDB the project under which traces will be
trace.debug.enable boolean false if set, traces for recent requests can be seen at https://<ui>/debug/requests
trace.lightstep.token string if set, traces go to Lightstep using this token
trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.
version version 21.1-118 set the active cluster version in the format '<major>.<minor>'
version version 21.1-120 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-118</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-120</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/system_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ var systemTableBackupConfiguration = map[string]systemBackupConfiguration{
systemschema.DatabaseRoleSettingsTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
},
systemschema.TenantUsageTable.GetName(): {
shouldIncludeInClusterBackup: optOutOfClusterBackup,
},
}

// GetSystemTablesToIncludeInClusterBackup returns a set of system table names that
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func (c *tenantSideCostController) mainLoop(ctx context.Context, stopper *stop.S
case <-ticker.C:
req := roachpb.TokenBucketRequest{
TenantID: c.tenantID.ToUint64(),
// TODO(radu): populate instance ID.
InstanceID: 1,
ConsumptionSinceLastRequest: roachpb.TokenBucketRequest_Consumption{
// Report a dummy 1 RU consumption each time.
RU: 1,
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ go_library(
"configure.go",
"metrics.go",
"server.go",
"system_table.go",
"token_bucket.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/multitenantccl/tenantcostserver",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/ccl/multitenantccl/tenantcostserver/tenanttokenbucket",
"//pkg/kv",
"//pkg/multitenant",
Expand All @@ -21,6 +23,7 @@ go_library(
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/util",
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/syncutil",
Expand All @@ -39,6 +42,8 @@ go_test(
deps = [
":tenantcostserver",
"//pkg/base",
"//pkg/kv",
"//pkg/multitenant",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/multitenantccl/tenantcostserver/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ func (s *instance) ReconfigureTokenBucket(
if active := *row[0].(*tree.DBool); !active {
return errors.Errorf("tenant %q is not active", tenantID)
}
state, err := readTenantUsageState(ctx, s.executor, txn, tenantID)
h := makeSysTableHelper(ctx, s.executor, txn, tenantID)
state, err := h.readTenantState()
if err != nil {
return err
}
state.Seq++
state.Bucket.Reconfigure(
availableRU, refillRate, maxBurstRU, asOf, asOfConsumedRequestUnits,
timeutil.Now(), state.Consumption.RU,
)
if err := updateTenantUsageState(ctx, s.executor, txn, tenantID, state); err != nil {
if err := h.updateTenantState(state); err != nil {
return err
}
return nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ type tenantMetrics struct {
totalWriteRequests *aggmetric.Gauge
totalWriteBytes *aggmetric.Gauge
totalSQLPodsCPUSeconds *aggmetric.GaugeFloat64

// Mutex is used to atomically update metrics together with a corresponding
// change to the system table.
mutex *syncutil.Mutex
}

// getTenantMetrics returns the metrics for a tenant.
Expand All @@ -122,6 +126,7 @@ func (m *Metrics) getTenantMetrics(tenantID roachpb.TenantID) tenantMetrics {
totalWriteRequests: m.TotalWriteRequests.AddChild(tid),
totalWriteBytes: m.TotalWriteBytes.AddChild(tid),
totalSQLPodsCPUSeconds: m.TotalSQLPodsCPUSeconds.AddChild(tid),
mutex: &syncutil.Mutex{},
}
m.mu.tenantMetrics[tenantID] = tm
}
Expand Down
218 changes: 149 additions & 69 deletions pkg/ccl/multitenantccl/tenantcostserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ package tenantcostserver_test

import (
"context"
gosql "database/sql"
"fmt"
"regexp"
"strconv"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/multitenantccl/tenantcostserver"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand All @@ -37,78 +41,154 @@ func TestDataDriven(t *testing.T) {
datadriven.Walk(t, "testdata", func(t *testing.T, path string) {
defer leaktest.AfterTest(t)()

// Set up a server that we use only for the system tables.
ctx := context.Background()
s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)
r := sqlutils.MakeSQLRunner(db)

tenantUsage := server.NewTenantUsageServer(kvDB, s.InternalExecutor().(*sql.InternalExecutor))
metricsReg := metric.NewRegistry()
metricsReg.AddMetricStruct(tenantUsage.Metrics())
var ts testState
ts.start(t)
defer ts.stop()

datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "create-tenant":
if len(d.CmdArgs) != 1 {
d.Fatalf(t, "expected tenant number")
}
r.Exec(t, fmt.Sprintf("SELECT crdb_internal.create_tenant(%s)", d.CmdArgs[0].Key))
return ""

case "token-bucket-request":
if len(d.CmdArgs) != 1 {
d.Fatalf(t, "expected tenant number")
}
tenantID, err := strconv.Atoi(d.CmdArgs[0].Key)
if err != nil {
d.Fatalf(t, "%v", err)
}
var args struct {
Consumption struct {
RU float64 `yaml:"ru"`
ReadReq uint64 `yaml:"read_req"`
ReadBytes uint64 `yaml:"read_bytes"`
WriteReq uint64 `yaml:"write_req"`
WriteBytes uint64 `yaml:"write_bytes"`
SQLPodsCPUUsage float64 `yaml:"sql_pods_cpu_usage"`
}
}
if err := yaml.UnmarshalStrict([]byte(d.Input), &args); err != nil {
d.Fatalf(t, "failed to parse request yaml: %v", err)
}
req := roachpb.TokenBucketRequest{
TenantID: uint64(tenantID),
ConsumptionSinceLastRequest: roachpb.TokenBucketRequest_Consumption{
RU: args.Consumption.RU,
ReadRequests: args.Consumption.ReadReq,
ReadBytes: args.Consumption.ReadBytes,
WriteRequests: args.Consumption.WriteReq,
WriteBytes: args.Consumption.WriteBytes,
SQLPodCPUSeconds: args.Consumption.SQLPodsCPUUsage,
},
}
res := tenantUsage.TokenBucketRequest(ctx, roachpb.MakeTenantID(uint64(tenantID)), &req)
if res.Error != (errors.EncodedError{}) {
return fmt.Sprintf("error: %v", errors.DecodeError(context.Background(), res.Error))
}
return ""

case "metrics":
re, err := regexp.Compile(d.Input)
if err != nil {
d.Fatalf(t, "failed to compile pattern: %v", err)
}
str, err := metrictestutils.GetMetricsText(metricsReg, re)
if err != nil {
d.Fatalf(t, "failed to scrape metrics: %v", err)
}
return str

default:
d.Fatalf(t, "unknown command %q", d.Cmd)
return ""
fn, ok := testStateCommands[d.Cmd]
if !ok {
d.Fatalf(t, "unknown command %s", d.Cmd)
}
return fn(&ts, t, d)
})
})
}

type testState struct {
s serverutils.TestServerInterface
db *gosql.DB
kvDB *kv.DB
r *sqlutils.SQLRunner
tenantUsage multitenant.TenantUsageServer
metricsReg *metric.Registry
}

func (ts *testState) start(t *testing.T) {
// Set up a server that we use only for the system tables.
ts.s, ts.db, ts.kvDB = serverutils.StartServer(t, base.TestServerArgs{})
ts.r = sqlutils.MakeSQLRunner(ts.db)

ts.tenantUsage = server.NewTenantUsageServer(ts.kvDB, ts.s.InternalExecutor().(*sql.InternalExecutor))
ts.metricsReg = metric.NewRegistry()
ts.metricsReg.AddMetricStruct(ts.tenantUsage.Metrics())
}

func (ts *testState) stop() {
ts.s.Stopper().Stop(context.Background())
}

var testStateCommands = map[string]func(*testState, *testing.T, *datadriven.TestData) string{
"create-tenant": (*testState).createTenant,
"token-bucket-request": (*testState).tokenBucketRequest,
"metrics": (*testState).metrics,
"configure": (*testState).configure,
"inspect": (*testState).inspect,
}

func (ts *testState) tenantID(t *testing.T, d *datadriven.TestData) uint64 {
for _, arg := range d.CmdArgs {
if arg.Key == "tenant" {
if len(arg.Vals) != 1 {
d.Fatalf(t, "tenant argument requires a value")
}
id, err := strconv.Atoi(arg.Vals[0])
if err != nil || id < 1 {
d.Fatalf(t, "invalid tenant argument")
}
return uint64(id)
}
}
d.Fatalf(t, "command requires tenant=<id> argument")
return 0
}

func (ts *testState) createTenant(t *testing.T, d *datadriven.TestData) string {
ts.r.Exec(t, fmt.Sprintf("SELECT crdb_internal.create_tenant(%d)", ts.tenantID(t, d)))
return ""
}

func (ts *testState) tokenBucketRequest(t *testing.T, d *datadriven.TestData) string {
tenantID := ts.tenantID(t, d)
var args struct {
InstanceID uint32 `yaml:"instance_id"`
Consumption struct {
RU float64 `yaml:"ru"`
ReadReq uint64 `yaml:"read_req"`
ReadBytes uint64 `yaml:"read_bytes"`
WriteReq uint64 `yaml:"write_req"`
WriteBytes uint64 `yaml:"write_bytes"`
SQLPodsCPUUsage float64 `yaml:"sql_pods_cpu_usage"`
}
}
if err := yaml.UnmarshalStrict([]byte(d.Input), &args); err != nil {
d.Fatalf(t, "failed to parse request yaml: %v", err)
}
req := roachpb.TokenBucketRequest{
TenantID: uint64(tenantID),
InstanceID: args.InstanceID,
ConsumptionSinceLastRequest: roachpb.TokenBucketRequest_Consumption{
RU: args.Consumption.RU,
ReadRequests: args.Consumption.ReadReq,
ReadBytes: args.Consumption.ReadBytes,
WriteRequests: args.Consumption.WriteReq,
WriteBytes: args.Consumption.WriteBytes,
SQLPodCPUSeconds: args.Consumption.SQLPodsCPUUsage,
},
}
res := ts.tenantUsage.TokenBucketRequest(
context.Background(), roachpb.MakeTenantID(tenantID), &req,
)
if res.Error != (errors.EncodedError{}) {
return fmt.Sprintf("error: %v", errors.DecodeError(context.Background(), res.Error))
}
return ""
}

func (ts *testState) metrics(t *testing.T, d *datadriven.TestData) string {
re, err := regexp.Compile(d.Input)
if err != nil {
d.Fatalf(t, "failed to compile pattern: %v", err)
}
str, err := metrictestutils.GetMetricsText(ts.metricsReg, re)
if err != nil {
d.Fatalf(t, "failed to scrape metrics: %v", err)
}
return str
}

func (ts *testState) configure(t *testing.T, d *datadriven.TestData) string {
tenantID := ts.tenantID(t, d)
var args struct {
AvailableRU float64 `yaml:"available_ru"`
RefillRate float64 `yaml:"refill_rate"`
MaxBurstRU float64 `yaml:"max_burst_ru"`
// TODO(radu): Add AsOf/AsOfConsumedRU.
}
if err := yaml.UnmarshalStrict([]byte(d.Input), &args); err != nil {
d.Fatalf(t, "failed to parse request yaml: %v", err)
}
ts.r.Exec(
t,
`SELECT crdb_internal.update_tenant_resource_limits($1, $2, $3, $4, now(), 0)`,
tenantID,
args.AvailableRU,
args.RefillRate,
args.MaxBurstRU,
)
return ""
}

func (ts *testState) inspect(t *testing.T, d *datadriven.TestData) string {
tenantID := ts.tenantID(t, d)
res, err := tenantcostserver.InspectTenantMetadata(
context.Background(),
ts.s.InternalExecutor().(*sql.InternalExecutor),
nil, /* txn */
roachpb.MakeTenantID(tenantID),
)
if err != nil {
d.Fatalf(t, "error inspecting tenant state: %v", err)
}
return res
}
Loading