Skip to content

Commit

Permalink
rpc: add new TestGRPCDeadlinePropagation test
Browse files Browse the repository at this point in the history
TestGRPCDeadlinePropagation is a smoketest for gRPC deadline propagation.
When RPC clients issue requests with deadlines/timeouts attached to their
context, they are guaranteed that not only will remote RPCs respect this
deadline/timeout if it is reached, but that the remote RPC will be aware of
the timeout throughout its lifetime. In other words, deadlines/timeouts are
communicated upfront by the client, not only after they have been reached.

gRPC implements this through its "grpc-timeout" header field, which is
attached to the header (first) frame of unary and streaming calls.

For more, see https://grpc.io/docs/what-is-grpc/core-concepts/#deadlines,
which says:
> gRPC allows clients to specify how long they are willing to wait for an RPC
> to complete before the RPC is terminated with a DEADLINE_EXCEEDED error. On
> the server side, the server can query to see if a particular RPC has timed
> out, or how much time is left to complete the RPC.
  • Loading branch information
nvanbenschoten committed Dec 23, 2021
1 parent b261b61 commit e951206
Showing 1 changed file with 89 additions and 6 deletions.
95 changes: 89 additions & 6 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,89 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
return nil
}

// TestGRPCDeadlinePropagation is a smoketest for gRPC deadline propagation.
// When RPC clients issue requests with deadlines/timeouts attached to their
// context, they are guaranteed that not only will remote RPCs respect this
// deadline/timeout if it is reached, but that the remote RPC will be aware of
// the timeout throughout its lifetime. In other words, deadlines/timeouts are
// communicated upfront by the client, not only after they have been reached.
//
// gRPC implements this through its "grpc-timeout" header field, which is
// attached to the header (first) frame of unary and streaming calls.
//
// For more, see https://grpc.io/docs/what-is-grpc/core-concepts/#deadlines,
// which says:
// > gRPC allows clients to specify how long they are willing to wait for an RPC
// > to complete before the RPC is terminated with a DEADLINE_EXCEEDED error. On
// > the server side, the server can query to see if a particular RPC has timed
// > out, or how much time is left to complete the RPC.
func TestGRPCDeadlinePropagation(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
clusterID := uuid.MakeV4()
clock := hlc.NewClock(timeutil.Unix(0, 20).UnixNano, time.Nanosecond)

// Construct the server context.
const serverNodeID = 1
serverCtx := newTestContext(clusterID, clock, stopper)
serverCtx.NodeID.Set(ctx, serverNodeID)

// Register an UnknownServiceHandler that expects a BatchRequest and sends
// a BatchResponse. Record the context deadline of the request in the handler.
var serverDeadline time.Time
s := newTestServer(t, serverCtx, grpc.UnknownServiceHandler(
func(srv interface{}, stream grpc.ServerStream) error {
serverDeadline, _ = stream.Context().Deadline()
var ba roachpb.BatchRequest
if err := stream.RecvMsg(&ba); err != nil {
return err
}
return stream.SendMsg(&roachpb.BatchResponse{})
},
))
RegisterHeartbeatServer(s, serverCtx.NewHeartbeatService())

// Begin listening.
ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
require.Nil(t, err)
remoteAddr := ln.Addr().String()

// Construct the client context.
clientCtx := newTestContext(clusterID, clock, stopper)
defConn, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(ctx)
require.Nil(t, err)

// Issue an RPC with a deadline far in the future.
clientDeadline := timeutil.Now().Add(4 * time.Hour)
ctxWithDeadline, cancel := context.WithDeadline(ctx, clientDeadline)
defer cancel()

desc := grpc.StreamDesc{
StreamName: "foo",
ClientStreams: true,
}
const method = "/cockroach.rpc.Testing/Foo"
cs, err := defConn.NewStream(ctxWithDeadline, &desc, method)
require.Nil(t, err)
require.Nil(t, cs.SendMsg(&roachpb.BatchRequest{}))
var br roachpb.BatchResponse
require.Nil(t, cs.RecvMsg(&br))
require.Nil(t, cs.CloseSend())

// The server should have heard about the deadline, and it should be nearly
// identical to the client-side deadline. The values aren't exactly the same
// because the deadline (a fixed point in time) passes through a timeout (a
// duration of time) over the wire. However, we can assert that the client
// deadline is always earlier than the server deadline, but by no more than a
// small margin (relative to the 4-hour timeout).
require.NotZero(t, serverDeadline)
require.True(t, clientDeadline.Before(serverDeadline))
require.True(t, serverDeadline.Before(clientDeadline.Add(1*time.Minute)))
}

func TestClusterIDMismatch(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -1597,8 +1680,8 @@ func TestGRPCDialClass(t *testing.T) {
remoteAddr := ln.Addr().String()
clientCtx := newTestContext(serverCtx.ClusterID.Get(), clock, stopper)

def1 := clientCtx.GRPCDialNode(remoteAddr, 1, DefaultClass)
sys1 := clientCtx.GRPCDialNode(remoteAddr, 1, SystemClass)
def1 := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass)
sys1 := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, SystemClass)
require.False(t, sys1 == def1,
"expected connections dialed with different classes to the same target to differ")
defConn1, err := def1.Connect(context.Background())
Expand All @@ -1607,10 +1690,10 @@ func TestGRPCDialClass(t *testing.T) {
require.Nil(t, err, "expected successful connection")
require.False(t, sysConn1 == defConn1, "expected connections dialed with "+
"different classes to the sametarget to have separate underlying gRPC connections")
def2 := clientCtx.GRPCDialNode(remoteAddr, 1, DefaultClass)
def2 := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass)
require.True(t, def1 == def2, "expected connections dialed with the same "+
"class to the same target to be the same")
sys2 := clientCtx.GRPCDialNode(remoteAddr, 1, SystemClass)
sys2 := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, SystemClass)
require.True(t, sys1 == sys2, "expected connections dialed with the same "+
"class to the same target to be the same")
for _, c := range []*Connection{def2, sys2} {
Expand Down Expand Up @@ -1714,9 +1797,9 @@ func TestTestingKnobs(t *testing.T) {
ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
require.Nil(t, err)
remoteAddr := ln.Addr().String()
sysConn, err := clientCtx.GRPCDialNode(remoteAddr, 1, SystemClass).Connect(context.Background())
sysConn, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, SystemClass).Connect(context.Background())
require.Nil(t, err)
defConn, err := clientCtx.GRPCDialNode(remoteAddr, 1, DefaultClass).Connect(context.Background())
defConn, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background())
require.Nil(t, err)
const unaryMethod = "/cockroach.rpc.Testing/Foo"
const streamMethod = "/cockroach.rpc.Testing/Bar"
Expand Down

0 comments on commit e951206

Please sign in to comment.