From e57c78503fbe4a4bf065956f4208a1353f62c68e Mon Sep 17 00:00:00 2001 From: Rueian Date: Sat, 22 Jan 2022 21:55:12 +0800 Subject: [PATCH] feat: improve client side caching with pttl == -2 by MULTI EXEC --- internal/cmds/cmds.go | 6 ++++++ internal/cmds/cmds_test.go | 15 +++++++++++++++ lru.go | 11 +++-------- lru_test.go | 13 +++++++++---- pipe.go | 27 +++++++++++++++++---------- pipe_test.go | 26 ++++++++++++++++++++------ redis_test.go | 15 +++++++++++++-- 7 files changed, 83 insertions(+), 30 deletions(-) create mode 100644 internal/cmds/cmds_test.go diff --git a/internal/cmds/cmds.go b/internal/cmds/cmds.go index 5660a082..d397970b 100644 --- a/internal/cmds/cmds.go +++ b/internal/cmds/cmds.go @@ -16,6 +16,12 @@ var ( cs: &CommandSlice{s: []string{"CLIENT", "CACHING", "YES"}}, cf: optInTag, } + MultiCmd = Completed{ + cs: &CommandSlice{s: []string{"MULTI"}}, + } + ExecCmd = Completed{ + cs: &CommandSlice{s: []string{"EXEC"}}, + } QuitCmd = Completed{ cs: &CommandSlice{s: []string{"QUIT"}}, } diff --git a/internal/cmds/cmds_test.go b/internal/cmds/cmds_test.go new file mode 100644 index 00000000..17f6e845 --- /dev/null +++ b/internal/cmds/cmds_test.go @@ -0,0 +1,15 @@ +package cmds + +import "testing" + +func TestCacheable_CacheKey(t *testing.T) { + key, cmd := (&Cacheable{cs: &CommandSlice{s: []string{"GET", "A"}}}).CacheKey() + if key != "A" || cmd != "GET" { + t.Fatalf("unexpected ret %v %v", key, cmd) + } + + key, cmd = (&Cacheable{cs: &CommandSlice{s: []string{"HMGET", "A", "B", "C"}}}).CacheKey() + if key != "A" || cmd != "HMGETBC" { + t.Fatalf("unexpected ret %v %v", key, cmd) + } +} diff --git a/lru.go b/lru.go index 7c66465d..cc5a72bc 100644 --- a/lru.go +++ b/lru.go @@ -150,14 +150,9 @@ func (c *lru) Update(key, cmd string, value RedisMessage, pttl int64) { ele = ele.Next() } } - if pttl == -2 { - store.ttl = time.Time{} - } else { - if pttl != -1 { - ttl := time.Now().Add(time.Duration(pttl) * time.Millisecond) - if ttl.Before(store.ttl) { - store.ttl = ttl - } + if pttl >= 0 { + if ttl := time.Now().Add(time.Duration(pttl) * time.Millisecond); ttl.Before(store.ttl) { + store.ttl = ttl } } } diff --git a/lru_test.go b/lru_test.go index 8e90ec3e..91d8df86 100644 --- a/lru_test.go +++ b/lru_test.go @@ -36,11 +36,16 @@ func TestLRU(t *testing.T) { } }) - t.Run("Cache Expire By PTTL -2", func(t *testing.T) { + t.Run("Cache Should Not Expire By PTTL -2", func(t *testing.T) { lru := setup(t) - lru.Update("0", "GET", RedisMessage{typ: '+', string: "0"}, -2) - if v, _ := lru.GetOrPrepare("1", "GET", TTL); v.typ != 0 { - t.Fatalf("got unexpected value from the first GetOrPrepare: %v", v) + if v, entry := lru.GetOrPrepare("1", "GET", TTL); v.typ != 0 || entry != nil { + t.Fatalf("got unexpected value from the GetOrPrepare after pttl: %v %v", v, entry) + } + lru.Update("1", "GET", RedisMessage{typ: '+', string: "1"}, -2) + if v, _ := lru.GetOrPrepare("1", "GET", TTL); v.typ == 0 { + t.Fatalf("did not get the value from the second GetOrPrepare") + } else if v.string != "1" { + t.Fatalf("got unexpected value from the second GetOrPrepare: %v", v) } }) diff --git a/pipe.go b/pipe.go index 30cfedaf..82391218 100644 --- a/pipe.go +++ b/pipe.go @@ -209,7 +209,6 @@ func (p *pipe) _backgroundRead() { var ( err error msg RedisMessage - tmp RedisMessage ones = make([]cmds.Completed, 1) multi []cmds.Completed ch chan RedisResult @@ -226,15 +225,13 @@ func (p *pipe) _backgroundRead() { continue } // if unfulfilled multi commands are lead by opt-in and get success response - if ff != len(multi) && len(multi) == 3 && multi[0].IsOptIn() { - if ff == 1 { - tmp = msg - } else if ff == 2 { - cacheable := cmds.Cacheable(multi[ff-1]) + if ff != len(multi) && len(multi) == 5 && multi[0].IsOptIn() { + if ff == 4 { + cacheable := cmds.Cacheable(multi[3]) ck, cc := cacheable.CacheKey() - tmp.attrs = cacheMark - p.cache.Update(ck, cc, tmp, msg.integer) - tmp = RedisMessage{} + cp := msg.values[1] + cp.attrs = cacheMark + p.cache.Update(ck, cc, cp, msg.values[0].integer) } } nextCMD: @@ -437,7 +434,17 @@ func (p *pipe) DoCache(cmd cmds.Cacheable, ttl time.Duration) RedisResult { } else if entry != nil { return newResult(entry.Wait(), nil) } - return p.DoMulti(cmds.OptInCmd, cmds.Completed(cmd), cmds.NewCompleted([]string{"PTTL", ck}))[1] + exec, err := p.DoMulti( + cmds.OptInCmd, + cmds.MultiCmd, + cmds.NewCompleted([]string{"PTTL", ck}), + cmds.Completed(cmd), + cmds.ExecCmd, + )[4].ToArray() + if err != nil { + return newErrResult(err) + } + return newResult(exec[1], nil) } func (p *pipe) Error() error { diff --git a/pipe_test.go b/pipe_test.go index 33b4bbe8..e760f2a2 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -325,11 +325,18 @@ func TestClientSideCaching(t *testing.T) { go func() { mock.Expect("CLIENT", "CACHING", "YES"). - Expect("GET", "a"). + Expect("MULTI"). Expect("PTTL", "a"). + Expect("GET", "a"). + Expect("EXEC"). + ReplyString("OK"). ReplyString("OK"). - ReplyString("1"). - ReplyInteger(-1) + ReplyString("OK"). + ReplyString("OK"). + Reply(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: ':', integer: -1}, + {typ: '+', string: "1"}, + }}) }() // single flight @@ -371,11 +378,18 @@ func TestClientSideCaching(t *testing.T) { }) go func() { mock.Expect("CLIENT", "CACHING", "YES"). - Expect("GET", "a"). + Expect("MULTI"). Expect("PTTL", "a"). + Expect("GET", "a"). + Expect("EXEC"). + ReplyString("OK"). ReplyString("OK"). - ReplyString("2"). - ReplyInteger(-1) + ReplyString("OK"). + ReplyString("OK"). + Reply(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: ':', integer: -1}, + {typ: '+', string: "2"}, + }}) }() for { diff --git a/redis_test.go b/redis_test.go index a13f10e8..7bff9942 100644 --- a/redis_test.go +++ b/redis_test.go @@ -5,6 +5,7 @@ import ( "math/rand" "strconv" "sync" + "sync/atomic" "testing" "time" ) @@ -64,16 +65,26 @@ func testSETGET(t *testing.T, client Client) { t.Logf("testing client side caching with %d interations and %d parallelism\n", keys*5, para) jobs, wait = parallel(para) - for i := 0; i < keys*5; i++ { + hits, miss := int64(0), int64(0) + for i := 0; i < keys*10; i++ { key := strconv.Itoa(rand.Intn(keys / 100)) jobs <- func() { - val, err := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute).ToString() + resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute) + val, err := resp.ToString() if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) { t.Errorf("unexpected set response %v %v %v", val, err, ok) } + if resp.IsCacheHit() { + atomic.AddInt64(&hits, 1) + } else { + atomic.AddInt64(&miss, 1) + } } } wait() + if atomic.LoadInt64(&miss) != 100 || atomic.LoadInt64(&hits) != int64(keys*10-100) { + t.Fatalf("unexpected client side caching hits and miss %v %v", atomic.LoadInt64(&hits), atomic.LoadInt64(&miss)) + } t.Logf("testing DEL caching with %d keys and %d parallelism\n", keys*2, para) jobs, wait = parallel(para)