Skip to content

Commit

Permalink
server: add TokenBucket connector API
Browse files Browse the repository at this point in the history
This change adds the TokenBucket API proposed in the RFC (cockroachdb#66436), a
stub implementation and client for it, and the corresponding KV
connector interface.

The client and server-side code lives in
ccl/multitenantccl/tenantcostclient and tenantcostserver.

Release note: None
  • Loading branch information
RaduBerinde committed Jul 21, 2021
1 parent 639cb02 commit 304a9e6
Show file tree
Hide file tree
Showing 34 changed files with 1,763 additions and 546 deletions.
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
/pkg/ccl/kvccl/ @cockroachdb/kv-noreview
/pkg/ccl/logictestccl/ @cockroachdb/sql-queries-noreview
/pkg/ccl/multiregionccl/ @cockroachdb/sql-queries-noreview
/pkg/ccl/multitenantccl/ @cockroachdb/unowned
/pkg/ccl/oidcccl/ @cockroachdb/sql-queries
/pkg/ccl/partitionccl/ @cockroachdb/sql-schema
/pkg/ccl/serverccl/ @cockroachdb/server-prs
Expand Down Expand Up @@ -175,6 +176,7 @@
/pkg/jobs/ @cockroachdb/sql-schema
/pkg/keys/ @cockroachdb/kv
/pkg/migration/ @cockroachdb/kv
/pkg/multitenant @cockroachdb/unowned
/pkg/release/ @cockroachdb/dev-inf
/pkg/roachpb/ @cockroachdb/kv
/pkg/rpc/ @cockroachdb/server-prs
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//pkg/ccl/importccl",
"//pkg/ccl/kvccl",
"//pkg/ccl/multiregionccl",
"//pkg/ccl/multitenantccl",
"//pkg/ccl/oidcccl",
"//pkg/ccl/partitionccl",
"//pkg/ccl/storageccl",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/ccl_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/importccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/multitenantccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/oidcccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
Expand Down
44 changes: 38 additions & 6 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,17 @@ func init() {
}

// Connector mediates the communication of cluster-wide state to sandboxed
// SQL-only tenant processes through a restricted interface. A Connector is
// seeded with a set of one or more network addresses that reference existing KV
// nodes in the cluster (or a load-balancer which fans out to some/all KV
// nodes). On startup, it establishes contact with one of these nodes to learn
// about the topology of the cluster and bootstrap the rest of SQL <-> KV
// network communication.
// SQL-only tenant processes through a restricted interface.
//
// A Connector is instantiated inside a tenant's SQL process and is seeded with
// a set of one or more network addresses that reference existing KV nodes in
// the host cluster (or a load-balancer which fans out to some/all KV nodes). On
// startup, it establishes contact with one of these nodes to learn about the
// topology of the cluster and bootstrap the rest of SQL <-> KV network
// communication.
//
// The Connector communicates with the host cluster through the roachpb.Internal
// API.
//
// See below for the Connector's roles.
type Connector struct {
Expand Down Expand Up @@ -378,6 +383,33 @@ func (c *Connector) FirstRange() (*roachpb.RangeDescriptor, error) {
return nil, status.Error(codes.Unauthenticated, "kvtenant.Proxy does not have access to FirstRange")
}

// TokenBucket implements the kvtenant.TokenBucketProvider interface.
func (c *Connector) TokenBucket(
ctx context.Context, in *roachpb.TokenBucketRequest,
) (*roachpb.TokenBucketResponse, error) {
// Proxy token bucket requests through the Internal service.
ctx = c.AnnotateCtx(ctx)
for ctx.Err() == nil {
client, err := c.getClient(ctx)
if err != nil {
continue
}
resp, err := client.TokenBucket(ctx, in)
if err != nil {
log.Warningf(ctx, "error issuing TokenBucket RPC: %v", err)
if grpcutil.IsAuthError(err) {
// Authentication or authorization error. Propagate.
return nil, err
}
// Soft RPC error. Drop client and retry.
c.tryForgetClient(ctx, client)
continue
}
return resp, nil
}
return nil, ctx.Err()
}

// getClient returns the singleton InternalClient if one is currently active. If
// not, the method attempts to dial one of the configured addresses. The method
// blocks until either a connection is successfully established or the provided
Expand Down
8 changes: 7 additions & 1 deletion pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,18 @@ func (*mockServer) RangeFeed(*roachpb.RangeFeedRequest, roachpb.Internal_RangeFe
panic("unimplemented")
}

func (m *mockServer) Join(
func (*mockServer) Join(
context.Context, *roachpb.JoinNodeRequest,
) (*roachpb.JoinNodeResponse, error) {
panic("unimplemented")
}

func (*mockServer) TokenBucket(
ctx context.Context, in *roachpb.TokenBucketRequest,
) (*roachpb.TokenBucketResponse, error) {
panic("unimplemented")
}

func gossipEventForClusterID(clusterID uuid.UUID) *roachpb.GossipSubscriptionEvent {
return &roachpb.GossipSubscriptionEvent{
Key: gossip.KeyClusterID,
Expand Down
12 changes: 12 additions & 0 deletions pkg/ccl/multitenantccl/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "multitenantccl",
srcs = ["doc.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/multitenantccl",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/multitenantccl/tenantcostclient",
"//pkg/ccl/multitenantccl/tenantcostserver",
],
)
15 changes: 15 additions & 0 deletions pkg/ccl/multitenantccl/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package multitenantccl

import (
// Imports for the CCL init hooks.
_ "github.com/cockroachdb/cockroach/pkg/ccl/multitenantccl/tenantcostclient"
_ "github.com/cockroachdb/cockroach/pkg/ccl/multitenantccl/tenantcostserver"
)
17 changes: 17 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "tenantcostclient",
srcs = ["tenant_side.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/multitenantccl/tenantcostclient",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvclient/kvtenant",
"//pkg/multitenant",
"//pkg/roachpb:with-mocks",
"//pkg/server",
"//pkg/util/log",
"//pkg/util/stop",
"@com_github_cockroachdb_errors//:errors",
],
)
80 changes: 80 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package tenantcostclient

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
)

// NewTenantSideCostController creates an object which implements the
// server.TenantSideCostController interface.
func NewTenantSideCostController(
ctx context.Context, tenantID roachpb.TenantID, provider kvtenant.TokenBucketProvider,
) (multitenant.TenantSideCostController, error) {
if tenantID == roachpb.SystemTenantID {
return nil, errors.AssertionFailedf("cost controller can't be used for system tenant")
}
return &tenantSideCostController{
tenantID: tenantID,
provider: provider,
}, nil
}

func init() {
server.NewTenantSideCostController = NewTenantSideCostController
}

type tenantSideCostController struct {
tenantID roachpb.TenantID
provider kvtenant.TokenBucketProvider
}

var _ multitenant.TenantSideCostController = (*tenantSideCostController)(nil)

// Start is part of multitenant.TenantSideCostController.
func (c *tenantSideCostController) Start(ctx context.Context, stopper *stop.Stopper) error {
return stopper.RunAsyncTask(ctx, "cost-controller", func(ctx context.Context) {
c.mainLoop(ctx, stopper)
})
}

func (c *tenantSideCostController) mainLoop(ctx context.Context, stopper *stop.Stopper) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
req := roachpb.TokenBucketRequest{
ConsumptionSinceLastRequest: roachpb.TokenBucketRequest_Consumption{
// Report a dummy 1 RU consumption each time.
RU: 1,
SQLPodCPUSeconds: 1,
},
}
_, err := c.provider.TokenBucket(ctx, &req)
if err != nil {
log.Warningf(ctx, "TokenBucket error: %v", err)
}

case <-stopper.ShouldQuiesce():
// TODO(radu): send one last request to update consumption.
return
}
}
}
21 changes: 21 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostserver/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "tenantcostserver",
srcs = [
"server.go",
"token_bucket.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/multitenantccl/tenantcostserver",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/multitenantccl/tenantcostserver/tenanttokenbucket",
"//pkg/kv",
"//pkg/multitenant",
"//pkg/roachpb:with-mocks",
"//pkg/server",
"//pkg/sql",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)
38 changes: 38 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostserver/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package tenantcostserver

import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql"
)

type instance struct {
db *kv.DB
executor *sql.InternalExecutor
}

func newInstance(db *kv.DB, executor *sql.InternalExecutor) *instance {
return &instance{
db: db,
executor: executor,
}
}

var _ multitenant.TenantUsageServer = (*instance)(nil)

func init() {
server.NewTenantUsageServer = func(
db *kv.DB, executor *sql.InternalExecutor,
) multitenant.TenantUsageServer {
return newInstance(db, executor)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "tenanttokenbucket",
srcs = ["tenant_token_bucket.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/multitenantccl/tenantcostserver/tenanttokenbucket",
visibility = ["//visibility:public"],
deps = ["//pkg/roachpb:with-mocks"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

// Package tenanttokenbucket implements the tenant token bucket server-side
// algorithm described in the distributed token bucket RFC. It has minimal
// dependencies and is meant to be testable on its own.
package tenanttokenbucket

import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// State of the distributed token bucket.
type State struct {
// RUBurstLimit is the burst limit in RUs.
RUBurstLimit float64
// RURefillRate is the refill rate in RUs/second.
RURefillRate float64

// RUCurrent is the available (burst) RUs.
RUCurrent float64

// CurrentShareSum is the sum of the last reported share value for
// each active SQL pod for the tenant.
CurrentShareSum float64
}

// Request processes a request for more tokens and updates the State
// accordingly.
func (s *State) Request(
req *roachpb.TokenBucketRequest, now time.Time,
) roachpb.TokenBucketResponse {
var res roachpb.TokenBucketResponse
// TODO(radu): fill in response.
return res
}

// TODO(radu): add Reconfigure API.
Loading

0 comments on commit 304a9e6

Please sign in to comment.