Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-19.0] Increase health check buffer size (#17636) #17639

Merged
merged 2 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@
// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond

// Size of channel buffer for each subscriber
broadcastChannelBufferSize = 2048

// HealthCheckCacheTemplate uses healthCheckTemplate with the `HealthCheck Tablet - Cache` title to create the
// HTML code required to render the cache of the HealthCheck.
HealthCheckCacheTemplate = fmt.Sprintf(healthCheckTemplate, "HealthCheck - Cache")
Expand Down Expand Up @@ -624,7 +627,7 @@
func (hc *HealthCheckImpl) Subscribe() chan *TabletHealth {
hc.subMu.Lock()
defer hc.subMu.Unlock()
c := make(chan *TabletHealth, 2)
c := make(chan *TabletHealth, broadcastChannelBufferSize)
hc.subscribers[c] = struct{}{}
return c
}
Expand All @@ -643,6 +646,8 @@
select {
case c <- th:
default:
// If the channel is full, we drop the message.
log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet))

Check warning on line 650 in go/vt/discovery/healthcheck.go

View check run for this annotation

Codecov / codecov/patch

go/vt/discovery/healthcheck.go#L649-L650

Added lines #L649 - L650 were not covered by tests
}
}
}
Expand Down
44 changes: 44 additions & 0 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,50 @@ func TestDebugURLFormatting(t *testing.T) {
require.Contains(t, wr.String(), expectedURL, "output missing formatted URL")
}

// TestConcurrentUpdates tests that concurrent updates from the HealthCheck implementation aren't dropped.
// Added in response to https://github.com/vitessio/vitess/issues/17629.
func TestConcurrentUpdates(t *testing.T) {
ctx := utils.LeakCheckContext(t)
var mu sync.Mutex
// reset error counters
hcErrorCounters.ResetAll()
ts := memorytopo.NewServer(ctx, "cell")
defer ts.Close()
hc := createTestHc(ctx, ts)
// close healthcheck
defer hc.Close()

// Subscribe to the healthcheck
// Make the receiver keep track of the updates received.
ch := hc.Subscribe()
totalCount := 0
go func() {
for range ch {
mu.Lock()
totalCount++
mu.Unlock()
// Simulate a somewhat slow consumer.
time.Sleep(100 * time.Millisecond)
}
}()

// Run multiple updates really quickly
// one after the other.
totalUpdates := 10
for i := 0; i < totalUpdates; i++ {
hc.broadcast(&TabletHealth{})
}
// Unsubscribe from the healthcheck
// and verify we process all the updates eventually.
hc.Unsubscribe(ch)
defer close(ch)
require.Eventuallyf(t, func() bool {
mu.Lock()
defer mu.Unlock()
return totalUpdates == totalCount
}, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed")
}

func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservice.QueryService, error) {
connMapMu.Lock()
defer connMapMu.Unlock()
Expand Down
Loading