Skip to content

Commit

Permalink
add TestConcurrentBufferDuplicateKeys (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
leeym authored Mar 1, 2024
1 parent 9fa00a2 commit 310e1c4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
17 changes: 6 additions & 11 deletions concurrent_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *ConcurrentBufferMemcached) Add(ctx context.Context, element string) (in
var oldNodes []SortedSetNode
_ = gob.NewDecoder(b).Decode(&oldNodes)
for _, node := range oldNodes {
if node.CreatedAt > now.UnixNano() {
if node.CreatedAt > now.UnixNano() && node.Value != element {
newNodes = append(newNodes, node)
}
}
Expand Down Expand Up @@ -239,25 +239,20 @@ func (c *ConcurrentBufferMemcached) Remove(ctx context.Context, key string) erro
var err error
now := c.clock.Now()
var newNodes []SortedSetNode
var casId uint64 = 0
deleted := false
var casID uint64
item, err := c.cli.Get(c.key)
if err != nil {
if errors.Is(err, memcache.ErrCacheMiss) {
return nil
}
return errors.Wrap(err, "failed to Get")
}
casId = item.CasID
casID = item.CasID
var oldNodes []SortedSetNode
_ = gob.NewDecoder(bytes.NewBuffer(item.Value)).Decode(&oldNodes)
for _, node := range oldNodes {
if node.CreatedAt > now.UnixNano() {
if node.Value == key && !deleted {
deleted = true
} else {
newNodes = append(newNodes, node)
}
if node.CreatedAt > now.UnixNano() && node.Value != key {
newNodes = append(newNodes, node)
}
}

Expand All @@ -266,7 +261,7 @@ func (c *ConcurrentBufferMemcached) Remove(ctx context.Context, key string) erro
item = &memcache.Item{
Key: c.key,
Value: b.Bytes(),
CasID: casId,
CasID: casID,
}
err = c.cli.CompareAndSwap(item)
if err != nil && (errors.Is(err, memcache.ErrCASConflict) || errors.Is(err, memcache.ErrNotStored) || errors.Is(err, memcache.ErrCacheMiss)) {
Expand Down
12 changes: 12 additions & 0 deletions concurrent_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,15 @@ func (s *LimitersTestSuite) TestConcurrentBufferExpiredKeys() {
s.NoError(buffer.Limit(context.TODO(), "key3"))
}
}

func (s *LimitersTestSuite) TestConcurrentBufferDuplicateKeys() {
clock := newFakeClock()
capacity := int64(2)
ttl := time.Second
for _, buffer := range s.concurrentBuffers(capacity, ttl, clock) {
s.Require().NoError(buffer.Limit(context.TODO(), "key1"))
s.Require().NoError(buffer.Limit(context.TODO(), "key2"))
// No error is expected as it should just update the timestamp of the existing key.
s.NoError(buffer.Limit(context.TODO(), "key1"))
}
}

0 comments on commit 310e1c4

Please sign in to comment.