-
Notifications
You must be signed in to change notification settings - Fork 4.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xds: xds_cluster_impl_balancer part 1 #4154
Conversation
3315489
to
5214205
Compare
*/ | ||
|
||
// Package utils contains utility structs shared by the balancers. | ||
package utils |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generic package names such a util/common/helper etc are better avoided. See go/go-utility-packages.
I would suggest a package name of loadstore
and type name Wrapper
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return | ||
} | ||
lsw.store = store | ||
lsw.perCluster = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this assignment to nil
required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleted
edsService string | ||
// Both store and perCluster will be nil if load reporting is disabled (EDS | ||
// response doesn't have LRS server name). Note that methods on Store and | ||
// perCluster all handle nil, so there's no need to check nil before calling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment about nil
is a little confusing, because we are checking for nil
in LoadStoreWrapper.Call*()
methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is no longer true. Updated.
} | ||
|
||
// CallStarted records a call started in the store. | ||
func (lsw *LoadStoreWrapper) CallStarted(locality string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that these Call*
methods are called concurrently? If not, the RWMutex really worth it? Because I would assume the UpdateLoadStore()
method is called really infrequently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they can. They are called by pickers, and each pick can record load.
) | ||
|
||
/* | ||
message XdsClusterImplLoadBalancingPolicyConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we do have a dependency on the new protobuf library now (google.golang.org/protobuf
), would it make more sense to directly use proto messages (from grpc-proto repo) corresponding to the service config, and convert from JSON directly into them.
I'm also wondering if this approach will simplify things in LB policies which generate service config for its children. Today we have strong coupling among these because the parent directly fills in the struct corresponding to the service config (and this struct is defined in the child policy).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot just use the proto message, because of the extra check we do on the sub balancer policies (internalserviceconfig.BalancerConfig
).
Parent balancers shouldn't use child's policy struct directly (we do have some today, but we should change those).
Because the only validation we do is usually in parseConfig
. Bypassing the JSON unmarshal also bypasses the validation, which can cause problems later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to simply link to the serviceConfig proto message in grpc-proto repo rather than inlining it here in this comment? I'm only concerned about keeping this up-to-date if something changes in the actual proto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not in the proto. I deleted the comments.
return cib.cc.NewSubConn(addresses, options) | ||
} | ||
|
||
func (cib *clusterImplBalancer) RemoveSubConn(conn balancer.SubConn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely convinced that it would be better to embed the balancer.ClientConn
in clusterImplBalancer
type, but the only advantage would be that when we change the balancer API (which we are planning to by adding UpdateAddresses()
to balancer.ClientConn
), we wouldn't need any changes here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to embed.
restartLoadReport = true | ||
cib.lrsServerName = newLRSServerName | ||
} | ||
if restartLoadReport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we get rid of restartLoadReport
var and do the underlying functionality under if cib.lrsServerName != newLRSServerName { ... }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea!
5214205
to
c6fe038
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. All done. PTAL.
*/ | ||
|
||
// Package clusterimpl implements the xds_cluster_impl balancing policy. It | ||
// handles the features in cluster except name resolution. Features include |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand your comment. Are you suggesting we don't mention the features?
Anyway, I modified the comment, to make it explicit that name resolution is done in xds_cluster_resolver.
|
||
type clusterImplBB struct{} | ||
|
||
func (wt *clusterImplBB) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
restartLoadReport = true | ||
cib.lrsServerName = newLRSServerName | ||
} | ||
if restartLoadReport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea!
|
||
// Compare new drop config. And update picker if it's changed. | ||
var updatePicker bool | ||
if cib.config == nil || !reflect.DeepEqual(cib.config.DropCategories, newConfig.DropCategories) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
This is not comparing the whole config, it's just comparing the drop categories.
Added a function for that.
*/ | ||
|
||
// Package utils contains utility structs shared by the balancers. | ||
package utils |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
edsService string | ||
// Both store and perCluster will be nil if load reporting is disabled (EDS | ||
// response doesn't have LRS server name). Note that methods on Store and | ||
// perCluster all handle nil, so there's no need to check nil before calling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is no longer true. Updated.
return | ||
} | ||
lsw.store = store | ||
lsw.perCluster = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleted
} | ||
|
||
// CallStarted records a call started in the store. | ||
func (lsw *LoadStoreWrapper) CallStarted(locality string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they can. They are called by pickers, and each pick can record load.
drops []*dropper | ||
s balancer.State | ||
loadStore loadReporter | ||
counter *client.ServiceRequestsCounter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The counter map is global.
It's not in the client, but it's in the client package (though I kind of don't like that now, I may change it in the next PR).
This is a pointer to the global counter for this cluster.
And all circuit breaking code removed. Will add in the next PR.
271a479
to
b2fc9e1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks mostly good to me. Mostly minor nits in this round.
|
||
type clusterImplBB struct{} | ||
|
||
func (clusterImplBB) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it completely safe to ignore these BuildOptions
here? Maybe some child policy down the tree needs it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we should keep it.
Added a field.
b.logger = prefixLogger(b) | ||
b.logger.Infof("Created") | ||
|
||
client, err := newXDSClient() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It might be better to create the xdsClient
as the first thing in this function for two reasons:
- We can assign to
xdsC
as part of the literal struct initialization - In the case that the
xdsClient
creation fails, we wouldn't see the log line emitted byb.logger.Infof("Created")
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do it in this order for the logger.Errorf
below. And logger
needs b
to print the pointer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Then maybe, we should move the line b.logger.Infof("Created")
to be after this err != nil
, so that we are sure that it is not printed in the case where the xds client creation fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
closed *grpcsync.Event | ||
|
||
logger *grpclog.PrefixLogger | ||
xdsC xdsClientInterface | ||
|
||
config *lbConfig | ||
childLB balancer.Balancer | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: seems a little too sparse. Maybe get rid of some of the newlines here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Newlines removed.
cib.edsServiceName = newConfig.EDSServiceName | ||
} | ||
if updateLoadClusterAndService { | ||
// This updates the clusterName and serviceName that will reported for the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/will reported/will be reported/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
) | ||
|
||
/* | ||
message XdsClusterImplLoadBalancingPolicyConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to simply link to the serviceConfig proto message in grpc-proto repo rather than inlining it here in this comment? I'm only concerned about keeping this up-to-date if something changes in the actual proto.
) | ||
|
||
// NewLoadStoreWrapper creates a Wrapper. | ||
func NewLoadStoreWrapper() *Wrapper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/NewLoadStoreWrapper/NewWrapper/ ?
go/go-style/decisions#repetitive-with-package
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
sds := loadStore.Stats([]string{testClusterName}) | ||
if len(sds) == 0 { | ||
t.Fatalf("loads for cluster %v not found in store", testClusterName) | ||
} | ||
sd := sds[0] | ||
if sd.Cluster != testClusterName || sd.Service != testServiceName { | ||
t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName) | ||
} | ||
const dropCount = rpcCount * dropNumerator / dropDenominator | ||
if diff := cmp.Diff(sd.Drops, map[string]uint64{dropReason: dropCount}); diff != "" { | ||
t.Fatalf("got unexpected drop reports, diff (-got, +want): %v", diff) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we directly instantiate a wantStats
variable and replace all these individual checks with the following:
wantStats := []*Data{ ... }
gotStats := loadStore.Stats([]string{testClusterName})
if !cmp.Diff(gotStats, wantStats) {
t.Fatalf(...)
}
go/go-style/decisions#compare-full-structures
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Done
} | ||
} | ||
|
||
sds2 := loadStore.Stats([]string{testClusterName}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Prefer full struct comparisons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. PTAL.
sds := loadStore.Stats([]string{testClusterName}) | ||
if len(sds) == 0 { | ||
t.Fatalf("loads for cluster %v not found in store", testClusterName) | ||
} | ||
sd := sds[0] | ||
if sd.Cluster != testClusterName || sd.Service != testServiceName { | ||
t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName) | ||
} | ||
const dropCount = rpcCount * dropNumerator / dropDenominator | ||
if diff := cmp.Diff(sd.Drops, map[string]uint64{dropReason: dropCount}); diff != "" { | ||
t.Fatalf("got unexpected drop reports, diff (-got, +want): %v", diff) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Done
} | ||
} | ||
|
||
sds2 := loadStore.Stats([]string{testClusterName}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
type clusterImplBB struct{} | ||
|
||
func (clusterImplBB) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we should keep it.
Added a field.
b.logger = prefixLogger(b) | ||
b.logger.Infof("Created") | ||
|
||
client, err := newXDSClient() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do it in this order for the logger.Errorf
below. And logger
needs b
to print the pointer.
closed *grpcsync.Event | ||
|
||
logger *grpclog.PrefixLogger | ||
xdsC xdsClientInterface | ||
|
||
config *lbConfig | ||
childLB balancer.Balancer | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Newlines removed.
cib.edsServiceName = newConfig.EDSServiceName | ||
} | ||
if updateLoadClusterAndService { | ||
// This updates the clusterName and serviceName that will reported for the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
) | ||
|
||
/* | ||
message XdsClusterImplLoadBalancingPolicyConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not in the proto. I deleted the comments.
) | ||
|
||
// NewLoadStoreWrapper creates a Wrapper. | ||
func NewLoadStoreWrapper() *Wrapper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
b684f2e
to
1626a79
Compare
b.logger = prefixLogger(b) | ||
b.logger.Infof("Created") | ||
|
||
client, err := newXDSClient() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Then maybe, we should move the line b.logger.Infof("Created")
to be after this err != nil
, so that we are sure that it is not printed in the case where the xds client creation fails.
12fd642
to
3a50f6f
Compare
Part of C2P fallback. To support fallback to a DNS cluster.
This PR adds implementation of xds_cluster_impl_balancer, which will be responsible for circuit breaking and rpc dropping.
This PR only added RPC dropping. Circuit breaking will be done in a followup PR, after some necessary refactoring.