From 30e117d8274978590351d4dc13ddca7089ff7b40 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 10 Nov 2021 12:17:02 +0100 Subject: [PATCH 01/10] kvserver: remove accidental early return in postDestroyRaftMuLocked Since this early return has been present since at least 19.1, the tenant limiter release path was never triggered. Release note: None --- pkg/kv/kvserver/replica_destroy.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index c784a9e9c8d2..05b9d5d917a1 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -113,7 +113,9 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS // call to postDestroyRaftMuLocked will currently leave the files around // forever. if r.raftMu.sideloaded != nil { - return r.raftMu.sideloaded.Clear(ctx) + if err := r.raftMu.sideloaded.Clear(ctx); err != nil { + return err + } } // Release the reference to this tenant in metrics, we know the tenant ID is From 84e3fdf8e7f9a4e7ec942e28cc17d2d4802cf74a Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Fri, 5 Nov 2021 00:54:06 +0100 Subject: [PATCH 02/10] kvserver: avoid use-after-release for tenant metrics Give tenants an explicit token that they hold on to to account for their tenant metrics reference. This gives us a convenient place to assert that references are not double-freed or used after free, which in fact hey were in multiple locations (all fixed in this commit). Release note: None --- pkg/kv/kvserver/metrics.go | 56 +++++++++++++------ pkg/kv/kvserver/metrics_test.go | 6 +- pkg/kv/kvserver/replica.go | 11 +++- pkg/kv/kvserver/replica_application_result.go | 13 +++-- .../replica_application_state_machine.go | 3 +- pkg/kv/kvserver/replica_destroy.go | 9 ++- pkg/kv/kvserver/replica_init.go | 2 +- pkg/kv/kvserver/replica_raftstorage.go | 14 ++--- pkg/kv/kvserver/store.go | 6 +- pkg/kv/kvserver/store_merge.go | 8 +-- pkg/kv/kvserver/store_remove_replica.go | 4 +- pkg/kv/kvserver/store_split.go | 6 +- 12 files changed, 89 insertions(+), 49 deletions(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index c0c19df7c772..8aafc4d2c478 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -12,6 +12,8 @@ package kvserver import ( "context" + "runtime/debug" + "sync/atomic" "time" "unsafe" @@ -1439,6 +1441,18 @@ type StoreMetrics struct { ClosedTimestampMaxBehindNanos *metric.Gauge } +type tenantMetricsRef struct { + _tenantID roachpb.TenantID + _state int32 // atomic; 0=usable 1=poisoned + stack []byte +} + +func (ref *tenantMetricsRef) assert(ctx context.Context) { + if atomic.LoadInt32(&ref._state) != 0 { + log.Fatalf(ctx, "tenantMetricsRef already finalized in:\n%s", ref.stack) + } +} + // TenantsStorageMetrics are metrics which are aggregated over all tenants // present on the server. The struct maintains child metrics used by each // tenant to track their individual values. The struct expects that children @@ -1473,7 +1487,7 @@ func (sm *TenantsStorageMetrics) MetricStruct() {} // method are reference counted with decrements occurring in the corresponding // releaseTenant call. This method must be called prior to adding or subtracting // MVCC stats. -func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) { +func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) *tenantMetricsRef { // incRef increments the reference count if it is not already zero indicating // that the struct has already been destroyed. incRef := func(m *tenantStorageMetrics) (alreadyDestroyed bool) { @@ -1490,7 +1504,9 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) { if mPtr, ok := sm.tenants.Load(key); ok { m := (*tenantStorageMetrics)(mPtr) if alreadyDestroyed := incRef(m); !alreadyDestroyed { - return + return &tenantMetricsRef{ + _tenantID: tenantID, + } } // Somebody else concurrently took the reference count to zero, go back // around. Because of the locking in releaseTenant, we know that we'll @@ -1522,7 +1538,9 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) { m.SysCount = sm.SysCount.AddChild(tenantIDStr) m.AbortSpanBytes = sm.AbortSpanBytes.AddChild(tenantIDStr) m.mu.Unlock() - return + return &tenantMetricsRef{ + _tenantID: tenantID, + } } } } @@ -1530,13 +1548,17 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) { // releaseTenant releases the reference to the metrics for this tenant which was // acquired with acquireTenant. It will fatally log if no entry exists for this // tenant. -func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, tenantID roachpb.TenantID) { - m := sm.getTenant(ctx, tenantID) +func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, ref *tenantMetricsRef) { + m := sm.getTenant(ctx, ref) // NB: asserts against use-after-release + if atomic.SwapInt32(&ref._state, 1) != 0 { + log.Fatalf(ctx, "metrics ref released twice") + } + ref.stack = debug.Stack() m.mu.Lock() defer m.mu.Unlock() m.mu.refCount-- if m.mu.refCount < 0 { - log.Fatalf(ctx, "invalid refCount on metrics for tenant %v: %d", tenantID, m.mu.refCount) + log.Fatalf(ctx, "invalid refCount on metrics for tenant %v: %d", ref._tenantID, m.mu.refCount) } else if m.mu.refCount > 0 { return } @@ -1558,18 +1580,19 @@ func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, tenantID roa m.SysBytes.Destroy() m.SysCount.Destroy() m.AbortSpanBytes.Destroy() - sm.tenants.Delete(int64(tenantID.ToUint64())) + sm.tenants.Delete(int64(ref._tenantID.ToUint64())) } // getTenant is a helper method used to retrieve the metrics for a tenant. The // call will log fatally if no such tenant has been previously acquired. func (sm *TenantsStorageMetrics) getTenant( - ctx context.Context, tenantID roachpb.TenantID, + ctx context.Context, ref *tenantMetricsRef, ) *tenantStorageMetrics { - key := int64(tenantID.ToUint64()) + ref.assert(ctx) + key := int64(ref._tenantID.ToUint64()) mPtr, ok := sm.tenants.Load(key) if !ok { - log.Fatalf(ctx, "no metrics exist for tenant %v", tenantID) + log.Fatalf(ctx, "no metrics exist for tenant %v", ref._tenantID) } return (*tenantStorageMetrics)(mPtr) } @@ -1843,9 +1866,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { // single snapshot of these gauges in the registry might mix the values of two // subsequent updates. func (sm *TenantsStorageMetrics) incMVCCGauges( - ctx context.Context, tenantID roachpb.TenantID, delta enginepb.MVCCStats, + ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats, ) { - tm := sm.getTenant(ctx, tenantID) + ref.assert(ctx) + tm := sm.getTenant(ctx, ref) tm.LiveBytes.Inc(delta.LiveBytes) tm.KeyBytes.Inc(delta.KeyBytes) tm.ValBytes.Inc(delta.ValBytes) @@ -1863,17 +1887,17 @@ func (sm *TenantsStorageMetrics) incMVCCGauges( } func (sm *TenantsStorageMetrics) addMVCCStats( - ctx context.Context, tenantID roachpb.TenantID, delta enginepb.MVCCStats, + ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats, ) { - sm.incMVCCGauges(ctx, tenantID, delta) + sm.incMVCCGauges(ctx, ref, delta) } func (sm *TenantsStorageMetrics) subtractMVCCStats( - ctx context.Context, tenantID roachpb.TenantID, delta enginepb.MVCCStats, + ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats, ) { var neg enginepb.MVCCStats neg.Subtract(delta) - sm.incMVCCGauges(ctx, tenantID, neg) + sm.incMVCCGauges(ctx, ref, neg) } func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) { diff --git a/pkg/kv/kvserver/metrics_test.go b/pkg/kv/kvserver/metrics_test.go index c6f76c91cc58..c6b58b07993a 100644 --- a/pkg/kv/kvserver/metrics_test.go +++ b/pkg/kv/kvserver/metrics_test.go @@ -54,13 +54,13 @@ func TestTenantsStorageMetricsConcurrency(t *testing.T) { tid := tenantIDs[rand.Intn(tenants)] time.Sleep(randDuration()) - sm.acquireTenant(tid) + ref := sm.acquireTenant(tid) time.Sleep(randDuration()) - sm.incMVCCGauges(ctx, tid, enginepb.MVCCStats{}) + sm.incMVCCGauges(ctx, ref, enginepb.MVCCStats{}) time.Sleep(randDuration()) - sm.releaseTenant(ctx, tid) + sm.releaseTenant(ctx, ref) } } var wg sync.WaitGroup diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 34df3b3fd18a..3bc7c2c086c2 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -267,9 +267,18 @@ type Replica struct { concMgr concurrency.Manager // tenantLimiter rate limits requests on a per-tenant basis and accumulates - // metrics about it. + // metrics about it. This is determined by the start key of the Replica, + // once initialized. tenantLimiter tenantrate.Limiter + // tenantMetricsRef is a metrics reference indicating the tenant under + // which to track the range's contributions. This is determined by the + // start key of the Replica, once initialized. + // Its purpose is to help track down missing/extraneous release operations + // that would not be apparent or easy to resolve when refcounting at the store + // level only. + tenantMetricsRef *tenantMetricsRef + // sideTransportClosedTimestamp encapsulates state related to the closed // timestamp's information about the range. Note that the // sideTransportClosedTimestamp does not incorporate the closed timestamp diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 33e1a3e33789..5512c3116af0 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -349,6 +349,13 @@ func (r *Replica) handleChangeReplicasResult( log.Infof(ctx, "removing replica due to ChangeReplicasTrigger: %v", chng) } + if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.NextReplicaID(), RemoveOptions{ + // We destroyed the data when the batch committed so don't destroy it again. + DestroyData: false, + }); err != nil { + log.Fatalf(ctx, "failed to remove replica: %v", err) + } + // NB: postDestroyRaftMuLocked requires that the batch which removed the data // be durably synced to disk, which we have. // See replicaAppBatch.ApplyToStateMachine(). @@ -356,12 +363,6 @@ func (r *Replica) handleChangeReplicasResult( log.Fatalf(ctx, "failed to run Replica postDestroy: %v", err) } - if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.NextReplicaID(), RemoveOptions{ - // We destroyed the data when the batch committed so don't destroy it again. - DestroyData: false, - }); err != nil { - log.Fatalf(ctx, "failed to remove replica: %v", err) - } return true } diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 225425da3945..96968eb5acd4 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -886,7 +886,6 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { needsSplitBySize := r.needsSplitBySizeRLocked() needsMergeBySize := r.needsMergeBySizeRLocked() needsTruncationByLogSize := r.needsRaftLogTruncationLocked() - tenantID := r.mu.tenantID r.mu.Unlock() if closedTimestampUpdated { r.handleClosedTimestampUpdateRaftMuLocked(ctx, b.state.RaftClosedTimestamp) @@ -895,7 +894,7 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { // Record the stats delta in the StoreMetrics. deltaStats := *b.state.Stats deltaStats.Subtract(prevStats) - r.store.metrics.addMVCCStats(ctx, tenantID, deltaStats) + r.store.metrics.addMVCCStats(ctx, r.tenantMetricsRef, deltaStats) // Record the write activity, passing a 0 nodeID because replica.writeStats // intentionally doesn't track the origin of the writes. diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index 05b9d5d917a1..cdc32adb3e20 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -112,6 +112,11 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS // directories belonging to replicas which aren't present. A crash before a // call to postDestroyRaftMuLocked will currently leave the files around // forever. + // + // TODO(tbg): coming back in 2021, the above should be outdated. The ReplicaID + // is set on creation and never changes over the lifetime of a Replica. Also, + // the replica is always contained in its descriptor. So this code below should + // be removable. if r.raftMu.sideloaded != nil { if err := r.raftMu.sideloaded.Clear(ctx); err != nil { return err @@ -120,8 +125,8 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS // Release the reference to this tenant in metrics, we know the tenant ID is // valid if the replica is initialized. - if tenantID, ok := r.TenantID(); ok { - r.store.metrics.releaseTenant(ctx, tenantID) + if r.tenantMetricsRef != nil { + r.store.metrics.releaseTenant(ctx, r.tenantMetricsRef) } // Unhook the tenant rate limiter if we have one. diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 09e2ef64a73b..7b007cf52644 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -339,7 +339,7 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R "replica %v: %v", r, err) } r.mu.tenantID = tenantID - r.store.metrics.acquireTenant(tenantID) + r.tenantMetricsRef = r.store.metrics.acquireTenant(tenantID) if tenantID != roachpb.SystemTenantID { r.tenantLimiter = r.store.tenantRateLimiters.GetTenant(ctx, tenantID, r.store.stopper.ShouldQuiesce()) } diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index ff436a769149..eafaab968b54 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -949,8 +949,8 @@ func (r *Replica) applySnapshot( r.mu.lastTerm = invalidLastTerm r.mu.raftLogSize = 0 // Update the store stats for the data in the snapshot. - r.store.metrics.subtractMVCCStats(ctx, r.mu.tenantID, *r.mu.state.Stats) - r.store.metrics.addMVCCStats(ctx, r.mu.tenantID, *state.Stats) + r.store.metrics.subtractMVCCStats(ctx, r.tenantMetricsRef, *r.mu.state.Stats) + r.store.metrics.addMVCCStats(ctx, r.tenantMetricsRef, *state.Stats) lastKnownLease := r.mu.state.Lease // Update the rest of the Raft state. Changes to r.mu.state.Desc must be // managed by r.setDescRaftMuLocked and changes to r.mu.state.Lease must be handled @@ -1135,11 +1135,6 @@ func (r *Replica) clearSubsumedReplicaInMemoryData( ctx context.Context, subsumedRepls []*Replica, subsumedNextReplicaID roachpb.ReplicaID, ) error { for _, sr := range subsumedRepls { - // We removed sr's data when we committed the batch. Finish subsumption by - // updating the in-memory bookkeping. - if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil { - return err - } // We already hold sr's raftMu, so we must call removeReplicaImpl directly. // Note that it's safe to update the store's metadata for sr's removal // separately from updating the store's metadata for r's new descriptor @@ -1153,6 +1148,11 @@ func (r *Replica) clearSubsumedReplicaInMemoryData( }); err != nil { return err } + // We removed sr's data when we committed the batch. Finish subsumption by + // updating the in-memory bookkeping. + if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil { + return err + } } return nil } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 5d90512eb330..707e5e436870 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1562,8 +1562,10 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // Add this range and its stats to our counter. s.metrics.ReplicaCount.Inc(1) - if tenantID, ok := rep.TenantID(); ok { - s.metrics.addMVCCStats(ctx, tenantID, rep.GetMVCCStats()) + if _, ok := rep.TenantID(); ok { + // TODO(tbg): why the check? We're definitely an initialized range so + // we have a tenantID. + s.metrics.addMVCCStats(ctx, rep.tenantMetricsRef, rep.GetMVCCStats()) } else { return errors.AssertionFailedf("found newly constructed replica"+ " for range %d at generation %d with an invalid tenant ID in store %d", diff --git a/pkg/kv/kvserver/store_merge.go b/pkg/kv/kvserver/store_merge.go index 22ad0ae8cef1..e96737888fb6 100644 --- a/pkg/kv/kvserver/store_merge.go +++ b/pkg/kv/kvserver/store_merge.go @@ -46,10 +46,6 @@ func (s *Store) MergeRange( leftRepl.raftMu.AssertHeld() rightRepl.raftMu.AssertHeld() - if err := rightRepl.postDestroyRaftMuLocked(ctx, rightRepl.GetMVCCStats()); err != nil { - return err - } - // Note that we were called (indirectly) from raft processing so we must // call removeInitializedReplicaRaftMuLocked directly to avoid deadlocking // on the right-hand replica's raftMu. @@ -61,6 +57,10 @@ func (s *Store) MergeRange( return errors.Errorf("cannot remove range: %s", err) } + if err := rightRepl.postDestroyRaftMuLocked(ctx, rightRepl.GetMVCCStats()); err != nil { + return err + } + if leftRepl.leaseholderStats != nil { leftRepl.leaseholderStats.resetRequestCounts() } diff --git a/pkg/kv/kvserver/store_remove_replica.go b/pkg/kv/kvserver/store_remove_replica.go index 8f47c805c840..f5d47ffa8a0a 100644 --- a/pkg/kv/kvserver/store_remove_replica.go +++ b/pkg/kv/kvserver/store_remove_replica.go @@ -69,7 +69,6 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( // destroy status. var desc *roachpb.RangeDescriptor var replicaID roachpb.ReplicaID - var tenantID roachpb.TenantID { rep.readOnlyCmdMu.Lock() rep.mu.Lock() @@ -112,7 +111,6 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( rep.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(rep.RangeID, rep.StoreID()), destroyReasonRemoved) replicaID = rep.mu.replicaID - tenantID = rep.mu.tenantID rep.mu.Unlock() rep.readOnlyCmdMu.Unlock() } @@ -144,7 +142,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( // Destroy, but this configuration helps avoid races in stat verification // tests. - s.metrics.subtractMVCCStats(ctx, tenantID, rep.GetMVCCStats()) + s.metrics.subtractMVCCStats(ctx, rep.tenantMetricsRef, rep.GetMVCCStats()) s.metrics.ReplicaCount.Dec(1) s.mu.Unlock() diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index 810965fae7a4..2294ca8ecd90 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -153,8 +153,10 @@ func splitPostApply( // Update store stats with difference in stats before and after split. if rightReplOrNil != nil { - if tenantID, ok := rightReplOrNil.TenantID(); ok { - rightReplOrNil.store.metrics.addMVCCStats(ctx, tenantID, deltaMS) + if _, ok := rightReplOrNil.TenantID(); ok { + // TODO(tbg): why this check to get here? Is this really checking if the RHS + // is already initialized? But isn't it always, at this point? + rightReplOrNil.store.metrics.addMVCCStats(ctx, rightReplOrNil.tenantMetricsRef, deltaMS) } else { log.Fatalf(ctx, "%s: found replica which is RHS of a split "+ "without a valid tenant ID", rightReplOrNil) From 7e70d0ad8d647b6972ebe05b9b848abf0664cc10 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 10 Nov 2021 14:46:14 +0100 Subject: [PATCH 03/10] kvserver: add TODO about tenant IDs > math.MaxInt64 These are effectively unsupported since using them would generate undefined behavior, at least in this one map that keys by int64 without shifting the value range around (officially, tenantIDs are uint64s). Release note: None --- pkg/kv/kvserver/metrics.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 8aafc4d2c478..9eb53c3f608e 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -1475,6 +1475,10 @@ type TenantsStorageMetrics struct { AbortSpanBytes *aggmetric.AggGauge // This struct is invisible to the metric package. + // + // TODO(tbg): seems bad that this uses int64 keys but tenantIDs can + // span uint64. Bad things will happen if we ever use tenantIDs in + // excess of math.MaxInt64. tenants syncutil.IntMap // map[roachpb.TenantID]*tenantStorageMetrics } From c0a8a638f019d3ef07772ea496573459bed1223c Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 10 Nov 2021 14:51:46 +0100 Subject: [PATCH 04/10] kvserver: test refcounting of tenants through split and merge Release note: None --- pkg/kv/kvserver/replica_command.go | 10 +++ pkg/kv/kvserver/replica_test.go | 113 +++++++++++++++++++++++++++++ 2 files changed, 123 insertions(+) diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 1c160add4dd2..2a1a0ef465bc 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -794,6 +794,11 @@ func waitForApplication( replicas []roachpb.ReplicaDescriptor, leaseIndex uint64, ) error { + if dialer == nil && len(replicas) == 1 { + // This early return supports unit tests (testContext{}) that also + // want to perform merges. + return nil + } return contextutil.RunWithTimeout(ctx, "wait for application", 5*time.Second, func(ctx context.Context) error { g := ctxgroup.WithContext(ctx) for _, repl := range replicas { @@ -825,6 +830,11 @@ func waitForReplicasInit( rangeID roachpb.RangeID, replicas []roachpb.ReplicaDescriptor, ) error { + if dialer == nil && len(replicas) == 1 { + // This early return supports unit tests (testContext{}) that also + // want to perform merges. + return nil + } return contextutil.RunWithTimeout(ctx, "wait for replicas init", 5*time.Second, func(ctx context.Context) error { g := ctxgroup.WithContext(ctx) for _, repl := range replicas { diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 16696639f69e..2fd97dbf0035 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -25,6 +25,7 @@ import ( "sync/atomic" "testing" "time" + "unsafe" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cli/exit" @@ -13057,3 +13058,115 @@ func TestRangeInfoReturned(t *testing.T) { }) } } + +func tenantsWithMetrics(m *StoreMetrics) map[roachpb.TenantID]struct{} { + metricsTenants := map[roachpb.TenantID]struct{}{} + m.tenants.Range(func(tenID int64, _ unsafe.Pointer) bool { + metricsTenants[roachpb.MakeTenantID(uint64(tenID))] = struct{}{} + return true // more + }) + return metricsTenants +} + +func isSystemTenantRepl(t *testing.T, repl *Replica) { + t.Log(repl) + // Even system tenant has a metrics ref. + require.NotNil(t, repl.tenantMetricsRef) + // System tenant has no rate limiter. + require.Nil(t, repl.tenantLimiter) + tenID, ok := repl.TenantID() + require.True(t, ok) // repl is initialized + require.Equal(t, roachpb.SystemTenantID, tenID) + // Even though the system tenant is not populated with a rate limiter and + // there is no refcounting for the system tenant, there is a system rate + // limiter (which would exist and be used for some requests even if the store + // had no replica for the system tenant). + require.NotNil(t, repl.store.tenantRateLimiters.GetTenant(context.Background(), tenID, nil /* closer */)) +} + +// TestStoreTenantMetricsAndRateLimiterRefcount verifies that the refcounting +// for replicas owned by tenants works: a tenant metrics or tenant rate limiter +// reference is only retained as long as a replica owned by that tenant exists +// on the store. This does not exhaustively test all of the ways in which a +// Replica can be destroyed (but uses a Merge). There are always-on assertions +// (see calls to tenantMetricsRef.assert) that fire on use-after-release, but +// if we leaked references on the non-merge code paths, this would not be +// obviously caught in testing. +func TestStoreTenantMetricsAndRateLimiterRefcount(t *testing.T) { + defer leaktest.AfterTest(t)() + + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + tc := testContext{} + tc.Start(t, stopper) + + // Initially all replicas are system tenant replicas. + tc.store.VisitReplicas(func(repl *Replica) (wantMore bool) { + isSystemTenantRepl(t, repl) + return true // wantMore + }) + + // The metrics only know the system tenant. + require.Equal(t, + map[roachpb.TenantID]struct{}{roachpb.SystemTenantID: {}}, + tenantsWithMetrics(tc.store.metrics), + ) + + // A range for tenant 123 appears via a split. + ten123 := roachpb.MakeTenantID(123) + splitKey := keys.MustAddr(keys.MakeSQLCodec(ten123).TenantPrefix()) + leftRepl := tc.store.LookupReplica(splitKey) + require.NotNil(t, leftRepl) + splitTestRange(tc.store, splitKey, t) + tenRepl := tc.store.LookupReplica(splitKey) + require.NotNil(t, tenRepl) + require.NotNil(t, tenRepl.tenantMetricsRef) + require.NotNil(t, tenRepl.tenantLimiter) + + // The store metrics correspondingly track the system tenant and tenant 123 + // and the rate limiter registry has an entry for it as well. + require.Equal(t, + map[roachpb.TenantID]struct{}{ + roachpb.SystemTenantID: {}, + ten123: {}, + }, + tenantsWithMetrics(tc.store.metrics), + ) + tenLimiter := tenRepl.tenantLimiter + secondLimiter := tenRepl.store.tenantRateLimiters.GetTenant(context.Background(), ten123, nil /* closer */) + tenRepl.store.tenantRateLimiters.Release(secondLimiter) + require.Equal(t, + tenLimiter, + secondLimiter, + ) + + // The sole range owned by tenant 123 gets merged away again. + _, pErr := leftRepl.AdminMerge(context.Background(), roachpb.AdminMergeRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: leftRepl.Desc().StartKey.AsRawKey(), + }, + }, "testing") + require.Nil(t, pErr) + + // The store metrics no longer track tenant 123. + require.Equal(t, + map[roachpb.TenantID]struct{}{ + roachpb.SystemTenantID: {}, + }, + tenantsWithMetrics(tc.store.metrics), + ) + // The rate limiter is similarly gone. We can't test this directly + // but we can check that the limiter we had has been released, which + // we can tell from a panic with an assertion failure if we release + // again. + func() { + defer func() { + r := recover() + err, ok := r.(error) + if !ok || !errors.HasAssertionFailure(err) { + t.Errorf("unxpected recover() after double-Release: %+v", r) + } + }() + tc.store.tenantRateLimiters.Release(tenLimiter) + }() +} From d1e3fc18fd3a0eca8c4f87f3d3b2e182484bca62 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 17 Nov 2021 15:27:55 +0100 Subject: [PATCH 05/10] kvserver: brush up tenant refcounting This adds some finishing touches that came out of PR review. Release note: None --- pkg/kv/kvserver/metrics.go | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 9eb53c3f608e..ebab735db162 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -1442,14 +1442,25 @@ type StoreMetrics struct { } type tenantMetricsRef struct { + // All fields are internal. Don't access them. + _tenantID roachpb.TenantID _state int32 // atomic; 0=usable 1=poisoned - stack []byte + + // _stack helps diagnose use-after-release when it occurs. + // This field is populated in releaseTenant and printed + // in assertions on failure. + _stack struct { + syncutil.Mutex + string + } } func (ref *tenantMetricsRef) assert(ctx context.Context) { if atomic.LoadInt32(&ref._state) != 0 { - log.Fatalf(ctx, "tenantMetricsRef already finalized in:\n%s", ref.stack) + ref._stack.Lock() + defer ref._stack.Unlock() + log.FatalfDepth(ctx, 1, "tenantMetricsRef already finalized in:\n%s", ref._stack.string) } } @@ -1476,10 +1487,11 @@ type TenantsStorageMetrics struct { // This struct is invisible to the metric package. // - // TODO(tbg): seems bad that this uses int64 keys but tenantIDs can - // span uint64. Bad things will happen if we ever use tenantIDs in - // excess of math.MaxInt64. - tenants syncutil.IntMap // map[roachpb.TenantID]*tenantStorageMetrics + // NB: note that the int64 conversion in this map is lossless, so + // everything will work with tenantsIDs in excess of math.MaxInt64 + // except that should one ever look at this map through a debugger + // the int64->uint64 conversion has to be done manually. + tenants syncutil.IntMap // map[int64(roachpb.TenantID)]*tenantStorageMetrics } var _ metric.Struct = (*TenantsStorageMetrics)(nil) @@ -1555,9 +1567,12 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) *tenan func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, ref *tenantMetricsRef) { m := sm.getTenant(ctx, ref) // NB: asserts against use-after-release if atomic.SwapInt32(&ref._state, 1) != 0 { - log.Fatalf(ctx, "metrics ref released twice") + ref.assert(ctx) // this will fatal + return // unreachable } - ref.stack = debug.Stack() + ref._stack.Lock() + ref._stack.string = string(debug.Stack()) + ref._stack.Unlock() m.mu.Lock() defer m.mu.Unlock() m.mu.refCount-- From 7f589808850ae6d3f52ff015d29e0a5a6cc00bda Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 18 Nov 2021 11:51:53 +0100 Subject: [PATCH 06/10] deps: bump logtags Needed to be able to do `AddTags(..., FromContext(ctx))` when the ctx might not have tags yet. Release note: None --- DEPS.bzl | 6 +++--- go.mod | 2 +- go.sum | 3 ++- vendor | 2 +- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 41b642a5de45..192b89ab3c5b 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -1198,10 +1198,10 @@ def go_deps(): name = "com_github_cockroachdb_logtags", build_file_proto_mode = "disable_global", importpath = "github.com/cockroachdb/logtags", - sha256 = "e0ff78268deed42414d58c55115e2a7db8d6b76f4165c02d8ba40d6cd32495a1", - strip_prefix = "github.com/cockroachdb/logtags@v0.0.0-20190617123548-eb05cc24525f", + sha256 = "1972c3f171f118add3fd9e64bcea6cbb9959a3b7fa0ada308e8a7310813fea74", + strip_prefix = "github.com/cockroachdb/logtags@v0.0.0-20211118104740-dabe8e521a4f", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20190617123548-eb05cc24525f.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20211118104740-dabe8e521a4f.zip", ], ) go_repository( diff --git a/go.mod b/go.mod index 19ed876760f3..cc54689b66b1 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/cockroachdb/errors v1.8.5 github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55 github.com/cockroachdb/gostdlib v1.13.0 - github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f + github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f github.com/cockroachdb/pebble v0.0.0-20211019184201-7fec828fc1af github.com/cockroachdb/redact v1.1.3 github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd diff --git a/go.sum b/go.sum index 20a94ab9adf1..9e740a70363f 100644 --- a/go.sum +++ b/go.sum @@ -384,8 +384,9 @@ github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55 h1:Yq github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55/go.mod h1:QqVqNIiRhLqJXif5C9wbM4JydBhrAF2WDMxkv5xkyxQ= github.com/cockroachdb/gostdlib v1.13.0 h1:TzSEPYgkKDNei3gbLc0rrHu4iHyBp7/+NxPOFmcXGaw= github.com/cockroachdb/gostdlib v1.13.0/go.mod h1:eXX95p9QDrYwJfJ6AgeN9QnRa/lqqid9LAzWz/l5OgA= -github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f h1:o/kfcElHqOiXqcou5a3rIlMc7oJbMQkeLk0VQJ7zgqY= github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI= +github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f h1:6jduT9Hfc0njg5jJ1DdKCFPdMBrp/mdZfCpa5h+WM74= +github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= github.com/cockroachdb/panicparse/v2 v2.0.0-20211103220158-604c82a44f1e h1:FrERdkPlRj+v7fc+PGpey3GUiDGuTR5CsmLCA54YJ8I= github.com/cockroachdb/panicparse/v2 v2.0.0-20211103220158-604c82a44f1e/go.mod h1:pMxsKyCewnV3xPaFvvT9NfwvDTcIx2Xqg0qL5Gq0SjM= github.com/cockroachdb/pebble v0.0.0-20211019184201-7fec828fc1af h1:NY+UDVTyU+Y2wKr0ocnBSbXGYTHEGwnQ9ukP+qg7xfY= diff --git a/vendor b/vendor index 526352f336aa..1b9ad708ac24 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit 526352f336aa6eeb87953094936c42ac8168807f +Subproject commit 1b9ad708ac24e1f35d450ee3a06caf8156156de9 From 4c00d7498c3c73acb2e3437419d9c31c096a2073 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Tue, 16 Nov 2021 21:23:34 +0100 Subject: [PATCH 07/10] jobs, lease: properly use `logtags.AddTags` In my previous changes to these areas, I mistakenly used `WithTags`, where we really wanted `AddTags`. This patch fixes that. Release note: None --- pkg/jobs/registry.go | 7 ++++++- pkg/sql/catalog/lease/descriptor_state.go | 4 +++- pkg/sql/catalog/lease/lease.go | 12 +++++++++--- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 3313212c9b36..4df6abf7c720 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -250,7 +250,12 @@ func (r *Registry) ID() base.SQLInstanceID { // cancel func. func (r *Registry) makeCtx() (context.Context, func()) { ctx := r.ac.AnnotateCtx(context.Background()) - ctx = logtags.WithTags(ctx, logtags.FromContext(r.serverCtx)) + // AddTags and not WithTags, so that we combine the tags with those + // filled by AnnotateCtx. + // TODO(knz): This may not be necessary if the AmbientContext had + // all the tags already. + // See: https://github.com/cockroachdb/cockroach/issues/72815 + ctx = logtags.AddTags(ctx, logtags.FromContext(r.serverCtx)) return context.WithCancel(ctx) } diff --git a/pkg/sql/catalog/lease/descriptor_state.go b/pkg/sql/catalog/lease/descriptor_state.go index 75112018c413..59d50e1aece2 100644 --- a/pkg/sql/catalog/lease/descriptor_state.go +++ b/pkg/sql/catalog/lease/descriptor_state.go @@ -287,7 +287,9 @@ func (t *descriptorState) maybeQueueLeaseRenewal( // Start the renewal. When it finishes, it will reset t.renewalInProgress. newCtx := m.ambientCtx.AnnotateCtx(context.Background()) - newCtx = logtags.WithTags(newCtx, logtags.FromContext(ctx)) + // AddTags and not WithTags, so that we combine the tags with those + // filled by AnnotateCtx. + newCtx = logtags.AddTags(newCtx, logtags.FromContext(ctx)) return t.stopper.RunAsyncTask(newCtx, "lease renewal", func(ctx context.Context) { t.startLeaseRenewal(ctx, m, id, name) diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 2798542fcb4b..810409d5ef28 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -473,7 +473,9 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro // because of its use of `singleflight.Group`. See issue #41780 for how this has // happened. baseCtx := m.ambientCtx.AnnotateCtx(context.Background()) - baseCtx = logtags.WithTags(baseCtx, logtags.FromContext(ctx)) + // AddTags and not WithTags, so that we combine the tags with those + // filled by AnnotateCtx. + baseCtx = logtags.AddTags(baseCtx, logtags.FromContext(ctx)) newCtx, cancel := m.stopper.WithCancelOnQuiesce(baseCtx) defer cancel() if m.isDraining() { @@ -526,7 +528,9 @@ func releaseLease(ctx context.Context, lease *storedLease, m *Manager) { // Release to the store asynchronously, without the descriptorState lock. newCtx := m.ambientCtx.AnnotateCtx(context.Background()) - newCtx = logtags.WithTags(newCtx, logtags.FromContext(ctx)) + // AddTags and not WithTags, so that we combine the tags with those + // filled by AnnotateCtx. + newCtx = logtags.AddTags(newCtx, logtags.FromContext(ctx)) if err := m.stopper.RunAsyncTask( newCtx, "sql.descriptorState: releasing descriptor lease", func(ctx context.Context) { @@ -1227,7 +1231,9 @@ func (m *Manager) DeleteOrphanedLeases(ctx context.Context, timeThreshold int64) // Run as async worker to prevent blocking the main server Start method. // Exit after releasing all the orphaned leases. newCtx := m.ambientCtx.AnnotateCtx(context.Background()) - newCtx = logtags.WithTags(newCtx, logtags.FromContext(ctx)) + // AddTags and not WithTags, so that we combine the tags with those + // filled by AnnotateCtx. + newCtx = logtags.AddTags(newCtx, logtags.FromContext(ctx)) _ = m.stopper.RunAsyncTask(newCtx, "del-orphaned-leases", func(ctx context.Context) { // This could have been implemented using DELETE WHERE, but DELETE WHERE // doesn't implement AS OF SYSTEM TIME. From 9b1cd586b93a3011933815b65c6e2ff35e46bf55 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Tue, 16 Nov 2021 21:53:14 +0100 Subject: [PATCH 08/10] sqlliveness: hook up to tracing and server tags Release note: None --- pkg/server/server_sql.go | 1 + pkg/sql/sqlliveness/slprovider/BUILD.bazel | 1 + pkg/sql/sqlliveness/slprovider/slprovider.go | 4 +++- pkg/sql/sqlliveness/slstorage/slstorage.go | 14 ++++++++++---- pkg/sql/sqlliveness/slstorage/slstorage_test.go | 10 +++++++--- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index c1e7b95bbe70..8fdf4f36d151 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -361,6 +361,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { sqllivenessKnobs, _ := cfg.TestingKnobs.SQLLivenessKnobs.(*sqlliveness.TestingKnobs) cfg.sqlLivenessProvider = slprovider.New( + cfg.AmbientCtx, cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs, ) cfg.sqlInstanceProvider = instanceprovider.New( diff --git a/pkg/sql/sqlliveness/slprovider/BUILD.bazel b/pkg/sql/sqlliveness/slprovider/BUILD.bazel index f107a6f89e12..f8e2f222b238 100644 --- a/pkg/sql/sqlliveness/slprovider/BUILD.bazel +++ b/pkg/sql/sqlliveness/slprovider/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//pkg/sql/sqlliveness/slinstance", "//pkg/sql/sqlliveness/slstorage", "//pkg/util/hlc", + "//pkg/util/log", "//pkg/util/metric", "//pkg/util/stop", ], diff --git a/pkg/sql/sqlliveness/slprovider/slprovider.go b/pkg/sql/sqlliveness/slprovider/slprovider.go index 428e6d692c26..f8e54f7e4d1d 100644 --- a/pkg/sql/sqlliveness/slprovider/slprovider.go +++ b/pkg/sql/sqlliveness/slprovider/slprovider.go @@ -22,12 +22,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" ) // New constructs a new Provider. func New( + ambientCtx log.AmbientContext, stopper *stop.Stopper, clock *hlc.Clock, db *kv.DB, @@ -35,7 +37,7 @@ func New( settings *cluster.Settings, testingKnobs *sqlliveness.TestingKnobs, ) sqlliveness.Provider { - storage := slstorage.NewStorage(stopper, clock, db, codec, settings) + storage := slstorage.NewStorage(ambientCtx, stopper, clock, db, codec, settings) instance := slinstance.NewSQLInstance(stopper, clock, storage, settings, testingKnobs) return &provider{ Storage: storage, diff --git a/pkg/sql/sqlliveness/slstorage/slstorage.go b/pkg/sql/sqlliveness/slstorage/slstorage.go index eea5692d126b..f574c1df1d0e 100644 --- a/pkg/sql/sqlliveness/slstorage/slstorage.go +++ b/pkg/sql/sqlliveness/slstorage/slstorage.go @@ -75,6 +75,8 @@ var CacheSize = settings.RegisterIntSetting( // Storage implements sqlliveness.Storage. type Storage struct { + log.AmbientContext + settings *cluster.Settings stopper *stop.Stopper clock *hlc.Clock @@ -103,6 +105,7 @@ type Storage struct { // NewTestingStorage constructs a new storage with control for the database // in which the `sqlliveness` table should exist. func NewTestingStorage( + ambientCtx log.AmbientContext, stopper *stop.Stopper, clock *hlc.Clock, db *kv.DB, @@ -112,6 +115,8 @@ func NewTestingStorage( newTimer func() timeutil.TimerI, ) *Storage { s := &Storage{ + AmbientContext: ambientCtx, + settings: settings, stopper: stopper, clock: clock, @@ -140,13 +145,14 @@ func NewTestingStorage( // NewStorage creates a new storage struct. func NewStorage( + ambientCtx log.AmbientContext, stopper *stop.Stopper, clock *hlc.Clock, db *kv.DB, codec keys.SQLCodec, settings *cluster.Settings, ) *Storage { - return NewTestingStorage(stopper, clock, db, codec, settings, keys.SqllivenessID, + return NewTestingStorage(ambientCtx, stopper, clock, db, codec, settings, keys.SqllivenessID, timeutil.DefaultTimeSource{}.NewTimer) } @@ -222,9 +228,9 @@ func (s *Storage) isAlive( // of the first context cancels other callers to the `acquireNodeLease()` method, // because of its use of `singleflight.Group`. See issue #41780 for how this has // happened. - newCtx, cancel := s.stopper.WithCancelOnQuiesce( - logtags.WithTags(context.Background(), logtags.FromContext(ctx)), - ) + bgCtx := s.AnnotateCtx(context.Background()) + bgCtx = logtags.AddTags(bgCtx, logtags.FromContext(ctx)) + newCtx, cancel := s.stopper.WithCancelOnQuiesce(bgCtx) defer cancel() // store the result underneath the singleflight to avoid the need diff --git a/pkg/sql/sqlliveness/slstorage/slstorage_test.go b/pkg/sql/sqlliveness/slstorage/slstorage_test.go index 4930fc5f2472..911bc73d2877 100644 --- a/pkg/sql/sqlliveness/slstorage/slstorage_test.go +++ b/pkg/sql/sqlliveness/slstorage/slstorage_test.go @@ -72,7 +72,8 @@ func TestStorage(t *testing.T) { }, base.DefaultMaxClockOffset) settings := cluster.MakeTestingClusterSettings() stopper := stop.NewStopper() - storage := slstorage.NewTestingStorage(stopper, clock, kvDB, keys.SystemSQLCodec, settings, + var ambientCtx log.AmbientContext + storage := slstorage.NewTestingStorage(ambientCtx, stopper, clock, kvDB, keys.SystemSQLCodec, settings, tableID, timeSource.NewTimer) return clock, timeSource, settings, stopper, storage } @@ -330,7 +331,8 @@ func TestConcurrentAccessesAndEvictions(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) slstorage.CacheSize.Override(ctx, &settings.SV, 10) - storage := slstorage.NewTestingStorage(stopper, clock, kvDB, keys.SystemSQLCodec, settings, + var ambientCtx log.AmbientContext + storage := slstorage.NewTestingStorage(ambientCtx, stopper, clock, kvDB, keys.SystemSQLCodec, settings, tableID, timeSource.NewTimer) storage.Start(ctx) @@ -494,7 +496,8 @@ func TestConcurrentAccessSynchronization(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) slstorage.CacheSize.Override(ctx, &settings.SV, 10) - storage := slstorage.NewTestingStorage(stopper, clock, kvDB, keys.SystemSQLCodec, settings, + var ambientCtx log.AmbientContext + storage := slstorage.NewTestingStorage(ambientCtx, stopper, clock, kvDB, keys.SystemSQLCodec, settings, tableID, timeSource.NewTimer) storage.Start(ctx) @@ -686,6 +689,7 @@ func TestDeleteMidUpdateFails(t *testing.T) { tableID := getTableID(t, tdb, dbName, "sqlliveness") storage := slstorage.NewTestingStorage( + s.DB().AmbientContext, s.Stopper(), s.Clock(), kvDB, keys.SystemSQLCodec, s.ClusterSettings(), tableID, timeutil.DefaultTimeSource{}.NewTimer, ) From a4b025dc68ed3b1a1738a5691009d36305491c62 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Tue, 16 Nov 2021 21:58:30 +0100 Subject: [PATCH 09/10] sql: hook up conn executor close to tracing Release note: None --- pkg/sql/conn_executor.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index f45aef6b06eb..da3cebb3aee4 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -976,7 +976,10 @@ func (ex *connExecutor) closeWrapper(ctx context.Context, recovered interface{}) panic(panicErr) } // Closing is not cancelable. - closeCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) + closeCtx := ex.server.cfg.AmbientCtx.AnnotateCtx(context.Background()) + // AddTags and not WithTags, so that we combine the tags with those + // filled by AnnotateCtx. + closeCtx = logtags.AddTags(closeCtx, logtags.FromContext(ctx)) ex.close(closeCtx, normalClose) } From bc269bddd1109875e43ade6cc51f6e1a6b502a04 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Tue, 16 Nov 2021 21:58:52 +0100 Subject: [PATCH 10/10] kvserver: hook up learner rollback task to tracing Release note: None --- pkg/kv/kvserver/replica_command.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 1c160add4dd2..194435a1a87b 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -1798,7 +1798,10 @@ func (r *Replica) tryRollbackRaftLearner( }) return err } - rollbackCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) + rollbackCtx := r.AnnotateCtx(context.Background()) + // AddTags and not WithTags, so that we combine the tags with those + // filled by AnnotateCtx. + rollbackCtx = logtags.AddTags(rollbackCtx, logtags.FromContext(ctx)) if err := contextutil.RunWithTimeout( rollbackCtx, "learner rollback", rollbackTimeout, rollbackFn, ); err != nil {