Skip to content

Commit

Permalink
feat: improve client side caching with pttl == -2 by MULTI EXEC
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed Jan 22, 2022
1 parent 6cd2aab commit e57c785
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 30 deletions.
6 changes: 6 additions & 0 deletions internal/cmds/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ var (
cs: &CommandSlice{s: []string{"CLIENT", "CACHING", "YES"}},
cf: optInTag,
}
MultiCmd = Completed{
cs: &CommandSlice{s: []string{"MULTI"}},
}
ExecCmd = Completed{
cs: &CommandSlice{s: []string{"EXEC"}},
}
QuitCmd = Completed{
cs: &CommandSlice{s: []string{"QUIT"}},
}
Expand Down
15 changes: 15 additions & 0 deletions internal/cmds/cmds_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package cmds

import "testing"

func TestCacheable_CacheKey(t *testing.T) {
key, cmd := (&Cacheable{cs: &CommandSlice{s: []string{"GET", "A"}}}).CacheKey()
if key != "A" || cmd != "GET" {
t.Fatalf("unexpected ret %v %v", key, cmd)
}

key, cmd = (&Cacheable{cs: &CommandSlice{s: []string{"HMGET", "A", "B", "C"}}}).CacheKey()
if key != "A" || cmd != "HMGETBC" {
t.Fatalf("unexpected ret %v %v", key, cmd)
}
}
11 changes: 3 additions & 8 deletions lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,9 @@ func (c *lru) Update(key, cmd string, value RedisMessage, pttl int64) {
ele = ele.Next()
}
}
if pttl == -2 {
store.ttl = time.Time{}
} else {
if pttl != -1 {
ttl := time.Now().Add(time.Duration(pttl) * time.Millisecond)
if ttl.Before(store.ttl) {
store.ttl = ttl
}
if pttl >= 0 {
if ttl := time.Now().Add(time.Duration(pttl) * time.Millisecond); ttl.Before(store.ttl) {
store.ttl = ttl
}
}
}
Expand Down
13 changes: 9 additions & 4 deletions lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@ func TestLRU(t *testing.T) {
}
})

t.Run("Cache Expire By PTTL -2", func(t *testing.T) {
t.Run("Cache Should Not Expire By PTTL -2", func(t *testing.T) {
lru := setup(t)
lru.Update("0", "GET", RedisMessage{typ: '+', string: "0"}, -2)
if v, _ := lru.GetOrPrepare("1", "GET", TTL); v.typ != 0 {
t.Fatalf("got unexpected value from the first GetOrPrepare: %v", v)
if v, entry := lru.GetOrPrepare("1", "GET", TTL); v.typ != 0 || entry != nil {
t.Fatalf("got unexpected value from the GetOrPrepare after pttl: %v %v", v, entry)
}
lru.Update("1", "GET", RedisMessage{typ: '+', string: "1"}, -2)
if v, _ := lru.GetOrPrepare("1", "GET", TTL); v.typ == 0 {
t.Fatalf("did not get the value from the second GetOrPrepare")
} else if v.string != "1" {
t.Fatalf("got unexpected value from the second GetOrPrepare: %v", v)
}
})

Expand Down
27 changes: 17 additions & 10 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ func (p *pipe) _backgroundRead() {
var (
err error
msg RedisMessage
tmp RedisMessage
ones = make([]cmds.Completed, 1)
multi []cmds.Completed
ch chan RedisResult
Expand All @@ -226,15 +225,13 @@ func (p *pipe) _backgroundRead() {
continue
}
// if unfulfilled multi commands are lead by opt-in and get success response
if ff != len(multi) && len(multi) == 3 && multi[0].IsOptIn() {
if ff == 1 {
tmp = msg
} else if ff == 2 {
cacheable := cmds.Cacheable(multi[ff-1])
if ff != len(multi) && len(multi) == 5 && multi[0].IsOptIn() {
if ff == 4 {
cacheable := cmds.Cacheable(multi[3])
ck, cc := cacheable.CacheKey()
tmp.attrs = cacheMark
p.cache.Update(ck, cc, tmp, msg.integer)
tmp = RedisMessage{}
cp := msg.values[1]
cp.attrs = cacheMark
p.cache.Update(ck, cc, cp, msg.values[0].integer)
}
}
nextCMD:
Expand Down Expand Up @@ -437,7 +434,17 @@ func (p *pipe) DoCache(cmd cmds.Cacheable, ttl time.Duration) RedisResult {
} else if entry != nil {
return newResult(entry.Wait(), nil)
}
return p.DoMulti(cmds.OptInCmd, cmds.Completed(cmd), cmds.NewCompleted([]string{"PTTL", ck}))[1]
exec, err := p.DoMulti(
cmds.OptInCmd,
cmds.MultiCmd,
cmds.NewCompleted([]string{"PTTL", ck}),
cmds.Completed(cmd),
cmds.ExecCmd,
)[4].ToArray()
if err != nil {
return newErrResult(err)
}
return newResult(exec[1], nil)
}

func (p *pipe) Error() error {
Expand Down
26 changes: 20 additions & 6 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,18 @@ func TestClientSideCaching(t *testing.T) {

go func() {
mock.Expect("CLIENT", "CACHING", "YES").
Expect("GET", "a").
Expect("MULTI").
Expect("PTTL", "a").
Expect("GET", "a").
Expect("EXEC").
ReplyString("OK").
ReplyString("OK").
ReplyString("1").
ReplyInteger(-1)
ReplyString("OK").
ReplyString("OK").
Reply(RedisMessage{typ: '*', values: []RedisMessage{
{typ: ':', integer: -1},
{typ: '+', string: "1"},
}})
}()

// single flight
Expand Down Expand Up @@ -371,11 +378,18 @@ func TestClientSideCaching(t *testing.T) {
})
go func() {
mock.Expect("CLIENT", "CACHING", "YES").
Expect("GET", "a").
Expect("MULTI").
Expect("PTTL", "a").
Expect("GET", "a").
Expect("EXEC").
ReplyString("OK").
ReplyString("OK").
ReplyString("2").
ReplyInteger(-1)
ReplyString("OK").
ReplyString("OK").
Reply(RedisMessage{typ: '*', values: []RedisMessage{
{typ: ':', integer: -1},
{typ: '+', string: "2"},
}})
}()

for {
Expand Down
15 changes: 13 additions & 2 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -64,16 +65,26 @@ func testSETGET(t *testing.T, client Client) {

t.Logf("testing client side caching with %d interations and %d parallelism\n", keys*5, para)
jobs, wait = parallel(para)
for i := 0; i < keys*5; i++ {
hits, miss := int64(0), int64(0)
for i := 0; i < keys*10; i++ {
key := strconv.Itoa(rand.Intn(keys / 100))
jobs <- func() {
val, err := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute).ToString()
resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute)
val, err := resp.ToString()
if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) {
t.Errorf("unexpected set response %v %v %v", val, err, ok)
}
if resp.IsCacheHit() {
atomic.AddInt64(&hits, 1)
} else {
atomic.AddInt64(&miss, 1)
}
}
}
wait()
if atomic.LoadInt64(&miss) != 100 || atomic.LoadInt64(&hits) != int64(keys*10-100) {
t.Fatalf("unexpected client side caching hits and miss %v %v", atomic.LoadInt64(&hits), atomic.LoadInt64(&miss))
}

t.Logf("testing DEL caching with %d keys and %d parallelism\n", keys*2, para)
jobs, wait = parallel(para)
Expand Down

0 comments on commit e57c785

Please sign in to comment.