Skip to content

Commit

Permalink
Merge #95818
Browse files Browse the repository at this point in the history
95818: rpc: cleanup latencyInfos for closed connection r=koorosh a=koorosh

Before, `RemoteClockMonitor` tracked most recent measurements on remote latency from connected node and didn't clean up this info after connection is closed. It led to keeping stale information about latency between nodes even if it doesn't exist anymore. Current change introduces `OnDisconnect` func that is called when heartbeat failed and connection closed.
This change will be necessary for upcoming changes to properly observe network partition between particular nodes.

Release note: None

Related to [CRDB-21710](https://cockroachlabs.atlassian.net/browse/CRDB-21710)
Resolves #95772

Co-authored-by: Andrii Vorobiov <[email protected]>
  • Loading branch information
craig[bot] and koorosh committed Jan 30, 2023
2 parents 5c38cfb + f8f83dd commit 3e65660
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 0 deletions.
29 changes: 29 additions & 0 deletions pkg/rpc/clock_offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type RemoteClockMonitor struct {
syncutil.RWMutex
offsets map[roachpb.NodeID]RemoteOffset
latencyInfos map[roachpb.NodeID]*latencyInfo
connCount map[roachpb.NodeID]uint
}

metrics RemoteClockMetrics
Expand Down Expand Up @@ -128,6 +129,7 @@ func newRemoteClockMonitor(
}
r.mu.offsets = make(map[roachpb.NodeID]RemoteOffset)
r.mu.latencyInfos = make(map[roachpb.NodeID]*latencyInfo)
r.mu.connCount = make(map[roachpb.NodeID]uint)
if histogramWindowInterval == 0 {
histogramWindowInterval = time.Duration(math.MaxInt64)
}
Expand Down Expand Up @@ -172,6 +174,31 @@ func (r *RemoteClockMonitor) AllLatencies() map[roachpb.NodeID]time.Duration {
return result
}

// OnConnect tracks connections count per node.
func (r *RemoteClockMonitor) OnConnect(ctx context.Context, nodeID roachpb.NodeID) {
r.mu.Lock()
defer r.mu.Unlock()
count := r.mu.connCount[nodeID]
count++
r.mu.connCount[nodeID] = count
}

// OnDisconnect removes all information associated with the provided node when there's no connections remain.
func (r *RemoteClockMonitor) OnDisconnect(ctx context.Context, nodeID roachpb.NodeID) {
r.mu.Lock()
defer r.mu.Unlock()
count, ok := r.mu.connCount[nodeID]
if ok && count > 0 {
count--
r.mu.connCount[nodeID] = count
}
if count == 0 {
delete(r.mu.offsets, nodeID)
delete(r.mu.latencyInfos, nodeID)
delete(r.mu.connCount, nodeID)
}
}

// UpdateOffset is a thread-safe way to update the remote clock and latency
// measurements.
//
Expand Down Expand Up @@ -206,6 +233,8 @@ func (r *RemoteClockMonitor) UpdateOffset(
if !emptyOffset {
r.mu.offsets[id] = offset
} else {
// Remove most recent offset because it is outdated and new received offset is empty
// so there's no reason to either keep previous value or update with new one.
delete(r.mu.offsets, id)
}
} else if offset.Uncertainty < oldOffset.Uncertainty {
Expand Down
6 changes: 6 additions & 0 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2224,6 +2224,9 @@ func (rpcCtx *Context) runHeartbeat(
} else {
close(conn.initialHeartbeatDone) // unblock any waiters
}
if rpcCtx.RemoteClocks != nil {
rpcCtx.RemoteClocks.OnDisconnect(ctx, conn.remoteNodeID)
}
}()

{
Expand All @@ -2234,6 +2237,9 @@ func (rpcCtx *Context) runHeartbeat(
// unusual to hit this case.
return err
}
if rpcCtx.RemoteClocks != nil {
rpcCtx.RemoteClocks.OnConnect(ctx, conn.remoteNodeID)
}
}

// Start heartbeat loop.
Expand Down
92 changes: 92 additions & 0 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,98 @@ func TestFailedOffsetMeasurement(t *testing.T) {
})
}

// TestLatencyInfoCleanup tests that latencyInfo is cleaned up for closed connection
// to avoid reporting stale information.
func TestLatencyInfoCleanupOnClosedConnection(t *testing.T) {
defer leaktest.AfterTest(t)()

stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

// Shared cluster ID by all RPC peers (this ensures that the peers
// don't talk to servers from unrelated tests by accident).
clusterID := uuid.MakeV4()
ctx := context.Background()

serverClock := timeutil.NewManualTime(timeutil.Unix(0, 20))
maxOffset := time.Duration(0)
serverCtx := newTestContext(clusterID, serverClock, maxOffset, stopper)
const serverNodeID = 1
serverCtx.NodeID.Set(ctx, serverNodeID)
s := newTestServer(t, serverCtx)
RegisterHeartbeatServer(s, &HeartbeatService{
clock: serverClock,
remoteClockMonitor: serverCtx.RemoteClocks,
clusterID: serverCtx.StorageClusterID,
nodeID: serverCtx.NodeID,
settings: serverCtx.Settings,
})

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
if err != nil {
t.Fatal(err)
}
remoteAddr := ln.Addr().String()

// Create a client clock that is behind the server clock.
clientClock := &AdvancingClock{time: timeutil.Unix(0, 10)}
clientMaxOffset := time.Duration(0)
clientCtx := newTestContext(clusterID, clientClock, clientMaxOffset, stopper)
// Make the interval shorter to speed up the test.
clientCtx.Config.RPCHeartbeatInterval = 1 * time.Millisecond
clientCtx.Config.RPCHeartbeatTimeout = 1 * time.Millisecond

conn, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(ctx)
if err != nil {
t.Fatal(err)
}

anotherConn, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, SystemClass).Connect(ctx)
if err != nil {
t.Fatal(err)
}

clientClock.setAdvancementInterval(
maximumPingDurationMult*clientMaxOffset + 1*time.Nanosecond)

testutils.SucceedsSoon(t, func() error {
clientCtx.RemoteClocks.mu.Lock()
defer clientCtx.RemoteClocks.mu.Unlock()

if li, ok := clientCtx.RemoteClocks.mu.latencyInfos[serverNodeID]; !ok {
return errors.Errorf("expected to have latencyInfos %v, but it was not", li)
}
return nil
})

// Close first connection. It cannot be considered as network disruption yet, since anotherConn still open.
err = conn.Close() // nolint:grpcconnclose
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
clientCtx.RemoteClocks.mu.Lock()
defer clientCtx.RemoteClocks.mu.Unlock()
if _, ok := clientCtx.RemoteClocks.mu.latencyInfos[serverNodeID]; !ok {
return errors.Errorf("expected to have latencyInfos, but nothing found")
}
return nil
})

// Close last anotherConn to simulate network disruption.
err = anotherConn.Close() // nolint:grpcconnclose
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
clientCtx.RemoteClocks.mu.Lock()
defer clientCtx.RemoteClocks.mu.Unlock()

if li, ok := clientCtx.RemoteClocks.mu.latencyInfos[serverNodeID]; ok {
return errors.Errorf("expected to have removed latencyInfos, but found: %v", li)
}
return nil
})
}

type AdvancingClock struct {
syncutil.Mutex
time time.Time
Expand Down

0 comments on commit 3e65660

Please sign in to comment.