Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.1: ccl/sqlproxyccl: invoke rebalancing logic during RUNNING pod events #81790

Merged
merged 2 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 117 additions & 54 deletions pkg/ccl/sqlproxyccl/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ const (
// DRAINING state before the proxy starts moving connections away from it.
minDrainPeriod = 1 * time.Minute

// defaultRebalanceDelay is the minimum amount of time that must elapse
// between rebalance operations. This was deliberately chosen to be half of
// rebalanceInterval, and is mainly used to rate limit effects due to events
// from the pod watcher.
defaultRebalanceDelay = 15 * time.Second

// rebalancePercentDeviation defines the percentage threshold that the
// current number of assignments can deviate away from the mean. Having a
// 15% "deadzone" reduces frequent transfers especially when load is
Expand All @@ -50,15 +56,15 @@ const (
// NOTE: This must be between 0 and 1 inclusive.
rebalancePercentDeviation = 0.15

// rebalanceRate defines the rate of rebalancing assignments across SQL
// pods. This rate applies to both RUNNING and DRAINING pods. For example,
// consider the case where the rate is 0.50; if we have decided that we need
// to move 15 assignments away from a particular pod, only 7 pods will be
// moved at a time.
// defaultRebalanceRate defines the rate of rebalancing assignments across
// SQL pods. This rate applies to both RUNNING and DRAINING pods. For
// example, consider the case where the rate is 0.50; if we have decided
// that we need to move 15 assignments away from a particular pod, only 7
// pods will be moved at a time.
//
// NOTE: This must be between 0 and 1 inclusive. 0 means no rebalancing
// will occur.
rebalanceRate = 0.50
defaultRebalanceRate = 0.50

// defaultMaxConcurrentRebalances represents the maximum number of
// concurrent rebalance requests that are being processed. This effectively
Expand All @@ -78,6 +84,8 @@ type balancerOptions struct {
maxConcurrentRebalances int
noRebalanceLoop bool
timeSource timeutil.TimeSource
rebalanceRate float32
rebalanceDelay time.Duration
}

// Option defines an option that can be passed to NewBalancer in order to
Expand Down Expand Up @@ -109,6 +117,25 @@ func TimeSource(ts timeutil.TimeSource) Option {
}
}

// RebalanceRate defines the rate of rebalancing across pods. This must be
// between 0 and 1 inclusive. 0 means no rebalancing will occur.
func RebalanceRate(rate float32) Option {
return func(opts *balancerOptions) {
opts.rebalanceRate = rate
}
}

// RebalanceDelay specifies the minimum amount of time that must elapse between
// attempts to rebalance a given tenant. This delay has the effect of throttling
// RebalanceTenant calls to avoid constantly moving connections around.
//
// RebalanceDelay defaults to defaultRebalanceDelay. Use -1 to never throttle.
func RebalanceDelay(delay time.Duration) Option {
return func(opts *balancerOptions) {
opts.rebalanceDelay = delay
}
}

// Balancer handles load balancing of SQL connections within the proxy.
// All methods on the Balancer instance are thread-safe.
type Balancer struct {
Expand Down Expand Up @@ -140,6 +167,22 @@ type Balancer struct {
// timeutil.DefaultTimeSource. Override with the TimeSource() option when
// calling NewBalancer.
timeSource timeutil.TimeSource

// rebalanceRate represents the rate of rebalancing connections.
rebalanceRate float32

// rebalanceDelay is the minimum amount of time that must elapse between
// attempts to rebalance a given tenant. Defaults to defaultRebalanceDelay.
rebalanceDelay time.Duration

// lastRebalance is the last time the tenants are rebalanced. This is used
// to rate limit the number of rebalances per tenant. Synchronization is
// needed since rebalance operations can be triggered by the rebalance loop,
// or the pod watcher.
lastRebalance struct {
syncutil.Mutex
tenants map[roachpb.TenantID]time.Time
}
}

// NewBalancer constructs a new Balancer instance that is responsible for
Expand All @@ -152,16 +195,15 @@ func NewBalancer(
opts ...Option,
) (*Balancer, error) {
// Handle options.
options := &balancerOptions{}
options := &balancerOptions{
maxConcurrentRebalances: defaultMaxConcurrentRebalances,
timeSource: timeutil.DefaultTimeSource{},
rebalanceRate: defaultRebalanceRate,
rebalanceDelay: defaultRebalanceDelay,
}
for _, opt := range opts {
opt(options)
}
if options.maxConcurrentRebalances == 0 {
options.maxConcurrentRebalances = defaultMaxConcurrentRebalances
}
if options.timeSource == nil {
options.timeSource = timeutil.DefaultTimeSource{}
}

// Ensure that ctx gets cancelled on stopper's quiescing.
ctx, _ = stopper.WithCancelOnQuiesce(ctx)
Expand All @@ -178,7 +220,11 @@ func NewBalancer(
queue: q,
processSem: semaphore.New(options.maxConcurrentRebalances),
timeSource: options.timeSource,
rebalanceRate: options.rebalanceRate,
rebalanceDelay: options.rebalanceDelay,
}
b.lastRebalance.tenants = make(map[roachpb.TenantID]time.Time)

b.connTracker, err = NewConnTracker(ctx, b.stopper, b.timeSource)
if err != nil {
return nil, err
Expand All @@ -199,6 +245,46 @@ func NewBalancer(
return b, nil
}

// RebalanceTenant rebalances connections to the given tenant. If no RUNNING
// pod exists for the given tenant, or the tenant has been recently rebalanced,
// this is a no-op.
func (b *Balancer) RebalanceTenant(ctx context.Context, tenantID roachpb.TenantID) {
// If rebalanced recently, no-op.
if !b.canRebalanceTenant(tenantID) {
return
}

tenantPods, err := b.directoryCache.TryLookupTenantPods(ctx, tenantID)
if err != nil {
log.Errorf(ctx, "could not rebalance tenant %s: %v", tenantID, err.Error())
return
}

// Construct a map so we could easily retrieve the pod by address.
podMap := make(map[string]*tenant.Pod)
var hasRunningPod bool
for _, pod := range tenantPods {
podMap[pod.Addr] = pod

if pod.State == tenant.RUNNING {
hasRunningPod = true
}
}

// Only attempt to rebalance if we have a RUNNING pod. In theory, this
// case would happen if we're scaling down from 1 to 0, which in that
// case, we can't transfer connections anywhere. Practically, we will
// never scale a tenant from 1 to 0 if there are still active
// connections, so this case should not occur.
if !hasRunningPod {
return
}

activeList, idleList := b.connTracker.listAssignments(tenantID)
b.rebalancePartition(podMap, activeList)
b.rebalancePartition(podMap, idleList)
}

// SelectTenantPod selects a tenant pod from the given list based on a weighted
// CPU load algorithm. It is expected that all pods within the list belongs to
// the same tenant. If no pods are available, this returns ErrNoAvailablePods.
Expand Down Expand Up @@ -299,58 +385,35 @@ func (b *Balancer) rebalanceLoop(ctx context.Context) {
}
}

// canRebalanceTenant returns true if it has been at least `rebalanceDelay`
// since the last time the given tenant was rebalanced, or false otherwise.
func (b *Balancer) canRebalanceTenant(tenantID roachpb.TenantID) bool {
b.lastRebalance.Lock()
defer b.lastRebalance.Unlock()

now := b.timeSource.Now()
if now.Sub(b.lastRebalance.tenants[tenantID]) < b.rebalanceDelay {
return false
}
b.lastRebalance.tenants[tenantID] = now
return true
}

// rebalance attempts to rebalance connections for all tenants within the proxy.
//
// TODO(jaylim-crl): Update this to support rebalancing a single tenant. That
// way, the pod watcher could call this to rebalance a single tenant. We may
// also want to rate limit the number of rebalances per tenant for requests
// coming from the pod watcher.
func (b *Balancer) rebalance(ctx context.Context) {
// getTenantIDs ensures that tenants will have at least one connection.
tenantIDs := b.connTracker.getTenantIDs()

for _, tenantID := range tenantIDs {
tenantPods, err := b.directoryCache.TryLookupTenantPods(ctx, tenantID)
if err != nil {
// This case shouldn't really occur unless there's a bug in the
// directory server (e.g. deleted pod events, but the pod is still
// alive).
log.Errorf(ctx, "could not lookup pods for tenant %s: %v", tenantID, err.Error())
continue
}

// Construct a map so we could easily retrieve the pod by address.
podMap := make(map[string]*tenant.Pod)
var hasRunningPod bool
for _, pod := range tenantPods {
podMap[pod.Addr] = pod

if pod.State == tenant.RUNNING {
hasRunningPod = true
}
}

// Only attempt to rebalance if we have a RUNNING pod. In theory, this
// case would happen if we're scaling down from 1 to 0, which in that
// case, we can't transfer connections anywhere. Practically, we will
// never scale a tenant from 1 to 0 if there are still active
// connections, so this case should not occur.
if !hasRunningPod {
continue
}

activeList, idleList := b.connTracker.listAssignments(tenantID)
b.rebalancePartition(podMap, activeList)
b.rebalancePartition(podMap, idleList)
b.RebalanceTenant(ctx, tenantID)
}
}

// rebalancePartition rebalances the given assignments partition.
func (b *Balancer) rebalancePartition(
pods map[string]*tenant.Pod, assignments []*ServerAssignment,
) {
// Nothing to do here.
if len(pods) == 0 || len(assignments) == 0 {
// Nothing to do here if there are no assignments, or only one pod.
if len(pods) <= 1 || len(assignments) == 0 {
return
}

Expand All @@ -371,7 +434,7 @@ func (b *Balancer) rebalancePartition(
//
// NOTE: Elements in the list may be shuffled around once this method returns.
func (b *Balancer) enqueueRebalanceRequests(list []*ServerAssignment) {
toMoveCount := int(math.Ceil(float64(len(list)) * float64(rebalanceRate)))
toMoveCount := int(math.Ceil(float64(len(list)) * float64(b.rebalanceRate)))
partition, _ := partitionNRandom(list, toMoveCount)
for _, a := range partition {
b.queue.enqueue(&rebalanceRequest{
Expand Down
104 changes: 104 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ func TestRebalancer_rebalance(t *testing.T) {
directoryCache,
NoRebalanceLoop(),
TimeSource(timeSource),
RebalanceDelay(-1),
)
require.NoError(t, err)

Expand Down Expand Up @@ -715,6 +716,109 @@ func TestRebalancer_rebalance(t *testing.T) {
}
}

func TestBalancer_RebalanceTenant_WithDefaultDelay(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

// Use a custom time source for testing.
t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
timeSource := timeutil.NewManualTime(t0)

metrics := NewMetrics()
directoryCache := newTestDirectoryCache()

b, err := NewBalancer(
ctx,
stopper,
metrics,
directoryCache,
NoRebalanceLoop(),
TimeSource(timeSource),
)
require.NoError(t, err)

tenantID := roachpb.MakeTenantID(10)
pods := []*tenant.Pod{
{TenantID: tenantID.ToUint64(), Addr: "127.0.0.30:80", State: tenant.DRAINING},
{TenantID: tenantID.ToUint64(), Addr: "127.0.0.30:81", State: tenant.RUNNING},
}
for _, pod := range pods {
require.True(t, directoryCache.upsertPod(pod))
}

// Create 100 active connections, all to the draining pod.
const numConns = 100
var mu syncutil.Mutex
assignments := make([]*ServerAssignment, numConns)
makeTestConnHandle := func(idx int) *testConnHandle {
var handle *testConnHandle
handle = &testConnHandle{
onTransferConnection: func() error {
mu.Lock()
defer mu.Unlock()
assignments[idx].Close()
assignments[idx] = NewServerAssignment(
tenantID, b.connTracker, handle, pods[1].Addr,
)
return nil
},
}
return handle
}
var handles []ConnectionHandle
for i := 0; i < numConns; i++ {
handle := makeTestConnHandle(i)
handles = append(handles, handle)
assignments[i] = NewServerAssignment(
tenantID, b.connTracker, handle, pods[0].Addr,
)
}

waitFor := func(numTransfers int) {
testutils.SucceedsSoon(t, func() error {
count := 0
for i := 0; i < 100; i++ {
count += handles[i].(*testConnHandle).transferConnectionCount()
}
if count != numTransfers {
return errors.Newf("require %d, but got %v", numTransfers, count)
}
return nil
})
}

// Attempt the rebalance, and wait until 50 were moved
// (i.e. 100 * defaultRebalanceRate).
b.RebalanceTenant(ctx, tenantID)
waitFor(50)

// Run the rebalance again.
b.RebalanceTenant(ctx, tenantID)

// Queue should be empty, and no additional connections should be moved.
b.queue.mu.Lock()
queueLen := b.queue.queue.Len()
b.queue.mu.Unlock()
require.Equal(t, 0, queueLen)
waitFor(50)

// Advance time, rebalance, and wait until 75 (i.e. 50 + 25) connections
// get moved.
timeSource.Advance(defaultRebalanceDelay)
b.RebalanceTenant(ctx, tenantID)
waitFor(75)

// Advance time, rebalance, and wait until 88 (i.e. 75 + 13) connections
// get moved.
timeSource.Advance(defaultRebalanceDelay)
b.RebalanceTenant(ctx, tenantID)
waitFor(88)
}

func TestEnqueueRebalanceRequests(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
Loading