Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(arrow/memory): Align allocations always #289

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 51 additions & 27 deletions arrow/memory/mallocator/mallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,20 @@ package mallocator

// #include <stdlib.h>
// #include <string.h>
//
// void* realloc_and_initialize(void* ptr, size_t old_len, size_t new_len) {
// void* new_ptr = realloc(ptr, new_len);
// if (new_ptr && new_len > old_len) {
// memset(new_ptr + old_len, 0, new_len - old_len);
// }
// return new_ptr;
// }
import "C"

import (
"sync"
"sync/atomic"
"unsafe"
)

func roundToPowerOf2(v, round uintptr) uintptr {
forceCarry := round - 1
truncateMask := ^forceCarry
return (v + forceCarry) & truncateMask
}

// Mallocator is an allocator which defers to libc malloc.
//
// The primary reason to use this is when exporting data across the C Data
Expand All @@ -45,9 +44,22 @@ import (
// The build tag 'mallocator' will also make this the default allocator.
type Mallocator struct {
allocatedBytes uint64
// We want to align allocations, but since we only get/return []byte,
// we need to remember the "real" address for Free somehow
realAllocations sync.Map
alignment int
}

func NewMallocator() *Mallocator { return &Mallocator{} }
func NewMallocator() *Mallocator { return &Mallocator{alignment: 64} }

func NewMallocatorWithAlignment(alignment int) *Mallocator {
if alignment < 1 {
panic("mallocator: invalid alignment (must be positive)")
} else if alignment > 1 && (alignment&(alignment-1)) != 0 {
panic("mallocator: invalid alignment (must be power of 2)")
}
return &Mallocator{alignment: alignment}
}

func (alloc *Mallocator) Allocate(size int) []byte {
// Use calloc to zero-initialize memory.
Expand All @@ -58,28 +70,44 @@ func (alloc *Mallocator) Allocate(size int) []byte {
if size < 0 {
panic("mallocator: negative size")
}
ptr, err := C.calloc(C.size_t(size), 1)
paddedSize := C.size_t(size) + C.size_t(alloc.alignment)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C.size_t(size + alloc.alignment) instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

ptr, err := C.calloc(paddedSize, 1)
if err != nil {
// under some circumstances and allocation patterns, we can end up in a scenario
// where for some reason calloc return ENOMEM even though there is definitely memory
// available for use. So we attempt to fallback to simply doing malloc + memset in
// this case. If malloc returns a nil pointer, then we know we're out of memory
// and will surface the error.
if ptr = C.malloc(C.size_t(size)); ptr == nil {
if ptr = C.malloc(paddedSize); ptr == nil {
panic(err)
}
C.memset(ptr, 0, C.size_t(size))
C.memset(ptr, 0, C.size_t(paddedSize))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

paddedSize is already a C.size_t now, so you don't need the cast

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

} else if ptr == nil {
panic("mallocator: out of memory")
}

buf := unsafe.Slice((*byte)(ptr), paddedSize)
aligned := roundToPowerOf2(uintptr(ptr), uintptr(alloc.alignment))
alloc.realAllocations.Store(aligned, uintptr(ptr))
atomic.AddUint64(&alloc.allocatedBytes, uint64(size))
return unsafe.Slice((*byte)(ptr), size)

if uintptr(ptr) != aligned {
shift := aligned - uintptr(ptr)
return buf[shift : uintptr(size)+shift : uintptr(size)+shift]
}
return buf[:size:size]
}

func (alloc *Mallocator) Free(b []byte) {
sz := len(b)
C.free(getPtr(b))
ptr := getPtr(b)
realAddr, loaded := alloc.realAllocations.LoadAndDelete(uintptr(ptr))
if !loaded {
// double-free?
return
}
realPtr := unsafe.Pointer(realAddr.(uintptr))
C.free(realPtr)
// Subtract sh.Len via two's complement (since atomic doesn't offer subtract)
atomic.AddUint64(&alloc.allocatedBytes, ^(uint64(sz) - 1))
}
Expand All @@ -88,20 +116,16 @@ func (alloc *Mallocator) Reallocate(size int, b []byte) []byte {
if size < 0 {
panic("mallocator: negative size")
}
cp := cap(b)
ptr, err := C.realloc_and_initialize(getPtr(b), C.size_t(cp), C.size_t(size))
if err != nil {
panic(err)
} else if ptr == nil && size != 0 {
panic("mallocator: out of memory")
}
delta := size - len(b)
if delta >= 0 {
atomic.AddUint64(&alloc.allocatedBytes, uint64(delta))
} else {
atomic.AddUint64(&alloc.allocatedBytes, ^(uint64(-delta) - 1))

if cap(b) >= size {
diff := size - len(b)
atomic.AddUint64(&alloc.allocatedBytes, uint64(diff))
return b[:size]
}
return unsafe.Slice((*byte)(ptr), size)
newBuf := alloc.Allocate(size)
copy(newBuf, b)
alloc.Free(b)
return newBuf
}

func (alloc *Mallocator) AllocatedBytes() int64 {
Expand Down
45 changes: 45 additions & 0 deletions arrow/memory/mallocator/mallocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package mallocator_test
import (
"fmt"
"testing"
"unsafe"

"github.com/apache/arrow-go/v18/arrow/memory/mallocator"
"github.com/stretchr/testify/assert"
Expand All @@ -41,6 +42,32 @@ func TestMallocatorAllocate(t *testing.T) {
for idx, c := range buf {
assert.Equal(t, uint8(0), c, fmt.Sprintf("Buf not zero-initialized at %d", idx))
}
// check aligned
if size > 0 {
assert.Equal(t, uintptr(0), uintptr(unsafe.Pointer(&buf[0]))%64)
}
})
}
}

func TestMallocatorAllocateAligned(t *testing.T) {
sizes := []int{0, 1, 4, 33, 65, 4095, 4096, 8193}
for _, size := range sizes {
t.Run(fmt.Sprint(size), func(t *testing.T) {
a := mallocator.NewMallocatorWithAlignment(16)
buf := a.Allocate(size)
defer a.Free(buf)

assert.Equal(t, size, len(buf))
assert.LessOrEqual(t, size, cap(buf))
// check 0-initialized
for idx, c := range buf {
assert.Equal(t, uint8(0), c, fmt.Sprintf("Buf not zero-initialized at %d", idx))
}
// check aligned
if size > 0 {
assert.Equal(t, uintptr(0), uintptr(unsafe.Pointer(&buf[0]))%16)
}
})
}
}
Expand Down Expand Up @@ -68,6 +95,7 @@ func TestMallocatorReallocate(t *testing.T) {
for idx, c := range buf {
assert.Equal(t, uint8(0), c, fmt.Sprintf("Buf not zero-initialized at %d", idx))
}
a.AssertSize(t, test.before)

buf = a.Reallocate(test.after, buf)
defer a.Free(buf)
Expand All @@ -77,10 +105,27 @@ func TestMallocatorReallocate(t *testing.T) {
for idx, c := range buf {
assert.Equal(t, uint8(0), c, fmt.Sprintf("Buf not zero-initialized at %d", idx))
}
a.AssertSize(t, test.after)
})
}
}

func TestMallocatorReallocateInPlace(t *testing.T) {
a := mallocator.NewMallocator()
buf := a.Allocate(80)
assert.Equal(t, 80, len(buf))
assert.LessOrEqual(t, 80, cap(buf))
a.AssertSize(t, 80)
addr := uintptr(unsafe.Pointer(&buf[0]))

buf2 := a.Reallocate(81, buf)
assert.Equal(t, 81, len(buf2))
assert.LessOrEqual(t, 81, cap(buf2))
a.AssertSize(t, 81)
addr2 := uintptr(unsafe.Pointer(&buf[0]))
assert.Equal(t, addr, addr2)
}

func TestMallocatorAssertSize(t *testing.T) {
a := mallocator.NewMallocator()
assert.Equal(t, int64(0), a.AllocatedBytes())
Expand Down
Loading