diff --git a/vet.sh b/vet.sh index b41df6dc8607..605d7a80cfe6 100755 --- a/vet.sh +++ b/vet.sh @@ -141,8 +141,11 @@ not grep -Fv '.CredsBundle .NewAddress .NewServiceConfig .Type is deprecated: use Attributes +BuildVersion is deprecated balancer.ErrTransientFailure balancer.Picker +extDesc.Filename is deprecated +github.com/golang/protobuf/jsonpb is deprecated grpc.CallCustomCodec grpc.Code grpc.Compressor @@ -164,13 +167,7 @@ grpc.WithServiceConfig grpc.WithTimeout http.CloseNotifier info.SecurityVersion -resolver.Backend -resolver.GRPCLB -extDesc.Filename is deprecated -BuildVersion is deprecated -github.com/golang/protobuf/jsonpb is deprecated proto is deprecated -xxx_messageInfo_ proto.InternalMessageInfo is deprecated proto.EnumName is deprecated proto.ErrInternalBadWireType is deprecated @@ -184,7 +181,12 @@ proto.RegisterExtension is deprecated proto.RegisteredExtension is deprecated proto.RegisteredExtensions is deprecated proto.RegisterMapType is deprecated -proto.Unmarshaler is deprecated' "${SC_OUT}" +proto.Unmarshaler is deprecated +resolver.Backend +resolver.GRPCLB +Target is deprecated: Use the Target field in the BuildOptions instead. +xxx_messageInfo_ +' "${SC_OUT}" # - special golint on package comments. lint_package_comment_per_package() { diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go new file mode 100644 index 000000000000..ac2d18821f01 --- /dev/null +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -0,0 +1,216 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package clusterimpl + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/connectivity" + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/xds/internal/client/load" + "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeclient" +) + +const ( + defaultTestTimeout = 1 * time.Second + testClusterName = "test-cluster" + testServiceName = "test-eds-service" + testLRSServerName = "test-lrs-name" +) + +var ( + testBackendAddrs = []resolver.Address{ + {Addr: "1.1.1.1:1"}, + } + + cmpOpts = cmp.Options{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(load.Data{}, "ReportInterval"), + } +) + +func init() { + newRandomWRR = testutils.NewTestWRR +} + +// TestDrop verifies that the balancer correctly drops the picks, and that +// the drops are reported. +func TestDrop(t *testing.T) { + xdsC := fakeclient.NewClient() + oldNewXDSClient := newXDSClient + newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil } + defer func() { newXDSClient = oldNewXDSClient }() + + builder := balancer.Get(clusterImplName) + cc := testutils.NewTestClientConn(t) + b := builder.Build(cc, balancer.BuildOptions{}) + defer b.Close() + + const ( + dropReason = "test-dropping-category" + dropNumerator = 1 + dropDenominator = 2 + ) + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: testBackendAddrs, + }, + BalancerConfig: &lbConfig{ + Cluster: testClusterName, + EDSServiceName: testServiceName, + LRSLoadReportingServerName: newString(testLRSServerName), + DropCategories: []dropCategory{{ + Category: dropReason, + RequestsPerMillion: million * dropNumerator / dropDenominator, + }}, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: roundrobin.Name, + }, + }, + }); err != nil { + t.Fatalf("unexpected error from UpdateClientConnState: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + got, err := xdsC.WaitForReportLoad(ctx) + if err != nil { + t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) + } + if got.Server != testLRSServerName { + t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName) + } + + sc1 := <-cc.NewSubConnCh + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + // This should get the connecting picker. + p0 := <-cc.NewPickerCh + for i := 0; i < 10; i++ { + _, err := p0.Pick(balancer.PickInfo{}) + if err != balancer.ErrNoSubConnAvailable { + t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable) + } + } + + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + // Test pick with one backend. + p1 := <-cc.NewPickerCh + const rpcCount = 20 + for i := 0; i < rpcCount; i++ { + gotSCSt, err := p1.Pick(balancer.PickInfo{}) + // Even RPCs are dropped. + if i%2 == 0 { + if err == nil || !strings.Contains(err.Error(), "dropped") { + t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err) + } + continue + } + if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) + } + if gotSCSt.Done != nil { + gotSCSt.Done(balancer.DoneInfo{}) + } + } + + // Dump load data from the store and compare with expected counts. + loadStore := xdsC.LoadStore() + if loadStore == nil { + t.Fatal("loadStore is nil in xdsClient") + } + const dropCount = rpcCount * dropNumerator / dropDenominator + wantStatsData0 := []*load.Data{{ + Cluster: testClusterName, + Service: testServiceName, + TotalDrops: dropCount, + Drops: map[string]uint64{dropReason: dropCount}, + }} + + gotStatsData0 := loadStore.Stats([]string{testClusterName}) + if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" { + t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff) + } + + // Send an update with new drop configs. + const ( + dropReason2 = "test-dropping-category-2" + dropNumerator2 = 1 + dropDenominator2 = 4 + ) + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: testBackendAddrs, + }, + BalancerConfig: &lbConfig{ + Cluster: testClusterName, + EDSServiceName: testServiceName, + LRSLoadReportingServerName: newString(testLRSServerName), + DropCategories: []dropCategory{{ + Category: dropReason2, + RequestsPerMillion: million * dropNumerator2 / dropDenominator2, + }}, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: roundrobin.Name, + }, + }, + }); err != nil { + t.Fatalf("unexpected error from UpdateClientConnState: %v", err) + } + + p2 := <-cc.NewPickerCh + for i := 0; i < rpcCount; i++ { + gotSCSt, err := p2.Pick(balancer.PickInfo{}) + // Even RPCs are dropped. + if i%4 == 0 { + if err == nil || !strings.Contains(err.Error(), "dropped") { + t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err) + } + continue + } + if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) + } + if gotSCSt.Done != nil { + gotSCSt.Done(balancer.DoneInfo{}) + } + } + + const dropCount2 = rpcCount * dropNumerator2 / dropDenominator2 + wantStatsData1 := []*load.Data{{ + Cluster: testClusterName, + Service: testServiceName, + TotalDrops: dropCount2, + Drops: map[string]uint64{dropReason2: dropCount2}, + }} + + gotStatsData1 := loadStore.Stats([]string{testClusterName}) + if diff := cmp.Diff(gotStatsData1, wantStatsData1, cmpOpts); diff != "" { + t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff) + } +} diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go new file mode 100644 index 000000000000..e9201c10bdc9 --- /dev/null +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -0,0 +1,312 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package clusterimpl implements the xds_cluster_impl balancing policy. It +// handles the cluster features (e.g. circuit_breaking, RPC dropping). +// +// Note that it doesn't handle name resolution, which is done by policy +// xds_cluster_resolver. +package clusterimpl + +import ( + "encoding/json" + "fmt" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/internal/buffer" + "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/xds/internal/balancer/loadstore" + xdsclient "google.golang.org/grpc/xds/internal/client" + "google.golang.org/grpc/xds/internal/client/load" +) + +const ( + clusterImplName = "xds_cluster_impl_experimental" + // TODO: define defaultRequestCountMax = 1024 +) + +func init() { + balancer.Register(clusterImplBB{}) +} + +var newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() } + +type clusterImplBB struct{} + +func (clusterImplBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { + b := &clusterImplBalancer{ + ClientConn: cc, + bOpts: bOpts, + closed: grpcsync.NewEvent(), + loadWrapper: loadstore.NewWrapper(), + pickerUpdateCh: buffer.NewUnbounded(), + } + b.logger = prefixLogger(b) + + client, err := newXDSClient() + if err != nil { + b.logger.Errorf("failed to create xds-client: %v", err) + return nil + } + b.xdsC = client + go b.run() + + b.logger.Infof("Created") + return b +} + +func (clusterImplBB) Name() string { + return clusterImplName +} + +func (clusterImplBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + return parseConfig(c) +} + +// xdsClientInterface contains only the xds_client methods needed by LRS +// balancer. It's defined so we can override xdsclient in tests. +type xdsClientInterface interface { + ReportLoad(server string) (*load.Store, func()) + Close() +} + +type clusterImplBalancer struct { + balancer.ClientConn + bOpts balancer.BuildOptions + closed *grpcsync.Event + logger *grpclog.PrefixLogger + xdsC xdsClientInterface + + config *lbConfig + childLB balancer.Balancer + cancelLoadReport func() + clusterName string + edsServiceName string + lrsServerName string + loadWrapper *loadstore.Wrapper + + // childState/drops/requestCounter can only be accessed in run(). And run() + // is the only goroutine that sends picker to the parent ClientConn. All + // requests to update picker need to be sent to pickerUpdateCh. + childState balancer.State + drops []*dropper + // TODO: add serviceRequestCount and maxRequestCount for circuit breaking. + pickerUpdateCh *buffer.Unbounded +} + +// updateLoadStore checks the config for load store, and decides whether it +// needs to restart the load reporting stream. +func (cib *clusterImplBalancer) updateLoadStore(newConfig *lbConfig) error { + var updateLoadClusterAndService bool + + // ClusterName is different, restart. ClusterName is from ClusterName and + // EdsServiceName. + if cib.clusterName != newConfig.Cluster { + updateLoadClusterAndService = true + cib.clusterName = newConfig.Cluster + } + if cib.edsServiceName != newConfig.EDSServiceName { + updateLoadClusterAndService = true + cib.edsServiceName = newConfig.EDSServiceName + } + if updateLoadClusterAndService { + // This updates the clusterName and serviceName that will be reported + // for the loads. The update here is too early, the perfect timing is + // when the picker is updated with the new connection. But from this + // balancer's point of view, it's impossible to tell. + // + // On the other hand, this will almost never happen. Each LRS policy + // shouldn't get updated config. The parent should do a graceful switch + // when the clusterName or serviceName is changed. + cib.loadWrapper.UpdateClusterAndService(cib.clusterName, cib.edsServiceName) + } + + // Check if it's necessary to restart load report. + var newLRSServerName string + if newConfig.LRSLoadReportingServerName != nil { + newLRSServerName = *newConfig.LRSLoadReportingServerName + } + if cib.lrsServerName != newLRSServerName { + // LrsLoadReportingServerName is different, load should be report to a + // different server, restart. + cib.lrsServerName = newLRSServerName + if cib.cancelLoadReport != nil { + cib.cancelLoadReport() + cib.cancelLoadReport = nil + } + var loadStore *load.Store + if cib.xdsC != nil { + loadStore, cib.cancelLoadReport = cib.xdsC.ReportLoad(cib.lrsServerName) + } + cib.loadWrapper.UpdateLoadStore(loadStore) + } + + return nil +} + +func (cib *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + if cib.closed.HasFired() { + cib.logger.Warningf("xds: received ClientConnState {%+v} after clusterImplBalancer was closed", s) + return nil + } + + newConfig, ok := s.BalancerConfig.(*lbConfig) + if !ok { + return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) + } + + // Need to check for potential errors at the beginning of this function, so + // that on errors, we reject the whole config, instead of applying part of + // it. + bb := balancer.Get(newConfig.ChildPolicy.Name) + if bb == nil { + return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name) + } + + // Update load reporting config. This needs to be done before updating the + // child policy because we need the loadStore from the updated client to be + // passed to the ccWrapper, so that the next picker from the child policy + // will pick up the new loadStore. + if err := cib.updateLoadStore(newConfig); err != nil { + return err + } + + // Compare new drop config. And update picker if it's changed. + var updatePicker bool + if cib.config == nil || !equalDropCategories(cib.config.DropCategories, newConfig.DropCategories) { + cib.drops = make([]*dropper, 0, len(newConfig.DropCategories)) + for _, c := range newConfig.DropCategories { + cib.drops = append(cib.drops, newDropper(c)) + } + updatePicker = true + } + + // TODO: compare cluster name. And update picker if it's changed, because + // circuit breaking's stream counter will be different. + // + // Set `updatePicker` to manually update the picker. + + // TODO: compare upper bound of stream count. And update picker if it's + // changed. This is also for circuit breaking. + // + // Set `updatePicker` to manually update the picker. + + if updatePicker { + cib.pickerUpdateCh.Put(&dropConfigs{ + drops: cib.drops, + }) + } + + // If child policy is a different type, recreate the sub-balancer. + if cib.config == nil || cib.config.ChildPolicy.Name != newConfig.ChildPolicy.Name { + if cib.childLB != nil { + cib.childLB.Close() + } + cib.childLB = bb.Build(cib, cib.bOpts) + } + cib.config = newConfig + + if cib.childLB == nil { + // This is not an expected situation, and should be super rare in + // practice. + // + // When this happens, we already applied all the other configurations + // (drop/circuit breaking), but there's no child policy. This balancer + // will be stuck, and we report the error to the parent. + return fmt.Errorf("child policy is nil, this means balancer %q's Build() returned nil", newConfig.ChildPolicy.Name) + } + + // Addresses and sub-balancer config are sent to sub-balancer. + return cib.childLB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: s.ResolverState, + BalancerConfig: cib.config.ChildPolicy.Config, + }) +} + +func (cib *clusterImplBalancer) ResolverError(err error) { + if cib.closed.HasFired() { + cib.logger.Warningf("xds: received resolver error {%+v} after clusterImplBalancer was closed", err) + return + } + + if cib.childLB != nil { + cib.childLB.ResolverError(err) + } +} + +func (cib *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { + if cib.closed.HasFired() { + cib.logger.Warningf("xds: received subconn state change {%+v, %+v} after clusterImplBalancer was closed", sc, s) + return + } + + if cib.childLB != nil { + cib.childLB.UpdateSubConnState(sc, s) + } +} + +func (cib *clusterImplBalancer) Close() { + if cib.childLB != nil { + cib.childLB.Close() + cib.childLB = nil + } + cib.xdsC.Close() + cib.closed.Fire() + cib.logger.Infof("Shutdown") +} + +// Override methods to accept updates from the child LB. + +func (cib *clusterImplBalancer) UpdateState(state balancer.State) { + // Instead of updating parent ClientConn inline, send state to run(). + cib.pickerUpdateCh.Put(state) +} + +type dropConfigs struct { + drops []*dropper +} + +func (cib *clusterImplBalancer) run() { + for { + select { + case update := <-cib.pickerUpdateCh.Get(): + cib.pickerUpdateCh.Load() + switch u := update.(type) { + case balancer.State: + cib.childState = u + cib.ClientConn.UpdateState(balancer.State{ + ConnectivityState: cib.childState.ConnectivityState, + Picker: newDropPicker(cib.childState, cib.drops, cib.loadWrapper), + }) + case *dropConfigs: + cib.drops = u.drops + // cib.requestCounter = u.requestCounter + if cib.childState.Picker != nil { + cib.ClientConn.UpdateState(balancer.State{ + ConnectivityState: cib.childState.ConnectivityState, + Picker: newDropPicker(cib.childState, cib.drops, cib.loadWrapper), + }) + } + } + case <-cib.closed.Done(): + return + } + } +} diff --git a/xds/internal/balancer/clusterimpl/config.go b/xds/internal/balancer/clusterimpl/config.go new file mode 100644 index 000000000000..548ab34bce4d --- /dev/null +++ b/xds/internal/balancer/clusterimpl/config.go @@ -0,0 +1,63 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package clusterimpl + +import ( + "encoding/json" + + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/serviceconfig" +) + +type dropCategory struct { + Category string + RequestsPerMillion uint32 +} + +// lbConfig is the balancer config for weighted_target. +type lbConfig struct { + serviceconfig.LoadBalancingConfig + + Cluster string + EDSServiceName string + LRSLoadReportingServerName *string + MaxConcurrentRequests *uint32 + DropCategories []dropCategory + ChildPolicy *internalserviceconfig.BalancerConfig +} + +func parseConfig(c json.RawMessage) (*lbConfig, error) { + var cfg lbConfig + if err := json.Unmarshal(c, &cfg); err != nil { + return nil, err + } + return &cfg, nil +} + +func equalDropCategories(a, b []dropCategory) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/xds/internal/balancer/clusterimpl/config_test.go b/xds/internal/balancer/clusterimpl/config_test.go new file mode 100644 index 000000000000..89696981e2a0 --- /dev/null +++ b/xds/internal/balancer/clusterimpl/config_test.go @@ -0,0 +1,144 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package clusterimpl + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/balancer" + _ "google.golang.org/grpc/balancer/roundrobin" + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + _ "google.golang.org/grpc/xds/internal/balancer/weightedtarget" +) + +const ( + testJSONConfig = `{ + "cluster": "test_cluster", + "edsServiceName": "test-eds", + "lrsLoadReportingServerName": "lrs_server", + "maxConcurrentRequests": 123, + "dropCategories": [ + { + "category": "drop-1", + "requestsPerMillion": 314 + }, + { + "category": "drop-2", + "requestsPerMillion": 159 + } + ], + "childPolicy": [ + { + "weighted_target_experimental": { + "targets": { + "wt-child-1": { + "weight": 75, + "childPolicy":[{"round_robin":{}}] + }, + "wt-child-2": { + "weight": 25, + "childPolicy":[{"round_robin":{}}] + } + } + } + } + ] +}` + + wtName = "weighted_target_experimental" +) + +var ( + wtConfigParser = balancer.Get(wtName).(balancer.ConfigParser) + wtConfigJSON = `{ + "targets": { + "wt-child-1": { + "weight": 75, + "childPolicy":[{"round_robin":{}}] + }, + "wt-child-2": { + "weight": 25, + "childPolicy":[{"round_robin":{}}] + } + } +}` + + wtConfig, _ = wtConfigParser.ParseConfig([]byte(wtConfigJSON)) +) + +func TestParseConfig(t *testing.T) { + tests := []struct { + name string + js string + want *lbConfig + wantErr bool + }{ + { + name: "empty json", + js: "", + want: nil, + wantErr: true, + }, + { + name: "bad json", + js: "{", + want: nil, + wantErr: true, + }, + { + name: "OK", + js: testJSONConfig, + want: &lbConfig{ + Cluster: "test_cluster", + EDSServiceName: "test-eds", + LRSLoadReportingServerName: newString("lrs_server"), + MaxConcurrentRequests: newUint32(123), + DropCategories: []dropCategory{ + {Category: "drop-1", RequestsPerMillion: 314}, + {Category: "drop-2", RequestsPerMillion: 159}, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: wtName, + Config: wtConfig, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseConfig([]byte(tt.js)) + if (err != nil) != tt.wantErr { + t.Fatalf("parseConfig() error = %v, wantErr %v", err, tt.wantErr) + } + if !cmp.Equal(got, tt.want) { + t.Errorf("parseConfig() got unexpected result, diff: %v", cmp.Diff(got, tt.want)) + } + }) + } +} + +func newString(s string) *string { + return &s +} + +func newUint32(i uint32) *uint32 { + return &i +} diff --git a/xds/internal/balancer/clusterimpl/logging.go b/xds/internal/balancer/clusterimpl/logging.go new file mode 100644 index 000000000000..3bbd1b0d7837 --- /dev/null +++ b/xds/internal/balancer/clusterimpl/logging.go @@ -0,0 +1,34 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package clusterimpl + +import ( + "fmt" + + "google.golang.org/grpc/grpclog" + internalgrpclog "google.golang.org/grpc/internal/grpclog" +) + +const prefix = "[xds-cluster-impl-lb %p] " + +var logger = grpclog.Component("xds") + +func prefixLogger(p *clusterImplBalancer) *internalgrpclog.PrefixLogger { + return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)) +} diff --git a/xds/internal/balancer/clusterimpl/picker.go b/xds/internal/balancer/clusterimpl/picker.go new file mode 100644 index 000000000000..05e9f89786fd --- /dev/null +++ b/xds/internal/balancer/clusterimpl/picker.go @@ -0,0 +1,104 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package clusterimpl + +import ( + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/wrr" + "google.golang.org/grpc/status" + "google.golang.org/grpc/xds/internal/client/load" +) + +var newRandomWRR = wrr.NewRandom + +const million = 1000000 + +type dropper struct { + category string + w wrr.WRR +} + +// greatest common divisor (GCD) via Euclidean algorithm +func gcd(a, b uint32) uint32 { + for b != 0 { + t := b + b = a % b + a = t + } + return a +} + +func newDropper(c dropCategory) *dropper { + w := newRandomWRR() + gcdv := gcd(c.RequestsPerMillion, million) + // Return true for RequestPerMillion, false for the rest. + w.Add(true, int64(c.RequestsPerMillion/gcdv)) + w.Add(false, int64((million-c.RequestsPerMillion)/gcdv)) + + return &dropper{ + category: c.Category, + w: w, + } +} + +func (d *dropper) drop() (ret bool) { + return d.w.Next().(bool) +} + +// loadReporter wraps the methods from the loadStore that are used here. +type loadReporter interface { + CallDropped(locality string) +} + +type dropPicker struct { + drops []*dropper + s balancer.State + loadStore loadReporter + // TODO: add serviceRequestCount and maxRequestCount for circuit breaking. +} + +func newDropPicker(s balancer.State, drops []*dropper, loadStore load.PerClusterReporter) *dropPicker { + return &dropPicker{ + drops: drops, + s: s, + loadStore: loadStore, + } +} + +func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + // Don't drop unless the inner picker is READY. Similar to + // https://github.com/grpc/grpc-go/issues/2622. + if d.s.ConnectivityState != connectivity.Ready { + return d.s.Picker.Pick(info) + } + + for _, dp := range d.drops { + if dp.drop() { + if d.loadStore != nil { + d.loadStore.CallDropped(dp.category) + } + return balancer.PickResult{}, status.Errorf(codes.Unavailable, "RPC is dropped") + } + } + // TODO: support circuit breaking, check if d.maxRequestCount >= + // d.counter.StartRequestWithMax(). + return d.s.Picker.Pick(info) +} diff --git a/xds/internal/balancer/loadstore/load_store_wrapper.go b/xds/internal/balancer/loadstore/load_store_wrapper.go new file mode 100644 index 000000000000..88fa344118cc --- /dev/null +++ b/xds/internal/balancer/loadstore/load_store_wrapper.go @@ -0,0 +1,120 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package loadstore contains the loadStoreWrapper shared by the balancers. +package loadstore + +import ( + "sync" + + "google.golang.org/grpc/xds/internal/client/load" +) + +// NewWrapper creates a Wrapper. +func NewWrapper() *Wrapper { + return &Wrapper{} +} + +// Wrapper wraps a load store with cluster and edsService. +// +// It's store and cluster/edsService can be updated separately. And it will +// update its internal perCluster store so that new stats will be added to the +// correct perCluster. +// +// Note that this struct is a temporary walkaround before we implement graceful +// switch for EDS. Any update to the clusterName and serviceName is too early, +// the perfect timing is when the picker is updated with the new connection. +// This early update could cause picks for the old SubConn being reported to the +// new services. +// +// When the graceful switch in EDS is done, there should be no need for this +// struct. The policies that record/report load shouldn't need to handle update +// of lrsServerName/cluster/edsService. Its parent should do a graceful switch +// of the whole tree when one of that changes. +type Wrapper struct { + mu sync.RWMutex + cluster string + edsService string + // store and perCluster are initialized as nil. They are only set by the + // balancer when LRS is enabled. Before that, all functions to record loads + // are no-op. + store *load.Store + perCluster load.PerClusterReporter +} + +// UpdateClusterAndService updates the cluster name and eds service for this +// wrapper. If any one of them is changed from before, the perCluster store in +// this wrapper will also be updated. +func (lsw *Wrapper) UpdateClusterAndService(cluster, edsService string) { + lsw.mu.Lock() + defer lsw.mu.Unlock() + if cluster == lsw.cluster && edsService == lsw.edsService { + return + } + lsw.cluster = cluster + lsw.edsService = edsService + lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService) +} + +// UpdateLoadStore updates the load store for this wrapper. If it is changed +// from before, the perCluster store in this wrapper will also be updated. +func (lsw *Wrapper) UpdateLoadStore(store *load.Store) { + lsw.mu.Lock() + defer lsw.mu.Unlock() + if store == lsw.store { + return + } + lsw.store = store + lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService) +} + +// CallStarted records a call started in the store. +func (lsw *Wrapper) CallStarted(locality string) { + lsw.mu.RLock() + defer lsw.mu.RUnlock() + if lsw.perCluster != nil { + lsw.perCluster.CallStarted(locality) + } +} + +// CallFinished records a call finished in the store. +func (lsw *Wrapper) CallFinished(locality string, err error) { + lsw.mu.RLock() + defer lsw.mu.RUnlock() + if lsw.perCluster != nil { + lsw.perCluster.CallFinished(locality, err) + } +} + +// CallServerLoad records the server load in the store. +func (lsw *Wrapper) CallServerLoad(locality, name string, val float64) { + lsw.mu.RLock() + defer lsw.mu.RUnlock() + if lsw.perCluster != nil { + lsw.perCluster.CallServerLoad(locality, name, val) + } +} + +// CallDropped records a call dropped in the store. +func (lsw *Wrapper) CallDropped(category string) { + lsw.mu.RLock() + defer lsw.mu.RUnlock() + if lsw.perCluster != nil { + lsw.perCluster.CallDropped(category) + } +} diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go index d60355afd25e..ab9ee7109db1 100644 --- a/xds/internal/balancer/lrs/balancer.go +++ b/xds/internal/balancer/lrs/balancer.go @@ -22,11 +22,11 @@ package lrs import ( "encoding/json" "fmt" - "sync" "google.golang.org/grpc/balancer" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/xds/internal/balancer/loadstore" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/load" ) @@ -162,72 +162,6 @@ type xdsClientInterface interface { Close() } -type loadStoreWrapper struct { - mu sync.RWMutex - cluster string - edsService string - // Both store and perCluster will be nil if load reporting is disabled (EDS - // response doesn't have LRS server name). Note that methods on Store and - // perCluster all handle nil, so there's no need to check nil before calling - // them. - store *load.Store - perCluster load.PerClusterReporter -} - -func (lsw *loadStoreWrapper) updateClusterAndService(cluster, edsService string) { - lsw.mu.Lock() - defer lsw.mu.Unlock() - if cluster == lsw.cluster && edsService == lsw.edsService { - return - } - lsw.cluster = cluster - lsw.edsService = edsService - lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService) -} - -func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) { - lsw.mu.Lock() - defer lsw.mu.Unlock() - if store == lsw.store { - return - } - lsw.store = store - lsw.perCluster = nil - lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService) -} - -func (lsw *loadStoreWrapper) CallStarted(locality string) { - lsw.mu.RLock() - defer lsw.mu.RUnlock() - if lsw.perCluster != nil { - lsw.perCluster.CallStarted(locality) - } -} - -func (lsw *loadStoreWrapper) CallFinished(locality string, err error) { - lsw.mu.RLock() - defer lsw.mu.RUnlock() - if lsw.perCluster != nil { - lsw.perCluster.CallFinished(locality, err) - } -} - -func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) { - lsw.mu.RLock() - defer lsw.mu.RUnlock() - if lsw.perCluster != nil { - lsw.perCluster.CallServerLoad(locality, name, val) - } -} - -func (lsw *loadStoreWrapper) CallDropped(category string) { - lsw.mu.RLock() - defer lsw.mu.RUnlock() - if lsw.perCluster != nil { - lsw.perCluster.CallDropped(category) - } -} - type xdsClientWrapper struct { c xdsClientInterface cancelLoadReport func() @@ -236,13 +170,13 @@ type xdsClientWrapper struct { lrsServerName string // loadWrapper is a wrapper with loadOriginal, with clusterName and // edsServiceName. It's used children to report loads. - loadWrapper *loadStoreWrapper + loadWrapper *loadstore.Wrapper } func newXDSClientWrapper(c xdsClientInterface) *xdsClientWrapper { return &xdsClientWrapper{ c: c, - loadWrapper: &loadStoreWrapper{}, + loadWrapper: loadstore.NewWrapper(), } } @@ -274,7 +208,7 @@ func (w *xdsClientWrapper) update(newConfig *lbConfig) error { // On the other hand, this will almost never happen. Each LRS policy // shouldn't get updated config. The parent should do a graceful switch when // the clusterName or serviceName is changed. - w.loadWrapper.updateClusterAndService(w.clusterName, w.edsServiceName) + w.loadWrapper.UpdateClusterAndService(w.clusterName, w.edsServiceName) } if w.lrsServerName != newConfig.LrsLoadReportingServerName { @@ -293,7 +227,7 @@ func (w *xdsClientWrapper) update(newConfig *lbConfig) error { if w.c != nil { loadStore, w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName) } - w.loadWrapper.updateLoadStore(loadStore) + w.loadWrapper.UpdateLoadStore(loadStore) } return nil