Skip to content

Commit

Permalink
Merge #39041
Browse files Browse the repository at this point in the history
39041: vendor: upgrade grpc from 1.13.0 to 1.21.2 r=ajwerner a=ajwerner

This PR upgrades gRPC from 1.13.0 to 1.21.2. The primary motivation for this
upgrade is to eliminate the disconnections caused by
grpc/grpc-go#1882. These failures manifest themselves
as the following set of errors:

```
ajwerner-test-0001> I190722 22:15:01.203008 12054 vendor/github.com/cockroachdb/circuitbreaker/circuitbreaker.go:322  [n1] circuitbreaker: rpc [::]:26257 [n2] tripped: failed to check for ready connection to n2 at ajwerner-test-0002:26257: connection not ready: TRANSIENT_FAILURE
```
Which then lead to tripped breakers and general badness. I suspect that there
are several other good bug fixes in here, including some purported leaks and
correctness fixes on shutdown.

I have verified that with this upgrade I no longer see connections break in
overload scenarios which reliably reproduced the situation in the above log.

This commit removes one condition from grpcutil.IsClosedConnection which should
be subsumed by the status check above. The `transport` subpackage has not been
around for many releases.

This does not upgrade to the current release 1.22.0 because the maintainer
mentions that it contains a bug
(grpc/grpc-go#2663 (comment)).

Release note (bug fix): Upgrade grpc library to fix connection state management
bug.

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Jul 30, 2019
2 parents 69f874d + 03aac64 commit 395bbbf
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 45 deletions.
16 changes: 11 additions & 5 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ ignored = [
name = "google.golang.org/genproto"
branch = "master"

[[constraint]]
name = "google.golang.org/grpc"
version = "=v1.21.2"

[prune]
go-tests = true
unused-packages = true
Expand Down
5 changes: 3 additions & 2 deletions pkg/cli/interactive_tests/netcat.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@

while True:
c = client_socket.recv(1)
sys.stdout.write("%c" % c)
sys.stdout.flush()
if c:
sys.stdout.write("%c" % c)
sys.stdout.flush()
4 changes: 2 additions & 2 deletions pkg/cli/interactive_tests/test_error_hints.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ eexpect "ready"
set spawn_id $client_spawn_id
send "$argv quit --insecure\r"
eexpect "insecure\r\n"
# In the first shell, stop the server.
# Wait to see an HTTP/2.0 header on the fake server, then stop the server.
set spawn_id $shell_spawn_id
eexpect "connected"
eexpect ":26257"
eexpect "PRI * HTTP/2.0"
interrupt
eexpect ":/# "
# Check that cockroach quit becomes suitably confused.
Expand Down
9 changes: 3 additions & 6 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,23 +682,21 @@ func init() {
// ensures that our initial heartbeat (and its version/clusterID
// validation) occurs on every new connection.
type onlyOnceDialer struct {
ctx context.Context
syncutil.Mutex
dialed bool
closed bool
redialChan chan struct{}
}

func (ood *onlyOnceDialer) dial(addr string, timeout time.Duration) (net.Conn, error) {
func (ood *onlyOnceDialer) dial(ctx context.Context, addr string) (net.Conn, error) {
ood.Lock()
defer ood.Unlock()
if !ood.dialed {
ood.dialed = true
dialer := net.Dialer{
Timeout: timeout,
LocalAddr: sourceAddr,
}
return dialer.DialContext(ood.ctx, "tcp", addr)
return dialer.DialContext(ctx, "tcp", addr)
} else if !ood.closed {
ood.closed = true
close(ood.redialChan)
Expand Down Expand Up @@ -726,10 +724,9 @@ func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{
grpc.WithInitialConnWindowSize(initialConnWindowSize))

dialer := onlyOnceDialer{
ctx: ctx.masterCtx,
redialChan: make(chan struct{}),
}
dialOpts = append(dialOpts, grpc.WithDialer(dialer.dial))
dialOpts = append(dialOpts, grpc.WithContextDialer(dialer.dial))

// add testingDialOpts after our dialer because one of our tests
// uses a custom dialer (this disables the only-one-connection
Expand Down
75 changes: 51 additions & 24 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,7 @@ func TestRemoteOffsetUnhealthy(t *testing.T) {
// its response stream even if it doesn't get any new requests.
func TestGRPCKeepaliveFailureFailsInflightRPCs(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skip("Takes too long given https://github.com/grpc/grpc-go/pull/2642")

sc := log.Scope(t)
defer sc.Close(t)
Expand Down Expand Up @@ -1041,8 +1042,8 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
// PartitionableConns. We'll partition the first opened connection.
dialerCh := make(chan *testutils.PartitionableConn, 1)
clientCtx.AddTestingDialOpts(
grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
grpc.WithContextDialer(
func(_ context.Context, addr string) (net.Conn, error) {
if !atomic.CompareAndSwapInt32(&firstConn, 1, 0) {
// If we allow gRPC to open a 2nd transport connection, then our RPCs
// might succeed if they're sent on that one. In the spirit of a
Expand All @@ -1052,7 +1053,7 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
return nil, errors.Errorf("No more connections for you. We're partitioned.")
}

conn, err := net.DialTimeout("tcp", addr, timeout)
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1116,10 +1117,8 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
// Now partition either client->server, server->client, or both, and attempt
// to perform an RPC. We expect it to fail once the grpc keepalive fails to
// get a response from the server.

transportConn := <-dialerCh
defer transportConn.Finish()

if c.partitionC2S {
log.Infof(ctx, "partition C2S")
transportConn.PartitionC2S()
Expand All @@ -1129,38 +1128,65 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
transportConn.PartitionS2C()
}

// We want to start a goroutine that keeps trying to send requests and reports
// the error from the send call. In cases where there are no keep-alives this
// request may get blocked if flow control blocks it.
errChan := make(chan error)
sendCtx, cancel := context.WithCancel(ctx)
r := retry.StartWithCtx(sendCtx, retry.Options{
InitialBackoff: 10 * time.Millisecond,
MaxBackoff: 500 * time.Millisecond,
})
defer cancel()
go func() {
for r.Next() {
err := heartbeatClient.Send(&request)
isClosed := err != nil && grpcutil.IsClosedConnection(err)
log.Infof(ctx, "heartbeat Send got error %+v (closed=%v)", err, isClosed)
select {
case errChan <- err:
case <-sendCtx.Done():
return
}
if isClosed {
return
}
}
}()
// Check whether the connection eventually closes. We may need to
// adjust this duration if the test gets flaky.
const retryDur = 3 * time.Second
errNotClosed := errors.New("conn not closed")
closedErr := retry.ForDuration(retryDur, func() error {
err := heartbeatClient.Send(&request)
if err == nil {
log.Infof(ctx, "expected send error, got no error")
return errNotClosed
}
if !grpcutil.IsClosedConnection(err) {
newErr := fmt.Errorf("expected closed connection error, found %v", err)
log.Infof(ctx, "%+v", newErr)
return newErr
// This unfortunately massive amount of time is required due to gRPC's
// minimum timeout of 10s and the below issue whereby keepalives are sent
// at half the expected rate.
// https://github.com/grpc/grpc-go/issues/2638
const timeoutDur = 21 * time.Second
timeout := time.After(timeoutDur)
// sendErr will hold the last error we saw from an attempt to send a
// heartbeat. Initialize it with a dummy error which will fail the test if
// it is not overwritten.
sendErr := fmt.Errorf("not a real error")
for done := false; !done; {
select {
case <-timeout:
cancel()
done = true
case sendErr = <-errChan:
}
return nil
})
}
if c.expClose {
if closedErr != nil {
newErr := fmt.Errorf("expected closed connection, found %v", closedErr)
if sendErr == nil || !grpcutil.IsClosedConnection(sendErr) {
newErr := fmt.Errorf("expected closed connection, found %v", sendErr)
log.Infof(ctx, "%+v", newErr)
return newErr
}
} else {
if closedErr != errNotClosed {
newErr := fmt.Errorf("expected unclosed connection, found %v", closedErr)
if sendErr != nil {
newErr := fmt.Errorf("expected unclosed connection, found %v", sendErr)
log.Infof(ctx, "%+v", newErr)
return newErr
}
}

log.Infof(ctx, "test done")
// If the DialOptions we passed to gRPC didn't prevent it from opening new
// connections, then next RPCs would succeed since gRPC reconnects the
// transport (and that would succeed here since we've only partitioned one
Expand All @@ -1169,6 +1195,7 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
// the (application-level) heartbeats performed by rpc.Context, but the
// behavior of our heartbeats in the face of transport failures is
// sufficiently tested in TestHeartbeatHealthTransport.
log.Infof(ctx, "test done")
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/rpc/stats_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -168,6 +169,8 @@ func TestStatsHandlerWithHeartbeats(t *testing.T) {
if s, c := serverVal.(*Stats).Outgoing(), clientVal.(*Stats).Incoming(); s == 0 || c == 0 || s > c {
return fmt.Errorf("expected server.outgoing < client.incoming; got %d, %d", s, c)
}
log.Infof(context.TODO(), "server incoming = %v, server outgoing = %v, client incoming = %v, client outgoing = %v",
serverVal.(*Stats).Incoming(), serverVal.(*Stats).Outgoing(), clientVal.(*Stats).Incoming(), clientVal.(*Stats).Outgoing())
return nil
})
}
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ func (s *Server) Start(ctx context.Context) error {
}
conn, err := grpc.DialContext(ctx, s.cfg.AdvertiseAddr, append(
dialOpts,
grpc.WithDialer(func(string, time.Duration) (net.Conn, error) {
grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
return c2, nil
}),
)...)
Expand Down
4 changes: 0 additions & 4 deletions pkg/util/grpcutil/grpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)

// ErrCannotReuseClientConn is returned when a failed connection is
Expand Down Expand Up @@ -64,9 +63,6 @@ func IsClosedConnection(err error) bool {
strings.Contains(err.Error(), "node unavailable") {
return true
}
if streamErr, ok := err.(transport.StreamError); ok && streamErr.Code == codes.Canceled {
return true
}
return netutil.IsClosedConnection(err)
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/util/grpcutil/grpc_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (hs healthServer) Check(
return nil, errors.New("no one should see this")
}

func (hs healthServer) Watch(*healthpb.HealthCheckRequest, healthpb.Health_WatchServer) error {
panic("not implemented")
}

func TestRequestDidNotStart(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
2 changes: 1 addition & 1 deletion vendor
Submodule vendor updated 92 files
+28 −13 google.golang.org/grpc/.travis.yml
+41 −17 google.golang.org/grpc/CONTRIBUTING.md
+36 −24 google.golang.org/grpc/Makefile
+88 −12 google.golang.org/grpc/README.md
+1 −1 google.golang.org/grpc/backoff.go
+1 −26 google.golang.org/grpc/balancer.go
+82 −10 google.golang.org/grpc/balancer/balancer.go
+22 −52 google.golang.org/grpc/balancer/base/balancer.go
+12 −0 google.golang.org/grpc/balancer/base/base.go
+10 −6 google.golang.org/grpc/balancer/roundrobin/roundrobin.go
+35 −20 google.golang.org/grpc/balancer_conn_wrappers.go
+44 −38 google.golang.org/grpc/balancer_v1_wrapper.go
+900 −0 google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go
+9 −28 google.golang.org/grpc/call.go
+676 −855 google.golang.org/grpc/clientconn.go
+2 −1 google.golang.org/grpc/codes/codes.go
+3 −2 google.golang.org/grpc/connectivity/connectivity.go
+125 −7 google.golang.org/grpc/credentials/credentials.go
+0 −60 google.golang.org/grpc/credentials/credentials_util_go17.go
+0 −57 google.golang.org/grpc/credentials/credentials_util_pre_go17.go
+61 −0 google.golang.org/grpc/credentials/internal/syscallconn.go
+10 −9 google.golang.org/grpc/credentials/internal/syscallconn_appengine.go
+8 −16 google.golang.org/grpc/credentials/tls13.go
+558 −0 google.golang.org/grpc/dialoptions.go
+3 −3 google.golang.org/grpc/encoding/encoding.go
+20 −0 google.golang.org/grpc/go.mod
+35 −0 google.golang.org/grpc/go.sum
+0 −70 google.golang.org/grpc/go16.go
+0 −71 google.golang.org/grpc/go17.go
+1 −1 google.golang.org/grpc/grpclog/grpclog.go
+122 −22 google.golang.org/grpc/health/grpc_health_v1/health.pb.go
+6 −0 google.golang.org/grpc/install_gae.sh
+1 −1 google.golang.org/grpc/interceptor.go
+46 −0 google.golang.org/grpc/internal/balancerload/load.go
+167 −0 google.golang.org/grpc/internal/binarylog/binarylog.go
+42 −0 google.golang.org/grpc/internal/binarylog/binarylog_testutil.go
+210 −0 google.golang.org/grpc/internal/binarylog/env_config.go
+423 −0 google.golang.org/grpc/internal/binarylog/method_logger.go
+33 −0 google.golang.org/grpc/internal/binarylog/regenerate.sh
+162 −0 google.golang.org/grpc/internal/binarylog/sink.go
+17 −18 google.golang.org/grpc/internal/binarylog/util.go
+193 −39 google.golang.org/grpc/internal/channelz/funcs.go
+301 −17 google.golang.org/grpc/internal/channelz/types.go
+53 −0 google.golang.org/grpc/internal/channelz/types_linux.go
+44 −0 google.golang.org/grpc/internal/channelz/types_nonlinux.go
+16 −14 google.golang.org/grpc/internal/channelz/util_linux.go
+7 −9 google.golang.org/grpc/internal/channelz/util_nonlinux.go
+64 −0 google.golang.org/grpc/internal/envconfig/envconfig.go
+61 −0 google.golang.org/grpc/internal/grpcsync/event.go
+28 −10 google.golang.org/grpc/internal/internal.go
+114 −0 google.golang.org/grpc/internal/syscall/syscall_linux.go
+73 −0 google.golang.org/grpc/internal/syscall/syscall_nonlinux.go
+4 −3 google.golang.org/grpc/internal/transport/bdp_estimator.go
+74 −18 google.golang.org/grpc/internal/transport/controlbuf.go
+49 −0 google.golang.org/grpc/internal/transport/defaults.go
+0 −24 google.golang.org/grpc/internal/transport/flowcontrol.go
+17 −38 google.golang.org/grpc/internal/transport/handler_server.go
+230 −130 google.golang.org/grpc/internal/transport/http2_client.go
+205 −133 google.golang.org/grpc/internal/transport/http2_server.go
+145 −61 google.golang.org/grpc/internal/transport/http_util.go
+0 −6 google.golang.org/grpc/internal/transport/log.go
+136 −81 google.golang.org/grpc/internal/transport/transport.go
+42 −22 google.golang.org/grpc/keepalive/keepalive.go
+1 −2 google.golang.org/grpc/metadata/metadata.go
+7 −4 google.golang.org/grpc/naming/dns_resolver.go
+0 −34 google.golang.org/grpc/naming/go17.go
+1 −1 google.golang.org/grpc/naming/naming.go
+1 −1 google.golang.org/grpc/peer/peer.go
+27 −166 google.golang.org/grpc/picker_wrapper.go
+3 −1 google.golang.org/grpc/pickfirst.go
+64 −0 google.golang.org/grpc/preloader.go
+37 −15 google.golang.org/grpc/proxy.go
+115 −39 google.golang.org/grpc/resolver/dns/dns_resolver.go
+0 −35 google.golang.org/grpc/resolver/dns/go17.go
+1 −1 google.golang.org/grpc/resolver/passthrough/passthrough.go
+36 −2 google.golang.org/grpc/resolver/resolver.go
+59 −52 google.golang.org/grpc/resolver_conn_wrapper.go
+170 −48 google.golang.org/grpc/rpc_util.go
+291 −252 google.golang.org/grpc/server.go
+157 −17 google.golang.org/grpc/service_config.go
+1 −2 google.golang.org/grpc/stats/handlers.go
+5 −1 google.golang.org/grpc/stats/stats.go
+0 −44 google.golang.org/grpc/status/go17.go
+23 −2 google.golang.org/grpc/status/status.go
+0 −97 google.golang.org/grpc/stickiness_linkedmap.go
+988 −254 google.golang.org/grpc/stream.go
+1 −1 google.golang.org/grpc/tap/tap.go
+13 −0 google.golang.org/grpc/trace.go
+0 −51 google.golang.org/grpc/transport/go16.go
+0 −52 google.golang.org/grpc/transport/go17.go
+1 −1 google.golang.org/grpc/version.go
+84 −49 google.golang.org/grpc/vet.sh

0 comments on commit 395bbbf

Please sign in to comment.