Skip to content

Commit

Permalink
Global ratelimiter helper: a small atomic-like map (cadence-workflow#…
Browse files Browse the repository at this point in the history
…6027)

This commit largely exists to break this out for separate review, as it's relatively simple and isolated.

In a later PR, this will be used by the "limiter"-side logic to hold known ratelimiters, and simplify its logic.  So it does just two core things, and exposes a *very* small API that's intended to be difficult or impossible to misuse:

1. Tracks length, so a "iterate over everything and collect data for sending to aggregators" loop can pre-allocate a collection that is likely large enough to store everything.
2. Implicitly initializes missing values, so the using logic does not ever need to check for existence or explicitly handle fallbacks or decide how to order atomic operations.

It could have other methods, but so far they do not seem necessary.  Delete is not currently planned to be used, but it seems important to reserve / guarantee it's possible to build, so I've included that as well.

---

This is not intended to be broadly reusable as we do not currently have any other locations that will use it, and the auto-initializing behavior is potentially unique.  It's mostly just isolating and simplifying some logic to prevent accidental misuse, because early iterations had some easily-missed flaws.

We very likely *should* have a "just a type-safe `sync.Map`" wrapper, which this could easily be changed to use, but the vast majority of that API would be unused here so I haven't built that.
  • Loading branch information
Groxx authored and timl3136 committed Jun 6, 2024
1 parent 8a2a9df commit 4516ea9
Show file tree
Hide file tree
Showing 2 changed files with 289 additions and 0 deletions.
104 changes: 104 additions & 0 deletions common/quotas/global/collection/internal/atomicmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package internal

import (
"sync"
"sync/atomic"
)

// AtomicMap adds type safety around a sync.Map (which has atomic-like behavior), and:
// - implicitly constructs values as needed, not relying on zero values
// - simplifies the API quite a bit because very few methods are in use.
// in particular there is no "Store" currently because it is not needed.
// - tracks length (atomically, so values are only an estimate)
//
// Due to length tracking, this is marginally more costly when modifying contents
// than "just" a type-safe sync.Map. It should only be used when length is needed.
type AtomicMap[Key comparable, Value any] struct {
contents sync.Map
create func(key Key) Value
len int64
}

// NewAtomicMap makes a simplified type-safe [sync.Map] that creates values as needed, and tracks length.
//
// The `create` callback will be called when creating a new value, possibly multiple times,
// without synchronization.
// It must be concurrency safe and should return ASAP to reduce the window for storage races,
// so ideally it should be simple and non-blocking, or pulling from a pre-populated cache if not.
//
// Due to length tracking, this is marginally more costly when modifying contents
// than "just" a type-safe [sync.Map]. It should only be used when length is needed.
func NewAtomicMap[Key comparable, Value any](create func(key Key) Value) *AtomicMap[Key, Value] {
return &AtomicMap[Key, Value]{
contents: sync.Map{},
create: create,
len: 0,
}
}

// Load will get the current Value for a Key, initializing it if necessary.
func (t *AtomicMap[Key, Value]) Load(key Key) Value {
val, loaded := t.contents.Load(key)
if loaded {
return val.(Value)
}
created := t.create(key)
val, loaded = t.contents.LoadOrStore(key, created)
if !loaded {
// stored a new value
atomic.AddInt64(&t.len, 1)
}
return val.(Value)
}

// Delete removes an entry from the map, and updates the length.
//
// Like the underlying [sync.Map.LoadAndDelete], this can be called concurrently with Range.
func (t *AtomicMap[Key, Value]) Delete(k Key) {
// whether used or not, this is included to ensure it is possible to build
// while maintaining length so collections can be pruned later if needed.
_, loaded := t.contents.LoadAndDelete(k)
if loaded {
atomic.AddInt64(&t.len, -1)
}
}

// Range calls [sync.Map.Range] on the underlying [sync.Map], and has the same semantics.
//
// This can be used while concurrently modifying the map, and it may result
// in ranging over more or fewer entries than Len would imply.
func (t *AtomicMap[Key, Value]) Range(f func(k Key, v Value) bool) {
t.contents.Range(func(k, v any) bool {
return f(k.(Key), v.(Value))
})
}

// Len returns the currently-known size of the collection. It cannot be guaranteed to
// be precise, as the collection may change at any time during or after this call.
//
// In particular, Range may iterate over more or fewer entries.
func (t *AtomicMap[Key, Value]) Len() int {
return int(atomic.LoadInt64(&t.len))
}
185 changes: 185 additions & 0 deletions common/quotas/global/collection/internal/atomicmap_external_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package internal_test

import (
"math/rand"
"runtime"
"strconv"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/uber/cadence/common/quotas/global/collection/internal"
)

func TestMapBasics(t *testing.T) {
const (
loaded1 = "four"
loaded2 = "tenletters"
tried = "should not exist"
)
type custom struct{ value int }

assertContentsEqual := func(t *testing.T, m *internal.AtomicMap[string, *custom], expected map[string]int) {
dup := make(map[string]int) // to avoid mutating the original
for k, v := range expected {
dup[k] = v
}
m.Range(func(k string, v *custom) bool {
if _, ok := dup[k]; ok {
delete(dup, k)
} else {
t.Errorf("ranged over unexpected or duplicate key %q", k)
}
return true
})
assert.Empty(t, dup, "did not find some contents")
}

m := internal.NewAtomicMap(func(key string) *custom {
return &custom{value: len(key)}
})

t.Run("load should work", func(t *testing.T) {
v := m.Load(loaded1)
assert.Equal(t, len(loaded1), v.value, "should use the constructed value initially, not a zero value")
v.value = 10
reload := m.Load(loaded1)
assert.Equal(t, v, reload, "should return the same object when loaded more than once")
})
t.Run("range should walk over only the one key", func(t *testing.T) {
assertContentsEqual(t, m, map[string]int{
loaded1: len(loaded1),
})
})
t.Run("loading a second value should range over two", func(t *testing.T) {
// init this value too
v2 := m.Load(loaded2)
assert.Equal(t, len(loaded2), v2.value, "sanity check: loaded2 should be created correctly, like loaded1")
assertContentsEqual(t, m, map[string]int{
loaded1: len(loaded1),
loaded2: len(loaded2),
})
})
}

func TestMapNotRacy(t *testing.T) {
creates := atomic.NewInt64(0)
// using a string pointer just to make things a bit riskier / more sensitive to races since mutation is possible.
// no mutation currently occurs, but it seems slightly safer to leave it here for future changes.
m := internal.NewAtomicMap(func(key string) *string {
s := key
s += "-"
s += strconv.Itoa(int(creates.Inc())) // just to be recognizable
return &s
})

// call ALL the methods concurrently
var g errgroup.Group
const loops = 1000 // using 1,000 because 100 had some coverage flapping
for i := 0; i < loops; i++ {
key := strconv.Itoa(i)
g.Go(func() error {
v := m.Load(key)
assert.NotEmpty(t, *v) // "never nil" also asserted by crashing
return nil
})
// try to load the same key multiple times
g.Go(func() error {
v := m.Load(key)
assert.NotEmpty(t, *v)
return nil
})
// range over it while reading/writing
g.Go(func() error {
m.Range(func(k string, v *string) bool {
assert.NotEmpty(t, k)
assert.NotEmpty(t, *v)
return true
})
return nil
})
g.Go(func() error {
_ = m.Len() // value does not matter / hard to check usefully
return nil
})
// delete ~10% of keys to exercise that logic, and mostly ensure coverage
if rand.Intn(10) == 0 {
g.Go(func() error {
m.Delete(key)
return nil
})
}
}
require.NoError(t, g.Wait())

// sanity-check to show decent concurrency:
// - out-of-order inits (values can be both higher and lower than the key)
// - duplicate inits (values higher than 100)
same, higher, lower, upper := 0, 0, 0, int64(0)
m.Range(func(k string, v *string) bool {
parts := strings.SplitN(*v, "-", 2)

// sanity check that keys and values stay associated
assert.Equal(t, k, parts[0], "key %q and first part of value must match: %q", k, *v)

if parts[0] == parts[1] {
same++
} else if parts[0] < parts[1] {
higher++
} else {
lower++
}

vint, err := strconv.ParseInt(parts[1], 10, 64)
assert.NoError(t, err, "creates-%v should be parse-able as an int", parts[1])
if vint > upper {
upper = vint
}
return true
})

assert.LessOrEqual(t,
int64(loops), upper,
// regrettably not guaranteed due to deletions, but I have yet to see it.
// if this becomes an issue, probably just delete it.
"did not observe a value at least as high as the number of loops. "+
"not technically impossible, just very unlikely",
)

t.Logf(
"Metrics for cpu %v:\n"+
"\tKey == value (1=>1-1): %v\n"+
"\tValue higher (5=>5-100): %v\n"+
"\tValue lower (100=>100-5): %v\n"+
"\tNumber of iterations: %v\n"+
"\tHighest saved create: %v\n"+ // same or higher than iterations
"\tTotal num of creates: %v", // same or higher than saved
runtime.GOMAXPROCS(0), same, higher, lower, loops, upper, creates.Load(),
)
}

0 comments on commit 4516ea9

Please sign in to comment.