diff --git a/go.mod b/go.mod index b2507fc46fa..779e9546fe2 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 golang.org/x/net v0.0.0-20190311183353-d8887717615a golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 - google.golang.org/grpc v1.22.1 + google.golang.org/grpc v1.22.1-0.20190805101010-a2bdfb40ff25 gopkg.in/cheggaaa/pb.v1 v1.0.25 gopkg.in/yaml.v2 v2.2.2 sigs.k8s.io/yaml v1.1.0 diff --git a/go.sum b/go.sum index ba00e25c092..81cf4f99e1c 100644 --- a/go.sum +++ b/go.sum @@ -177,8 +177,8 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= -google.golang.org/grpc v1.22.1 h1:/7cs52RnTJmD43s3uxzlq2U7nqVTd/37viQwMrMNlOM= -google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.22.1-0.20190805101010-a2bdfb40ff25 h1:lS/LGci7282xXbzMwFpHD7RKjsfKUK3KYwk34RYtlK0= +google.golang.org/grpc v1.22.1-0.20190805101010-a2bdfb40ff25/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/vendor/google.golang.org/grpc/balancer.go b/vendor/google.golang.org/grpc/balancer.go index a78e702baee..a8eb0f47609 100644 --- a/vendor/google.golang.org/grpc/balancer.go +++ b/vendor/google.golang.org/grpc/balancer.go @@ -43,7 +43,7 @@ type Address struct { // BalancerConfig specifies the configurations for Balancer. // -// Deprecated: please use package balancer. +// Deprecated: please use package balancer. May be removed in a future 1.x release. type BalancerConfig struct { // DialCreds is the transport credential the Balancer implementation can // use to dial to a remote load balancer server. The Balancer implementations @@ -57,7 +57,7 @@ type BalancerConfig struct { // BalancerGetOptions configures a Get call. // -// Deprecated: please use package balancer. +// Deprecated: please use package balancer. May be removed in a future 1.x release. type BalancerGetOptions struct { // BlockingWait specifies whether Get should block when there is no // connected address. @@ -66,7 +66,7 @@ type BalancerGetOptions struct { // Balancer chooses network addresses for RPCs. // -// Deprecated: please use package balancer. +// Deprecated: please use package balancer. May be removed in a future 1.x release. type Balancer interface { // Start does the initialization work to bootstrap a Balancer. For example, // this function may start the name resolution and watch the updates. It will @@ -120,7 +120,7 @@ type Balancer interface { // RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch // the name resolution updates and updates the addresses available correspondingly. // -// Deprecated: please use package balancer/roundrobin. +// Deprecated: please use package balancer/roundrobin. May be removed in a future 1.x release. func RoundRobin(r naming.Resolver) Balancer { return &roundRobin{r: r} } diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go index e587d8d11a7..1af88f0a3f1 100644 --- a/vendor/google.golang.org/grpc/balancer/base/balancer.go +++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go @@ -73,7 +73,9 @@ func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) { // TODO: handle s.ResolverState.Err (log if not nil) once implemented. // TODO: handle s.ResolverState.ServiceConfig? - grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s) + if grpclog.V(2) { + grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s) + } // addrsSet is the set converted from addrs, it's used for quick lookup of an address. addrsSet := make(map[resolver.Address]struct{}) for _, a := range s.ResolverState.Addresses { @@ -127,10 +129,14 @@ func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectiv func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { s := state.ConnectivityState - grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s) + if grpclog.V(2) { + grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s) + } oldS, ok := b.scStates[sc] if !ok { - grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s) + if grpclog.V(2) { + grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s) + } return } b.scStates[sc] = s diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go index 7bc6621a5ee..8df4095ca95 100644 --- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go +++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go @@ -183,7 +183,7 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) { if ccb.cc.curBalancerName != grpclbName { // Filter any grpclb addresses since we don't have the grpclb balancer. - s := ccs.ResolverState + s := &ccs.ResolverState for i := 0; i < len(s.Addresses); { if s.Addresses[i].Type == resolver.GRPCLB { copy(s.Addresses[i:], s.Addresses[i+1:]) diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index 84e31a26756..a7643df7d29 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -38,7 +38,6 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/channelz" - "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/keepalive" @@ -1061,8 +1060,8 @@ func (ac *addrConn) resetTransport() { ac.mu.Lock() if ac.state == connectivity.Shutdown { - newTr.Close() ac.mu.Unlock() + newTr.Close() return } ac.curAddr = addr @@ -1077,20 +1076,16 @@ func (ac *addrConn) resetTransport() { // we restart from the top of the addr list. <-reconnect.Done() hcancel() - - // Need to reconnect after a READY, the addrConn enters - // TRANSIENT_FAILURE. + // restart connecting - the top of the loop will set state to + // CONNECTING. This is against the current connectivity semantics doc, + // however it allows for graceful behavior for RPCs not yet dispatched + // - unfortunate timing would otherwise lead to the RPC failing even + // though the TRANSIENT_FAILURE state (called for by the doc) would be + // instantaneous. // - // This will set addrConn to TRANSIENT_FAILURE for a very short period - // of time, and turns CONNECTING. It seems reasonable to skip this, but - // READY-CONNECTING is not a valid transition. - ac.mu.Lock() - if ac.state == connectivity.Shutdown { - ac.mu.Unlock() - return - } - ac.updateConnectivityState(connectivity.TransientFailure) - ac.mu.Unlock() + // Ideally we should transition to Idle here and block until there is + // RPC activity that leads to the balancer requesting a reconnect of + // the associated SubConn. } } @@ -1147,14 +1142,35 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne Authority: ac.cc.authority, } + once := sync.Once{} onGoAway := func(r transport.GoAwayReason) { ac.mu.Lock() ac.adjustParams(r) + once.Do(func() { + if ac.state == connectivity.Ready { + // Prevent this SubConn from being used for new RPCs by setting its + // state to Connecting. + // + // TODO: this should be Idle when grpc-go properly supports it. + ac.updateConnectivityState(connectivity.Connecting) + } + }) ac.mu.Unlock() reconnect.Fire() } onClose := func() { + ac.mu.Lock() + once.Do(func() { + if ac.state == connectivity.Ready { + // Prevent this SubConn from being used for new RPCs by setting its + // state to Connecting. + // + // TODO: this should be Idle when grpc-go properly supports it. + ac.updateConnectivityState(connectivity.Connecting) + } + }) + ac.mu.Unlock() close(onCloseCalled) reconnect.Fire() } @@ -1176,20 +1192,18 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne return nil, nil, err } - if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn { - select { - case <-time.After(connectDeadline.Sub(time.Now())): - // We didn't get the preface in time. - newTr.Close() - grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr) - return nil, nil, errors.New("timed out waiting for server handshake") - case <-prefaceReceived: - // We got the preface - huzzah! things are good. - case <-onCloseCalled: - // The transport has already closed - noop. - return nil, nil, errors.New("connection closed") - // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix. - } + select { + case <-time.After(connectDeadline.Sub(time.Now())): + // We didn't get the preface in time. + newTr.Close() + grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr) + return nil, nil, errors.New("timed out waiting for server handshake") + case <-prefaceReceived: + // We got the preface - huzzah! things are good. + case <-onCloseCalled: + // The transport has already closed - noop. + return nil, nil, errors.New("connection closed") + // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix. } return newTr, reconnect, nil } diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go index 69c003159d4..e8f34d0d6ea 100644 --- a/vendor/google.golang.org/grpc/dialoptions.go +++ b/vendor/google.golang.org/grpc/dialoptions.go @@ -60,7 +60,6 @@ type dialOptions struct { balancerBuilder balancer.Builder // This is to support grpclb. resolverBuilder resolver.Builder - reqHandshake envconfig.RequireHandshakeSetting channelzParentID int64 disableServiceConfig bool disableRetry bool @@ -100,17 +99,6 @@ func newFuncDialOption(f func(*dialOptions)) *funcDialOption { } } -// WithWaitForHandshake blocks until the initial settings frame is received from -// the server before assigning RPCs to the connection. -// -// Deprecated: this is the default behavior, and this option will be removed -// after the 1.18 release. -func WithWaitForHandshake() DialOption { - return newFuncDialOption(func(o *dialOptions) { - o.reqHandshake = envconfig.RequireHandshakeOn - }) -} - // WithWriteBufferSize determines how much data can be batched before doing a // write on the wire. The corresponding memory allocation for this buffer will // be twice the size to keep syscalls low. The default value for this buffer is @@ -156,7 +144,8 @@ func WithInitialConnWindowSize(s int32) DialOption { // WithMaxMsgSize returns a DialOption which sets the maximum message size the // client can receive. // -// Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead. +// Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead. Will +// be supported throughout 1.x. func WithMaxMsgSize(s int) DialOption { return WithDefaultCallOptions(MaxCallRecvMsgSize(s)) } @@ -172,7 +161,8 @@ func WithDefaultCallOptions(cos ...CallOption) DialOption { // WithCodec returns a DialOption which sets a codec for message marshaling and // unmarshaling. // -// Deprecated: use WithDefaultCallOptions(ForceCodec(_)) instead. +// Deprecated: use WithDefaultCallOptions(ForceCodec(_)) instead. Will be +// supported throughout 1.x. func WithCodec(c Codec) DialOption { return WithDefaultCallOptions(CallCustomCodec(c)) } @@ -181,7 +171,7 @@ func WithCodec(c Codec) DialOption { // message compression. It has lower priority than the compressor set by the // UseCompressor CallOption. // -// Deprecated: use UseCompressor instead. +// Deprecated: use UseCompressor instead. Will be supported throughout 1.x. func WithCompressor(cp Compressor) DialOption { return newFuncDialOption(func(o *dialOptions) { o.cp = cp @@ -196,7 +186,8 @@ func WithCompressor(cp Compressor) DialOption { // message. If no compressor is registered for the encoding, an Unimplemented // status error will be returned. // -// Deprecated: use encoding.RegisterCompressor instead. +// Deprecated: use encoding.RegisterCompressor instead. Will be supported +// throughout 1.x. func WithDecompressor(dc Decompressor) DialOption { return newFuncDialOption(func(o *dialOptions) { o.dc = dc @@ -207,7 +198,7 @@ func WithDecompressor(dc Decompressor) DialOption { // Name resolver will be ignored if this DialOption is specified. // // Deprecated: use the new balancer APIs in balancer package and -// WithBalancerName. +// WithBalancerName. Will be removed in a future 1.x release. func WithBalancer(b Balancer) DialOption { return newFuncDialOption(func(o *dialOptions) { o.balancerBuilder = &balancerWrapperBuilder{ @@ -223,7 +214,8 @@ func WithBalancer(b Balancer) DialOption { // The balancer cannot be overridden by balancer option specified by service // config. // -// This is an EXPERIMENTAL API. +// Deprecated: use WithDefaultServiceConfig and WithDisableServiceConfig +// instead. Will be removed in a future 1.x release. func WithBalancerName(balancerName string) DialOption { builder := balancer.Get(balancerName) if builder == nil { @@ -244,9 +236,10 @@ func withResolverBuilder(b resolver.Builder) DialOption { // WithServiceConfig returns a DialOption which has a channel to read the // service configuration. // -// Deprecated: service config should be received through name resolver, as -// specified here. -// https://github.com/grpc/grpc/blob/master/doc/service_config.md +// Deprecated: service config should be received through name resolver or via +// WithDefaultServiceConfig, as specified at +// https://github.com/grpc/grpc/blob/master/doc/service_config.md. Will be +// removed in a future 1.x release. func WithServiceConfig(c <-chan ServiceConfig) DialOption { return newFuncDialOption(func(o *dialOptions) { o.scChan = c @@ -329,7 +322,8 @@ func WithCredentialsBundle(b credentials.Bundle) DialOption { // WithTimeout returns a DialOption that configures a timeout for dialing a // ClientConn initially. This is valid if and only if WithBlock() is present. // -// Deprecated: use DialContext and context.WithTimeout instead. +// Deprecated: use DialContext and context.WithTimeout instead. Will be +// supported throughout 1.x. func WithTimeout(d time.Duration) DialOption { return newFuncDialOption(func(o *dialOptions) { o.timeout = d @@ -356,7 +350,8 @@ func init() { // is returned by f, gRPC checks the error's Temporary() method to decide if it // should try to reconnect to the network address. // -// Deprecated: use WithContextDialer instead +// Deprecated: use WithContextDialer instead. Will be supported throughout +// 1.x. func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { return WithContextDialer( func(ctx context.Context, addr string) (net.Conn, error) { @@ -480,8 +475,10 @@ func WithDisableServiceConfig() DialOption { // WithDefaultServiceConfig returns a DialOption that configures the default // service config, which will be used in cases where: -// 1. WithDisableServiceConfig is called. -// 2. Resolver does not return service config or if the resolver gets and invalid config. +// +// 1. WithDisableServiceConfig is also used. +// 2. Resolver does not return a service config or if the resolver returns an +// invalid service config. // // This API is EXPERIMENTAL. func WithDefaultServiceConfig(s string) DialOption { @@ -537,7 +534,6 @@ func withHealthCheckFunc(f internal.HealthChecker) DialOption { func defaultDialOptions() dialOptions { return dialOptions{ disableRetry: !envconfig.Retry, - reqHandshake: envconfig.RequireHandshake, healthCheckFunc: internal.HealthCheckFunc, copts: transport.ConnectOptions{ WriteBufferSize: defaultWriteBufSize, diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go index 11be7cd08c5..3ee8740f1f9 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go @@ -25,40 +25,11 @@ import ( ) const ( - prefix = "GRPC_GO_" - retryStr = prefix + "RETRY" - requireHandshakeStr = prefix + "REQUIRE_HANDSHAKE" -) - -// RequireHandshakeSetting describes the settings for handshaking. -type RequireHandshakeSetting int - -const ( - // RequireHandshakeOn indicates to wait for handshake before considering a - // connection ready/successful. - RequireHandshakeOn RequireHandshakeSetting = iota - // RequireHandshakeOff indicates to not wait for handshake before - // considering a connection ready/successful. - RequireHandshakeOff + prefix = "GRPC_GO_" + retryStr = prefix + "RETRY" ) var ( // Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on". Retry = strings.EqualFold(os.Getenv(retryStr), "on") - // RequireHandshake is set based upon the GRPC_GO_REQUIRE_HANDSHAKE - // environment variable. - // - // Will be removed after the 1.18 release. - RequireHandshake = RequireHandshakeOn ) - -func init() { - switch strings.ToLower(os.Getenv(requireHandshakeStr)) { - case "on": - fallthrough - default: - RequireHandshake = RequireHandshakeOn - case "off": - RequireHandshake = RequireHandshakeOff - } -} diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go index c96178d7403..8bb56d2dd60 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go @@ -493,6 +493,9 @@ func (t *http2Client) createAudience(callHdr *CallHdr) string { } func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) { + if len(t.perRPCCreds) == 0 { + return nil, nil + } authData := map[string]string{} for _, c := range t.perRPCCreds { data, err := c.GetRequestMetadata(ctx, audience) @@ -513,7 +516,7 @@ func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[s } func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) { - callAuthData := map[string]string{} + var callAuthData map[string]string // Check if credentials.PerRPCCredentials were provided via call options. // Note: if these credentials are provided both via dial options and call // options, then both sets of credentials will be applied. @@ -525,6 +528,7 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call if err != nil { return nil, status.Errorf(codes.Internal, "transport: %v", err) } + callAuthData = make(map[string]string, len(data)) for k, v := range data { // Capital header names are illegal in HTTP/2 k = strings.ToLower(k) @@ -556,7 +560,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { close(s.headerChan) } - } hdr := &headerFrame{ hf: headerFields, @@ -769,6 +772,9 @@ func (t *http2Client) Close() error { t.mu.Unlock() return nil } + // Call t.onClose before setting the state to closing to prevent the client + // from attempting to create new streams ASAP. + t.onClose() t.state = closing streams := t.activeStreams t.activeStreams = nil @@ -789,7 +795,6 @@ func (t *http2Client) Close() error { } t.statsHandler.HandleConn(t.ctx, connEnd) } - t.onClose() return err } @@ -978,9 +983,9 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { statusCode = codes.Unknown } if statusCode == codes.Canceled { - // Our deadline was already exceeded, and that was likely the cause of - // this cancelation. Alter the status code accordingly. - if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) { + if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) { + // Our deadline was already exceeded, and that was likely the cause + // of this cancelation. Alter the status code accordingly. statusCode = codes.DeadlineExceeded } } @@ -1085,11 +1090,12 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { default: t.setGoAwayReason(f) close(t.goAway) - t.state = draining t.controlBuf.put(&incomingGoAway{}) - - // This has to be a new goroutine because we're still using the current goroutine to read in the transport. + // Notify the clientconn about the GOAWAY before we set the state to + // draining, to allow the client to stop attempting to create streams + // before disallowing new streams on this connection. t.onGoAway(t.goAwayReason) + t.state = draining } // All streams with IDs greater than the GoAwayId // and smaller than the previous GoAway ID should be killed. @@ -1326,6 +1332,7 @@ func (t *http2Client) keepalive() { timer.Reset(t.kp.Time) continue } + infof("transport: closing client transport due to idleness.") t.Close() return case <-t.ctx.Done(): diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index 150b73e4659..beeb2ed33a6 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -766,6 +766,10 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { return nil } +func (t *http2Server) setResetPingStrikes() { + atomic.StoreUint32(&t.resetPingStrikes, 1) +} + func (t *http2Server) writeHeaderLocked(s *Stream) error { // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields // first and create a slice of that exact size. @@ -780,9 +784,7 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error { streamID: s.id, hf: headerFields, endStream: false, - onWrite: func() { - atomic.StoreUint32(&t.resetPingStrikes, 1) - }, + onWrite: t.setResetPingStrikes, }) if !success { if err != nil { @@ -842,9 +844,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { streamID: s.id, hf: headerFields, endStream: true, - onWrite: func() { - atomic.StoreUint32(&t.resetPingStrikes, 1) - }, + onWrite: t.setResetPingStrikes, } s.hdrMu.Unlock() success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader) @@ -896,12 +896,10 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e hdr = append(hdr, data[:emptyLen]...) data = data[emptyLen:] df := &dataFrame{ - streamID: s.id, - h: hdr, - d: data, - onEachWrite: func() { - atomic.StoreUint32(&t.resetPingStrikes, 1) - }, + streamID: s.id, + h: hdr, + d: data, + onEachWrite: t.setResetPingStrikes, } if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { select { @@ -967,6 +965,7 @@ func (t *http2Server) keepalive() { select { case <-maxAge.C: // Close the connection after grace period. + infof("transport: closing server transport due to maximum connection age.") t.Close() // Resetting the timer so that the clean-up doesn't deadlock. maxAge.Reset(infinity) @@ -980,6 +979,7 @@ func (t *http2Server) keepalive() { continue } if pingSent { + infof("transport: closing server transport due to idleness.") t.Close() // Resetting the timer so that the clean-up doesn't deadlock. keepalive.Reset(infinity) diff --git a/vendor/google.golang.org/grpc/pickfirst.go b/vendor/google.golang.org/grpc/pickfirst.go index d1e38aad778..ed05b02ed96 100644 --- a/vendor/google.golang.org/grpc/pickfirst.go +++ b/vendor/google.golang.org/grpc/pickfirst.go @@ -51,14 +51,18 @@ type pickfirstBalancer struct { func (b *pickfirstBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { if err != nil { - grpclog.Infof("pickfirstBalancer: HandleResolvedAddrs called with error %v", err) + if grpclog.V(2) { + grpclog.Infof("pickfirstBalancer: HandleResolvedAddrs called with error %v", err) + } return } if b.sc == nil { b.sc, err = b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{}) if err != nil { //TODO(yuxuanli): why not change the cc state to Idle? - grpclog.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err) + if grpclog.V(2) { + grpclog.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err) + } return } b.cc.UpdateBalancerState(connectivity.Idle, &picker{sc: b.sc}) @@ -70,9 +74,13 @@ func (b *pickfirstBalancer) HandleResolvedAddrs(addrs []resolver.Address, err er } func (b *pickfirstBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { - grpclog.Infof("pickfirstBalancer: HandleSubConnStateChange: %p, %v", sc, s) + if grpclog.V(2) { + grpclog.Infof("pickfirstBalancer: HandleSubConnStateChange: %p, %v", sc, s) + } if b.sc != sc { - grpclog.Infof("pickfirstBalancer: ignored state change because sc is not recognized") + if grpclog.V(2) { + grpclog.Infof("pickfirstBalancer: ignored state change because sc is not recognized") + } return } if s == connectivity.Shutdown { diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index 617289e2e36..f064b73e555 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -42,6 +42,7 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/binarylog" "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" @@ -56,6 +57,8 @@ const ( defaultServerMaxSendMessageSize = math.MaxInt32 ) +var statusOK = status.New(codes.OK, "") + type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) // MethodDesc represents an RPC service's method specification. @@ -97,10 +100,8 @@ type Server struct { m map[string]*service // service name -> service info events trace.EventLog - quit chan struct{} - done chan struct{} - quitOnce sync.Once - doneOnce sync.Once + quit *grpcsync.Event + done *grpcsync.Event channelzRemoveOnce sync.Once serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop @@ -388,8 +389,8 @@ func NewServer(opt ...ServerOption) *Server { opts: opts, conns: make(map[transport.ServerTransport]bool), m: make(map[string]*service), - quit: make(chan struct{}), - done: make(chan struct{}), + quit: grpcsync.NewEvent(), + done: grpcsync.NewEvent(), czData: new(channelzData), } s.cv = sync.NewCond(&s.mu) @@ -556,11 +557,9 @@ func (s *Server) Serve(lis net.Listener) error { s.serveWG.Add(1) defer func() { s.serveWG.Done() - select { - // Stop or GracefulStop called; block until done and return nil. - case <-s.quit: - <-s.done - default: + if s.quit.HasFired() { + // Stop or GracefulStop called; block until done and return nil. + <-s.done.Done() } }() @@ -603,7 +602,7 @@ func (s *Server) Serve(lis net.Listener) error { timer := time.NewTimer(tempDelay) select { case <-timer.C: - case <-s.quit: + case <-s.quit.Done(): timer.Stop() return nil } @@ -613,10 +612,8 @@ func (s *Server) Serve(lis net.Listener) error { s.printf("done serving; Accept = %v", err) s.mu.Unlock() - select { - case <-s.quit: + if s.quit.HasFired() { return nil - default: } return err } @@ -637,6 +634,10 @@ func (s *Server) Serve(lis net.Listener) error { // handleRawConn forks a goroutine to handle a just-accepted connection that // has not had any I/O performed on it yet. func (s *Server) handleRawConn(rawConn net.Conn) { + if s.quit.HasFired() { + rawConn.Close() + return + } rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) conn, authInfo, err := s.useTransportAuthenticator(rawConn) if err != nil { @@ -653,14 +654,6 @@ func (s *Server) handleRawConn(rawConn net.Conn) { return } - s.mu.Lock() - if s.conns == nil { - s.mu.Unlock() - conn.Close() - return - } - s.mu.Unlock() - // Finish handshaking (HTTP2) st := s.newHTTP2Transport(conn, authInfo) if st == nil { @@ -768,6 +761,9 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled. // If tracing is not enabled, it returns nil. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) { + if !EnableTracing { + return nil + } tr, ok := trace.FromContext(stream.Context()) if !ok { return nil @@ -1078,7 +1074,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. // TODO: Should we be logging if writing status failed here, like above? // Should the logging be in WriteStatus? Should we ignore the WriteStatus // error or allow the stats handler to see it? - err = t.WriteStatus(stream, status.New(codes.OK, "")) + err = t.WriteStatus(stream, statusOK) if binlog != nil { binlog.Log(&binarylog.ServerTrailer{ Trailer: stream.Trailer(), @@ -1236,7 +1232,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.trInfo.tr.LazyLog(stringer("OK"), false) ss.mu.Unlock() } - err = t.WriteStatus(ss.s, status.New(codes.OK, "")) + err = t.WriteStatus(ss.s, statusOK) if ss.binlog != nil { ss.binlog.Log(&binarylog.ServerTrailer{ Trailer: ss.s.Trailer(), @@ -1353,15 +1349,11 @@ func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream // pending RPCs on the client side will get notified by connection // errors. func (s *Server) Stop() { - s.quitOnce.Do(func() { - close(s.quit) - }) + s.quit.Fire() defer func() { s.serveWG.Wait() - s.doneOnce.Do(func() { - close(s.done) - }) + s.done.Fire() }() s.channelzRemoveOnce.Do(func() { @@ -1398,15 +1390,8 @@ func (s *Server) Stop() { // accepting new connections and RPCs and blocks until all the pending RPCs are // finished. func (s *Server) GracefulStop() { - s.quitOnce.Do(func() { - close(s.quit) - }) - - defer func() { - s.doneOnce.Do(func() { - close(s.done) - }) - }() + s.quit.Fire() + defer s.done.Fire() s.channelzRemoveOnce.Do(func() { if channelz.IsOn() { diff --git a/vendor/google.golang.org/grpc/status/status.go b/vendor/google.golang.org/grpc/status/status.go index 641c45c6fed..a1348e9b16b 100644 --- a/vendor/google.golang.org/grpc/status/status.go +++ b/vendor/google.golang.org/grpc/status/status.go @@ -58,6 +58,17 @@ func (se *statusError) GRPCStatus() *Status { return &Status{s: (*spb.Status)(se)} } +// Is implements future error.Is functionality. +// A statusError is equivalent if the code and message are identical. +func (se *statusError) Is(target error) bool { + tse, ok := target.(*statusError) + if !ok { + return false + } + + return proto.Equal((*spb.Status)(se), (*spb.Status)(tse)) +} + // Status represents an RPC status code, message, and details. It is immutable // and should be created with New, Newf, or FromProto. type Status struct { @@ -132,7 +143,7 @@ func FromProto(s *spb.Status) *Status { // Status is returned with codes.Unknown and the original error message. func FromError(err error) (s *Status, ok bool) { if err == nil { - return &Status{s: &spb.Status{Code: int32(codes.OK)}}, true + return nil, true } if se, ok := err.(interface { GRPCStatus() *Status @@ -206,7 +217,7 @@ func Code(err error) codes.Code { func FromContextError(err error) *Status { switch err { case nil: - return New(codes.OK, "") + return nil case context.DeadlineExceeded: return New(codes.DeadlineExceeded, err.Error()) case context.Canceled: diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index db14c3225d1..a148bb804d2 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -328,7 +328,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) error { - cs.attempt = &csAttempt{ + newAttempt := &csAttempt{ cs: cs, dc: cs.cc.dopts.dc, statsHandler: sh, @@ -345,8 +345,9 @@ func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) er if trInfo != nil { trInfo.firstLine.SetRemoteAddr(t.RemoteAddr()) } - cs.attempt.t = t - cs.attempt.done = done + newAttempt.t = t + newAttempt.done = done + cs.attempt = newAttempt return nil } @@ -395,11 +396,18 @@ type clientStream struct { serverHeaderBinlogged bool mu sync.Mutex - firstAttempt bool // if true, transparent retry is valid - numRetries int // exclusive of transparent retry attempt(s) - numRetriesSincePushback int // retries since pushback; to reset backoff - finished bool // TODO: replace with atomic cmpxchg or sync.Once? - attempt *csAttempt // the active client stream attempt + firstAttempt bool // if true, transparent retry is valid + numRetries int // exclusive of transparent retry attempt(s) + numRetriesSincePushback int // retries since pushback; to reset backoff + finished bool // TODO: replace with atomic cmpxchg or sync.Once? + // attempt is the active client stream attempt. + // The only place where it is written is the newAttemptLocked method and this method never writes nil. + // So, attempt can be nil only inside newClientStream function when clientStream is first created. + // One of the first things done after clientStream's creation, is to call newAttemptLocked which either + // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked, + // then newClientStream calls finish on the clientStream and returns. So, finish method is the only + // place where we need to check if the attempt is nil. + attempt *csAttempt // TODO(hedging): hedging will have multiple attempts simultaneously. committed bool // active attempt committed for retry? buffer []func(a *csAttempt) error // operations to replay on retry @@ -457,8 +465,8 @@ func (cs *clientStream) shouldRetry(err error) error { if cs.attempt.s != nil { <-cs.attempt.s.Done() } - if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) { - // First attempt, wait-for-ready, stream unprocessed: transparently retry. + if cs.firstAttempt && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) { + // First attempt, stream unprocessed: transparently retry. cs.firstAttempt = false return nil } @@ -805,11 +813,11 @@ func (cs *clientStream) finish(err error) { } if cs.attempt != nil { cs.attempt.finish(err) - } - // after functions all rely upon having a stream. - if cs.attempt.s != nil { - for _, o := range cs.opts { - o.after(cs.callInfo) + // after functions all rely upon having a stream. + if cs.attempt.s != nil { + for _, o := range cs.opts { + o.after(cs.callInfo) + } } } cs.cancel() diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go index 70b739fbad5..aed5f62219d 100644 --- a/vendor/google.golang.org/grpc/version.go +++ b/vendor/google.golang.org/grpc/version.go @@ -19,4 +19,4 @@ package grpc // Version is the current grpc version. -const Version = "1.22.1" +const Version = "1.23.0-dev"