Skip to content

Commit

Permalink
xds: xds_cluster_impl_balancer part 1 (#4154)
Browse files Browse the repository at this point in the history
Part of C2P fallback. To support fallback to a DNS cluster.

This PR adds implementation of xds_cluster_impl_balancer, which will be responsible for circuit breaking and rpc dropping.

This PR only added RPC dropping. Circuit breaking will be done in a followup PR, after some necessary refactoring.
  • Loading branch information
menghanl authored Feb 11, 2021
1 parent c9217c7 commit 9f3606c
Show file tree
Hide file tree
Showing 9 changed files with 1,007 additions and 78 deletions.
16 changes: 9 additions & 7 deletions vet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,11 @@ not grep -Fv '.CredsBundle
.NewAddress
.NewServiceConfig
.Type is deprecated: use Attributes
BuildVersion is deprecated
balancer.ErrTransientFailure
balancer.Picker
extDesc.Filename is deprecated
github.com/golang/protobuf/jsonpb is deprecated
grpc.CallCustomCodec
grpc.Code
grpc.Compressor
Expand All @@ -164,13 +167,7 @@ grpc.WithServiceConfig
grpc.WithTimeout
http.CloseNotifier
info.SecurityVersion
resolver.Backend
resolver.GRPCLB
extDesc.Filename is deprecated
BuildVersion is deprecated
github.com/golang/protobuf/jsonpb is deprecated
proto is deprecated
xxx_messageInfo_
proto.InternalMessageInfo is deprecated
proto.EnumName is deprecated
proto.ErrInternalBadWireType is deprecated
Expand All @@ -184,7 +181,12 @@ proto.RegisterExtension is deprecated
proto.RegisteredExtension is deprecated
proto.RegisteredExtensions is deprecated
proto.RegisterMapType is deprecated
proto.Unmarshaler is deprecated' "${SC_OUT}"
proto.Unmarshaler is deprecated
resolver.Backend
resolver.GRPCLB
Target is deprecated: Use the Target field in the BuildOptions instead.
xxx_messageInfo_
' "${SC_OUT}"

# - special golint on package comments.
lint_package_comment_per_package() {
Expand Down
216 changes: 216 additions & 0 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package clusterimpl

import (
"context"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
)

const (
defaultTestTimeout = 1 * time.Second
testClusterName = "test-cluster"
testServiceName = "test-eds-service"
testLRSServerName = "test-lrs-name"
)

var (
testBackendAddrs = []resolver.Address{
{Addr: "1.1.1.1:1"},
}

cmpOpts = cmp.Options{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(load.Data{}, "ReportInterval"),
}
)

func init() {
newRandomWRR = testutils.NewTestWRR
}

// TestDrop verifies that the balancer correctly drops the picks, and that
// the drops are reported.
func TestDrop(t *testing.T) {
xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
defer func() { newXDSClient = oldNewXDSClient }()

builder := balancer.Get(clusterImplName)
cc := testutils.NewTestClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()

const (
dropReason = "test-dropping-category"
dropNumerator = 1
dropDenominator = 2
)
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: testBackendAddrs,
},
BalancerConfig: &lbConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LRSLoadReportingServerName: newString(testLRSServerName),
DropCategories: []dropCategory{{
Category: dropReason,
RequestsPerMillion: million * dropNumerator / dropDenominator,
}},
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

got, err := xdsC.WaitForReportLoad(ctx)
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != testLRSServerName {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
}

sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
p0 := <-cc.NewPickerCh
for i := 0; i < 10; i++ {
_, err := p0.Pick(balancer.PickInfo{})
if err != balancer.ErrNoSubConnAvailable {
t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
}
}

b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
p1 := <-cc.NewPickerCh
const rpcCount = 20
for i := 0; i < rpcCount; i++ {
gotSCSt, err := p1.Pick(balancer.PickInfo{})
// Even RPCs are dropped.
if i%2 == 0 {
if err == nil || !strings.Contains(err.Error(), "dropped") {
t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
}
continue
}
if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
if gotSCSt.Done != nil {
gotSCSt.Done(balancer.DoneInfo{})
}
}

// Dump load data from the store and compare with expected counts.
loadStore := xdsC.LoadStore()
if loadStore == nil {
t.Fatal("loadStore is nil in xdsClient")
}
const dropCount = rpcCount * dropNumerator / dropDenominator
wantStatsData0 := []*load.Data{{
Cluster: testClusterName,
Service: testServiceName,
TotalDrops: dropCount,
Drops: map[string]uint64{dropReason: dropCount},
}}

gotStatsData0 := loadStore.Stats([]string{testClusterName})
if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
}

// Send an update with new drop configs.
const (
dropReason2 = "test-dropping-category-2"
dropNumerator2 = 1
dropDenominator2 = 4
)
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: testBackendAddrs,
},
BalancerConfig: &lbConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LRSLoadReportingServerName: newString(testLRSServerName),
DropCategories: []dropCategory{{
Category: dropReason2,
RequestsPerMillion: million * dropNumerator2 / dropDenominator2,
}},
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}

p2 := <-cc.NewPickerCh
for i := 0; i < rpcCount; i++ {
gotSCSt, err := p2.Pick(balancer.PickInfo{})
// Even RPCs are dropped.
if i%4 == 0 {
if err == nil || !strings.Contains(err.Error(), "dropped") {
t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
}
continue
}
if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
if gotSCSt.Done != nil {
gotSCSt.Done(balancer.DoneInfo{})
}
}

const dropCount2 = rpcCount * dropNumerator2 / dropDenominator2
wantStatsData1 := []*load.Data{{
Cluster: testClusterName,
Service: testServiceName,
TotalDrops: dropCount2,
Drops: map[string]uint64{dropReason2: dropCount2},
}}

gotStatsData1 := loadStore.Stats([]string{testClusterName})
if diff := cmp.Diff(gotStatsData1, wantStatsData1, cmpOpts); diff != "" {
t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
}
}
Loading

0 comments on commit 9f3606c

Please sign in to comment.