Skip to content

Commit

Permalink
introducing Map.Drain API to traverse a map while also deleting entries
Browse files Browse the repository at this point in the history
This commit introduces the `Map.Drain` API to traverse the map while also
removing its entries. It leverages the same `MapIterator` structure, with
the introduction of a new unexported method to handle the map draining.
The tests make sure that the behavior is as expected, and that this API
returns an error while invoked on the wrong map, such as arrays, for which
`Map.Iterate` should be used instead.
To support the adoption of the `LookupAndDelete` system call at different
kernel releases (e.g., queues and hash support this operation from different
kernel versions), this implementation introduces a fallback method to
iterate through the map using the sequential `Lookup` -> `Delete`
operations. As for the `MapIterate.Next`, concurrent operations might
modify/remove entries, but the method would try to check whether there is
more data to lookup in the map and proceed without failing (e.g., key
removed in the meantime doesn't necessarily mean that it should fail,
instead try with `nextKey`). From the user perspective, the usage should
be similar to `Map.Iterate`, as shown as follows:

```go
m, err := NewMap(&MapSpec{
	Type:       Hash,
	KeySize:    4,
	ValueSize:  8,
	MaxEntries: 10,
})
// populate here the map and defer close
it := m.Drain()
for it.Next(keyPtr, &value) {
    // here the entry doesn't exist anymore in the underlying map.
    ...
}
```

Signed-off-by: Simone Magnani <[email protected]>
  • Loading branch information
smagnani96 committed Sep 3, 2024
1 parent 2613a2c commit 3d9f4e0
Show file tree
Hide file tree
Showing 2 changed files with 312 additions and 22 deletions.
179 changes: 157 additions & 22 deletions map.go
Original file line number Diff line number Diff line change
Expand Up @@ -1286,10 +1286,31 @@ func batchCount(keys, values any) (int, error) {
//
// It's not possible to guarantee that all keys in a map will be
// returned if there are concurrent modifications to the map.
//
// Iterating a hash map from which keys are being deleted is not
// safe. You may see the same key multiple times. Iteration may
// also abort with an error, see IsIterationAborted.
//
// Iterating a queue/stack map returns an error (NextKey) as the
// Map.Drain API should be used instead.
func (m *Map) Iterate() *MapIterator {
return newMapIterator(m)
}

// Drain traverses a map while also removing entries.
//
// It's safe to create multiple drainers at the same time,
// but their respective outputs will differ.
//
// Iterating a map that does not support entry removal such as
// an array return an error (Delete/LookupAndDelete) as the
// Map.Iterate API should be used instead.
func (m *Map) Drain() *MapIterator {
it := newMapIterator(m)
it.drain = true
return it
}

// Close the Map's underlying file descriptor, which could unload the
// Map from the kernel if it is not pinned or in use by a loaded Program.
func (m *Map) Close() error {
Expand Down Expand Up @@ -1548,8 +1569,10 @@ type MapIterator struct {
// of []byte to avoid allocations.
cursor any
count, maxEntries uint32
done bool
err error
done, drain bool
// Used in Map.Drain when LookupAndDelete is not supported.
fallback bool
err error
}

func newMapIterator(target *Map) *MapIterator {
Expand All @@ -1561,10 +1584,6 @@ func newMapIterator(target *Map) *MapIterator {

// Next decodes the next key and value.
//
// Iterating a hash map from which keys are being deleted is not
// safe. You may see the same key multiple times. Iteration may
// also abort with an error, see IsIterationAborted.
//
// Returns false if there are no more entries. You must check
// the result of Err afterwards.
//
Expand All @@ -1573,26 +1592,28 @@ func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool {
if mi.err != nil || mi.done {
return false
}
if mi.drain {
return mi.nextDrain(keyOut, valueOut)
}
return mi.nextIterate(keyOut, valueOut)
}

// For array-like maps NextKey returns nil only after maxEntries
// iterations.
func (mi *MapIterator) nextIterate(keyOut, valueOut interface{}) bool {
var key interface{}

// For array-like maps NextKey returns nil only after maxEntries iterations.
for mi.count <= mi.maxEntries {
if mi.cursor == nil {
// Pass nil interface to NextKey to make sure the Map's first key
// is returned. If we pass an uninitialized []byte instead, it'll see a
// non-nil interface and try to marshal it.
mi.cursor = make([]byte, mi.target.keySize)
mi.err = mi.target.NextKey(nil, mi.cursor)
key = nil
} else {
mi.err = mi.target.NextKey(mi.cursor, mi.cursor)
key = mi.cursor
}

if errors.Is(mi.err, ErrKeyNotExist) {
mi.done = true
mi.err = nil
return false
} else if mi.err != nil {
mi.err = fmt.Errorf("get next key: %w", mi.err)
if !mi.fetchNextKey(key) {
return false
}

Expand All @@ -1614,20 +1635,134 @@ func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool {
return false
}

buf := mi.cursor.([]byte)
if ptr, ok := keyOut.(unsafe.Pointer); ok {
copy(unsafe.Slice((*byte)(ptr), len(buf)), buf)
return mi.copyCursorToKeyOut(keyOut)
}

mi.err = fmt.Errorf("%w", ErrIterationAborted)
return false
}

func (mi *MapIterator) nextDrain(keyOut, valueOut interface{}) bool {
if mi.isKeylessMap() {
return mi.handleDrainKeylessMap(keyOut, valueOut)
}

// Allocate only once data for retrieving the next key in the map.
if mi.cursor == nil {
mi.cursor = make([]byte, mi.target.keySize)
}

// Always retrieve first key in the map. This should ensure that the whole map
// is traversed, despite concurrent operations (ordering of items might differ).
for mi.err == nil && mi.fetchNextKey(nil) {
if mi.tryLookupAndDelete(valueOut) {
return mi.copyCursorToKeyOut(keyOut)
}
}
return false
}

func (mi *MapIterator) tryLookupAndDelete(valueOut interface{}) bool {
// Default try using the updated `Map.LookupAndDelete` API.
if !mi.fallback {
mi.err = mi.target.LookupAndDelete(mi.cursor, valueOut)
if mi.err == nil {
mi.count++
return true
}

switch {
case errors.Is(mi.err, ErrNotSupported) || errors.Is(mi.err, unix.EINVAL):
mi.fallback = true
case errors.Is(mi.err, ErrKeyNotExist):
// Same as `MapIterator.nextIterate`: valid key but no value retrieved.
mi.err = nil
return false
default:
mi.err = fmt.Errorf("lookup_and_delete key: %w", mi.err)
return false
}
}

// Fallback to sequential `Map.Lookup` -> `Map.Delete` when `Map.LookupAndDelete` is not supported.
mi.err = mi.target.Lookup(mi.cursor, valueOut)
if mi.err != nil {
if errors.Is(mi.err, ErrKeyNotExist) {
// Same as `MapIterator.nextIterate`: valid key but no value retrieved.
mi.err = nil
} else {
mi.err = sysenc.Unmarshal(keyOut, buf)
mi.err = fmt.Errorf("look up next key: %w", mi.err)
}
return false
}

return mi.err == nil
mi.err = mi.target.Delete(mi.cursor)
if mi.err != nil {
if errors.Is(mi.err, ErrKeyNotExist) {
// Same as `MapIterator.nextIterate`: valid key but no value retrieved.
mi.err = nil
} else {
mi.err = fmt.Errorf("delete key: %w", mi.err)
}
return false
}

mi.count++
return true
}

func (mi *MapIterator) isKeylessMap() bool {
return mi.target.keySize == 0
}

func (mi *MapIterator) handleDrainKeylessMap(keyOut, valueOut interface{}) bool {
if keyOut != nil {
mi.err = fmt.Errorf("non-nil keyOut provided for map without a key, must be nil instead")
return false
}

mi.err = mi.target.LookupAndDelete(nil, valueOut)
if mi.err == nil {
mi.count++
return true
}

if errors.Is(mi.err, ErrKeyNotExist) {
mi.done = true
mi.err = nil
} else {
mi.err = fmt.Errorf("lookup_and_delete key: %w", mi.err)
}

mi.err = fmt.Errorf("%w", ErrIterationAborted)
return false
}

func (mi *MapIterator) fetchNextKey(key interface{}) bool {
mi.err = mi.target.NextKey(key, mi.cursor)
if mi.err == nil {
return true
}

if errors.Is(mi.err, ErrKeyNotExist) {
mi.done = true
mi.err = nil
} else {
mi.err = fmt.Errorf("get next key: %w", mi.err)
}

return false
}

func (mi *MapIterator) copyCursorToKeyOut(keyOut interface{}) bool {
buf := mi.cursor.([]byte)
if ptr, ok := keyOut.(unsafe.Pointer); ok {
copy(unsafe.Slice((*byte)(ptr), len(buf)), buf)
} else {
mi.err = sysenc.Unmarshal(keyOut, buf)
}
return mi.err == nil
}

// Err returns any encountered error.
//
// The method must be called after Next returns nil.
Expand Down
155 changes: 155 additions & 0 deletions map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,161 @@ func TestMapIteratorAllocations(t *testing.T) {
qt.Assert(t, qt.Equals(allocs, float64(0)))
}

func TestDrainEmptyMap(t *testing.T) {
for _, mapType := range []MapType{
Hash,
Queue,
} {
t.Run(mapType.String(), func(t *testing.T) {
var (
keySize = uint32(4)
key string
value uint64
keyPtr interface{} = &key
)

if mapType == Queue {
testutils.SkipOnOldKernel(t, "4.20", "map type queue")
keySize = 0
keyPtr = nil
}

m, err := NewMap(&MapSpec{
Type: mapType,
KeySize: keySize,
ValueSize: 8,
MaxEntries: 2,
})
qt.Assert(t, qt.IsNil(err))
defer m.Close()

entries := m.Drain()
if entries.Next(keyPtr, &value) {
t.Errorf("Empty %v should not be drainable", mapType)
}

qt.Assert(t, qt.IsNil(entries.Err()))
})
}
}

func TestMapDrain(t *testing.T) {
for _, mapType := range []MapType{
Hash,
Queue,
} {
t.Run(Hash.String(), func(t *testing.T) {
var (
key, value uint32
values []uint32
anyKey interface{}
keyPtr interface{} = &key
keySize uint32 = 4
data = []uint32{0, 1}
)

if mapType == Queue {
testutils.SkipOnOldKernel(t, "4.20", "map type queue")
keySize = 0
keyPtr = nil
}

m, err := NewMap(&MapSpec{
Type: mapType,
KeySize: keySize,
ValueSize: 4,
MaxEntries: 2,
})
qt.Assert(t, qt.IsNil(err))
defer m.Close()

for _, v := range data {
if keySize != 0 {
anyKey = uint32(v)
}
err := m.Put(anyKey, uint32(v))
qt.Assert(t, qt.IsNil(err))
}

entries := m.Drain()
for entries.Next(keyPtr, &value) {
values = append(values, value)
}
qt.Assert(t, qt.IsNil(entries.Err()))

sort.Slice(values, func(i, j int) bool { return values[i] < values[j] })
qt.Assert(t, qt.DeepEquals(values, data))
})
}
}

func TestDrainWrongMap(t *testing.T) {
arr, err := NewMap(&MapSpec{
Type: Array,
KeySize: 4,
ValueSize: 4,
MaxEntries: 10,
})
qt.Assert(t, qt.IsNil(err))
defer arr.Close()

var key, value uint32
entries := arr.Drain()

qt.Assert(t, qt.IsFalse(entries.Next(&key, &value)))
qt.Assert(t, qt.IsNotNil(entries.Err()))
}

func TestMapDrainerAllocations(t *testing.T) {
for _, mapType := range []MapType{
Hash,
Queue,
} {
t.Run(mapType.String(), func(t *testing.T) {
var (
key, value uint32
anyKey interface{}
keyPtr interface{} = &key
keySize uint32 = 4
)

if mapType == Queue {
testutils.SkipOnOldKernel(t, "4.20", "map type queue")
keySize = 0
keyPtr = nil
}

m, err := NewMap(&MapSpec{
Type: mapType,
KeySize: keySize,
ValueSize: 4,
MaxEntries: 10,
})
qt.Assert(t, qt.ErrorIs(err, nil))
defer m.Close()

for i := 0; i < int(m.MaxEntries()); i++ {
if keySize != 0 {
anyKey = uint32(i)
}
if err := m.Put(anyKey, uint32(i)); err != nil {
t.Fatal(err)
}
}

iter := m.Drain()

allocs := testing.AllocsPerRun(int(m.MaxEntries()-1), func() {
if !iter.Next(keyPtr, &value) {
t.Fatal("Next failed while draining: %w", iter.Err())
}
})

qt.Assert(t, qt.Equals(allocs, float64(0)))
})
}
}

func TestMapBatchLookupAllocations(t *testing.T) {
testutils.SkipIfNotSupported(t, haveBatchAPI())

Expand Down

0 comments on commit 3d9f4e0

Please sign in to comment.