Skip to content

Commit

Permalink
Support check regions after rule updated. (#2664) (#2696)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Jul 29, 2020
1 parent 4108b4b commit 4520e3e
Show file tree
Hide file tree
Showing 9 changed files with 719 additions and 38 deletions.
38 changes: 34 additions & 4 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,36 @@ type testRegionCacheSuite struct {
func (s *testRegionCacheSuite) TestExpireRegionCache(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cache := NewTTL(ctx, time.Second, 2*time.Second)
cache := NewIDTTL(ctx, time.Second, 2*time.Second)
// Test Pop
cache.PutWithTTL(9, "9", 5*time.Second)
cache.PutWithTTL(10, "10", 5*time.Second)
c.Assert(cache.Len(), Equals, 2)
k, v, success := cache.pop()
c.Assert(success, Equals, true)
c.Assert(cache.Len(), Equals, 1)
k2, v2, success := cache.pop()
c.Assert(success, Equals, true)
// we can't ensure the order which the key/value pop from cache, so we save into a map
kvMap := map[uint64]string{
9: "9",
10: "10",
}
expV, ok := kvMap[k.(uint64)]
c.Assert(ok, Equals, true)
c.Assert(expV, Equals, v.(string))
expV, ok = kvMap[k2.(uint64)]
c.Assert(ok, Equals, true)
c.Assert(expV, Equals, v2.(string))

cache.PutWithTTL(11, "11", 1*time.Second)
time.Sleep(5 * time.Second)
k, v, success = cache.pop()
c.Assert(success, Equals, false)
c.Assert(k, IsNil)
c.Assert(v, IsNil)

// Test Get
cache.PutWithTTL(1, 1, 1*time.Second)
cache.PutWithTTL(2, "v2", 5*time.Second)
cache.PutWithTTL(3, 3.0, 5*time.Second)
Expand All @@ -53,7 +82,7 @@ func (s *testRegionCacheSuite) TestExpireRegionCache(c *C) {

c.Assert(cache.Len(), Equals, 3)

c.Assert(sortIDs(cache.GetKeys()), DeepEquals, []uint64{1, 2, 3})
c.Assert(sortIDs(cache.GetAllID()), DeepEquals, []uint64{1, 2, 3})

time.Sleep(2 * time.Second)

Expand All @@ -70,7 +99,7 @@ func (s *testRegionCacheSuite) TestExpireRegionCache(c *C) {
c.Assert(value, Equals, 3.0)

c.Assert(cache.Len(), Equals, 2)
c.Assert(sortIDs(cache.GetKeys()), DeepEquals, []uint64{2, 3})
c.Assert(sortIDs(cache.GetAllID()), DeepEquals, []uint64{2, 3})

cache.Remove(2)

Expand All @@ -83,7 +112,7 @@ func (s *testRegionCacheSuite) TestExpireRegionCache(c *C) {
c.Assert(value, Equals, 3.0)

c.Assert(cache.Len(), Equals, 1)
c.Assert(sortIDs(cache.GetKeys()), DeepEquals, []uint64{3})
c.Assert(sortIDs(cache.GetAllID()), DeepEquals, []uint64{3})
}

func sortIDs(ids []uint64) []uint64 {
Expand All @@ -94,6 +123,7 @@ func sortIDs(ids []uint64) []uint64 {

func (s *testRegionCacheSuite) TestLRUCache(c *C) {
cache := newLRU(3)

cache.Put(1, "1")
cache.Put(2, "2")
cache.Put(3, "3")
Expand Down
120 changes: 94 additions & 26 deletions pkg/cache/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@ type ttlCacheItem struct {
expire time.Time
}

// TTL is a cache that assigns TTL(Time-To-Live) for each items.
type TTL struct {
// ttlCache is a cache that assigns TTL(Time-To-Live) for each items.
type ttlCache struct {
sync.RWMutex
ctx context.Context

items map[uint64]ttlCacheItem
items map[interface{}]ttlCacheItem
ttl time.Duration
gcInterval time.Duration
}

// NewTTL returns a new TTL cache.
func NewTTL(ctx context.Context, gcInterval time.Duration, ttl time.Duration) *TTL {
c := &TTL{
func newTTL(ctx context.Context, gcInterval time.Duration, duration time.Duration) *ttlCache {
c := &ttlCache{
ctx: ctx,
items: make(map[uint64]ttlCacheItem),
ttl: ttl,
items: make(map[interface{}]ttlCacheItem),
ttl: duration,
gcInterval: gcInterval,
}

Expand All @@ -51,12 +51,12 @@ func NewTTL(ctx context.Context, gcInterval time.Duration, ttl time.Duration) *T
}

// Put puts an item into cache.
func (c *TTL) Put(key uint64, value interface{}) {
c.PutWithTTL(key, value, c.ttl)
func (c *ttlCache) put(key interface{}, value interface{}) {
c.putWithTTL(key, value, c.ttl)
}

// PutWithTTL puts an item into cache with specified TTL.
func (c *TTL) PutWithTTL(key uint64, value interface{}, ttl time.Duration) {
func (c *ttlCache) putWithTTL(key interface{}, value interface{}, ttl time.Duration) {
c.Lock()
defer c.Unlock()

Expand All @@ -67,7 +67,7 @@ func (c *TTL) PutWithTTL(key uint64, value interface{}, ttl time.Duration) {
}

// Get retrives an item from cache.
func (c *TTL) Get(key uint64) (interface{}, bool) {
func (c *ttlCache) get(key interface{}) (interface{}, bool) {
c.RLock()
defer c.RUnlock()

Expand All @@ -84,11 +84,11 @@ func (c *TTL) Get(key uint64) (interface{}, bool) {
}

// GetKeys returns all keys that are not expired.
func (c *TTL) GetKeys() []uint64 {
func (c *ttlCache) getKeys() []interface{} {
c.RLock()
defer c.RUnlock()

var keys []uint64
var keys []interface{}

now := time.Now()
for key, item := range c.items {
Expand All @@ -100,23 +100,38 @@ func (c *TTL) GetKeys() []uint64 {
}

// Remove eliminates an item from cache.
func (c *TTL) Remove(key uint64) {
func (c *ttlCache) remove(key interface{}) {
c.Lock()
defer c.Unlock()

delete(c.items, key)
}

// pop one key/value that is not expired. If boolean is false, it means that it didn't find the valid one.
func (c *ttlCache) pop() (interface{}, interface{}, bool) {
c.Lock()
defer c.Unlock()
now := time.Now()
for k, item := range c.items {
if item.expire.After(now) {
value := item.value
delete(c.items, k)
return k, value, true
}
}
return nil, nil, false
}

// Len returns current cache size.
func (c *TTL) Len() int {
func (c *ttlCache) Len() int {
c.RLock()
defer c.RUnlock()

return len(c.items)
}

// Clear removes all items in the ttl cache.
func (c *TTL) Clear() {
func (c *ttlCache) Clear() {
c.Lock()
defer c.Unlock()

Expand All @@ -125,7 +140,7 @@ func (c *TTL) Clear() {
}
}

func (c *TTL) doGC() {
func (c *ttlCache) doGC() {
ticker := time.NewTicker(c.gcInterval)
defer ticker.Stop()

Expand Down Expand Up @@ -153,28 +168,81 @@ func (c *TTL) doGC() {

// TTLUint64 is simple TTL saves only uint64s.
type TTLUint64 struct {
*TTL
*ttlCache
}

// NewIDTTL creates a new TTLUint64 cache.
func NewIDTTL(ctx context.Context, gcInterval, ttl time.Duration) *TTLUint64 {
return &TTLUint64{
TTL: NewTTL(ctx, gcInterval, ttl),
ttlCache: newTTL(ctx, gcInterval, ttl),
}
}

// Put saves an ID in cache.
func (c *TTLUint64) Put(id uint64) {
c.TTL.Put(id, nil)
// Get return the value by key id
func (c *TTLUint64) Get(id uint64) (interface{}, bool) {
return c.ttlCache.get(id)
}

// GetAll returns all ids.
func (c *TTLUint64) GetAll() []uint64 {
return c.TTL.GetKeys()
// Put saves an ID in cache.
func (c *TTLUint64) Put(id uint64, value interface{}) {
c.ttlCache.put(id, value)
}

// GetAllID returns all ids.
func (c *TTLUint64) GetAllID() []uint64 {
keys := c.ttlCache.getKeys()
var ids []uint64
for _, key := range keys {
id, ok := key.(uint64)
if ok {
ids = append(ids, id)
}
}
return ids
}

// Exists checks if an ID exists in cache.
func (c *TTLUint64) Exists(id uint64) bool {
_, ok := c.TTL.Get(id)
_, ok := c.ttlCache.get(id)
return ok
}

// Remove remove key
func (c *TTLUint64) Remove(key uint64) {
c.ttlCache.remove(key)
}

// PutWithTTL puts an item into cache with specified TTL.
func (c *TTLUint64) PutWithTTL(key uint64, value interface{}, ttl time.Duration) {
c.ttlCache.putWithTTL(key, value, ttl)
}

// TTLString is simple TTL saves key string and value.
type TTLString struct {
*ttlCache
}

// NewStringTTL creates a new TTLString cache.
func NewStringTTL(ctx context.Context, gcInterval, ttl time.Duration) *TTLString {
return &TTLString{
ttlCache: newTTL(ctx, gcInterval, ttl),
}
}

// Put put the string key with the value
func (c *TTLString) Put(key string, value interface{}) {
c.ttlCache.put(key, value)
}

// Pop one key/value that is not expired
func (c *TTLString) Pop() (string, interface{}, bool) {
k, v, success := c.ttlCache.pop()
if !success {
return "", nil, false
}
key, ok := k.(string)
if !ok {
return "", nil, false
}
return key, v, true
}
24 changes: 24 additions & 0 deletions pkg/keyutil/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package keyutil

import (
"encoding/hex"
"fmt"
)

// BuildKeyRangeKey build key for a keyRange
func BuildKeyRangeKey(startKey, endKey []byte) string {
return fmt.Sprintf("%s-%s", hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}
36 changes: 36 additions & 0 deletions pkg/keyutil/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package keyutil

import (
"testing"

. "github.com/pingcap/check"
)

func Test(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testKeyUtilSuite{})

type testKeyUtilSuite struct {
}

func (s *testKeyUtilSuite) TestKeyUtil(c *C) {
startKey := []byte("a")
endKey := []byte("b")
key := BuildKeyRangeKey(startKey, endKey)
c.Assert(key, Equals, "61-62")
}
26 changes: 24 additions & 2 deletions server/api/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/gorilla/mux"
"github.com/pingcap/pd/v4/pkg/apiutil"
"github.com/pingcap/pd/v4/pkg/codec"
"github.com/pingcap/pd/v4/pkg/keyutil"
"github.com/pingcap/pd/v4/server"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/schedule/placement"
Expand Down Expand Up @@ -179,11 +180,23 @@ func (h *ruleHandler) Set(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
oldRule := cluster.GetRuleManager().GetRule(rule.GroupID, rule.ID)
if err := cluster.GetRuleManager().SetRule(&rule); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
newSuspectKeyRange := [2][]byte{
rule.StartKey,
rule.EndKey,
}
cluster.AddSuspectKeyRange(keyutil.BuildKeyRangeKey(rule.StartKey, rule.EndKey), newSuspectKeyRange)
if oldRule != nil {
cluster.AddSuspectKeyRange(keyutil.BuildKeyRangeKey(oldRule.StartKey, oldRule.EndKey), [2][]byte{
oldRule.StartKey,
oldRule.EndKey,
})
}
h.rd.JSON(w, http.StatusOK, "Update rule successfully.")
}

func (h *ruleHandler) checkRule(r *placement.Rule) error {
Expand Down Expand Up @@ -232,9 +245,18 @@ func (h *ruleHandler) Delete(w http.ResponseWriter, r *http.Request) {
return
}
group, id := mux.Vars(r)["group"], mux.Vars(r)["id"]
rule := cluster.GetRuleManager().GetRule(group, id)
if err := cluster.GetRuleManager().DeleteRule(group, id); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
if rule != nil {
suspectKeyRanges := [2][]byte{
rule.StartKey,
rule.EndKey,
}
cluster.AddSuspectKeyRange(keyutil.BuildKeyRangeKey(rule.StartKey, rule.EndKey), suspectKeyRanges)
}

h.rd.JSON(w, http.StatusOK, "Delete rule successfully.")
}
Loading

0 comments on commit 4520e3e

Please sign in to comment.