Skip to content

Commit

Permalink
[release-19.0] Increase health check buffer size (#17636) (#17639)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
Signed-off-by: deepthi <[email protected]>
Co-authored-by: Manan Gupta <[email protected]>
Co-authored-by: deepthi <[email protected]>
  • Loading branch information
3 people authored Jan 30, 2025
1 parent 107cab6 commit f4563b3
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
7 changes: 6 additions & 1 deletion go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ var (
// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond

// Size of channel buffer for each subscriber
broadcastChannelBufferSize = 2048

// HealthCheckCacheTemplate uses healthCheckTemplate with the `HealthCheck Tablet - Cache` title to create the
// HTML code required to render the cache of the HealthCheck.
HealthCheckCacheTemplate = fmt.Sprintf(healthCheckTemplate, "HealthCheck - Cache")
Expand Down Expand Up @@ -624,7 +627,7 @@ func (hc *HealthCheckImpl) recomputeHealthy(key KeyspaceShardTabletType) {
func (hc *HealthCheckImpl) Subscribe() chan *TabletHealth {
hc.subMu.Lock()
defer hc.subMu.Unlock()
c := make(chan *TabletHealth, 2)
c := make(chan *TabletHealth, broadcastChannelBufferSize)
hc.subscribers[c] = struct{}{}
return c
}
Expand All @@ -643,6 +646,8 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {
select {
case c <- th:
default:
// If the channel is full, we drop the message.
log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet))
}
}
}
Expand Down
44 changes: 44 additions & 0 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,50 @@ func TestDebugURLFormatting(t *testing.T) {
require.Contains(t, wr.String(), expectedURL, "output missing formatted URL")
}

// TestConcurrentUpdates tests that concurrent updates from the HealthCheck implementation aren't dropped.
// Added in response to https://github.com/vitessio/vitess/issues/17629.
func TestConcurrentUpdates(t *testing.T) {
ctx := utils.LeakCheckContext(t)
var mu sync.Mutex
// reset error counters
hcErrorCounters.ResetAll()
ts := memorytopo.NewServer(ctx, "cell")
defer ts.Close()
hc := createTestHc(ctx, ts)
// close healthcheck
defer hc.Close()

// Subscribe to the healthcheck
// Make the receiver keep track of the updates received.
ch := hc.Subscribe()
totalCount := 0
go func() {
for range ch {
mu.Lock()
totalCount++
mu.Unlock()
// Simulate a somewhat slow consumer.
time.Sleep(100 * time.Millisecond)
}
}()

// Run multiple updates really quickly
// one after the other.
totalUpdates := 10
for i := 0; i < totalUpdates; i++ {
hc.broadcast(&TabletHealth{})
}
// Unsubscribe from the healthcheck
// and verify we process all the updates eventually.
hc.Unsubscribe(ch)
defer close(ch)
require.Eventuallyf(t, func() bool {
mu.Lock()
defer mu.Unlock()
return totalUpdates == totalCount
}, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed")
}

func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservice.QueryService, error) {
connMapMu.Lock()
defer connMapMu.Unlock()
Expand Down

0 comments on commit f4563b3

Please sign in to comment.