Skip to content

Commit

Permalink
Add TSODispatchingStats
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Jun 8, 2023
1 parent f193fe4 commit 7aaed93
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 12 deletions.
6 changes: 6 additions & 0 deletions pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type TSODispatcher struct {
tsoProxyHandleDuration prometheus.Histogram
tsoProxyBatchSize prometheus.Histogram

*TSODispatchingStats

ctx context.Context
// dispatchChs is used to dispatch different TSO requests to the corresponding forwarding TSO channels.
dispatchChs sync.Map // Store as map[string]chan Request (forwardedHost -> dispatch channel)
Expand All @@ -53,6 +55,7 @@ func NewTSODispatcher(
ctx: ctx,
tsoProxyHandleDuration: tsoProxyHandleDuration,
tsoProxyBatchSize: tsoProxyBatchSize,
TSODispatchingStats: &TSODispatchingStats{},
}
return tsoDispatcher
}
Expand Down Expand Up @@ -117,13 +120,16 @@ func (s *TSODispatcher) startDispatchLoop(
pendingTSOReqCount := 0

log.Info("start the dispatch loop", zap.String("forwarded-host", forwardedHost))
s.EnterDispatcher()

defer func() {
log.Info("exiting from the dispatch loop. cleaning up the pending requests",
zap.String("forwarded-host", forwardedHost))
if forwardStream != nil {
forwardStream.closeSend()
}
s.cleanup(forwardedHost, forwardErr, pendingRequests[:pendingTSOReqCount])
s.LeaveDispatcher()
log.Info("the dispatch loop exited", zap.String("forwarded-host", forwardedHost))
}()

Expand Down
123 changes: 123 additions & 0 deletions pkg/utils/tsoutil/tso_dispatching_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tsoutil

import (
"sync"
"sync/atomic"
)

// TSODispatchingStats records the statistics of TSO dispatching.
type TSODispatchingStats struct {
// Count the number of TSO streaming routines.
streamingRoutinesLock sync.RWMutex
aliveTSOStreamingRoutines int64
peakTSOStreamingRoutines int64

// Count the number of dispatchers.
dispatcherCountLock sync.RWMutex
aliveDispatcherCount int64
peakDispatcherCount int64

dispatcherExitCount atomic.Int64
}

// NewTSODispatchingStats creates a TSODispatchingStats.
func NewTSODispatchingStats(
aliveTSOStreamingRoutine int64,
peakTSOStreamingRoutines int64,
aliveDispatcherCount int64,
peakDispatcherCount int64,
dispatcherExitCount int64,
) *TSODispatchingStats {
stats := &TSODispatchingStats{
aliveTSOStreamingRoutines: aliveTSOStreamingRoutine,
peakTSOStreamingRoutines: peakTSOStreamingRoutines,

aliveDispatcherCount: aliveDispatcherCount,
peakDispatcherCount: peakDispatcherCount,
}
stats.dispatcherExitCount.Store(dispatcherExitCount)
return stats
}

// GetAliveTSOStreamingRoutines returns the current value.
func (s *TSODispatchingStats) GetAliveTSOStreamingRoutines() int64 {
s.streamingRoutinesLock.RLock()
defer s.streamingRoutinesLock.RUnlock()
return s.aliveTSOStreamingRoutines
}

// GetPeakTSOStreamingRoutines returns the current value of peakTSOStreamingRoutines.
func (s *TSODispatchingStats) GetPeakTSOStreamingRoutines() int64 {
s.streamingRoutinesLock.RLock()
defer s.streamingRoutinesLock.RUnlock()
return s.peakTSOStreamingRoutines
}

// GetAliveDispatcherCount returns the current value of aliveDispatcherCount.
func (s *TSODispatchingStats) GetAliveDispatcherCount() int64 {
s.dispatcherCountLock.RLock()
defer s.dispatcherCountLock.RUnlock()
return s.aliveDispatcherCount
}

// GetPeakDispatcherCount returns the current value of peakDispatcherCount.
func (s *TSODispatchingStats) GetPeakDispatcherCount() int64 {
s.dispatcherCountLock.RLock()
defer s.dispatcherCountLock.RUnlock()
return s.peakDispatcherCount
}

// GetDispatcherExitCount returns the current value of dispatcherExitCount.
func (s *TSODispatchingStats) GetDispatcherExitCount() int64 {
return s.dispatcherExitCount.Load()
}

// EnterTSOStreamingRoutine is called when entering into a TSO streaming routine.
func (s *TSODispatchingStats) EnterTSOStreamingRoutine() {
s.streamingRoutinesLock.Lock()
defer s.streamingRoutinesLock.Unlock()
s.aliveDispatcherCount++
if s.aliveDispatcherCount > s.peakDispatcherCount {
s.peakDispatcherCount = s.aliveDispatcherCount
}
}

// LeaveTSOStreamingRoutine is called when a TSO streaming routine exits.
func (s *TSODispatchingStats) LeaveTSOStreamingRoutine() {
s.streamingRoutinesLock.Lock()
defer s.streamingRoutinesLock.Unlock()
s.aliveDispatcherCount--
}

// EnterDispatcher is called when entering into a dispatcher.
func (s *TSODispatchingStats) EnterDispatcher() {
s.dispatcherCountLock.Lock()
defer s.dispatcherCountLock.Unlock()
s.aliveDispatcherCount++
if s.aliveDispatcherCount > s.peakDispatcherCount {
s.peakDispatcherCount = s.aliveDispatcherCount
}
}

// LeaveDispatcher is called when a dispatcher exits.
func (s *TSODispatchingStats) LeaveDispatcher() {
s.dispatcherCountLock.Lock()
s.aliveDispatcherCount--
s.dispatcherCountLock.Unlock()

s.dispatcherExitCount.Add(1)
}
5 changes: 5 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,11 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {

// forwardTSO forwards the incoming TSO requests to the TSO microservice.
func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
if s.IsAPIServiceMode() {
s.tsoDispatcher.EnterTSOStreamingRoutine()
defer s.tsoDispatcher.LeaveTSOStreamingRoutine()
}

streamCtx := stream.Context()
responseCh := make(chan *pdpb.TsoResponse, 1)

Expand Down
16 changes: 4 additions & 12 deletions tests/integrations/mcs/tso/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *tsoProxyTestSuite) TearDownSuite() {

// TestTSOProxyBasic tests the TSO Proxy's basic function to forward TSO requests to TSO microservice.
func (s *tsoProxyTestSuite) TestTSOProxyBasic() {
s.verifyTSOProxy(s.streams, 100)
s.verifyTSOProxy(s.streams, 1000)
}

func (s *tsoProxyTestSuite) cleanupGRPCStreams(
Expand All @@ -117,7 +117,7 @@ func (s *tsoProxyTestSuite) verifyTSOProxy(
for i := 0; i < requestsPerClient; i++ {
reqs[i] = &pdpb.TsoRequest{
Header: &pdpb.RequestHeader{ClusterId: s.apiLeader.GetClusterID()},
Count: uint32(i) + 1, // Make sure the count is not zero.
Count: uint32(i) + 1, // Make sure the count is positive.
}
}

Expand Down Expand Up @@ -273,7 +273,7 @@ func benchmarkTSOProxyNClients(clientCount int, sameContext bool, b *testing.B)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

grpcClientConns, streams, cancelFuns := createTSOStreams(re, ctx, suite.backendEndpoints, clientCount, sameContext)
grpcClientConns, streams, cancelFuncs := createTSOStreams(re, ctx, suite.backendEndpoints, clientCount, sameContext)

// Benchmark TSO proxy
b.ResetTimer()
Expand All @@ -293,15 +293,7 @@ func benchmarkTSOProxyNClients(clientCount int, sameContext bool, b *testing.B)
}
b.StopTimer()

for _, stream := range streams {
stream.CloseSend()
}
for _, conn := range grpcClientConns {
conn.Close()
}
for _, cancelFun := range cancelFuns {
cancelFun()
}
suite.cleanupGRPCStreams(grpcClientConns, streams, cancelFuncs)

suite.TearDownSuite()
}

0 comments on commit 7aaed93

Please sign in to comment.