Skip to content

Commit

Permalink
tenantcostserver: use a per-tenant mutex
Browse files Browse the repository at this point in the history
Use a per-tenant mutex to avoid unnecessary transaction restart and
remove the possibility of out-of-order metric updates.

Release note: None
  • Loading branch information
RaduBerinde committed Jul 31, 2021
1 parent c0f5a3d commit 70aa2c2
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
5 changes: 5 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ type tenantMetrics struct {
totalWriteRequests *aggmetric.Gauge
totalWriteBytes *aggmetric.Gauge
totalSQLPodsCPUSeconds *aggmetric.GaugeFloat64

// Mutex is used to atomically update metrics together with a corresponding
// change to the system table.
mutex *syncutil.Mutex
}

// getTenantMetrics returns the metrics for a tenant.
Expand All @@ -122,6 +126,7 @@ func (m *Metrics) getTenantMetrics(tenantID roachpb.TenantID) tenantMetrics {
totalWriteRequests: m.TotalWriteRequests.AddChild(tid),
totalWriteBytes: m.TotalWriteBytes.AddChild(tid),
totalSQLPodsCPUSeconds: m.TotalSQLPodsCPUSeconds.AddChild(tid),
mutex: &syncutil.Mutex{},
}
m.mu.tenantMetrics[tenantID] = tm
}
Expand Down
23 changes: 14 additions & 9 deletions pkg/ccl/multitenantccl/tenantcostserver/token_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ func (s *instance) TokenBucketRequest(
}
}

metrics := s.metrics.getTenantMetrics(tenantID)
// Use a per-tenant mutex to serialize operations to the bucket. The
// transactions will need to be serialized anyway, so this avoids more
// expensive restarts. It also guarantees that the metric updates happen in
// the same order with the system table changes.
metrics.mutex.Lock()
defer metrics.mutex.Unlock()

result := &roachpb.TokenBucketResponse{}
var consumption roachpb.TokenBucketRequest_Consumption
if err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand Down Expand Up @@ -80,14 +88,11 @@ func (s *instance) TokenBucketRequest(
}

// Report current consumption.
// TODO(radu): there is a possible race here, where two different requests
// update the metrics in opposite order.
m := s.metrics.getTenantMetrics(tenantID)
m.totalRU.Update(consumption.RU)
m.totalReadRequests.Update(int64(consumption.ReadRequests))
m.totalReadBytes.Update(int64(consumption.ReadBytes))
m.totalWriteRequests.Update(int64(consumption.WriteRequests))
m.totalWriteBytes.Update(int64(consumption.WriteBytes))
m.totalSQLPodsCPUSeconds.Update(consumption.SQLPodCPUSeconds)
metrics.totalRU.Update(consumption.RU)
metrics.totalReadRequests.Update(int64(consumption.ReadRequests))
metrics.totalReadBytes.Update(int64(consumption.ReadBytes))
metrics.totalWriteRequests.Update(int64(consumption.WriteRequests))
metrics.totalWriteBytes.Update(int64(consumption.WriteBytes))
metrics.totalSQLPodsCPUSeconds.Update(consumption.SQLPodCPUSeconds)
return result
}

0 comments on commit 70aa2c2

Please sign in to comment.