Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/rawdb: add ability to retrieve freezer data iteratively #20308

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/golang/snappy"
Expand Down Expand Up @@ -579,6 +580,146 @@ func (t *freezerTable) Retrieve(item uint64) ([]byte, error) {
return snappy.Decode(nil, blob)
}

// freezerTableRS represents a resultset
type freezerTableRS struct {
data []byte
sizes []uint32
current int32
compressed bool
err error
}

// Next move the resultset forward one step, and returns true if the iterator
// is exhausted
func (rs *freezerTableRS) Next() bool {
// On the first call to Next after initialization, the first
// sizes will be zero
nextStart := int(rs.sizes[0])
rs.data = rs.data[nextStart:]
rs.sizes = rs.sizes[1:]
rs.current++
return len(rs.sizes) == 0
}

// Value returns the current value of the iterator, decompressing if necessary
func (rs *freezerTableRS) Value() []byte {
nextStart := int(rs.sizes[0])

if !rs.compressed {
return rs.data[:nextStart]
}
if res, err := snappy.Decode(nil, rs.data[:nextStart]); err != nil {
rs.err = err
return nil
} else {
return res
}
}

// Key returns the little-endian representation of the current item number
func (rs *freezerTableRS) Key() []byte {
res := make([]byte, 4)
binary.LittleEndian.PutUint32(res, uint32(rs.current))
return res
}

// Release releases the memory held
func (rs *freezerTableRS) Release() {
rs.data = nil
rs.sizes = nil
}

// Error returns any error that occurred during processing
func (rs *freezerTableRS) Error() error {
return rs.err
}

// RetrieveN returns an iterator with a number of items, starting from the index 'start'.
// The number of items retrieved is determined by
// 1. It tries to read 'max' number of items, but
// 2. Stops reading if it comes to a file boundary, and
// 3. Stops reading if the total bytesize exceeds maxBytes
//
// OBS! if the underlying table is compressed, the data will be decompressed
// when returned by the iterator. Thus, the total size of the decompressed item(s)
// may be larger than the `maxBytes` argument
func (t *freezerTable) RetrieveN(start, max, maxBytes uint64) (ethdb.Iterator, error) {
// Ensure the table and the item is accessible
if t.index == nil || t.head == nil {
return nil, errClosed
}
count := atomic.LoadUint64(&t.items)
if count <= start {
return nil, errOutOfBounds
}
// Check if requested max count is even possible
if count < start+max {
max = count - start
}
if max == 0 {
// Why are you requesting zero items? Sheesh
return nil, errOutOfBounds
}
// Ensure the item was not deleted from the tail either
offset := atomic.LoadUint32(&t.itemOffset)
if uint64(offset) > start {
return nil, errOutOfBounds
}
// Now, keep iterating the index until we hit one of the limits
t.lock.RLock()
defer t.lock.RUnlock()
var (
fileId = -1
totalSize = uint64(0)
sizes = []uint32{0}
dataStart = uint32(0)
dataEnd = uint32(0)
nItems = uint64(0)
)
current := start
for nItems < max {
nextStart, nextEnd, nextFile, err := t.getBounds(current)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we really want to go nuts, we can optimize this too and do t.getBoundsN, to read n index entries in one go. But I'll leave that for later/someone else

size := (nextEnd - nextStart)
if err != nil {
return nil, err
}
if fileId == -1 {
// This is the first item
fileId = int(nextFile)
dataStart = nextStart
} else if int(nextFile) != fileId {
// Crossing file boundary
break
}
if nItems > 0 && totalSize+uint64(size) > maxBytes {
// Too large (but return at least one item)
break
}
// We include this one too
nItems++
totalSize += uint64(size)
sizes = append(sizes, size)
dataEnd = nextEnd
current++
}
dataFile, exist := t.files[uint32(fileId)]
if !exist {
return nil, fmt.Errorf("missing data file %d", fileId)
}
// Now, read the actual data
// Retrieve the data itself, decompress and return
blob := make([]byte, dataEnd-dataStart)
if _, err := dataFile.ReadAt(blob, int64(dataStart)); err != nil {
return nil, err
}
return &freezerTableRS{
data: blob,
sizes: sizes,
current: int32(start - 1), // Set to one less than first item, a 'Next' is required
compressed: !t.noCompression,
}, nil
}

// has returns an indicator whether the specified number data
// exists in the freezer table.
func (t *freezerTable) has(number uint64) bool {
Expand Down
106 changes: 106 additions & 0 deletions core/rawdb/freezer_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,3 +622,109 @@ func TestOffset(t *testing.T) {
// However, all 'normal' failure modes arising due to failing to sync() or save a file should be
// handled already, and the case described above can only (?) happen if an external process/user
// deletes files from the filesystem.

// TestIterator does some basic tests on the RetrieveN and iterarating the
// resultset
func TestIterator(t *testing.T) {
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("iterator-%d", rand.Uint64())
{ // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 30 times
for x := 0; x < 30; x++ {
data := getChunk(15, x)
f.Append(uint64(x), data)
}
f.Close()
}
{ // Open it, iterate, verify iteration
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true)
if err != nil {
t.Fatal(err)
}
exp := f.items
itemCount := uint64(0)
lookupCount := 0
for {
it, err := f.RetrieveN(itemCount, 10000, 100000)
if err != nil {
break
}
lookupCount++
for !it.Next() {
it.Value()
itemCount++
}
}
// There should be one lookup per file (zero-indexed files)
if exp := int(f.headId) + 1; exp != lookupCount {
t.Errorf("did %d lookups, expected %d", lookupCount, exp)
}
if itemCount != exp {
t.Errorf("got %d items, expected %d", itemCount, exp)
}
f.Close()
}
{ // Open it, iterate, verify byte limit. The byte limit is less than item
// size, so each lookup should only return one otem
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true)
if err != nil {
t.Fatal(err)
}
exp := f.items
itemCount := uint64(0)
lookupCount := 0
for {

it, err := f.RetrieveN(itemCount, 10000, 10)
if err != nil {
break
}
lookupCount++
for !it.Next() {
it.Value()
itemCount++
}
}
// There should be one lookup per item
if exp != uint64(lookupCount) {
t.Errorf("did %d lookups, expected %d", lookupCount, exp)
}
if itemCount != exp {
t.Errorf("got %d items, expected %d", itemCount, exp)
}
f.Close()
}
{ // Open it, iterate, verify item limit. Max two items per go
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true)
if err != nil {
t.Fatal(err)
}
exp := f.items
itemCount := uint64(0)
lookupCount := 0
for {

it, err := f.RetrieveN(itemCount, 1, 1000)
if err != nil {
break
}
lookupCount++
for !it.Next() {
it.Value()
itemCount++
}
}
// There should be one lookup per item
if exp != uint64(lookupCount) {
t.Errorf("did %d lookups, expected %d", lookupCount, exp)
}
if itemCount != exp {
t.Errorf("got %d items, expected %d", itemCount, exp)
}
f.Close()
}
}