From 04481d8fbe069435d5a77dcd29a0e90433f41d2b Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 8 Aug 2022 13:13:33 +0200 Subject: [PATCH 1/7] kvserver: also block LEARNER snaps to paused followers We checked whether the snapshot recipient was paused only in the raft log queue path. By pushing the check down into `sendSnapshot`, it is now hit by any snapshot attempt, which includes the replicate queue and store rebalancer. For best results, both of these should avoid moving replicas to paused followers in the first place, which they already do, at least partially, so this change shouldn't have much of an impact in practice. Fixes https://github.com/cockroachdb/cockroach/issues/85479. Release note: None --- pkg/kv/kvserver/raft_snapshot_queue.go | 12 ------------ pkg/kv/kvserver/replica_command.go | 21 +++++++++++++++------ 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index 223747f98e8d..61419699ecec 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -139,18 +139,6 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( return false, nil } } - repl.mu.RLock() - _, destPaused := repl.mu.pausedFollowers[id] - repl.mu.RUnlock() - if ioThresh := repl.store.ioOverloadedStores.Load()[repDesc.StoreID]; ioThresh != nil && destPaused { - // If the destination is paused, be more hesitant to send snapshots. The destination being - // paused implies that we have recently checked that it's not required for quorum, and that - // we wish to conserve I/O on that store, which sending a snapshot counteracts. So hold back on - // the snapshot as well. - err := errors.Errorf("skipping snapshot; %s is overloaded: %s", repDesc, ioThresh) - repl.reportSnapshotStatus(ctx, repDesc.ReplicaID, err) - return false, err - } err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY) diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 5b65e682e968..f48c2423dd6b 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2592,10 +2592,23 @@ func (r *Replica) sendSnapshot( r.reportSnapshotStatus(ctx, recipient.ReplicaID, retErr) }() - sender, err := r.GetReplicaDescriptor() + r.mu.RLock() + sender, err := r.getReplicaDescriptorRLocked() + _, destPaused := r.mu.pausedFollowers[recipient.ReplicaID] + r.mu.RUnlock() + if err != nil { return err } + + if ioThresh := r.store.ioOverloadedStores.Load()[recipient.StoreID]; ioThresh != nil && destPaused { + // If the destination is paused, be more hesitant to send snapshots. The destination being + // paused implies that we have recently checked that it's not required for quorum, and that + // we wish to conserve I/O on that store, which sending a snapshot counteracts. So hold back on + // the snapshot as well. + return errors.Errorf("skipping snapshot; %s is overloaded: %s", recipient, ioThresh) + } + // Check follower snapshots cluster setting. if followerSnapshotsEnabled.Get(&r.ClusterSettings().SV) { sender, err = r.getSenderReplica(ctx) @@ -2607,10 +2620,6 @@ func (r *Replica) sendSnapshot( log.VEventf( ctx, 2, "delegating snapshot transmission for %v to %v", recipient, sender, ) - desc, err := r.GetReplicaDescriptor() - if err != nil { - return err - } status := r.RaftStatus() if status == nil { // This code path is sometimes hit during scatter for replicas that @@ -2621,7 +2630,7 @@ func (r *Replica) sendSnapshot( // Create new delegate snapshot request with only required metadata. delegateRequest := &kvserverpb.DelegateSnapshotRequest{ RangeID: r.RangeID, - CoordinatorReplica: desc, + CoordinatorReplica: sender, RecipientReplica: recipient, Priority: priority, Type: snapType, From c3bbfe1ff5b83e91ed495f7069c6b0d5529bdf56 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 8 Aug 2022 13:34:29 +0200 Subject: [PATCH 2/7] kvserver: extract method updatePausedFollowersLocked Intentionally leaving this unsimplified as a purely mechanical commit. Release note: None --- pkg/kv/kvserver/replica_raft.go | 48 +--------------------- pkg/kv/kvserver/replica_raft_overload.go | 52 ++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 47 deletions(-) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 7f10109ab59a..b204747a9c2a 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1186,53 +1186,7 @@ func (r *Replica) tick( return false, nil } - if r.replicaID == r.mu.leaderID && len(ioOverloadMap) > 0 && quotaPoolEnabledForRange(*r.descRLocked()) { - // When multiple followers are overloaded, we may not be able to exclude all - // of them from replication traffic due to quorum constraints. We would like - // a given Range to deterministically exclude the same store (chosen - // randomly), so that across multiple Ranges we have a chance of removing - // load from all overloaded Stores in the cluster. (It would be a bad idea - // to roll a per-Range dice here on every tick, since that would rapidly - // include and exclude individual followers from replication traffic, which - // would be akin to a high rate of packet loss. Once we've decided to ignore - // a follower, this decision should be somewhat stable for at least a few - // seconds). - // - // Note that we don't enable this mechanism for the liveness range (see - // quotaPoolEnabledForRange), simply to play it safe, as we know that the - // liveness range is unlikely to be a major contributor to any follower's - // I/O and wish to reduce the likelihood of a problem in replication pausing - // contributing to an outage of that critical range. - seed := int64(r.RangeID) - now := r.store.Clock().Now().GoTime() - d := computeExpendableOverloadedFollowersInput{ - replDescs: r.descRLocked().Replicas(), - ioOverloadMap: ioOverloadMap, - getProgressMap: func(_ context.Context) map[uint64]tracker.Progress { - prs := r.mu.internalRaftGroup.Status().Progress - updateRaftProgressFromActivity(ctx, prs, r.descRLocked().Replicas().AsProto(), func(id roachpb.ReplicaID) bool { - return r.mu.lastUpdateTimes.isFollowerActiveSince(ctx, id, now, r.store.cfg.RangeLeaseActiveDuration()) - }) - return prs - }, - minLiveMatchIndex: r.mu.proposalQuotaBaseIndex, - seed: seed, - } - r.mu.pausedFollowers, _ = computeExpendableOverloadedFollowers(ctx, d) - for replicaID := range r.mu.pausedFollowers { - // We're dropping messages to those followers (see handleRaftReady) but - // it's a good idea to tell raft not to even bother sending in the first - // place. Raft will react to this by moving the follower to probing state - // where it will be contacted only sporadically until it responds to an - // MsgApp (which it can only do once we stop dropping messages). Something - // similar would result naturally if we didn't report as unreachable, but - // with more wasted work. - r.mu.internalRaftGroup.ReportUnreachable(uint64(replicaID)) - } - } else if len(r.mu.pausedFollowers) > 0 { - // No store in the cluster is overloaded, or this replica is not raft leader. - r.mu.pausedFollowers = nil - } + r.updatePausedFollowersLocked(ctx, ioOverloadMap) now := r.store.Clock().NowAsClockTimestamp() if r.maybeQuiesceRaftMuLockedReplicaMuLocked(ctx, now, livenessMap) { diff --git a/pkg/kv/kvserver/replica_raft_overload.go b/pkg/kv/kvserver/replica_raft_overload.go index 0495417a2613..4a46a1ba01d0 100644 --- a/pkg/kv/kvserver/replica_raft_overload.go +++ b/pkg/kv/kvserver/replica_raft_overload.go @@ -206,3 +206,55 @@ func (osm *overloadedStoresMap) Swap( v, _ := (*atomic.Value)(osm).Swap(m).(map[roachpb.StoreID]*admissionpb.IOThreshold) return v } + +func (r *Replica) updatePausedFollowersLocked( + ctx context.Context, ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold, +) { + if r.replicaID == r.mu.leaderID && len(ioOverloadMap) > 0 && quotaPoolEnabledForRange(*r.descRLocked()) { + // When multiple followers are overloaded, we may not be able to exclude all + // of them from replication traffic due to quorum constraints. We would like + // a given Range to deterministically exclude the same store (chosen + // randomly), so that across multiple Ranges we have a chance of removing + // load from all overloaded Stores in the cluster. (It would be a bad idea + // to roll a per-Range dice here on every tick, since that would rapidly + // include and exclude individual followers from replication traffic, which + // would be akin to a high rate of packet loss. Once we've decided to ignore + // a follower, this decision should be somewhat stable for at least a few + // seconds). + // + // Note that we don't enable this mechanism for the liveness range (see + // quotaPoolEnabledForRange), simply to play it safe, as we know that the + // liveness range is unlikely to be a major contributor to any follower's + // I/O and wish to reduce the likelihood of a problem in replication pausing + // contributing to an outage of that critical range. + seed := int64(r.RangeID) + now := r.store.Clock().Now().GoTime() + d := computeExpendableOverloadedFollowersInput{ + replDescs: r.descRLocked().Replicas(), + ioOverloadMap: ioOverloadMap, + getProgressMap: func(_ context.Context) map[uint64]tracker.Progress { + prs := r.mu.internalRaftGroup.Status().Progress + updateRaftProgressFromActivity(ctx, prs, r.descRLocked().Replicas().AsProto(), func(id roachpb.ReplicaID) bool { + return r.mu.lastUpdateTimes.isFollowerActiveSince(ctx, id, now, r.store.cfg.RangeLeaseActiveDuration()) + }) + return prs + }, + minLiveMatchIndex: r.mu.proposalQuotaBaseIndex, + seed: seed, + } + r.mu.pausedFollowers, _ = computeExpendableOverloadedFollowers(ctx, d) + for replicaID := range r.mu.pausedFollowers { + // We're dropping messages to those followers (see handleRaftReady) but + // it's a good idea to tell raft not to even bother sending in the first + // place. Raft will react to this by moving the follower to probing state + // where it will be contacted only sporadically until it responds to an + // MsgApp (which it can only do once we stop dropping messages). Something + // similar would result naturally if we didn't report as unreachable, but + // with more wasted work. + r.mu.internalRaftGroup.ReportUnreachable(uint64(replicaID)) + } + } else if len(r.mu.pausedFollowers) > 0 { + // No store in the cluster is overloaded, or this replica is not raft leader. + r.mu.pausedFollowers = nil + } +} From bc8c25a2608b80ba1d64d94db973f21ee9444a41 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 8 Aug 2022 13:46:02 +0200 Subject: [PATCH 3/7] kvserver: simplify updatePausedFollowersLocked Release note: None --- pkg/kv/kvserver/replica_raft_overload.go | 97 +++++++++++++----------- 1 file changed, 53 insertions(+), 44 deletions(-) diff --git a/pkg/kv/kvserver/replica_raft_overload.go b/pkg/kv/kvserver/replica_raft_overload.go index 4a46a1ba01d0..98e9a9ac33e2 100644 --- a/pkg/kv/kvserver/replica_raft_overload.go +++ b/pkg/kv/kvserver/replica_raft_overload.go @@ -210,51 +210,60 @@ func (osm *overloadedStoresMap) Swap( func (r *Replica) updatePausedFollowersLocked( ctx context.Context, ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold, ) { - if r.replicaID == r.mu.leaderID && len(ioOverloadMap) > 0 && quotaPoolEnabledForRange(*r.descRLocked()) { - // When multiple followers are overloaded, we may not be able to exclude all - // of them from replication traffic due to quorum constraints. We would like - // a given Range to deterministically exclude the same store (chosen - // randomly), so that across multiple Ranges we have a chance of removing - // load from all overloaded Stores in the cluster. (It would be a bad idea - // to roll a per-Range dice here on every tick, since that would rapidly - // include and exclude individual followers from replication traffic, which - // would be akin to a high rate of packet loss. Once we've decided to ignore - // a follower, this decision should be somewhat stable for at least a few - // seconds). - // - // Note that we don't enable this mechanism for the liveness range (see - // quotaPoolEnabledForRange), simply to play it safe, as we know that the - // liveness range is unlikely to be a major contributor to any follower's + r.mu.pausedFollowers = nil + + if len(ioOverloadMap) == 0 { + return + } + + if r.replicaID != r.mu.leaderID { + // Only the raft leader pauses followers. Followers never send meaningful + // amounts of data in raft messages, so pausing doesn't make sense on them. + return + } + + if !quotaPoolEnabledForRange(*r.descRLocked()) { + // If the quota pool isn't enabled (like for the liveness range), play it + // safe. The range is unlikely to be a major contributor to any follower's // I/O and wish to reduce the likelihood of a problem in replication pausing // contributing to an outage of that critical range. - seed := int64(r.RangeID) - now := r.store.Clock().Now().GoTime() - d := computeExpendableOverloadedFollowersInput{ - replDescs: r.descRLocked().Replicas(), - ioOverloadMap: ioOverloadMap, - getProgressMap: func(_ context.Context) map[uint64]tracker.Progress { - prs := r.mu.internalRaftGroup.Status().Progress - updateRaftProgressFromActivity(ctx, prs, r.descRLocked().Replicas().AsProto(), func(id roachpb.ReplicaID) bool { - return r.mu.lastUpdateTimes.isFollowerActiveSince(ctx, id, now, r.store.cfg.RangeLeaseActiveDuration()) - }) - return prs - }, - minLiveMatchIndex: r.mu.proposalQuotaBaseIndex, - seed: seed, - } - r.mu.pausedFollowers, _ = computeExpendableOverloadedFollowers(ctx, d) - for replicaID := range r.mu.pausedFollowers { - // We're dropping messages to those followers (see handleRaftReady) but - // it's a good idea to tell raft not to even bother sending in the first - // place. Raft will react to this by moving the follower to probing state - // where it will be contacted only sporadically until it responds to an - // MsgApp (which it can only do once we stop dropping messages). Something - // similar would result naturally if we didn't report as unreachable, but - // with more wasted work. - r.mu.internalRaftGroup.ReportUnreachable(uint64(replicaID)) - } - } else if len(r.mu.pausedFollowers) > 0 { - // No store in the cluster is overloaded, or this replica is not raft leader. - r.mu.pausedFollowers = nil + return + } + + // When multiple followers are overloaded, we may not be able to exclude all + // of them from replication traffic due to quorum constraints. We would like + // a given Range to deterministically exclude the same store (chosen + // randomly), so that across multiple Ranges we have a chance of removing + // load from all overloaded Stores in the cluster. (It would be a bad idea + // to roll a per-Range dice here on every tick, since that would rapidly + // include and exclude individual followers from replication traffic, which + // would be akin to a high rate of packet loss. Once we've decided to ignore + // a follower, this decision should be somewhat stable for at least a few + // seconds). + seed := int64(r.RangeID) + now := r.store.Clock().Now().GoTime() + d := computeExpendableOverloadedFollowersInput{ + replDescs: r.descRLocked().Replicas(), + ioOverloadMap: ioOverloadMap, + getProgressMap: func(_ context.Context) map[uint64]tracker.Progress { + prs := r.mu.internalRaftGroup.Status().Progress + updateRaftProgressFromActivity(ctx, prs, r.descRLocked().Replicas().AsProto(), func(id roachpb.ReplicaID) bool { + return r.mu.lastUpdateTimes.isFollowerActiveSince(ctx, id, now, r.store.cfg.RangeLeaseActiveDuration()) + }) + return prs + }, + minLiveMatchIndex: r.mu.proposalQuotaBaseIndex, + seed: seed, + } + r.mu.pausedFollowers, _ = computeExpendableOverloadedFollowers(ctx, d) + for replicaID := range r.mu.pausedFollowers { + // We're dropping messages to those followers (see handleRaftReady) but + // it's a good idea to tell raft not to even bother sending in the first + // place. Raft will react to this by moving the follower to probing state + // where it will be contacted only sporadically until it responds to an + // MsgApp (which it can only do once we stop dropping messages). Something + // similar would result naturally if we didn't report as unreachable, but + // with more wasted work. + r.mu.internalRaftGroup.ReportUnreachable(uint64(replicaID)) } } From dfac2ff5071914fd6c37998e7ac84a51f37c5a9b Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 8 Aug 2022 13:47:19 +0200 Subject: [PATCH 4/7] kvserver: only pause followers when holding active lease If the raft leader is not the leaseholder (which includes the case in which we just transferred the lease away), leave all followers unpaused. Otherwise, the leaseholder won't learn that the entries it submitted were committed which effectively causes range unavailability. Fixes #84884. Release note: None --- pkg/kv/kvserver/replica_raft_overload.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/kv/kvserver/replica_raft_overload.go b/pkg/kv/kvserver/replica_raft_overload.go index 98e9a9ac33e2..155a2d171726 100644 --- a/pkg/kv/kvserver/replica_raft_overload.go +++ b/pkg/kv/kvserver/replica_raft_overload.go @@ -230,6 +230,15 @@ func (r *Replica) updatePausedFollowersLocked( return } + status := r.leaseStatusAtRLocked(ctx, r.Clock().NowAsClockTimestamp()) + if !status.IsValid() || !status.OwnedBy(r.StoreID()) { + // If we're not the leaseholder (which includes the case in which we just + // transferred the lease away), leave all followers unpaused. Otherwise, the + // leaseholder won't learn that the entries it submitted were committed + // which effectively causes range unavailability. + return + } + // When multiple followers are overloaded, we may not be able to exclude all // of them from replication traffic due to quorum constraints. We would like // a given Range to deterministically exclude the same store (chosen From e4ae047573f90b952299148123a4a6bd138e60a4 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 8 Aug 2022 15:46:31 +0200 Subject: [PATCH 5/7] kvserver: never pause local replica As observed in #84884, an overloaded store that held leases could end up "pausing" replication traffic to itself. This (likely) had no practical effect since the leader never sends messages to itself, but it meant reporting bogus counts of paused replicas. This commit ensures that a raft leader will never pause itself. Release note: None --- pkg/kv/kvserver/replica_raft_overload.go | 5 +++-- pkg/kv/kvserver/replica_raft_overload_test.go | 4 ++++ pkg/kv/kvserver/testdata/replica_raft_overload/self.txt | 4 ++++ 3 files changed, 11 insertions(+), 2 deletions(-) create mode 100644 pkg/kv/kvserver/testdata/replica_raft_overload/self.txt diff --git a/pkg/kv/kvserver/replica_raft_overload.go b/pkg/kv/kvserver/replica_raft_overload.go index 155a2d171726..003ede45322e 100644 --- a/pkg/kv/kvserver/replica_raft_overload.go +++ b/pkg/kv/kvserver/replica_raft_overload.go @@ -43,6 +43,7 @@ var pauseReplicationIOThreshold = settings.RegisterFloatSetting( ) type computeExpendableOverloadedFollowersInput struct { + self roachpb.ReplicaID replDescs roachpb.ReplicaSet // TODO(tbg): all entries are overloaded, so consdier removing the IOThreshold here // because it's confusing. @@ -104,11 +105,10 @@ func computeExpendableOverloadedFollowers( var nonLive map[roachpb.ReplicaID]nonLiveReason var liveOverloadedVoterCandidates map[roachpb.ReplicaID]struct{} var liveOverloadedNonVoterCandidates map[roachpb.ReplicaID]struct{} - var prs map[uint64]tracker.Progress for _, replDesc := range d.replDescs.AsProto() { - if _, overloaded := d.ioOverloadMap[replDesc.StoreID]; !overloaded { + if _, overloaded := d.ioOverloadMap[replDesc.StoreID]; !overloaded || replDesc.ReplicaID == d.self { continue } // There's at least one overloaded follower, so initialize @@ -252,6 +252,7 @@ func (r *Replica) updatePausedFollowersLocked( seed := int64(r.RangeID) now := r.store.Clock().Now().GoTime() d := computeExpendableOverloadedFollowersInput{ + self: r.replicaID, replDescs: r.descRLocked().Replicas(), ioOverloadMap: ioOverloadMap, getProgressMap: func(_ context.Context) map[uint64]tracker.Progress { diff --git a/pkg/kv/kvserver/replica_raft_overload_test.go b/pkg/kv/kvserver/replica_raft_overload_test.go index bc11a5b69a13..842d08cd268e 100644 --- a/pkg/kv/kvserver/replica_raft_overload_test.go +++ b/pkg/kv/kvserver/replica_raft_overload_test.go @@ -46,6 +46,7 @@ func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T) require.Equal(t, "run", d.Cmd) var seed uint64 var replDescs roachpb.ReplicaSet + var self roachpb.ReplicaID ioOverloadMap := map[roachpb.StoreID]*admissionpb.IOThreshold{} snapshotMap := map[roachpb.ReplicaID]struct{}{} downMap := map[roachpb.ReplicaID]struct{}{} @@ -65,6 +66,8 @@ func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T) switch arg.Key { case "min-live-match-index": minLiveMatchIndex = id + case "self": + self = roachpb.ReplicaID(id) case "voters", "learners": replicaID := roachpb.ReplicaID(id) if matchS != "" { @@ -134,6 +137,7 @@ func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T) } m, _ := computeExpendableOverloadedFollowers(ctx, computeExpendableOverloadedFollowersInput{ + self: self, replDescs: replDescs, ioOverloadMap: ioOverloadMap, getProgressMap: getProgressMap, diff --git a/pkg/kv/kvserver/testdata/replica_raft_overload/self.txt b/pkg/kv/kvserver/testdata/replica_raft_overload/self.txt new file mode 100644 index 000000000000..c73da0d7eadf --- /dev/null +++ b/pkg/kv/kvserver/testdata/replica_raft_overload/self.txt @@ -0,0 +1,4 @@ +# Won't consider itself for pausing. +run voters=(1,2,3) overloaded=(1) self=1 +---- +[] From 7f713fb31c445262b7be4adc4f82ea643f7792a5 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 30 Dec 2021 16:41:09 -0500 Subject: [PATCH 6/7] kv: pool rangeCacheKey objects MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit introduces a `sync.Pool` for `rangeCacheKey` objects. This is used to avoid heap allocations when querying and updating the `RangeCache`. ``` name old time/op new time/op delta KV/Scan/Native/rows=1-10 14.8µs ± 2% 14.9µs ± 4% ~ (p=0.356 n=9+10) KV/Scan/SQL/rows=1-10 92.1µs ± 4% 94.3µs ± 5% ~ (p=0.113 n=9+10) name old alloc/op new alloc/op delta KV/Scan/Native/rows=1-10 6.87kB ± 0% 6.85kB ± 0% -0.35% (p=0.000 n=10+10) KV/Scan/SQL/rows=1-10 20.0kB ± 0% 20.0kB ± 0% -0.25% (p=0.012 n=10+10) name old allocs/op new allocs/op delta KV/Scan/Native/rows=1-10 52.0 ± 0% 51.0 ± 0% -1.92% (p=0.000 n=10+10) KV/Scan/SQL/rows=1-10 244 ± 0% 242 ± 0% -0.78% (p=0.000 n=10+10) ``` --- pkg/kv/kvclient/rangecache/range_cache.go | 70 ++++++++++++++----- .../kvclient/rangecache/range_cache_test.go | 12 ++-- 2 files changed, 58 insertions(+), 24 deletions(-) diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index 47baa4b140f0..866a3342fa22 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -16,6 +16,7 @@ import ( "fmt" "strconv" "strings" + "sync" "time" "github.com/biogo/store/llrb" @@ -41,16 +42,34 @@ import ( // RangeCache. type rangeCacheKey roachpb.RKey -var minCacheKey interface{} = rangeCacheKey(roachpb.RKeyMin) +var minCacheKey = newRangeCacheKey(roachpb.RKeyMin) -func (a rangeCacheKey) String() string { - return roachpb.Key(a).String() +var rangeCacheKeyPool = sync.Pool{ + New: func() interface{} { return &rangeCacheKey{} }, } -// Compare implements the llrb.Comparable interface for rangeCacheKey, so that +// newRangeCacheKey allocates a new rangeCacheKey using the supplied key. The +// objects escape to the heap because they are passed through an interface{} +// when handed to an OrderedCache, so the sync.Pool avoids a heap allocation. +func newRangeCacheKey(key roachpb.RKey) *rangeCacheKey { + k := rangeCacheKeyPool.Get().(*rangeCacheKey) + *k = rangeCacheKey(key) + return k +} + +func (k *rangeCacheKey) release() { + *k = rangeCacheKey{} + rangeCacheKeyPool.Put(k) +} + +func (k *rangeCacheKey) String() string { + return roachpb.Key(*k).String() +} + +// Compare implements the llrb.Comparable interface for *rangeCacheKey, so that // it can be used as a key for util.OrderedCache. -func (a rangeCacheKey) Compare(b llrb.Comparable) int { - return bytes.Compare(a, b.(rangeCacheKey)) +func (k *rangeCacheKey) Compare(o llrb.Comparable) int { + return bytes.Compare(*k, *o.(*rangeCacheKey)) } // RangeDescriptorDB is a type which can query range descriptors from an @@ -201,7 +220,7 @@ func (rc *RangeCache) String() string { func (rc *RangeCache) stringLocked() string { var buf strings.Builder rc.rangeCache.cache.Do(func(k, v interface{}) bool { - fmt.Fprintf(&buf, "key=%s desc=%+v\n", roachpb.Key(k.(rangeCacheKey)), v) + fmt.Fprintf(&buf, "key=%s desc=%+v\n", roachpb.Key(*k.(*rangeCacheKey)), v) return false }) return buf.String() @@ -566,6 +585,8 @@ func (rc *RangeCache) GetCachedOverlapping(ctx context.Context, span roachpb.RSp func (rc *RangeCache) getCachedOverlappingRLocked( ctx context.Context, span roachpb.RSpan, ) []*cache.Entry { + from := newRangeCacheKey(span.EndKey) + defer from.release() var res []*cache.Entry rc.rangeCache.cache.DoRangeReverseEntry(func(e *cache.Entry) (exit bool) { desc := rc.getValue(e).Desc() @@ -579,7 +600,7 @@ func (rc *RangeCache) getCachedOverlappingRLocked( } res = append(res, e) return false // continue iterating - }, rangeCacheKey(span.EndKey), minCacheKey) + }, from, minCacheKey) // Invert the results so the get sorted ascendingly. for i, j := 0, len(res)-1; i < j; i, j = i+1, j-1 { res[i], res[j] = res[j], res[i] @@ -847,7 +868,7 @@ func (rc *RangeCache) EvictByKey(ctx context.Context, descKey roachpb.RKey) bool return false } log.VEventf(ctx, 2, "evict cached descriptor: %s", cachedDesc) - rc.rangeCache.cache.DelEntry(entry) + rc.delEntryLocked(entry) return true } @@ -868,7 +889,7 @@ func (rc *RangeCache) evictDescLocked(ctx context.Context, desc *roachpb.RangeDe // equal because the desc that the caller supplied also came from the cache // and the cache is not expected to go backwards). Evict it. log.VEventf(ctx, 2, "evict cached descriptor: desc=%s", cachedEntry) - rc.rangeCache.cache.DelEntry(rawEntry) + rc.delEntryLocked(rawEntry) return true } @@ -897,14 +918,18 @@ func (rc *RangeCache) getCachedRLocked( // key, in the direction indicated by inverted. var rawEntry *cache.Entry if !inverted { + k := newRangeCacheKey(key) + defer k.release() var ok bool - rawEntry, ok = rc.rangeCache.cache.FloorEntry(rangeCacheKey(key)) + rawEntry, ok = rc.rangeCache.cache.FloorEntry(k) if !ok { return nil, nil } } else { + from := newRangeCacheKey(key) + defer from.release() rc.rangeCache.cache.DoRangeReverseEntry(func(e *cache.Entry) bool { - startKey := roachpb.RKey(e.Key.(rangeCacheKey)) + startKey := roachpb.RKey(*e.Key.(*rangeCacheKey)) if key.Equal(startKey) { // DoRangeReverseEntry is inclusive on the higher key. We're iterating // backwards and we got a range that starts at key. We're not interested @@ -914,7 +939,7 @@ func (rc *RangeCache) getCachedRLocked( } rawEntry = e return true - }, rangeCacheKey(key), minCacheKey) + }, from, minCacheKey) // DoRangeReverseEntry is exclusive on the "to" part, so we need to check // manually if there's an entry for RKeyMin. if rawEntry == nil { @@ -998,11 +1023,10 @@ func (rc *RangeCache) insertLockedInner(ctx context.Context, rs []*CacheEntry) [ entries[i] = newerEntry continue } - rangeKey := ent.Desc().StartKey if log.V(2) { log.Infof(ctx, "adding cache entry: value=%s", ent) } - rc.rangeCache.cache.Add(rangeCacheKey(rangeKey), ent) + rc.addEntryLocked(ent) entries[i] = ent } return entries @@ -1043,7 +1067,7 @@ func (rc *RangeCache) clearOlderOverlappingLocked( if log.V(2) { log.Infof(ctx, "clearing overlapping descriptor: key=%s entry=%s", e.Key, rc.getValue(e)) } - rc.rangeCache.cache.DelEntry(e) + rc.delEntryLocked(e) } else { newest = false if descsCompatible(entry.Desc(), newEntry.Desc()) { @@ -1074,13 +1098,23 @@ func (rc *RangeCache) swapEntryLocked( } } - rc.rangeCache.cache.DelEntry(oldEntry) + rc.delEntryLocked(oldEntry) if newEntry != nil { log.VEventf(ctx, 2, "caching new entry: %s", newEntry) - rc.rangeCache.cache.Add(oldEntry.Key, newEntry) + rc.addEntryLocked(newEntry) } } +func (rc *RangeCache) addEntryLocked(entry *CacheEntry) { + key := newRangeCacheKey(entry.Desc().StartKey) + rc.rangeCache.cache.Add(key, entry) +} + +func (rc *RangeCache) delEntryLocked(entry *cache.Entry) { + rc.rangeCache.cache.DelEntry(entry) + entry.Key.(*rangeCacheKey).release() +} + // DB returns the descriptor database, for tests. func (rc *RangeCache) DB() RangeDescriptorDB { return rc.db diff --git a/pkg/kv/kvclient/rangecache/range_cache_test.go b/pkg/kv/kvclient/rangecache/range_cache_test.go index df909b2d92a3..1bd71eec9fed 100644 --- a/pkg/kv/kvclient/rangecache/range_cache_test.go +++ b/pkg/kv/kvclient/rangecache/range_cache_test.go @@ -1011,7 +1011,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, tr) - cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKeyMax)), &CacheEntry{desc: *defDesc}) + cache.addEntryLocked(&CacheEntry{desc: *defDesc}) // Now, add a new, overlapping set of descriptors. minToBDesc := &roachpb.RangeDescriptor{ @@ -1026,13 +1026,13 @@ func TestRangeCacheClearOverlapping(t *testing.T) { } curGeneration := roachpb.RangeGeneration(1) require.True(t, clearOlderOverlapping(ctx, cache, minToBDesc)) - cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKey("b"))), &CacheEntry{desc: *minToBDesc}) + cache.addEntryLocked(&CacheEntry{desc: *minToBDesc}) if desc := cache.GetCached(ctx, roachpb.RKey("b"), false); desc != nil { t.Errorf("descriptor unexpectedly non-nil: %s", desc) } require.True(t, clearOlderOverlapping(ctx, cache, bToMaxDesc)) - cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKeyMax)), &CacheEntry{desc: *bToMaxDesc}) + cache.addEntryLocked(&CacheEntry{desc: *bToMaxDesc}) ri := cache.GetCached(ctx, roachpb.RKey("b"), false) require.Equal(t, bToMaxDesc, ri.Desc()) @@ -1041,7 +1041,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) { curGeneration++ defDescCpy.Generation = curGeneration require.True(t, clearOlderOverlapping(ctx, cache, &defDescCpy)) - cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKeyMax)), &CacheEntry{desc: defDescCpy}) + cache.addEntryLocked(&CacheEntry{desc: defDescCpy}) for _, key := range []roachpb.RKey{roachpb.RKey("a"), roachpb.RKey("b")} { ri = cache.GetCached(ctx, key, false) require.Equal(t, &defDescCpy, ri.Desc()) @@ -1055,7 +1055,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) { Generation: curGeneration, } require.True(t, clearOlderOverlapping(ctx, cache, bToCDesc)) - cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKey("c"))), &CacheEntry{desc: *bToCDesc}) + cache.addEntryLocked(&CacheEntry{desc: *bToCDesc}) ri = cache.GetCached(ctx, roachpb.RKey("c"), true) require.Equal(t, bToCDesc, ri.Desc()) @@ -1066,7 +1066,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) { Generation: curGeneration, } require.True(t, clearOlderOverlapping(ctx, cache, aToBDesc)) - cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKey("b"))), ri) + cache.addEntryLocked(ri) ri = cache.GetCached(ctx, roachpb.RKey("c"), true) require.Equal(t, bToCDesc, ri.Desc()) } From dfc612342c0c815287ceac34e6fc4e43a152caef Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Mon, 8 Aug 2022 13:16:52 -0400 Subject: [PATCH 7/7] builtins: add strptime/strftime builtins without experimental prefix These are just an alias for the existing implementation. Release note (sql change): The strptime and strftime builtin functions were added as aliases for experimental_strptime and experimental_strftime. --- docs/generated/sql/functions.md | 8 + .../logictest/testdata/logic_test/pg_catalog | 2 +- pkg/sql/sem/builtins/builtins.go | 164 ++++++++++-------- 3 files changed, 96 insertions(+), 78 deletions(-) diff --git a/docs/generated/sql/functions.md b/docs/generated/sql/functions.md index 071d5890751a..3fd58113c9ab 100644 --- a/docs/generated/sql/functions.md +++ b/docs/generated/sql/functions.md @@ -637,6 +637,14 @@ has no relationship with the commit order of concurrent transactions.

Stable statement_timestamp() → timestamptz

Returns the start time of the current statement.

Stable +strftime(input: date, extract_format: string) → string

From input, extracts and formats the time as identified in extract_format using standard strftime notation (though not all formatting is supported).

+
Immutable +strftime(input: timestamp, extract_format: string) → string

From input, extracts and formats the time as identified in extract_format using standard strftime notation (though not all formatting is supported).

+
Immutable +strftime(input: timestamptz, extract_format: string) → string

From input, extracts and formats the time as identified in extract_format using standard strftime notation (though not all formatting is supported).

+
Immutable +strptime(input: string, format: string) → timestamptz

Returns input as a timestamptz using format (which uses standard strptime formatting).

+
Immutable timeofday() → string

Returns the current system time on one of the cluster nodes as a string.

Stable timezone(timezone: string, time: time) → timetz

Treat given time without time zone as located in the specified time zone.

diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 9e335baf9ba0..35b295cefc8a 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -4640,7 +4640,7 @@ FROM pg_proc p JOIN pg_type t ON t.typinput = p.oid WHERE t.typname = '_int4' ---- -2006 array_in array_in +2010 array_in array_in ## #16285 ## int2vectors should be 0-indexed diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index 119092a20346..03e6781a432d 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -2403,84 +2403,11 @@ var regularBuiltins = map[string]builtinDefinition{ // Timestamp/Date functions. - "experimental_strftime": makeBuiltin( - tree.FunctionProperties{ - Category: builtinconstants.CategoryDateAndTime, - }, - tree.Overload{ - Types: tree.ArgTypes{{"input", types.Timestamp}, {"extract_format", types.String}}, - ReturnType: tree.FixedReturnType(types.String), - Fn: func(_ *eval.Context, args tree.Datums) (tree.Datum, error) { - fromTime := args[0].(*tree.DTimestamp).Time - format := string(tree.MustBeDString(args[1])) - t, err := strtime.Strftime(fromTime, format) - if err != nil { - return nil, err - } - return tree.NewDString(t), nil - }, - Info: "From `input`, extracts and formats the time as identified in `extract_format` " + - "using standard `strftime` notation (though not all formatting is supported).", - Volatility: volatility.Immutable, - }, - tree.Overload{ - Types: tree.ArgTypes{{"input", types.Date}, {"extract_format", types.String}}, - ReturnType: tree.FixedReturnType(types.String), - Fn: func(_ *eval.Context, args tree.Datums) (tree.Datum, error) { - fromTime, err := args[0].(*tree.DDate).ToTime() - if err != nil { - return nil, err - } - format := string(tree.MustBeDString(args[1])) - t, err := strtime.Strftime(fromTime, format) - if err != nil { - return nil, err - } - return tree.NewDString(t), nil - }, - Info: "From `input`, extracts and formats the time as identified in `extract_format` " + - "using standard `strftime` notation (though not all formatting is supported).", - Volatility: volatility.Immutable, - }, - tree.Overload{ - Types: tree.ArgTypes{{"input", types.TimestampTZ}, {"extract_format", types.String}}, - ReturnType: tree.FixedReturnType(types.String), - Fn: func(_ *eval.Context, args tree.Datums) (tree.Datum, error) { - fromTime := args[0].(*tree.DTimestampTZ).Time - format := string(tree.MustBeDString(args[1])) - t, err := strtime.Strftime(fromTime, format) - if err != nil { - return nil, err - } - return tree.NewDString(t), nil - }, - Info: "From `input`, extracts and formats the time as identified in `extract_format` " + - "using standard `strftime` notation (though not all formatting is supported).", - Volatility: volatility.Immutable, - }, - ), + "strftime": strftimeImpl(), + "experimental_strftime": strftimeImpl(), - "experimental_strptime": makeBuiltin( - tree.FunctionProperties{ - Category: builtinconstants.CategoryDateAndTime, - }, - tree.Overload{ - Types: tree.ArgTypes{{"input", types.String}, {"format", types.String}}, - ReturnType: tree.FixedReturnType(types.TimestampTZ), - Fn: func(_ *eval.Context, args tree.Datums) (tree.Datum, error) { - toParse := string(tree.MustBeDString(args[0])) - format := string(tree.MustBeDString(args[1])) - t, err := strtime.Strptime(toParse, format) - if err != nil { - return nil, err - } - return tree.MakeDTimestampTZ(t.UTC(), time.Microsecond) - }, - Info: "Returns `input` as a timestamptz using `format` (which uses standard " + - "`strptime` formatting).", - Volatility: volatility.Immutable, - }, - ), + "strptime": strptimeImpl(), + "experimental_strptime": strptimeImpl(), "to_char": makeBuiltin( defProps(), @@ -7568,6 +7495,89 @@ func generateRandomUUID4Impl() builtinDefinition { ) } +func strftimeImpl() builtinDefinition { + return makeBuiltin( + tree.FunctionProperties{ + Category: builtinconstants.CategoryDateAndTime, + }, + tree.Overload{ + Types: tree.ArgTypes{{"input", types.Timestamp}, {"extract_format", types.String}}, + ReturnType: tree.FixedReturnType(types.String), + Fn: func(_ *eval.Context, args tree.Datums) (tree.Datum, error) { + fromTime := args[0].(*tree.DTimestamp).Time + format := string(tree.MustBeDString(args[1])) + t, err := strtime.Strftime(fromTime, format) + if err != nil { + return nil, err + } + return tree.NewDString(t), nil + }, + Info: "From `input`, extracts and formats the time as identified in `extract_format` " + + "using standard `strftime` notation (though not all formatting is supported).", + Volatility: volatility.Immutable, + }, + tree.Overload{ + Types: tree.ArgTypes{{"input", types.Date}, {"extract_format", types.String}}, + ReturnType: tree.FixedReturnType(types.String), + Fn: func(_ *eval.Context, args tree.Datums) (tree.Datum, error) { + fromTime, err := args[0].(*tree.DDate).ToTime() + if err != nil { + return nil, err + } + format := string(tree.MustBeDString(args[1])) + t, err := strtime.Strftime(fromTime, format) + if err != nil { + return nil, err + } + return tree.NewDString(t), nil + }, + Info: "From `input`, extracts and formats the time as identified in `extract_format` " + + "using standard `strftime` notation (though not all formatting is supported).", + Volatility: volatility.Immutable, + }, + tree.Overload{ + Types: tree.ArgTypes{{"input", types.TimestampTZ}, {"extract_format", types.String}}, + ReturnType: tree.FixedReturnType(types.String), + Fn: func(_ *eval.Context, args tree.Datums) (tree.Datum, error) { + fromTime := args[0].(*tree.DTimestampTZ).Time + format := string(tree.MustBeDString(args[1])) + t, err := strtime.Strftime(fromTime, format) + if err != nil { + return nil, err + } + return tree.NewDString(t), nil + }, + Info: "From `input`, extracts and formats the time as identified in `extract_format` " + + "using standard `strftime` notation (though not all formatting is supported).", + Volatility: volatility.Immutable, + }, + ) +} + +func strptimeImpl() builtinDefinition { + return makeBuiltin( + tree.FunctionProperties{ + Category: builtinconstants.CategoryDateAndTime, + }, + tree.Overload{ + Types: tree.ArgTypes{{"input", types.String}, {"format", types.String}}, + ReturnType: tree.FixedReturnType(types.TimestampTZ), + Fn: func(_ *eval.Context, args tree.Datums) (tree.Datum, error) { + toParse := string(tree.MustBeDString(args[0])) + format := string(tree.MustBeDString(args[1])) + t, err := strtime.Strptime(toParse, format) + if err != nil { + return nil, err + } + return tree.MakeDTimestampTZ(t.UTC(), time.Microsecond) + }, + Info: "Returns `input` as a timestamptz using `format` (which uses standard " + + "`strptime` formatting).", + Volatility: volatility.Immutable, + }, + ) +} + func uuidV4Impl() builtinDefinition { return makeBuiltin( tree.FunctionProperties{