-
Notifications
You must be signed in to change notification settings - Fork 4.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New grpclb implementation #1558
Conversation
94b3cd9
to
e5af9ad
Compare
balancer/balancer.go
Outdated
@@ -128,6 +128,10 @@ type PickOptions struct{} | |||
type DoneInfo struct { | |||
// Err is the rpc error the RPC finished with. It could be nil. | |||
Err error | |||
// BytesSent indicates if any byte has been sent to the server. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*"bytes have"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
call.go
Outdated
@@ -251,13 +251,14 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli | |||
stream, err = t.NewStream(ctx, callHdr) | |||
if err != nil { | |||
if done != nil { | |||
doneInfo := balancer.DoneInfo{Err: err} | |||
if _, ok := err.(transport.ConnectionError); ok { | |||
// If error is connection error, transport was sending data on wire, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually not true (any longer?).
NewStream only returns errors before it attempts to write to the network. I.e. this block should be deleted (and is by #1597).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping? Bytes not sent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rebased on top of the retry PR. This was removed.
call.go
Outdated
done(balancer.DoneInfo{Err: err}) | ||
doneInfo := balancer.DoneInfo{ | ||
Err: err, | ||
BytesSent: stream.BytesSent(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, if there is no error from NewStream, we always attempted to write bytes to the wire. So this should be "true". (And below.)
grpclb_picker.go
Outdated
NumCallsFinishedKnownReceived int64 | ||
} | ||
|
||
// toClientStats converts rpcStats to lbpb.ClientStats, and clear rpcStats. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*clearS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
grpclb_picker.go
Outdated
|
||
mu sync.Mutex | ||
serverList []*lbpb.Server | ||
nextSL int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is nextSL and nextSC? Please document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
grpclb.go
Outdated
if l == nil { | ||
// regeneratePicker takes a snapshot of the balancer, and generate a picker from | ||
// it. The picker | ||
// - always return ErrTransientFailure if the balancer is in TransientFailure, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*returnS
and doES below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
grpclb.go
Outdated
func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) { | ||
timer := time.NewTimer(fallbackTimeout) | ||
defer func() { | ||
if !timer.Stop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, timer
is a local, so you don't even have to stop it if you don't want. You definitely don't have to check its return value and drain its channel. That's only if you want to reuse it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
grpclb.go
Outdated
// Has iterated all the possible address but none is connected. | ||
break | ||
} | ||
var ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var remoteBalancerAddrs, backendAddrs []resolver.Address
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
grpclb.go
Outdated
b.mu.Unlock() | ||
if lb.ccRemoteLB == nil { | ||
if len(remoteBalancerAddrs) <= 0 { | ||
grpclog.Fatalf("grpclb: no remote balancer address is available, should never happen") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we return an error instead anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
We don't check this returned error, though...
} | ||
if b.w != nil { | ||
b.w.Close() | ||
close(lb.doneCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still racy - is that OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close() is guaranteed to be called from one goroutine (never in parallel).
So this should be OK.
3347ebc
to
23c3be9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. PTAL.
balancer/balancer.go
Outdated
@@ -128,6 +128,10 @@ type PickOptions struct{} | |||
type DoneInfo struct { | |||
// Err is the rpc error the RPC finished with. It could be nil. | |||
Err error | |||
// BytesSent indicates if any byte has been sent to the server. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
grpclb.go
Outdated
} | ||
// NewLBBuilder creates a builder for grpclb. | ||
func NewLBBuilder() balancer.Builder { | ||
// TODO(bar grpclb) this function is exported for testing only, remove it when resolver supports selecting grpclb. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
grpclb.go
Outdated
// NewLBBuilderWithFallbackTimeout creates a grpclb builder with the given | ||
// fallbackTimeout. | ||
// | ||
// Only call this function when a non-default fallback timeout is needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can also be used to override the registered grpclb builder with a new fallback timeout.
Updated the comment.
grpclb.go
Outdated
} | ||
|
||
func (b *grpclbBalancer) processServerList(l *lbmpb.ServerList, seq int) { | ||
if l == nil { | ||
// regeneratePicker takes a snapshot of the balancer, and generate a picker from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
grpclb.go
Outdated
if l == nil { | ||
// regeneratePicker takes a snapshot of the balancer, and generate a picker from | ||
// it. The picker | ||
// - always return ErrTransientFailure if the balancer is in TransientFailure, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
grpclb_remote_balancer.go
Outdated
continue | ||
} | ||
if initResp.LoadBalancerDelegate != "" { | ||
grpclog.Fatalf("TODO: Delegation is not supported yet.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to Errorf
and continue.
grpclb_remote_balancer.go
Outdated
} | ||
|
||
func (lb *lbBalancer) dialRemoteLB(remoteLBName string) { | ||
var ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
grpclb_remote_balancer.go
Outdated
grpclog.Fatalf("TODO: Delegation is not supported yet.") | ||
} | ||
|
||
// streamDone will be closed by the readServerList goroutine when |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, right, context.
There's no need to create a new context here.
SendLoadReport()
should block on the stream's context.
grpclb_remote_balancer.go
Outdated
// time.Ticker forever. | ||
streamDone := make(chan struct{}) | ||
|
||
var wg sync.WaitGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
grpclb_remote_balancer.go
Outdated
dopts = append(dopts, WithInsecure()) | ||
} | ||
if lb.opt.Dialer != nil { | ||
// WithDialer takes a different type of function, so we instead use a special DialOption here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
Added a unexported withContextDialer()
23c3be9
to
3b7168a
Compare
3b7168a
to
25da9c2
Compare
call.go
Outdated
@@ -251,13 +251,14 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli | |||
stream, err = t.NewStream(ctx, callHdr) | |||
if err != nil { | |||
if done != nil { | |||
doneInfo := balancer.DoneInfo{Err: err} | |||
if _, ok := err.(transport.ConnectionError); ok { | |||
// If error is connection error, transport was sending data on wire, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping? Bytes not sent.
grpclb.go
Outdated
} | ||
// NewLBBuilder creates a builder for grpclb. | ||
func NewLBBuilder() balancer.Builder { | ||
// TODO(bar grpclb) this function is exported for testing only, remove it when resolver supports selecting grpclb. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not done?
grpclb.go
Outdated
// NewLBBuilderWithFallbackTimeout creates a grpclb builder with the given | ||
// fallbackTimeout. | ||
// | ||
// Only call this function when a non-default fallback timeout is needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's a fallback timeout? OK, I think I know what it means, but can you explain in the comment?
grpclb.go
Outdated
// The ClientConn to talk to the remote balancer. | ||
ccRemoteLB *ClientConn | ||
|
||
mu sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this guard?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything following until the blank line...
I move the code around and added a comment.
grpclb.go
Outdated
} | ||
|
||
func (b *grpclbBalancer) processServerList(l *lbmpb.ServerList, seq int) { | ||
if l == nil { | ||
// regeneratePicker takes a snapshot of the balancer, and generate a picker from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not done? Maybe your latest updates aren't pushed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. PTAL.
The "Bytes not sent" was about the byteSent
boolean variable you removed in your retry change.
I rebased this PR on the retry commit, so it's fixed now.
grpclb.go
Outdated
} | ||
|
||
func (b *grpclbBalancer) processServerList(l *lbmpb.ServerList, seq int) { | ||
if l == nil { | ||
// regeneratePicker takes a snapshot of the balancer, and generate a picker from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done again... And also the following one.
grpclb.go
Outdated
// NewLBBuilderWithFallbackTimeout creates a grpclb builder with the given | ||
// fallbackTimeout. | ||
// | ||
// Only call this function when a non-default fallback timeout is needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
grpclb.go
Outdated
} | ||
// NewLBBuilder creates a builder for grpclb. | ||
func NewLBBuilder() balancer.Builder { | ||
// TODO(bar grpclb) this function is exported for testing only, remove it when resolver supports selecting grpclb. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done again...
grpclb.go
Outdated
// The ClientConn to talk to the remote balancer. | ||
ccRemoteLB *ClientConn | ||
|
||
mu sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything following until the blank line...
I move the code around and added a comment.
split lbPicker into lbPicker and errPicker grpclb_remote_balancer.go go style and supress error logs add rrpicker
479269c
to
9161131
Compare
grpclb_remote_balancer.go
Outdated
for { | ||
select { | ||
case <-lb.doneCh: | ||
return | ||
default: | ||
if remoteBalancerErr != nil { | ||
grpclog.Error(remoteBalancerErr) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remoteBalancerErr = nil
?
9877aa1
to
15d0543
Compare
Also add new feature: