From 441bdd436f959fab94eb631a8917af97f5203dda Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Mon, 28 Jun 2021 12:28:42 +0200 Subject: [PATCH 1/6] core/rawdb: implement sequential reads in freezer_table --- core/rawdb/freezer_table.go | 206 +++++++++++++++++++++++-------- core/rawdb/freezer_table_test.go | 62 +++++++++- 2 files changed, 218 insertions(+), 50 deletions(-) diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index d7bfe18e020f..98070c4bea59 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -70,6 +70,19 @@ func (i *indexEntry) marshallBinary() []byte { return b } +// bounds returns the start- and end- offsets, and the file number of where to +// read there data item marked by the given indexEntry, which are assumed to be +// sequential. +func (start *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) { + if start.filenum != end.filenum { + // If a piece of data 'crosses' a data-file, + // it's actually in one piece on the second data-file. + // We return a zero-indexEntry for the second file as start + return 0, end.offset, end.filenum + } + return start.offset, end.offset, end.filenum +} + // freezerTable represents a single chained data table within the freezer (e.g. blocks). // It consists of a data file (snappy encoded arbitrary data blobs) and an indexEntry // file (uncompressed 64 bit indices into the data file). @@ -546,84 +559,179 @@ func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool return false, nil } -// getBounds returns the indexes for the item -// returns start, end, filenumber and error -func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { - buffer := make([]byte, indexEntrySize) - var startIdx, endIdx indexEntry - // Read second index - if _, err := t.index.ReadAt(buffer, int64((item+1)*indexEntrySize)); err != nil { - return 0, 0, 0, err +// getIndices returns the index entries for the given from-item, covering 'count' items. +// N.B: The actual number of returned indices for N items will always be N+1 (unless an +// error is returned). +func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) { + // Apply the table-offset + from = from - uint64(t.itemOffset) + // For reading N items, we need N+1 indices. + buffer := make([]byte, (count+1)*indexEntrySize) + if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil { + return nil, err } - endIdx.unmarshalBinary(buffer) - // Read first index (unless it's the very first item) - if item != 0 { - if _, err := t.index.ReadAt(buffer, int64(item*indexEntrySize)); err != nil { - return 0, 0, 0, err - } - startIdx.unmarshalBinary(buffer) - } else { + var ( + indices []*indexEntry + offset int + ) + for i := from; i <= from+count; i++ { + var startIndex = new(indexEntry) + startIndex.unmarshalBinary(buffer[offset:]) + offset += indexEntrySize + indices = append(indices, startIndex) + } + if from == 0 { // Special case if we're reading the first item in the freezer. We assume that // the first item always start from zero(regarding the deletion, we // only support deletion by files, so that the assumption is held). // This means we can use the first item metadata to carry information about // the 'global' offset, for the deletion-case - return 0, endIdx.offset, endIdx.filenum, nil + indices[0].offset = 0 + indices[0].filenum = indices[1].filenum } - if startIdx.filenum != endIdx.filenum { - // If a piece of data 'crosses' a data-file, - // it's actually in one piece on the second data-file. - // We return a zero-indexEntry for the second file as start - return 0, endIdx.offset, endIdx.filenum, nil + return indices, nil +} + +// getBounds returns the indexes for the item +// returns start, end, filenumber and error +func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { + indices, err := t.getIndices(item, 1) + if err != nil { + return 0, 0, 0, err } - return startIdx.offset, endIdx.offset, endIdx.filenum, nil + start, end, fileNum := indices[0].bounds(indices[1]) + return start, end, fileNum, nil } // Retrieve looks up the data offset of an item with the given number and retrieves // the raw binary blob from the data file. func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { - blob, err := t.retrieve(item) + if items, err := t.RetrieveItems(item, 1, 0); err != nil { + return nil, err + } else { + return items[0], nil + } +} + +// RetrieveItems returns multiple items in sequence, starting from the index 'start'. +// It will return at most 'max' items, but will abort earlier to respect the +// 'maxBytes' argument. However, if the 'maxBytes' is smaller than the size of one +// item, it _will_ return one element and possibly overflow the maxBytes. +func (t *freezerTable) RetrieveItems(start, max, maxBytes uint64) ([][]byte, error) { + // First we read the 'raw' data, which might be compressed. + diskData, sizes, err := t.retrieveItems(start, max, maxBytes) if err != nil { return nil, err } - if t.noCompression { - return blob, nil + var ( + output = make([][]byte, 0, max) + offset int // offset for reading + outputSize int // size of uncompressed data + ) + // Now slice up the data and decompress. + for i, diskSize := range sizes { + item := diskData[offset : offset+diskSize] + offset += diskSize + decompressedSize := diskSize + if !t.noCompression { + decompressedSize, _ = snappy.DecodedLen(item) + } + if i > 0 && uint64(outputSize+decompressedSize) > maxBytes { + break + } + if !t.noCompression { + if data, err := snappy.Decode(nil, item); err != nil { + return nil, err + } else { + output = append(output, data) + } + } else { + output = append(output, item) + } + outputSize += decompressedSize } - return snappy.Decode(nil, blob) + return output, nil } -// retrieve looks up the data offset of an item with the given number and retrieves -// the raw binary blob from the data file. OBS! This method does not decode -// compressed data. -func (t *freezerTable) retrieve(item uint64) ([]byte, error) { +func (t *freezerTable) retrieveItems(start, max, maxBytes uint64) ([]byte, []int, error) { t.lock.RLock() defer t.lock.RUnlock() // Ensure the table and the item is accessible if t.index == nil || t.head == nil { - return nil, errClosed + return nil, nil, errClosed } - if atomic.LoadUint64(&t.items) <= item { - return nil, errOutOfBounds + itemCount := atomic.LoadUint64(&t.items) // max number + // Ensure the start is written, not deleted from the tail, and that the + // caller actually wants something + if itemCount <= start || uint64(t.itemOffset) > start || max == 0 { + return nil, nil, errOutOfBounds } - // Ensure the item was not deleted from the tail either - if uint64(t.itemOffset) > item { - return nil, errOutOfBounds + if start+max > itemCount { + max = itemCount - start } - startOffset, endOffset, filenum, err := t.getBounds(item - uint64(t.itemOffset)) - if err != nil { - return nil, err + var ( + output = make([]byte, maxBytes) // Buffer to read data into + outputSize int // Used size of that buffer + ) + // Read the data + readData := func(fileId, start uint32, length int) error { + // In case a small limit is used, and the elements are large, may need to + // realloc the read-buffer when reading the first (and only) item. + if len(output) < length { + output = make([]byte, length) + } + if dataFile, exist := t.files[uint32(fileId)]; !exist { + return fmt.Errorf("missing data file %d", fileId) + } else if _, err := dataFile.ReadAt(output[outputSize:outputSize+length], int64(start)); err != nil { + return err + } + outputSize += length + return nil } - dataFile, exist := t.files[filenum] - if !exist { - return nil, fmt.Errorf("missing data file %d", filenum) + // Read all the indexes in one go + indices, err := t.getIndices(start, max) + if err != nil { + return nil, nil, err } - // Retrieve the data itself, decompress and return - blob := make([]byte, endOffset-startOffset) - if _, err := dataFile.ReadAt(blob, int64(startOffset)); err != nil { - return nil, err + var ( + sizes []int // The sizes for each element + totalSize = 0 // The total size of all data read so far + readStart = indices[0].offset // Where, in the file, to start reading + unreadSize = 0 // The size of the as-yet-unread data + ) + + for i, firstIndex := range indices[:len(indices)-1] { + secondIndex := indices[i+1] + // Determine the size of the item. + offset1, offset2, _ := firstIndex.bounds(secondIndex) + size := int(offset2 - offset1) + // Crossing a file boundary? + if secondIndex.filenum != firstIndex.filenum { + if unreadSize > 0 { + // If we have unread data in the first file, we need to do that read now. + if err := readData(firstIndex.filenum, uint32(readStart), unreadSize); err != nil { + return nil, nil, err + } + unreadSize = 0 + } + readStart = 0 + } + if i > 0 && uint64(totalSize+size) > maxBytes { + break + } + // Defer the read for later + unreadSize += size + totalSize += size + sizes = append(sizes, size) + if i == len(indices)-2 || uint64(totalSize) > maxBytes { + // Last item, need to do the read now + if err := readData(secondIndex.filenum, uint32(readStart), unreadSize); err != nil { + return nil, nil, err + } + break + } } - t.readMeter.Mark(int64(len(blob) + 2*indexEntrySize)) - return blob, nil + return output[:outputSize], sizes, nil } // has returns an indicator whether the specified number data diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index 0df28f236d2d..3ad3cba3cb5e 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -74,7 +74,7 @@ func TestFreezerBasics(t *testing.T) { exp := getChunk(15, y) got, err := f.Retrieve(uint64(y)) if err != nil { - t.Fatal(err) + t.Fatalf("reading item %d: %v", y, err) } if !bytes.Equal(got, exp) { t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) @@ -692,3 +692,63 @@ func TestAppendTruncateParallel(t *testing.T) { } } } + +// TestSequentialRead does some basic tests on the RetrieveItems. +func TestSequentialRead(t *testing.T) { + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("batchread-%d", rand.Uint64()) + { // Fill table + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 30 times + for x := 0; x < 30; x++ { + data := getChunk(15, x) + f.Append(uint64(x), data) + } + f.DumpIndex(0, 30) + f.Close() + } + { // Open it, iterate, verify iteration + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + if err != nil { + t.Fatal(err) + } + items, err := f.RetrieveItems(0, 10000, 100000) + if err != nil { + t.Fatal(err) + } + if have, want := len(items), 30; have != want { + t.Fatalf("want %d items, have %d ", want, have) + } + for i, have := range items { + want := getChunk(15, i) + if !bytes.Equal(want, have) { + t.Fatalf("data corruption: have\n%x\n, want \n%x\n", have, want) + } + } + f.Close() + } + { // Open it, iterate, verify byte limit. The byte limit is less than item + // size, so each lookup should only return one otem + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + items, err := f.RetrieveItems(0, 10000, 10) + if err != nil { + t.Fatal(err) + } + if have, want := len(items), 1; have != want { + t.Fatalf("want %d items, have %d ", want, have) + } + for i, have := range items { + want := getChunk(15, i) + if !bytes.Equal(want, have) { + t.Fatalf("data corruption: have\n%x\n, want \n%x\n", have, want) + } + } + f.Close() + } +} From d8a24b47f415722ac2cd455bdb7de7f5e515c987 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Mon, 28 Jun 2021 16:43:35 +0200 Subject: [PATCH 2/6] core/rawdb, ethdb: add sequential reader to db interface --- core/rawdb/database.go | 5 +++++ core/rawdb/freezer.go | 12 ++++++++++++ core/rawdb/table.go | 6 ++++++ ethdb/database.go | 7 +++++++ 4 files changed, 30 insertions(+) diff --git a/core/rawdb/database.go b/core/rawdb/database.go index c8bfdbace14e..90619169a09b 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -89,6 +89,11 @@ func (db *nofreezedb) Ancient(kind string, number uint64) ([]byte, error) { return nil, errNotSupported } +// ReadAncients returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error) { + return nil, errNotSupported +} + // Ancients returns an error as we don't have a backing chain freezer. func (db *nofreezedb) Ancients() (uint64, error) { return 0, errNotSupported diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index ff8919b59e92..d40e54c05702 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -180,6 +180,18 @@ func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) { return nil, errUnknownTable } +// Ancient retrieves multiple items in sequence, starting from the index 'start'. +// It will return +// - at most 'max' items, +// - at least 1 item (even if exceeding the maxByteSize), but will otherwise +// return as many items as fit into maxByteSize. +func (f *freezer) ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error) { + if table := f.tables[kind]; table != nil { + return table.RetrieveItems(start, max, maxByteSize) + } + return nil, errUnknownTable +} + // Ancients returns the length of the frozen items. func (f *freezer) Ancients() (uint64, error) { return atomic.LoadUint64(&f.frozen), nil diff --git a/core/rawdb/table.go b/core/rawdb/table.go index d5ef60ae50ef..d6d299ec9719 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -62,6 +62,12 @@ func (t *table) Ancient(kind string, number uint64) ([]byte, error) { return t.db.Ancient(kind, number) } +// ReadAncients is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error) { + return t.db.ReadAncients(kind, start, max, maxByteSize) +} + // Ancients is a noop passthrough that just forwards the request to the underlying // database. func (t *table) Ancients() (uint64, error) { diff --git a/ethdb/database.go b/ethdb/database.go index 0dc14624b98e..fd2a71420eac 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -76,6 +76,13 @@ type AncientReader interface { // Ancient retrieves an ancient binary blob from the append-only immutable files. Ancient(kind string, number uint64) ([]byte, error) + // Ancient retrieves multiple items in sequence, starting from the index 'start'. + // It will return + // - at most 'max' items, + // - at least 1 item (even if exceeding the maxByteSize), but will otherwise + // return as many items as fit into maxByteSize. + ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error) + // Ancients returns the ancient item numbers in the ancient store. Ancients() (uint64, error) From ff8267c234674d7740b4542ed392eaac7b0df898 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Tue, 29 Jun 2021 13:02:56 +0200 Subject: [PATCH 3/6] core/rawdb: lint nitpicks --- core/rawdb/freezer_table.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 98070c4bea59..3e5b98dc9412 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -680,7 +680,7 @@ func (t *freezerTable) retrieveItems(start, max, maxBytes uint64) ([]byte, []int if len(output) < length { output = make([]byte, length) } - if dataFile, exist := t.files[uint32(fileId)]; !exist { + if dataFile, exist := t.files[fileId]; !exist { return fmt.Errorf("missing data file %d", fileId) } else if _, err := dataFile.ReadAt(output[outputSize:outputSize+length], int64(start)); err != nil { return err @@ -709,7 +709,7 @@ func (t *freezerTable) retrieveItems(start, max, maxBytes uint64) ([]byte, []int if secondIndex.filenum != firstIndex.filenum { if unreadSize > 0 { // If we have unread data in the first file, we need to do that read now. - if err := readData(firstIndex.filenum, uint32(readStart), unreadSize); err != nil { + if err := readData(firstIndex.filenum, readStart, unreadSize); err != nil { return nil, nil, err } unreadSize = 0 @@ -725,7 +725,7 @@ func (t *freezerTable) retrieveItems(start, max, maxBytes uint64) ([]byte, []int sizes = append(sizes, size) if i == len(indices)-2 || uint64(totalSize) > maxBytes { // Last item, need to do the read now - if err := readData(secondIndex.filenum, uint32(readStart), unreadSize); err != nil { + if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil { return nil, nil, err } break From d6b40b83f759c813310d6c23e015c6b0ac951e7a Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Wed, 4 Aug 2021 11:46:12 +0200 Subject: [PATCH 4/6] core/rawdb: fix some nitpicks --- core/rawdb/freezer.go | 6 ++-- core/rawdb/freezer_table.go | 49 ++++++++++++++++++-------------- core/rawdb/freezer_table_test.go | 2 +- core/rawdb/table.go | 4 +-- ethdb/database.go | 10 +++---- 5 files changed, 38 insertions(+), 33 deletions(-) diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index d40e54c05702..253de9f7cfc3 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -180,14 +180,14 @@ func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) { return nil, errUnknownTable } -// Ancient retrieves multiple items in sequence, starting from the index 'start'. +// ReadAncients retrieves multiple items in sequence, starting from the index 'start'. // It will return // - at most 'max' items, // - at least 1 item (even if exceeding the maxByteSize), but will otherwise // return as many items as fit into maxByteSize. -func (f *freezer) ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error) { +func (f *freezer) ReadAncients(kind string, start, count, maxBytes uint64) ([][]byte, error) { if table := f.tables[kind]; table != nil { - return table.RetrieveItems(start, max, maxByteSize) + return table.RetrieveItems(start, count, maxBytes) } return nil, errUnknownTable } diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 3e5b98dc9412..f87896f41a4c 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -71,8 +71,8 @@ func (i *indexEntry) marshallBinary() []byte { } // bounds returns the start- and end- offsets, and the file number of where to -// read there data item marked by the given indexEntry, which are assumed to be -// sequential. +// read there data item marked by the two index entries. The two entries are +// assumed to be sequential. func (start *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) { if start.filenum != end.filenum { // If a piece of data 'crosses' a data-file, @@ -575,10 +575,10 @@ func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) { offset int ) for i := from; i <= from+count; i++ { - var startIndex = new(indexEntry) - startIndex.unmarshalBinary(buffer[offset:]) + index := new(indexEntry) + index.unmarshalBinary(buffer[offset:]) offset += indexEntrySize - indices = append(indices, startIndex) + indices = append(indices, index) } if from == 0 { // Special case if we're reading the first item in the freezer. We assume that @@ -606,25 +606,25 @@ func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { // Retrieve looks up the data offset of an item with the given number and retrieves // the raw binary blob from the data file. func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { - if items, err := t.RetrieveItems(item, 1, 0); err != nil { + items, err := t.RetrieveItems(item, 1, 0) + if err != nil { return nil, err - } else { - return items[0], nil } + return items[0], nil } // RetrieveItems returns multiple items in sequence, starting from the index 'start'. // It will return at most 'max' items, but will abort earlier to respect the // 'maxBytes' argument. However, if the 'maxBytes' is smaller than the size of one // item, it _will_ return one element and possibly overflow the maxBytes. -func (t *freezerTable) RetrieveItems(start, max, maxBytes uint64) ([][]byte, error) { +func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, error) { // First we read the 'raw' data, which might be compressed. - diskData, sizes, err := t.retrieveItems(start, max, maxBytes) + diskData, sizes, err := t.retrieveItems(start, count, maxBytes) if err != nil { return nil, err } var ( - output = make([][]byte, 0, max) + output = make([][]byte, 0, count) offset int // offset for reading outputSize int // size of uncompressed data ) @@ -640,11 +640,11 @@ func (t *freezerTable) RetrieveItems(start, max, maxBytes uint64) ([][]byte, err break } if !t.noCompression { - if data, err := snappy.Decode(nil, item); err != nil { + data, err := snappy.Decode(nil, item) + if err != nil { return nil, err - } else { - output = append(output, data) } + output = append(output, data) } else { output = append(output, item) } @@ -653,7 +653,10 @@ func (t *freezerTable) RetrieveItems(start, max, maxBytes uint64) ([][]byte, err return output, nil } -func (t *freezerTable) retrieveItems(start, max, maxBytes uint64) ([]byte, []int, error) { +// retrieveItems reads up to 'count' items from the table. It reads at least +// one item, but otherwise avoids reading more than maxBytes bytes. +// It returns the (potentially compressed) data, and the sizes. +func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []int, error) { t.lock.RLock() defer t.lock.RUnlock() // Ensure the table and the item is accessible @@ -663,33 +666,35 @@ func (t *freezerTable) retrieveItems(start, max, maxBytes uint64) ([]byte, []int itemCount := atomic.LoadUint64(&t.items) // max number // Ensure the start is written, not deleted from the tail, and that the // caller actually wants something - if itemCount <= start || uint64(t.itemOffset) > start || max == 0 { + if itemCount <= start || uint64(t.itemOffset) > start || count == 0 { return nil, nil, errOutOfBounds } - if start+max > itemCount { - max = itemCount - start + if start+count > itemCount { + count = itemCount - start } var ( output = make([]byte, maxBytes) // Buffer to read data into outputSize int // Used size of that buffer ) - // Read the data + // readData is a helper method to read a single data item from disk. readData := func(fileId, start uint32, length int) error { // In case a small limit is used, and the elements are large, may need to // realloc the read-buffer when reading the first (and only) item. if len(output) < length { output = make([]byte, length) } - if dataFile, exist := t.files[fileId]; !exist { + dataFile, exist := t.files[fileId] + if !exist { return fmt.Errorf("missing data file %d", fileId) - } else if _, err := dataFile.ReadAt(output[outputSize:outputSize+length], int64(start)); err != nil { + } + if _, err := dataFile.ReadAt(output[outputSize:outputSize+length], int64(start)); err != nil { return err } outputSize += length return nil } // Read all the indexes in one go - indices, err := t.getIndices(start, max) + indices, err := t.getIndices(start, count) if err != nil { return nil, nil, err } diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index 3ad3cba3cb5e..b04ff309da5d 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -731,7 +731,7 @@ func TestSequentialRead(t *testing.T) { f.Close() } { // Open it, iterate, verify byte limit. The byte limit is less than item - // size, so each lookup should only return one otem + // size, so each lookup should only return one item f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true) if err != nil { t.Fatal(err) diff --git a/core/rawdb/table.go b/core/rawdb/table.go index d6d299ec9719..586451c0644d 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -64,8 +64,8 @@ func (t *table) Ancient(kind string, number uint64) ([]byte, error) { // ReadAncients is a noop passthrough that just forwards the request to the underlying // database. -func (t *table) ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error) { - return t.db.ReadAncients(kind, start, max, maxByteSize) +func (t *table) ReadAncients(kind string, start, count, maxBytes uint64) ([][]byte, error) { + return t.db.ReadAncients(kind, start, count, maxBytes) } // Ancients is a noop passthrough that just forwards the request to the underlying diff --git a/ethdb/database.go b/ethdb/database.go index fd2a71420eac..bdc09d5e9877 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -76,12 +76,12 @@ type AncientReader interface { // Ancient retrieves an ancient binary blob from the append-only immutable files. Ancient(kind string, number uint64) ([]byte, error) - // Ancient retrieves multiple items in sequence, starting from the index 'start'. + // ReadAncients retrieves multiple items in sequence, starting from the index 'start'. // It will return - // - at most 'max' items, - // - at least 1 item (even if exceeding the maxByteSize), but will otherwise - // return as many items as fit into maxByteSize. - ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error) + // - at most 'count' items, + // - at least 1 item (even if exceeding the maxBytes), but will otherwise + // return as many items as fit into maxBytes. + ReadAncients(kind string, start, count, maxBytes uint64) ([][]byte, error) // Ancients returns the ancient item numbers in the ancient store. Ancients() (uint64, error) From 821c696ef1858284921907b65f92c0d25cb5f8e8 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Sat, 7 Aug 2021 20:15:05 +0200 Subject: [PATCH 5/6] core/rawdb: fix flaw with deferred reads not being performed --- core/rawdb/freezer_table.go | 9 +++++- core/rawdb/freezer_table_test.go | 55 ++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index f87896f41a4c..79ddd44dcaf5 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -712,8 +712,8 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i size := int(offset2 - offset1) // Crossing a file boundary? if secondIndex.filenum != firstIndex.filenum { + // If we have unread data in the first file, we need to do that read now. if unreadSize > 0 { - // If we have unread data in the first file, we need to do that read now. if err := readData(firstIndex.filenum, readStart, unreadSize); err != nil { return nil, nil, err } @@ -722,6 +722,13 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i readStart = 0 } if i > 0 && uint64(totalSize+size) > maxBytes { + // About to break out due to byte limit being exceeded. We don't + // read this last item, but we need to do the deferred reads now. + if unreadSize > 0 { + if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil { + return nil, nil, err + } + } break } // Defer the read for later diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index b04ff309da5d..e8a8b5c46309 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -752,3 +752,58 @@ func TestSequentialRead(t *testing.T) { f.Close() } } + +// TestSequentialReadByteLimit does some more advanced tests on batch reads. +// These tests check that when the byte limit hits, we correctly abort in time, +// but also properly do all the deferred reads for the previous data, regardless +// of whether the data crosses a file boundary or not. +func TestSequentialReadByteLimit(t *testing.T) { + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("batchread-2-%d", rand.Uint64()) + { // Fill table + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 100, true) + if err != nil { + t.Fatal(err) + } + // Write 10 bytes 30 times, + // Splitting it at every 100 bytes (10 items) + for x := 0; x < 30; x++ { + data := getChunk(10, x) + f.Append(uint64(x), data) + } + f.Close() + } + for i, tc := range []struct { + items uint64 + limit uint64 + want int + }{ + {9, 89, 8}, + {10, 99, 9}, + {11, 109, 10}, + {100, 89, 8}, + {100, 99, 9}, + {100, 109, 10}, + } { + { + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 100, true) + if err != nil { + t.Fatal(err) + } + items, err := f.RetrieveItems(0, tc.items, tc.limit) + if err != nil { + t.Fatal(err) + } + if have, want := len(items), tc.want; have != want { + t.Fatalf("test %d: want %d items, have %d ", i, want, have) + } + for ii, have := range items { + want := getChunk(10, ii) + if !bytes.Equal(want, have) { + t.Fatalf("test %d: data corruption item %d: have\n%x\n, want \n%x\n", i, ii, have, want) + } + } + f.Close() + } + } +} From 4e5b6f9f81bb5bd1354d50137e5874349a1eda02 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Sat, 7 Aug 2021 20:23:43 +0200 Subject: [PATCH 6/6] core/rawdb: better documentation --- core/rawdb/freezer_table.go | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 79ddd44dcaf5..9d052f7cd8d3 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -562,6 +562,9 @@ func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool // getIndices returns the index entries for the given from-item, covering 'count' items. // N.B: The actual number of returned indices for N items will always be N+1 (unless an // error is returned). +// OBS: This method assumes that the caller has already verified (and/or trimmed) the range +// so that the items are within bounds. If this method is used to read out of bounds, +// it will return error. func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) { // Apply the table-offset from = from - uint64(t.itemOffset) @@ -592,17 +595,6 @@ func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) { return indices, nil } -// getBounds returns the indexes for the item -// returns start, end, filenumber and error -func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { - indices, err := t.getIndices(item, 1) - if err != nil { - return 0, 0, 0, err - } - start, end, fileNum := indices[0].bounds(indices[1]) - return start, end, fileNum, nil -} - // Retrieve looks up the data offset of an item with the given number and retrieves // the raw binary blob from the data file. func (t *freezerTable) Retrieve(item uint64) ([]byte, error) {