From 357057a158114ec524c709f10d221d514e330de1 Mon Sep 17 00:00:00 2001 From: Tangui-Bitfly <181825613+Tangui-Bitfly@users.noreply.github.com> Date: Tue, 29 Oct 2024 11:08:36 +0100 Subject: [PATCH] (BEDS-536) Use raw db for resync (#2972) * rpc/erigon: use raw bigtable * rpc/erigon: use cache raw db * rpc/erigon: correct path for geth traces * cmd/reindex: improve performance + fix raw store cache * rpc/erigon: fix sender address * db2/store: add remote server + client --- cmd/eth1indexer/main.go | 2 +- cmd/misc/main.go | 144 ++++++++---- cmd/store/main.go | 38 ++++ db/bigtable_eth1.go | 55 ++++- db/db_test.go | 2 +- db2/cache.go | 119 ++++++++++ db2/client.go | 255 +++++++++++++++++++++ db2/client_test.go | 307 ++++++++++++++++++++++++++ db2/compress.go | 48 ++++ db2/hexutil.go | 56 +++++ db2/raw.go | 173 +++++++++++++++ db2/raw_test.go | 438 +++++++++++++++++++++++++++++++++++++ db2/store/bigtable.go | 337 ++++++++++++++++++++++++++++ db2/store/bigtable_test.go | 171 +++++++++++++++ db2/store/remote.go | 171 +++++++++++++++ db2/store/store.go | 18 ++ db2/storetest/bigtable.go | 38 ++++ db2/tables.go | 23 ++ go.mod | 9 +- go.sum | 13 +- rpc/erigon.go | 428 ++++++++++++++++++++---------------- types/config.go | 6 +- 22 files changed, 2600 insertions(+), 251 deletions(-) create mode 100644 cmd/store/main.go create mode 100644 db2/cache.go create mode 100644 db2/client.go create mode 100644 db2/client_test.go create mode 100644 db2/compress.go create mode 100644 db2/hexutil.go create mode 100644 db2/raw.go create mode 100644 db2/raw_test.go create mode 100644 db2/store/bigtable.go create mode 100644 db2/store/bigtable_test.go create mode 100644 db2/store/remote.go create mode 100644 db2/store/store.go create mode 100644 db2/storetest/bigtable.go create mode 100644 db2/tables.go diff --git a/cmd/eth1indexer/main.go b/cmd/eth1indexer/main.go index 65ff085fb7..cca2e4936e 100644 --- a/cmd/eth1indexer/main.go +++ b/cmd/eth1indexer/main.go @@ -187,7 +187,7 @@ func main() { return } - transforms := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0) + transforms := make([]db.TransformFunc, 0) transforms = append(transforms, bt.TransformBlock, bt.TransformTx, diff --git a/cmd/misc/main.go b/cmd/misc/main.go index 6d3d955d88..43e67b7552 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -318,7 +318,7 @@ func main() { case "index-missing-blocks": indexMissingBlocks(opts.StartBlock, opts.EndBlock, bt, erigonClient) case "re-index-blocks": - reIndexBlocks(opts.StartBlock, opts.EndBlock, bt, erigonClient, opts.Transformers) + reIndexBlocks(opts.StartBlock, opts.EndBlock, bt, erigonClient, opts.Transformers, opts.BatchSize, opts.DataConcurrency) case "migrate-last-attestation-slot-bigtable": migrateLastAttestationSlotToBigtable() case "migrate-app-purchases": @@ -1619,7 +1619,15 @@ func indexMissingBlocks(start uint64, end uint64, bt *db.Bigtable, client *rpc.E // // Both [start] and [end] are inclusive // Pass math.MaxInt64 as [end] to export from [start] to the last block in the blocks table -func reIndexBlocks(start uint64, end uint64, bt *db.Bigtable, client *rpc.ErigonClient, transformerFlag string) { +func reIndexBlocks(start uint64, end uint64, bt *db.Bigtable, client *rpc.ErigonClient, transformerFlag string, batchSize uint64, concurrency uint64) { + if start > 0 && end < start { + utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", end, start), 0) + return + } + if concurrency == 0 { + utils.LogError(nil, "concurrency must be greater than 0", 0) + return + } if end == math.MaxInt64 { lastBlockFromBlocksTable, err := bt.GetLastBlockInBlocksTable() if err != nil { @@ -1628,54 +1636,81 @@ func reIndexBlocks(start uint64, end uint64, bt *db.Bigtable, client *rpc.Erigon } end = uint64(lastBlockFromBlocksTable) } - - errFields := map[string]interface{}{ - "start": start, - "end": end, + transformers, importENSChanges, err := getTransformers(transformerFlag, bt) + if err != nil { + utils.LogError(nil, err, 0) + return } - - batchSize := uint64(10000) - for from := start; from <= end; from += batchSize { - targetCount := batchSize - if from+targetCount >= end { - targetCount = end - from + 1 + if importENSChanges { + if err := bt.ImportEnsUpdates(client.GetNativeClient(), math.MaxInt64); err != nil { + utils.LogError(err, "error importing ens from events", 0) + return } - to := from + targetCount - 1 + } - errFields["from"] = from - errFields["to"] = to - errFields["targetCount"] = targetCount + readGroup := errgroup.Group{} + readGroup.SetLimit(int(concurrency)) - for block := from; block <= to; block++ { - bc, _, err := client.GetBlock(int64(block), "parity/geth") - if err != nil { - utils.LogError(err, fmt.Sprintf("error getting block %v from the node", block), 0, errFields) - return - } - if err := bt.SaveBlock(bc); err != nil { - utils.LogError(err, fmt.Sprintf("error saving block: %v ", block), 0, errFields) - return + writeGroup := errgroup.Group{} + writeGroup.SetLimit(int(concurrency*concurrency) + 1) + + cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit + quit := make(chan struct{}) + + sink := make(chan *types.Eth1Block) + writeGroup.Go(func() error { + for { + select { + case block, ok := <-sink: + if !ok { + return nil + } + writeGroup.Go(func() error { + if err := bt.SaveBlock(block); err != nil { + return fmt.Errorf("error saving block %v: %w", block.Number, err) + } + err := bt.IndexBlocksWithTransformers([]*types.Eth1Block{block}, transformers, cache) + if err != nil { + return fmt.Errorf("error indexing from bigtable: %w", err) + } + logrus.Infof("%d indexed", block.Number) + return nil + }) + case <-quit: + return nil } } - indexOldEth1Blocks(from, to, batchSize, 1, transformerFlag, bt, client) - } -} + }) -func indexOldEth1Blocks(startBlock uint64, endBlock uint64, batchSize uint64, concurrency uint64, transformerFlag string, bt *db.Bigtable, client *rpc.ErigonClient) { - if endBlock > 0 && endBlock < startBlock { - utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", endBlock, startBlock), 0) - return + for i := start; i <= end; i = i + batchSize { + height := int64(i) + readGroup.Go(func() error { + heightEnd := height + int64(batchSize) - 1 + if heightEnd > int64(end) { + heightEnd = int64(end) + } + blocks, err := client.GetBlocks(height, heightEnd, "geth") + if err != nil { + return fmt.Errorf("error getting block %v from the node: %w", height, err) + } + for _, block := range blocks { + sink <- block + } + return nil + }) } - if concurrency == 0 { - utils.LogError(nil, "concurrency must be greater than 0", 0) - return + if err := readGroup.Wait(); err != nil { + panic(err) } - if bt == nil { - utils.LogError(nil, "no bigtable provided", 0) - return + quit <- struct{}{} + close(sink) + if err := writeGroup.Wait(); err != nil { + panic(err) } +} - transforms := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0) +func getTransformers(transformerFlag string, bt *db.Bigtable) ([]db.TransformFunc, bool, error) { + transforms := make([]db.TransformFunc, 0) logrus.Infof("transformerFlag: %v", transformerFlag) transformerList := strings.Split(transformerFlag, ",") @@ -1683,13 +1718,11 @@ func indexOldEth1Blocks(startBlock uint64, endBlock uint64, batchSize uint64, co transformerList = []string{"TransformBlock", "TransformTx", "TransformBlobTx", "TransformItx", "TransformERC20", "TransformERC721", "TransformERC1155", "TransformWithdrawals", "TransformUncle", "TransformEnsNameRegistered", "TransformContract"} } else if len(transformerList) == 0 { utils.LogError(nil, "no transformer functions provided", 0) - return + return nil, false, fmt.Errorf("no transformer functions provided") } logrus.Infof("transformers: %v", transformerList) + importENSChanges := false - /** - * Add additional transformers you want to sync to this switch case - **/ for _, t := range transformerList { switch t { case "TransformBlock": @@ -1716,10 +1749,31 @@ func indexOldEth1Blocks(startBlock uint64, endBlock uint64, batchSize uint64, co case "TransformContract": transforms = append(transforms, bt.TransformContract) default: - utils.LogError(nil, "Invalid transformer flag %v", 0) - return + return nil, false, fmt.Errorf("invalid transformer flag %v", t) } } + return transforms, importENSChanges, nil +} + +func indexOldEth1Blocks(startBlock uint64, endBlock uint64, batchSize uint64, concurrency uint64, transformerFlag string, bt *db.Bigtable, client *rpc.ErigonClient) { + if endBlock > 0 && endBlock < startBlock { + utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", endBlock, startBlock), 0) + return + } + if concurrency == 0 { + utils.LogError(nil, "concurrency must be greater than 0", 0) + return + } + if bt == nil { + utils.LogError(nil, "no bigtable provided", 0) + return + } + + transforms, importENSChanges, err := getTransformers(transformerFlag, bt) + if err != nil { + utils.LogError(nil, err, 0) + return + } cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit diff --git a/cmd/store/main.go b/cmd/store/main.go new file mode 100644 index 0000000000..6ab7f98c8a --- /dev/null +++ b/cmd/store/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "errors" + "flag" + "net/http" + + "github.com/sirupsen/logrus" + + "github.com/gobitfly/eth2-beaconchain-explorer/db2" + "github.com/gobitfly/eth2-beaconchain-explorer/db2/store" + "github.com/gobitfly/eth2-beaconchain-explorer/types" + "github.com/gobitfly/eth2-beaconchain-explorer/utils" +) + +func main() { + configPath := flag.String("config", "config/default.config.yml", "Path to the config file") + flag.Parse() + + cfg := &types.Config{} + err := utils.ReadConfig(cfg, *configPath) + if err != nil { + panic(err) + } + + bt, err := store.NewBigTable(cfg.RawBigtable.Bigtable.Project, cfg.RawBigtable.Bigtable.Instance, nil) + if err != nil { + panic(err) + } + remote := store.NewRemoteStore(store.Wrap(bt, db2.BlocksRawTable, "")) + go func() { + logrus.Info("starting remote raw store on port 8087") + if err := http.ListenAndServe("0.0.0.0:8087", remote.Routes()); err != nil && !errors.Is(err, http.ErrServerClosed) { + panic(err) + } + }() + utils.WaitForCtrlC() +} diff --git a/db/bigtable_eth1.go b/db/bigtable_eth1.go index 815b75a462..fe2a437e92 100644 --- a/db/bigtable_eth1.go +++ b/db/bigtable_eth1.go @@ -659,7 +659,7 @@ func TimestampToBigtableTimeDesc(ts time.Time) string { return fmt.Sprintf("%04d%02d%02d%02d%02d%02d", 9999-ts.Year(), 12-ts.Month(), 31-ts.Day(), 23-ts.Hour(), 59-ts.Minute(), 59-ts.Second()) } -func (bigtable *Bigtable) IndexEventsWithTransformers(start, end int64, transforms []func(blk *types.Eth1Block, cache *freecache.Cache) (bulkData *types.BulkMutations, bulkMetadataUpdates *types.BulkMutations, err error), concurrency int64, cache *freecache.Cache) error { +func (bigtable *Bigtable) IndexEventsWithTransformers(start, end int64, transforms []TransformFunc, concurrency int64, cache *freecache.Cache) error { g := new(errgroup.Group) g.SetLimit(int(concurrency)) @@ -766,6 +766,59 @@ func (bigtable *Bigtable) IndexEventsWithTransformers(start, end int64, transfor return nil } +type TransformFunc func(blk *types.Eth1Block, cache *freecache.Cache) (bulkData *types.BulkMutations, bulkMetadataUpdates *types.BulkMutations, err error) + +func (bigtable *Bigtable) blockKeysMutation(blockNumber uint64, blockHash []byte, keys string) (string, *gcp_bigtable.Mutation) { + mut := gcp_bigtable.NewMutation() + mut.Set(METADATA_UPDATES_FAMILY_BLOCKS, "keys", gcp_bigtable.Now(), []byte(keys)) + + key := fmt.Sprintf("%s:BLOCK:%s:%x", bigtable.chainId, reversedPaddedBlockNumber(blockNumber), blockHash) + return key, mut +} + +func (bigtable *Bigtable) IndexBlocksWithTransformers(blocks []*types.Eth1Block, transforms []TransformFunc, cache *freecache.Cache) error { + bulkMutsData := types.BulkMutations{} + bulkMutsMetadataUpdate := types.BulkMutations{} + for _, block := range blocks { + for _, transform := range transforms { + mutsData, mutsMetadataUpdate, err := transform(block, cache) + if err != nil { + logrus.WithError(err).Errorf("error transforming block [%v]", block.Number) + } + bulkMutsData.Keys = append(bulkMutsData.Keys, mutsData.Keys...) + bulkMutsData.Muts = append(bulkMutsData.Muts, mutsData.Muts...) + + if mutsMetadataUpdate != nil { + bulkMutsMetadataUpdate.Keys = append(bulkMutsMetadataUpdate.Keys, mutsMetadataUpdate.Keys...) + bulkMutsMetadataUpdate.Muts = append(bulkMutsMetadataUpdate.Muts, mutsMetadataUpdate.Muts...) + } + + if len(mutsData.Keys) > 0 { + metaKeys := strings.Join(bulkMutsData.Keys, ",") // save block keys in order to be able to handle chain reorgs + key, mut := bigtable.blockKeysMutation(block.Number, block.Hash, metaKeys) + bulkMutsMetadataUpdate.Keys = append(bulkMutsMetadataUpdate.Keys, key) + bulkMutsMetadataUpdate.Muts = append(bulkMutsMetadataUpdate.Muts, mut) + } + } + } + + if len(bulkMutsData.Keys) > 0 { + err := bigtable.WriteBulk(&bulkMutsData, bigtable.tableData, DEFAULT_BATCH_INSERTS) + if err != nil { + return fmt.Errorf("error writing blocks [%v-%v] to bigtable data table: %w", blocks[0].Number, blocks[len(blocks)-1].Number, err) + } + } + + if len(bulkMutsMetadataUpdate.Keys) > 0 { + err := bigtable.WriteBulk(&bulkMutsMetadataUpdate, bigtable.tableMetadataUpdates, DEFAULT_BATCH_INSERTS) + if err != nil { + return fmt.Errorf("error writing blocks [%v-%v] to bigtable metadata updates table: %w", blocks[0].Number, blocks[len(blocks)-1].Number, err) + } + } + + return nil +} + // TransformBlock extracts blocks from bigtable more specifically from the table blocks. // It transforms the block and strips any information that is not necessary for a blocks view // It writes blocks to table data: diff --git a/db/db_test.go b/db/db_test.go index b16863ef42..e9acfb87d2 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -92,7 +92,7 @@ func TestTxRevertTransformer(t *testing.T) { if err := bt.SaveBlock(block); err != nil { t.Fatal(err) } - transformers := []func(blk *types.Eth1Block, cache *freecache.Cache) (bulkData *types.BulkMutations, bulkMetadataUpdates *types.BulkMutations, err error){ + transformers := []TransformFunc{ bt.TransformItx, bt.TransformTx, } diff --git a/db2/cache.go b/db2/cache.go new file mode 100644 index 0000000000..c086b4eb51 --- /dev/null +++ b/db2/cache.go @@ -0,0 +1,119 @@ +package db2 + +import ( + "encoding/json" + "sync" + "time" +) + +const ( + oneBlockTTL = 1 * time.Second + blocksTTL = 30 * time.Second // default ttl, if read it will be deleted sooner +) + +type MinimalBlock struct { + Result struct { + Hash string `json:"hash"` + } `json:"result"` +} + +type CachedRawStore struct { + db RawStoreReader + // sync.Map with manual delete have better perf than freecache because we can handle this way a ttl < 1s + cache sync.Map + + locks map[string]*sync.RWMutex + mapLock sync.Mutex // to make the map safe concurrently +} + +func WithCache(reader RawStoreReader) *CachedRawStore { + return &CachedRawStore{ + db: reader, + locks: make(map[string]*sync.RWMutex), + } +} + +func (c *CachedRawStore) lockBy(key string) func() { + c.mapLock.Lock() + defer c.mapLock.Unlock() + + lock, found := c.locks[key] + if !found { + lock = &sync.RWMutex{} + c.locks[key] = lock + lock.Lock() + return lock.Unlock + } + lock.RLock() + return lock.RUnlock +} + +func (c *CachedRawStore) ReadBlockByNumber(chainID uint64, number int64) (*FullBlockRawData, error) { + key := blockKey(chainID, number) + + unlock := c.lockBy(key) + defer unlock() + + v, ok := c.cache.Load(key) + if ok { + // once read ensure to delete it from the cache + go c.unCacheBlockAfter(key, "", oneBlockTTL) + return v.(*FullBlockRawData), nil + } + // TODO make warning not found in cache + block, err := c.db.ReadBlockByNumber(chainID, number) + if block != nil { + c.cacheBlock(block, oneBlockTTL) + } + return block, err +} + +func (c *CachedRawStore) cacheBlock(block *FullBlockRawData, ttl time.Duration) { + key := blockKey(block.ChainID, block.BlockNumber) + c.cache.Store(key, block) + + var mini MinimalBlock + if len(block.Uncles) != 0 { + // retrieve the block hash for caching but only if the block has uncle(s) + _ = json.Unmarshal(block.Block, &mini) + c.cache.Store(mini.Result.Hash, block.BlockNumber) + } + + go c.unCacheBlockAfter(key, mini.Result.Hash, ttl) +} + +func (c *CachedRawStore) unCacheBlockAfter(key, hash string, ttl time.Duration) { + time.Sleep(ttl) + c.cache.Delete(key) + c.mapLock.Lock() + if hash != "" { + c.cache.Delete(hash) + } + defer c.mapLock.Unlock() + delete(c.locks, key) +} + +func (c *CachedRawStore) ReadBlockByHash(chainID uint64, hash string) (*FullBlockRawData, error) { + v, ok := c.cache.Load(hash) + if !ok { + return c.db.ReadBlockByHash(chainID, hash) + } + + v, ok = c.cache.Load(blockKey(chainID, v.(int64))) + if !ok { + return c.db.ReadBlockByHash(chainID, hash) + } + + return v.(*FullBlockRawData), nil +} + +func (c *CachedRawStore) ReadBlocksByNumber(chainID uint64, start, end int64) ([]*FullBlockRawData, error) { + blocks, err := c.db.ReadBlocksByNumber(chainID, start, end) + if err != nil { + return nil, err + } + for _, block := range blocks { + c.cacheBlock(block, blocksTTL) + } + return blocks, nil +} diff --git a/db2/client.go b/db2/client.go new file mode 100644 index 0000000000..4985e0f5b1 --- /dev/null +++ b/db2/client.go @@ -0,0 +1,255 @@ +package db2 + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "math/big" + "net/http" + + "github.com/ethereum/go-ethereum/common/hexutil" + + "github.com/gobitfly/eth2-beaconchain-explorer/db2/store" +) + +var ErrNotFoundInCache = fmt.Errorf("cannot find hash in cache") +var ErrMethodNotSupported = fmt.Errorf("method not supported") + +type RawStoreReader interface { + ReadBlockByNumber(chainID uint64, number int64) (*FullBlockRawData, error) + ReadBlockByHash(chainID uint64, hash string) (*FullBlockRawData, error) + ReadBlocksByNumber(chainID uint64, start, end int64) ([]*FullBlockRawData, error) +} + +type WithFallback struct { + roundTripper http.RoundTripper + fallback http.RoundTripper +} + +func NewWithFallback(roundTripper, fallback http.RoundTripper) *WithFallback { + return &WithFallback{ + roundTripper: roundTripper, + fallback: fallback, + } +} + +func (r WithFallback) RoundTrip(request *http.Request) (*http.Response, error) { + resp, err := r.roundTripper.RoundTrip(request) + if err == nil { + // no fallback needed + return resp, nil + } + + var e1 *json.SyntaxError + if !errors.As(err, &e1) && + !errors.Is(err, ErrNotFoundInCache) && + !errors.Is(err, ErrMethodNotSupported) && + !errors.Is(err, store.ErrNotFound) { + return nil, err + } + + return r.fallback.RoundTrip(request) +} + +type BigTableEthRaw struct { + db RawStoreReader + chainID uint64 +} + +func NewBigTableEthRaw(db RawStoreReader, chainID uint64) *BigTableEthRaw { + return &BigTableEthRaw{ + db: db, + chainID: chainID, + } +} + +func (r *BigTableEthRaw) RoundTrip(request *http.Request) (*http.Response, error) { + body, err := io.ReadAll(request.Body) + if err != nil { + return nil, err + } + defer func() { + request.Body = io.NopCloser(bytes.NewBuffer(body)) + }() + var messages []*jsonrpcMessage + var isSingle bool + if err := json.NewDecoder(bytes.NewReader(body)).Decode(&messages); err != nil { + isSingle = true + message := new(jsonrpcMessage) + if err := json.NewDecoder(bytes.NewReader(body)).Decode(message); err != nil { + return nil, err + } + messages = append(messages, message) + } + var resps []*jsonrpcMessage + for _, message := range messages { + resp, err := r.handle(request.Context(), message) + if err != nil { + return nil, err + } + resps = append(resps, resp) + } + + respBody, _ := makeBody(isSingle, resps) + return &http.Response{ + Body: respBody, + StatusCode: http.StatusOK, + }, nil +} + +func (r *BigTableEthRaw) handle(ctx context.Context, message *jsonrpcMessage) (*jsonrpcMessage, error) { + var args []interface{} + err := json.Unmarshal(message.Params, &args) + if err != nil { + return nil, err + } + + var respBody []byte + switch message.Method { + case "eth_getBlockByNumber": + // we decode only big.Int maybe we should also handle "latest" + block, err := hexutil.DecodeBig(args[0].(string)) + if err != nil { + return nil, err + } + + respBody, err = r.BlockByNumber(ctx, block) + if err != nil { + return nil, err + } + + case "debug_traceBlockByNumber": + block, err := hexutil.DecodeBig(args[0].(string)) + if err != nil { + return nil, err + } + + respBody, err = r.TraceBlockByNumber(ctx, block) + if err != nil { + return nil, err + } + + case "eth_getBlockReceipts": + block, err := hexutil.DecodeBig(args[0].(string)) + if err != nil { + return nil, err + } + + respBody, err = r.BlockReceipts(ctx, block) + if err != nil { + return nil, err + } + + case "eth_getUncleByBlockHashAndIndex": + index, err := hexutil.DecodeBig(args[1].(string)) + if err != nil { + return nil, err + } + respBody, err = r.UncleByBlockHashAndIndex(ctx, args[0].(string), index.Int64()) + if err != nil { + return nil, err + } + default: + return nil, ErrMethodNotSupported + } + var resp jsonrpcMessage + _ = json.Unmarshal(respBody, &resp) + if len(respBody) == 0 { + resp.Version = message.Version + resp.Result = []byte("[]") + } + resp.ID = message.ID + return &resp, nil +} + +func makeBody(isSingle bool, messages []*jsonrpcMessage) (io.ReadCloser, error) { + var b []byte + var err error + if isSingle { + b, err = json.Marshal(messages[0]) + } else { + b, err = json.Marshal(messages) + } + if err != nil { + return nil, err + } + return io.NopCloser(bytes.NewReader(b)), nil +} + +func (r *BigTableEthRaw) BlockByNumber(ctx context.Context, number *big.Int) ([]byte, error) { + block, err := r.db.ReadBlockByNumber(r.chainID, number.Int64()) + if err != nil { + return nil, err + } + return block.Block, nil +} + +func (r *BigTableEthRaw) BlockReceipts(ctx context.Context, number *big.Int) ([]byte, error) { + block, err := r.db.ReadBlockByNumber(r.chainID, number.Int64()) + if err != nil { + return nil, err + } + return block.Receipts, nil +} + +func (r *BigTableEthRaw) TraceBlockByNumber(ctx context.Context, number *big.Int) ([]byte, error) { + block, err := r.db.ReadBlockByNumber(r.chainID, number.Int64()) + if err != nil { + return nil, err + } + return block.Traces, nil +} + +func (r *BigTableEthRaw) UncleByBlockNumberAndIndex(ctx context.Context, number *big.Int, index int64) ([]byte, error) { + block, err := r.db.ReadBlockByNumber(r.chainID, number.Int64()) + if err != nil { + return nil, err + } + + var uncles []*jsonrpcMessage + if err := json.Unmarshal(block.Uncles, &uncles); err != nil { + var uncle *jsonrpcMessage + if err := json.Unmarshal(block.Uncles, &uncle); err != nil { + return nil, fmt.Errorf("cannot unmarshal uncle: %w", err) + } + return json.Marshal(uncle) + } + return json.Marshal(uncles[index]) +} + +func (r *BigTableEthRaw) UncleByBlockHashAndIndex(ctx context.Context, hash string, index int64) ([]byte, error) { + block, err := r.db.ReadBlockByHash(r.chainID, hash) + if err != nil { + return nil, err + } + + var uncles []*jsonrpcMessage + if err := json.Unmarshal(block.Uncles, &uncles); err != nil { + var uncle *jsonrpcMessage + if err := json.Unmarshal(block.Uncles, &uncle); err != nil { + return nil, fmt.Errorf("cannot unmarshal uncle: %w", err) + } + return json.Marshal(uncle) + } + return json.Marshal(uncles[index]) +} + +// A value of this type can a JSON-RPC request, notification, successful response or +// error response. Which one it is depends on the fields. +type jsonrpcMessage struct { + Version string `json:"jsonrpc,omitempty"` + ID json.RawMessage `json:"id,omitempty"` + Method string `json:"method,omitempty"` + Params json.RawMessage `json:"params,omitempty"` + Error *jsonError `json:"error,omitempty"` + Result json.RawMessage `json:"result,omitempty"` +} + +type jsonError struct { + Code int `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` +} diff --git a/db2/client_test.go b/db2/client_test.go new file mode 100644 index 0000000000..4173a96a02 --- /dev/null +++ b/db2/client_test.go @@ -0,0 +1,307 @@ +package db2 + +import ( + "context" + "math/big" + "net/http" + "os" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/rpc" + + "github.com/gobitfly/eth2-beaconchain-explorer/db2/store" + "github.com/gobitfly/eth2-beaconchain-explorer/db2/storetest" +) + +const ( + chainID uint64 = 1 +) + +func TestBigTableClientRealCondition(t *testing.T) { + project := os.Getenv("BIGTABLE_PROJECT") + instance := os.Getenv("BIGTABLE_INSTANCE") + if project == "" || instance == "" { + t.Skip("skipping test, set BIGTABLE_PROJECT and BIGTABLE_INSTANCE") + } + + tests := []struct { + name string + block int64 + }{ + { + name: "test block", + block: 6008149, + }, + { + name: "test block 2", + block: 141, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bg, err := store.NewBigTable(project, instance, nil) + if err != nil { + t.Fatal(err) + } + + rawStore := NewRawStore(store.Wrap(bg, BlocksRawTable, "")) + rpcClient, err := rpc.DialOptions(context.Background(), "http://foo.bar", rpc.WithHTTPClient(&http.Client{ + Transport: NewBigTableEthRaw(rawStore, chainID), + })) + if err != nil { + t.Fatal(err) + } + ethClient := ethclient.NewClient(rpcClient) + + block, err := ethClient.BlockByNumber(context.Background(), big.NewInt(tt.block)) + if err != nil { + t.Fatalf("BlockByNumber() error = %v", err) + } + if got, want := block.Number().Int64(), tt.block; got != want { + t.Errorf("got %v, want %v", got, want) + } + + receipts, err := ethClient.BlockReceipts(context.Background(), rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(tt.block))) + if err != nil { + t.Fatalf("BlockReceipts() error = %v", err) + } + if len(block.Transactions()) != 0 && len(receipts) == 0 { + t.Errorf("receipts should not be empty") + } + + var traces []GethTraceCallResultWrapper + if err := rpcClient.Call(&traces, "debug_traceBlockByNumber", hexutil.EncodeBig(block.Number()), gethTracerArg); err != nil { + t.Fatalf("debug_traceBlockByNumber() error = %v", err) + } + if len(block.Transactions()) != 0 && len(traces) == 0 { + t.Errorf("traces should not be empty") + } + }) + } +} + +func benchmarkBlockRetrieval(b *testing.B, ethClient *ethclient.Client, rpcClient *rpc.Client) { + b.ResetTimer() + for j := 0; j < b.N; j++ { + blockTestNumber := int64(20978000 + b.N) + _, err := ethClient.BlockByNumber(context.Background(), big.NewInt(blockTestNumber)) + if err != nil { + b.Fatalf("BlockByNumber() error = %v", err) + } + + if _, err := ethClient.BlockReceipts(context.Background(), rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(blockTestNumber))); err != nil { + b.Fatalf("BlockReceipts() error = %v", err) + } + + var traces []GethTraceCallResultWrapper + if err := rpcClient.Call(&traces, "debug_traceBlockByNumber", hexutil.EncodeBig(big.NewInt(blockTestNumber)), gethTracerArg); err != nil { + b.Fatalf("debug_traceBlockByNumber() error = %v", err) + } + } +} + +func BenchmarkErigonNode(b *testing.B) { + node := os.Getenv("ETH1_ERIGON_ENDPOINT") + if node == "" { + b.Skip("skipping test, please set ETH1_ERIGON_ENDPOINT") + } + + rpcClient, err := rpc.DialOptions(context.Background(), node) + if err != nil { + b.Fatal(err) + } + + benchmarkBlockRetrieval(b, ethclient.NewClient(rpcClient), rpcClient) +} + +func BenchmarkRawBigTable(b *testing.B) { + project := os.Getenv("BIGTABLE_PROJECT") + instance := os.Getenv("BIGTABLE_INSTANCE") + if project == "" || instance == "" { + b.Skip("skipping test, set BIGTABLE_PROJECT and BIGTABLE_INSTANCE") + } + + bt, err := store.NewBigTable(project, instance, nil) + if err != nil { + b.Fatal(err) + } + + rawStore := WithCache(NewRawStore(store.Wrap(bt, BlocksRawTable, ""))) + rpcClient, err := rpc.DialOptions(context.Background(), "http://foo.bar", rpc.WithHTTPClient(&http.Client{ + Transport: NewBigTableEthRaw(rawStore, chainID), + })) + if err != nil { + b.Fatal(err) + } + + benchmarkBlockRetrieval(b, ethclient.NewClient(rpcClient), rpcClient) +} + +func BenchmarkAll(b *testing.B) { + b.Run("BenchmarkErigonNode", func(b *testing.B) { + BenchmarkErigonNode(b) + }) + b.Run("BenchmarkRawBigTable", func(b *testing.B) { + BenchmarkRawBigTable(b) + }) +} + +func TestBigTableClient(t *testing.T) { + tests := []struct { + name string + block FullBlockRawData + }{ + { + name: "test block", + block: testFullBlock, + }, + { + name: "two uncles", + block: testTwoUnclesFullBlock, + }, + } + + client, admin := storetest.NewBigTable(t) + bg, err := store.NewBigTableWithClient(context.Background(), client, admin, raw) + if err != nil { + t.Fatal(err) + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rawStore := NewRawStore(store.Wrap(bg, BlocksRawTable, "")) + if err := rawStore.AddBlocks([]FullBlockRawData{tt.block}); err != nil { + t.Fatal(err) + } + + rpcClient, err := rpc.DialOptions(context.Background(), "http://foo.bar", rpc.WithHTTPClient(&http.Client{ + Transport: NewBigTableEthRaw(rawStore, tt.block.ChainID), + })) + if err != nil { + t.Fatal(err) + } + ethClient := ethclient.NewClient(rpcClient) + + block, err := ethClient.BlockByNumber(context.Background(), big.NewInt(tt.block.BlockNumber)) + if err != nil { + t.Fatalf("BlockByNumber() error = %v", err) + } + if got, want := block.Number().Int64(), tt.block.BlockNumber; got != want { + t.Errorf("got %v, want %v", got, want) + } + + receipts, err := ethClient.BlockReceipts(context.Background(), rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(tt.block.BlockNumber))) + if err != nil { + t.Fatalf("BlockReceipts() error = %v", err) + } + if len(block.Transactions()) != 0 && len(receipts) == 0 { + t.Errorf("receipts should not be empty") + } + + var traces []GethTraceCallResultWrapper + if err := rpcClient.Call(&traces, "debug_traceBlockByNumber", hexutil.EncodeBig(block.Number()), gethTracerArg); err != nil { + t.Fatalf("debug_traceBlockByNumber() error = %v", err) + } + if len(block.Transactions()) != 0 && len(traces) == 0 { + t.Errorf("traces should not be empty") + } + }) + } +} + +func TestBigTableClientWithFallback(t *testing.T) { + node := os.Getenv("ETH1_ERIGON_ENDPOINT") + if node == "" { + t.Skip("skipping test, set ETH1_ERIGON_ENDPOINT") + } + + tests := []struct { + name string + block FullBlockRawData + }{ + { + name: "test block", + block: testFullBlock, + }, + } + + client, admin := storetest.NewBigTable(t) + bg, err := store.NewBigTableWithClient(context.Background(), client, admin, raw) + if err != nil { + t.Fatal(err) + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rawStore := NewRawStore(store.Wrap(bg, BlocksRawTable, "")) + + rpcClient, err := rpc.DialOptions(context.Background(), node, rpc.WithHTTPClient(&http.Client{ + Transport: NewWithFallback(NewBigTableEthRaw(rawStore, tt.block.ChainID), http.DefaultTransport), + })) + if err != nil { + t.Fatal(err) + } + ethClient := ethclient.NewClient(rpcClient) + + balance, err := ethClient.BalanceAt(context.Background(), common.Address{}, big.NewInt(tt.block.BlockNumber)) + if err != nil { + t.Fatal(err) + } + if balance == nil { + t.Errorf("empty balance") + } + + block, err := ethClient.BlockByNumber(context.Background(), big.NewInt(tt.block.BlockNumber)) + if err != nil { + t.Fatalf("BlockByNumber() error = %v", err) + } + if got, want := block.Number().Int64(), tt.block.BlockNumber; got != want { + t.Errorf("got %v, want %v", got, want) + } + + receipts, err := ethClient.BlockReceipts(context.Background(), rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(tt.block.BlockNumber))) + if err != nil { + t.Fatalf("BlockReceipts() error = %v", err) + } + if len(block.Transactions()) != 0 && len(receipts) == 0 { + t.Errorf("receipts should not be empty") + } + + var traces []GethTraceCallResultWrapper + if err := rpcClient.Call(&traces, "debug_traceBlockByNumber", hexutil.EncodeBig(block.Number()), gethTracerArg); err != nil { + t.Fatalf("debug_traceBlockByNumber() error = %v", err) + } + if len(block.Transactions()) != 0 && len(traces) == 0 { + t.Errorf("traces should not be empty") + } + }) + } +} + +// TODO import those 3 from somewhere +var gethTracerArg = map[string]string{ + "tracer": "callTracer", +} + +type GethTraceCallResultWrapper struct { + Result *GethTraceCallResult `json:"result,omitempty"` +} + +type GethTraceCallResult struct { + TransactionPosition int `json:"transaction_position,omitempty"` + Time string `json:"time,omitempty"` + GasUsed string `json:"gas_used,omitempty"` + From common.Address `json:"from,omitempty"` + To common.Address `json:"to,omitempty"` + Value string `json:"value,omitempty"` + Gas string `json:"gas,omitempty"` + Input string `json:"input,omitempty"` + Output string `json:"output,omitempty"` + Error string `json:"error,omitempty"` + Type string `json:"type,omitempty"` + Calls []*GethTraceCallResult `json:"calls,omitempty"` +} diff --git a/db2/compress.go b/db2/compress.go new file mode 100644 index 0000000000..1c92672f22 --- /dev/null +++ b/db2/compress.go @@ -0,0 +1,48 @@ +package db2 + +import ( + "bytes" + "compress/gzip" + "fmt" + "io" +) + +type gzipCompressor struct { +} + +func (gzipCompressor) compress(src []byte) ([]byte, error) { + var buf bytes.Buffer + zw := gzip.NewWriter(&buf) + if _, err := zw.Write(src); err != nil { + return nil, fmt.Errorf("gzip cannot compress data: %w", err) + } + if err := zw.Close(); err != nil { + return nil, fmt.Errorf("gzip cannot close writer: %w", err) + } + return buf.Bytes(), nil +} + +func (gzipCompressor) decompress(src []byte) ([]byte, error) { + if len(src) == 0 { + return nil, nil + } + zr, err := gzip.NewReader(bytes.NewReader(src)) + if err != nil { + return nil, fmt.Errorf("gzip cannot create reader: %w", err) + } + data, err := io.ReadAll(zr) + if err != nil { + return nil, fmt.Errorf("gzip cannot read: %w", err) + } + return data, nil +} + +type noOpCompressor struct{} + +func (n noOpCompressor) compress(src []byte) ([]byte, error) { + return src, nil +} + +func (n noOpCompressor) decompress(src []byte) ([]byte, error) { + return src, nil +} diff --git a/db2/hexutil.go b/db2/hexutil.go new file mode 100644 index 0000000000..bfaa2d51e5 --- /dev/null +++ b/db2/hexutil.go @@ -0,0 +1,56 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package db2 + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "strings" +) + +// Bytes marshals/unmarshals as a JSON string with 0x prefix. +// The empty slice marshals as "0x". +type Bytes []byte + +// UnmarshalJSON implements json.Unmarshaler. +func (b *Bytes) UnmarshalJSON(input []byte) error { + var v string + if err := json.Unmarshal(input, &v); err != nil { + return err + } + + v = strings.Replace(v, "0x", "", 1) + + // make sure to have an even length hex string by prefixing odd strings with a single 0, 0x0 will become 0x00 for example + // while hashes and addresses have always an even length, numbers usually don't + if len(v)%2 != 0 { + v = "0" + v + } + + var err error + *b, err = hex.DecodeString(v) + + if err != nil { + return fmt.Errorf("error decoding %s: %v", string(input), err) + } + return err +} + +func (b *Bytes) String() string { + return fmt.Sprintf("0x%x", *b) +} diff --git a/db2/raw.go b/db2/raw.go new file mode 100644 index 0000000000..1c6c742a95 --- /dev/null +++ b/db2/raw.go @@ -0,0 +1,173 @@ +package db2 + +import ( + "fmt" + "log/slog" + "math/big" + "strings" + + "github.com/gobitfly/eth2-beaconchain-explorer/db2/store" +) + +type compressor interface { + compress(src []byte) ([]byte, error) + decompress(src []byte) ([]byte, error) +} + +type RawStore struct { + store store.Store + compressor compressor +} + +func NewRawStore(store store.Store) RawStore { + return RawStore{ + store: store, + compressor: gzipCompressor{}, + } +} + +func (db RawStore) AddBlocks(blocks []FullBlockRawData) error { + itemsByKey := make(map[string][]store.Item) + for _, fullBlock := range blocks { + if len(fullBlock.Block) == 0 || len(fullBlock.BlockTxs) != 0 && len(fullBlock.Traces) == 0 { + return fmt.Errorf("block %d: empty data", fullBlock.BlockNumber) + } + key := blockKey(fullBlock.ChainID, fullBlock.BlockNumber) + + block, err := db.compressor.compress(fullBlock.Block) + if err != nil { + return fmt.Errorf("cannot compress block %d: %w", fullBlock.BlockNumber, err) + } + receipts, err := db.compressor.compress(fullBlock.Receipts) + if err != nil { + return fmt.Errorf("cannot compress receipts %d: %w", fullBlock.BlockNumber, err) + } + traces, err := db.compressor.compress(fullBlock.Traces) + if err != nil { + return fmt.Errorf("cannot compress traces %d: %w", fullBlock.BlockNumber, err) + } + itemsByKey[key] = []store.Item{ + { + Family: BT_COLUMNFAMILY_BLOCK, + Column: BT_COLUMN_BLOCK, + Data: block, + }, + { + Family: BT_COLUMNFAMILY_RECEIPTS, + Column: BT_COLUMN_RECEIPTS, + Data: receipts, + }, + { + Family: BT_COLUMNFAMILY_TRACES, + Column: BT_COLUMN_TRACES, + Data: traces, + }, + } + if len(fullBlock.Receipts) < 1 { + // todo move that log higher up + slog.Warn(fmt.Sprintf("empty receipts at block %d lRec %d lTxs %d", fullBlock.BlockNumber, len(fullBlock.Receipts), len(fullBlock.BlockTxs))) + } + if fullBlock.BlockUnclesCount > 0 { + uncles, err := db.compressor.compress(fullBlock.Uncles) + if err != nil { + return fmt.Errorf("cannot compress block %d: %w", fullBlock.BlockNumber, err) + } + itemsByKey[key] = append(itemsByKey[key], store.Item{ + Family: BT_COLUMNFAMILY_UNCLES, + Column: BT_COLUMN_UNCLES, + Data: uncles, + }) + } + } + return db.store.BulkAdd(itemsByKey) +} + +func (db RawStore) ReadBlockByNumber(chainID uint64, number int64) (*FullBlockRawData, error) { + return db.readBlock(chainID, number) +} + +func (db RawStore) ReadBlockByHash(chainID uint64, hash string) (*FullBlockRawData, error) { + // todo use sql db to retrieve hash + return nil, fmt.Errorf("ReadBlockByHash not implemented") +} + +func (db RawStore) readBlock(chainID uint64, number int64) (*FullBlockRawData, error) { + key := blockKey(chainID, number) + data, err := db.store.GetRow(key) + if err != nil { + return nil, err + } + return db.parseRow(chainID, number, data) +} + +func (db RawStore) parseRow(chainID uint64, number int64, data map[string][]byte) (*FullBlockRawData, error) { + block, err := db.compressor.decompress(data[fmt.Sprintf("%s:%s", BT_COLUMNFAMILY_BLOCK, BT_COLUMN_BLOCK)]) + if err != nil { + return nil, fmt.Errorf("cannot decompress block %d: %w", number, err) + } + receipts, err := db.compressor.decompress(data[fmt.Sprintf("%s:%s", BT_COLUMNFAMILY_RECEIPTS, BT_COLUMN_RECEIPTS)]) + if err != nil { + return nil, fmt.Errorf("cannot decompress receipts %d: %w", number, err) + } + traces, err := db.compressor.decompress(data[fmt.Sprintf("%s:%s", BT_COLUMNFAMILY_TRACES, BT_COLUMN_TRACES)]) + if err != nil { + return nil, fmt.Errorf("cannot decompress traces %d: %w", number, err) + } + uncles, err := db.compressor.decompress(data[fmt.Sprintf("%s:%s", BT_COLUMNFAMILY_UNCLES, BT_COLUMN_UNCLES)]) + if err != nil { + return nil, fmt.Errorf("cannot decompress uncles %d: %w", number, err) + } + return &FullBlockRawData{ + ChainID: chainID, + BlockNumber: number, + BlockHash: nil, + BlockUnclesCount: 0, + BlockTxs: nil, + Block: block, + Receipts: receipts, + Traces: traces, + Uncles: uncles, + }, nil +} + +func (db RawStore) ReadBlocksByNumber(chainID uint64, start, end int64) ([]*FullBlockRawData, error) { + rows, err := db.store.GetRowsRange(blockKey(chainID, start-1), blockKey(chainID, end)) + if err != nil { + return nil, err + } + blocks := make([]*FullBlockRawData, 0, end-start+1) + for key, data := range rows { + number := blockKeyToNumber(chainID, key) + block, err := db.parseRow(chainID, number, data) + if err != nil { + return nil, err + } + blocks = append(blocks, block) + } + return blocks, nil +} + +func blockKey(chainID uint64, number int64) string { + return fmt.Sprintf("%d:%12d", chainID, MAX_EL_BLOCK_NUMBER-number) +} + +func blockKeyToNumber(chainID uint64, key string) int64 { + key = strings.TrimPrefix(key, fmt.Sprintf("%d:", chainID)) + reversed, _ := new(big.Int).SetString(key, 10) + + return MAX_EL_BLOCK_NUMBER - reversed.Int64() +} + +type FullBlockRawData struct { + ChainID uint64 + + BlockNumber int64 + BlockHash Bytes + BlockUnclesCount int + BlockTxs []string + + Block Bytes + Receipts Bytes + Traces Bytes + Uncles Bytes +} diff --git a/db2/raw_test.go b/db2/raw_test.go new file mode 100644 index 0000000000..011ef64dfb --- /dev/null +++ b/db2/raw_test.go @@ -0,0 +1,438 @@ +package db2 + +import ( + "context" + "testing" + + "github.com/gobitfly/eth2-beaconchain-explorer/db2/store" + "github.com/gobitfly/eth2-beaconchain-explorer/db2/storetest" +) + +func TestRaw(t *testing.T) { + client, admin := storetest.NewBigTable(t) + + s, err := store.NewBigTableWithClient(context.Background(), client, admin, raw) + if err != nil { + t.Fatal(err) + } + + db := RawStore{ + store: store.Wrap(s, BlocksRawTable, ""), + compressor: noOpCompressor{}, + } + + block := FullBlockRawData{ + ChainID: 1, + BlockNumber: testBlockNumber, + BlockHash: nil, + BlockUnclesCount: 1, + BlockTxs: nil, + Block: []byte(testBlock), + Receipts: []byte(testReceipts), + Traces: []byte(testTraces), + Uncles: []byte(testUncles), + } + + if err := db.AddBlocks([]FullBlockRawData{block}); err != nil { + t.Fatal(err) + } + + res, err := db.ReadBlockByNumber(block.ChainID, block.BlockNumber) + if err != nil { + t.Fatal(err) + } + + if got, want := string(res.Block), testBlock; got != want { + t.Errorf("got %v, want %v", got, want) + } + if got, want := string(res.Receipts), testReceipts; got != want { + t.Errorf("got %v, want %v", got, want) + } + if got, want := string(res.Traces), testTraces; got != want { + t.Errorf("got %v, want %v", got, want) + } + if got, want := string(res.Uncles), testUncles; got != want { + t.Errorf("got %v, want %v", got, want) + } +} + +var testFullBlock = FullBlockRawData{ + ChainID: 1, + BlockNumber: testBlockNumber, + BlockUnclesCount: 1, + Block: []byte(testBlock), + Receipts: []byte(testReceipts), + Traces: []byte(testTraces), + Uncles: []byte(testUncles), +} + +var testTwoUnclesFullBlock = FullBlockRawData{ + ChainID: 1, + BlockNumber: testTwoUnclesBlockNumber, + BlockUnclesCount: 2, + Block: []byte(testTwoUnclesBlock), + Receipts: nil, + Traces: nil, + Uncles: []byte(testTwoUnclesBlockUncles), +} + +const ( + testBlockNumber = 6008149 + testBlock = `{ + "id":1, + "jsonrpc":"2.0", + "result":{ + "difficulty":"0xbfabcdbd93dda", + "extraData":"0x737061726b706f6f6c2d636e2d6e6f64652d3132", + "gasLimit":"0x79f39e", + "gasUsed":"0x79ccd3", + "hash":"0xb3b20624f8f0f86eb50dd04688409e5cea4bd02d700bf6e79e9384d47d6a5a35", + "logsBloom":"0x4848112002a2020aaa0812180045840210020005281600c80104264300080008000491220144461026015300100000128005018401002090a824a4150015410020140400d808440106689b29d0280b1005200007480ca950b15b010908814e01911000054202a020b05880b914642a0000300003010044044082075290283516be82504082003008c4d8d14462a8800c2990c88002a030140180036c220205201860402001014040180002006860810ec0a1100a14144148408118608200060461821802c081000042d0810104a8004510020211c088200420822a082040e10104c00d010064004c122692020c408a1aa2348020445403814002c800888208b1", + "miner":"0x5a0b54d5dc17e0aadc383d2db43b0a0d3e029c4c", + "mixHash":"0x3d1fdd16f15aeab72e7db1013b9f034ee33641d92f71c0736beab4e67d34c7a7", + "nonce":"0x4db7a1c01d8a8072", + "number":"0x5bad55", + "parentHash":"0x61a8ad530a8a43e3583f8ec163f773ad370329b2375d66433eb82f005e1d6202", + "receiptsRoot":"0x5eced534b3d84d3d732ddbc714f5fd51d98a941b28182b6efe6df3a0fe90004b", + "sha3Uncles":"0x8a562e7634774d3e3a36698ac4915e37fc84a2cd0044cb84fa5d80263d2af4f6", + "size":"0x41c7", + "stateRoot":"0xf5208fffa2ba5a3f3a2f64ebd5ca3d098978bedd75f335f56b705d8715ee2305", + "timestamp":"0x5b541449", + "totalDifficulty":"0x12ac11391a2f3872fcd", + "transactions":[ + { + "blockHash":"0xb3b20624f8f0f86eb50dd04688409e5cea4bd02d700bf6e79e9384d47d6a5a35", + "blockNumber":"0x5bad55", + "chainId":"0x1", + "from":"0xfbb1b73c4f0bda4f67dca266ce6ef42f520fbb98", + "gas":"0x249f0", + "gasPrice":"0x174876e800", + "hash":"0x8784d99762bccd03b2086eabccee0d77f14d05463281e121a62abfebcf0d2d5f", + "input":"0x6ea056a9000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000bd8d7fa6f8cc00", + "nonce":"0x5e4724", + "r":"0xd1556332df97e3bd911068651cfad6f975a30381f4ff3a55df7ab3512c78b9ec", + "s":"0x66b51cbb10cd1b2a09aaff137d9f6d4255bf73cb7702b666ebd5af502ffa4410", + "to":"0x4b9c25ca0224aef6a7522cabdbc3b2e125b7ca50", + "transactionIndex":"0x0", + "type":"0x0", + "v":"0x25", + "value":"0x0" + }, + { + "blockHash":"0xb3b20624f8f0f86eb50dd04688409e5cea4bd02d700bf6e79e9384d47d6a5a35", + "blockNumber":"0x5bad55", + "chainId":"0x1", + "from":"0xc837f51a0efa33f8eca03570e3d01a4b2cf97ffd", + "gas":"0x15f90", + "gasPrice":"0x14b8d03a00", + "hash":"0x311be6a9b58748717ac0f70eb801d29973661aaf1365960d159e4ec4f4aa2d7f", + "input":"0x", + "nonce":"0x4241", + "r":"0xe9ef2f6fcff76e45fac6c2e8080094370082cfb47e8fde0709312f9aa3ec06ad", + "s":"0x421ebc4ebe187c173f13b1479986dcbff5c4997c0dfeb1fd149a982ad4bcdfe7", + "to":"0xf49bd0367d830850456d2259da366a054038dc46", + "transactionIndex":"0x1", + "type":"0x0", + "v":"0x25", + "value":"0x1bafa9ee16e78000" + }, + { + "blockHash":"0xb3b20624f8f0f86eb50dd04688409e5cea4bd02d700bf6e79e9384d47d6a5a35", + "blockNumber":"0x5bad55", + "chainId":"0x1", + "from":"0x532a2bae845abe7e5115808b832d34f9c3d41eed", + "gas":"0x910c", + "gasPrice":"0xe6f7cec00", + "hash":"0xe42b0256058b7cad8a14b136a0364acda0b4c36f5b02dea7e69bfd82cef252a2", + "input":"0xa9059cbb000000000000000000000000398a58b2e3790431fdac1ea56017e65401fa998800000000000000000000000000000000000000000007bcadb57b861109080000", + "nonce":"0x0", + "r":"0x4e3fdc1ad7ac52439791a8a48bc8ed70040170fa9c4b6cef6317f63d45e9a142", + "s":"0x6e5feaefdbc8f99c5d036b31d6386fb49c1a97812f13d48742a1b77b7e690858", + "to":"0x818fc6c2ec5986bc6e2cbf00939d90556ab12ce5", + "transactionIndex":"0x2", + "type":"0x0", + "v":"0x26", + "value":"0x0" + }, + { + "blockHash":"0xb3b20624f8f0f86eb50dd04688409e5cea4bd02d700bf6e79e9384d47d6a5a35", + "blockNumber":"0x5bad55", + "from":"0x2a9847093ad514639e8cdec960b5e51686960291", + "gas":"0x4f588", + "gasPrice":"0xc22a75840", + "hash":"0x4eb05376055c6456ed883fc843bc43df1dcf739c321ba431d518aecd7f98ca11", + "input":"0x000101fa27134d5320", + "nonce":"0xd50", + "r":"0x980e463d70e67c49477883a55cdb42829c9e5746e95d63b738d7390c7d685551", + "s":"0x647babbe3a96df447da960812c88833c6b5aa009f1c361c6adae818100d15007", + "to":"0xc7ed8919c70dd8ccf1a57c0ed75b25ceb2dd22d1", + "transactionIndex":"0x3", + "type":"0x0", + "v":"0x1b", + "value":"0x0" + }, + { + "blockHash":"0xb3b20624f8f0f86eb50dd04688409e5cea4bd02d700bf6e79e9384d47d6a5a35", + "blockNumber":"0x5bad55", + "chainId":"0x1", + "from":"0xe12c32af0ca83fe12c58b1daef82ebe6333f7b10", + "gas":"0x5208", + "gasPrice":"0xba43b7400", + "hash":"0x994dd9e72b212b7dc5fd0466ab75adf7d391cf4f206a65b7ad2a1fd032bb06d7", + "input":"0x", + "nonce":"0x1d", + "r":"0xedf9e958bbd3f7d2fd9831678a3166cf0373a4436a63c152c4aa84f864bb7e6e", + "s":"0xacbc0cfcc7d3264de55c0af45b6c280ea237e28d273f7ddba0eea05204c2101", + "to":"0x5343222c6f7e2af4d9d3e844fb8f3f18f7be0e55", + "transactionIndex":"0x4", + "type":"0x0", + "v":"0x26", + "value":"0x31f64d59c01f6000" + }, + { + "blockHash":"0xb3b20624f8f0f86eb50dd04688409e5cea4bd02d700bf6e79e9384d47d6a5a35", + "blockNumber":"0x5bad55", + "chainId":"0x1", + "from":"0x80c779504c3a3a39dbd0356f5d8e851cb6dbba0a", + "gas":"0x57e40", + "gasPrice":"0x9c35a3cc8", + "hash":"0xf6feecbb9ab0ac58591a4bc287059b1133089c499517e91a274e6a1f5e7dce53", + "input":"0x010b01000d0670", + "nonce":"0xb9bf", + "r":"0x349d0601e24f0128ecfce3665edd2a0727a043fa62ccf587fded784aed46c3f6", + "s":"0x77127ddf76cb2b9e12074006a504fbd6893d5bd29a18a8efb193907f4565404", + "to":"0x3714e5671be406fc1920351984f4429237831477", + "transactionIndex":"0x5", + "type":"0x0", + "v":"0x26", + "value":"0x0" + }, + { + "blockHash":"0xb3b20624f8f0f86eb50dd04688409e5cea4bd02d700bf6e79e9384d47d6a5a35", + "blockNumber":"0x5bad55", + "chainId":"0x1", + "from":"0xc5b373618d4d01a38f822f56ca6d2ff5080cc4f2", + "gas":"0x4f588", + "gasPrice":"0x9c355a8e8", + "hash":"0x7e537d687a5525259480440c6ea2e1a8469cd98906eaff8597f3d2a44422ff97", + "input":"0x0108e9000d0670136b", + "nonce":"0x109f3", + "r":"0x7736e11b03c6702eb6aaea8c45ed6b8a510878bb7741028d82938b9207448e9b", + "s":"0x70bcd4c0ec2b0c67eb9eefb53a6ff7e114b45888589a5aaf4c1a1f00fa704775", + "to":"0xc5f60fa4613493931b605b6da1e9febbdeb61e16", + "transactionIndex":"0x6", + "type":"0x0", + "v":"0x25", + "value":"0x0" + } + ], + "transactionsRoot":"0xf98631e290e88f58a46b7032f025969039aa9b5696498efc76baf436fa69b262", + "uncles":[ + "0x824cce7c7c2ec6874b9fa9a9a898eb5f27cbaf3991dfa81084c3af60d1db618c" + ] + } +}` + testTraces = `{ + "jsonrpc": "2.0", + "id": 1, + "result": [ + { + "result": { + "from": "0xa5ba45f484bc67fe293cf01f7d92d5ba3514dd42", + "gas": "0x5208", + "gasUsed": "0x5208", + "input": "0x", + "to": "0x45a318273749d6eb00f5f6ca3bc7cd3de26d642a", + "type": "CALL", + "value": "0x2ca186f5fda8004" + } + }, + { + "result": { + "from": "0x25f2650cc9e8ad863bf5da6a7598e24271574e29", + "gas": "0xfe0e", + "gasUsed": "0xafee", + "input": "0xd0e30db0", + "to": "0xe5d7c2a44ffddf6b295a15c148167daaaf5cf34f", + "type": "CALL", + "value": "0x2386f26fc10000" + } + } + ] +}` + testReceipts = `{ + "jsonrpc": "2.0", + "id": 1, + "result": [ + { + "blockHash": "0x19514ce955c65e4dd2cd41f435a75a46a08535b8fc16bc660f8092b32590b182", + "blockNumber": "0x6f55", + "contractAddress": null, + "cumulativeGasUsed": "0x18c36", + "from": "0x22896bfc68814bfd855b1a167255ee497006e730", + "gasUsed": "0x18c36", + "effectiveGasPrice": "0x9502f907", + "logs": [ + { + "address": "0xfd584430cafa2f451b4e2ebcf3986a21fff04350", + "topics": [ + "0x2f8788117e7eff1d82e926ec794901d17c78024a50270940304540a733656f0d", + "0x4be29e0e4eb91f98f709d98803cba271592782e293b84a625e025cbb40197ba8", + "0x000000000000000000000000835281a2563db4ebf1b626172e085dc406bfc7d2", + "0x00000000000000000000000022896bfc68814bfd855b1a167255ee497006e730" + ], + "data": "0x", + "blockNumber": "0x6f55", + "transactionHash": "0x4a481e4649da999d92db0585c36cba94c18a33747e95dc235330e6c737c6f975", + "transactionIndex": "0x0", + "blockHash": "0x19514ce955c65e4dd2cd41f435a75a46a08535b8fc16bc660f8092b32590b182", + "logIndex": "0x0", + "removed": false + } + ], + "logsBloom": "0x00000004000000000000000000000000000000000000000000000000000000000000800000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000200000000000000000000080020000000000000200010000000000000000000001000000800000000000000000000000000000000000000000000000000000100100000000000000000000008000000000000000000000000000000002000000000000000000000", + "status": "0x1", + "to": "0xfd584430cafa2f451b4e2ebcf3986a21fff04350", + "transactionHash": "0x4a481e4649da999d92db0585c36cba94c18a33747e95dc235330e6c737c6f975", + "transactionIndex": "0x0", + "type": "0x0" + }, + { + "blockHash": "0x19514ce955c65e4dd2cd41f435a75a46a08535b8fc16bc660f8092b32590b182", + "blockNumber": "0x6f55", + "contractAddress": null, + "cumulativeGasUsed": "0x1de3e", + "from": "0x712e3a792c974b3e3dbe41229ad4290791c75a82", + "gasUsed": "0x5208", + "effectiveGasPrice": "0x9502f907", + "logs": [], + "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "status": "0x1", + "to": "0xd42e2b1c14d02f1df5369a9827cb8e6f3f75f338", + "transactionHash": "0xefb83b4e3f1c317e8da0f8e2fbb2fe964f34ee184466032aeecac79f20eacaf6", + "transactionIndex": "0x1", + "type": "0x2" + } + ] +}` + testUncles = `[{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "difficulty": "0x57f117f5c", + "extraData": "0x476574682f76312e302e302f77696e646f77732f676f312e342e32", + "gasLimit": "0x1388", + "gasUsed": "0x0", + "hash": "0x932bdf904546a2287a2c9b2ede37925f698a7657484b172d4e5184f80bdd464d", + "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner": "0x5bf5e9cf9b456d6591073513de7fd69a9bef04bc", + "mixHash": "0x4500aa4ee2b3044a155252e35273770edeb2ab6f8cb19ca8e732771484462169", + "nonce": "0x24732773618192ac", + "number": "0x299", + "parentHash": "0xa779859b1ee558258b7008bbabff272280136c5dd3eb3ea3bfa8f6ae03bf91e5", + "receiptsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "size": "0x21d", + "stateRoot": "0x2604fbf5183f5360da249b51f1b9f1e0f315d2ff3ffa1a4143ff221ad9ca1fec", + "timestamp": "0x55ba4827", + "totalDifficulty": null, + "transactionsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles": [] + } +}]` + + testTwoUnclesBlockNumber = 141 + testTwoUnclesBlock = `{ + "jsonrpc":"2.0", + "id":0, + "result":{ + "difficulty":"0x4417decf7", + "extraData":"0x426974636f696e2069732054484520426c6f636b636861696e2e", + "gasLimit":"0x1388", + "gasUsed":"0x0", + "hash":"0xeafbe76fdcadc1b69ba248589eb2a674b60b00c84374c149c9deaf5596183932", + "logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner":"0x1b7047b4338acf65be94c1a3e8c5c9338ad7d67c", + "mixHash":"0x21eabda67c3151855389a5a968e50daa7b356b3046e2f119ef46c97d204a541e", + "nonce":"0x85378a3fc5e608e1", + "number":"0x8d", + "parentHash":"0xe2c1e8200ef2e9fba09979f0b504dc52c068719623c7064904c7bd3e9365acc1", + "receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sha3Uncles":"0x393f5f01182846b91386f8b00759fd54f83998a6a1064b8ac72fc8eca1bcf81b", + "size":"0x653", + "stateRoot":"0x3e1eea9a01178945535230b6f5839201f594d9be20618bb4edaa383f4f0c850f", + "timestamp":"0x55ba4444", + "totalDifficulty":"0x24826e73469", + "transactions":[ + + ], + "transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles":[ + "0x61beeeb3e11e89d19fed2e988c8017b55c3ddb8895f531072363ce2abaf56b95", + "0xf84d9d74415364c3a7569f315ff831b910968c7dd637fffaab51278c9e7f9306" + ] + } +}` + testTwoUnclesBlockUncles = `[ + { + "jsonrpc":"2.0", + "id":141, + "result":{ + "difficulty":"0x4406dc086", + "extraData":"0x476574682f4c5649562f76312e302e302f6c696e75782f676f312e342e32", + "gasLimit":"0x1388", + "gasUsed":"0x0", + "hash":"0x61beeeb3e11e89d19fed2e988c8017b55c3ddb8895f531072363ce2abaf56b95", + "logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner":"0xbb7b8287f3f0a933474a79eae42cbca977791171", + "mixHash":"0x87547a998fe63f18b36180ca918131b6b20fc5d67390e2ac2f66be3fee8fb7d2", + "nonce":"0x1dc5b79704350bee", + "number":"0x8b", + "parentHash":"0x2253b8f79c23b6ff67cb2ef6fabd9ec59e1edf2d07c16d98a19378041f96624d", + "receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "size":"0x21f", + "stateRoot":"0x940131b162b07452ea31b5335c4dedfdddc13338142f71f261d51dea664033b4", + "timestamp":"0x55ba4441", + "totalDifficulty":"0x24826e73469", + "transactions":[ + + ], + "transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles":[ + + ] + } + }, + { + "jsonrpc":"2.0", + "id":141, + "result":{ + "difficulty":"0x4406dc086", + "extraData":"0x476574682f6b6c6f737572652f76312e302e302d66633739643332642f6c696e", + "gasLimit":"0x1388", + "gasUsed":"0x0", + "hash":"0xf84d9d74415364c3a7569f315ff831b910968c7dd637fffaab51278c9e7f9306", + "logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner":"0xd7e30ae310c1d1800f5b641baa7af95b2e1fd98c", + "mixHash":"0x6039f236ebb70ec71091df5770aef0f0faa13ef334c4c68daaffbfdf7961a3d3", + "nonce":"0x7d8ec05d330e6e99", + "number":"0x8b", + "parentHash":"0x2253b8f79c23b6ff67cb2ef6fabd9ec59e1edf2d07c16d98a19378041f96624d", + "receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "size":"0x221", + "stateRoot":"0x302bb7708752013f46f009dec61cad586c35dc185d20cdde0071b7487f7c2008", + "timestamp":"0x55ba4440", + "totalDifficulty":"0x24826e73469", + "transactions":[ + + ], + "transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles":[ + + ] + } + } +]` +) diff --git a/db2/store/bigtable.go b/db2/store/bigtable.go new file mode 100644 index 0000000000..db1fc04eee --- /dev/null +++ b/db2/store/bigtable.go @@ -0,0 +1,337 @@ +package store + +import ( + "context" + "fmt" + "slices" + "strings" + "time" + + "cloud.google.com/go/bigtable" + "golang.org/x/exp/maps" +) + +var ErrNotFound = fmt.Errorf("not found") + +const ( + timeout = time.Minute // Timeout duration for Bigtable operations +) + +type TableWrapper struct { + *BigTableStore + table string + family string +} + +func Wrap(db *BigTableStore, table string, family string) TableWrapper { + return TableWrapper{ + BigTableStore: db, + table: table, + family: family, + } +} + +func (w TableWrapper) Add(key, column string, data []byte, allowDuplicate bool) error { + return w.BigTableStore.Add(w.table, w.family, key, column, data, allowDuplicate) +} + +func (w TableWrapper) Read(prefix string) ([][]byte, error) { + return w.BigTableStore.Read(w.table, w.family, prefix) +} + +func (w TableWrapper) GetLatestValue(key string) ([]byte, error) { + return w.BigTableStore.GetLatestValue(w.table, w.family, key) +} + +func (w TableWrapper) GetRow(key string) (map[string][]byte, error) { + return w.BigTableStore.GetRow(w.table, key) +} + +func (w TableWrapper) GetRowKeys(prefix string) ([]string, error) { + return w.BigTableStore.GetRowKeys(w.table, prefix) +} + +func (w TableWrapper) BulkAdd(itemsByKey map[string][]Item) error { + return w.BigTableStore.BulkAdd(w.table, itemsByKey) +} + +func (w TableWrapper) GetRowsRange(high, low string) (map[string]map[string][]byte, error) { + return w.BigTableStore.GetRowsRange(w.table, high, low) +} + +// BigTableStore is a wrapper around Google Cloud Bigtable for storing and retrieving data +type BigTableStore struct { + client *bigtable.Client + admin *bigtable.AdminClient +} + +func NewBigTableWithClient(ctx context.Context, client *bigtable.Client, adminClient *bigtable.AdminClient, tablesAndFamilies map[string][]string) (*BigTableStore, error) { + // Initialize the Bigtable table and column family + if err := initTable(ctx, adminClient, tablesAndFamilies); err != nil { + return nil, err + } + + return &BigTableStore{client: client, admin: adminClient}, nil +} + +// NewBigTable initializes a new BigTableStore +// It returns a BigTableStore and an error if any part of the setup fails +func NewBigTable(project, instance string, tablesAndFamilies map[string][]string) (*BigTableStore, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // Create an admin client to manage Bigtable tables + adminClient, err := bigtable.NewAdminClient(ctx, project, instance) + if err != nil { + return nil, fmt.Errorf("could not create admin client: %v", err) + } + + // Create a Bigtable client for performing data operations + client, err := bigtable.NewClient(ctx, project, instance) + if err != nil { + return nil, fmt.Errorf("could not create data operations client: %v", err) + } + + return NewBigTableWithClient(ctx, client, adminClient, tablesAndFamilies) +} + +// initTable creates the tables and column family in the Bigtable +func initTable(ctx context.Context, adminClient *bigtable.AdminClient, tablesAndFamilies map[string][]string) error { + for table, families := range tablesAndFamilies { + if err := createTableAndFamilies(ctx, adminClient, table, families...); err != nil { + return err + } + } + return nil +} + +func createTableAndFamilies(ctx context.Context, admin *bigtable.AdminClient, tableName string, familyNames ...string) error { + // Get the list of existing tables + tables, err := admin.Tables(ctx) + if err != nil { + return fmt.Errorf("could not fetch table list: %v", err) + } + + // Create the table if it doesn't exist + if !slices.Contains(tables, tableName) { + if err := admin.CreateTable(ctx, tableName); err != nil { + return fmt.Errorf("could not create table %s: %v", tableName, err) + } + } + + // Retrieve information about the table + tblInfo, err := admin.TableInfo(ctx, tableName) + if err != nil { + return fmt.Errorf("could not read info for table %s: %v", tableName, err) + } + + for _, familyName := range familyNames { + // Create the column family if it doesn't exist + if !slices.Contains(tblInfo.Families, familyName) { + if err := admin.CreateColumnFamily(ctx, tableName, familyName); err != nil { + return fmt.Errorf("could not create column family %s: %v", familyName, err) + } + } + } + return nil +} + +type Item struct { + Family string + Column string + Data []byte +} + +func (b BigTableStore) BulkAdd(table string, itemsByKey map[string][]Item) error { + tbl := b.client.Open(table) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + var muts []*bigtable.Mutation + for _, items := range itemsByKey { + mut := bigtable.NewMutation() + for _, item := range items { + mut.Set(item.Family, item.Column, bigtable.Timestamp(0), item.Data) + } + muts = append(muts, mut) + } + errs, err := tbl.ApplyBulk(ctx, maps.Keys(itemsByKey), muts) + if err != nil { + return fmt.Errorf("cannot ApplyBulk err: %w", err) + } + // TODO aggregate errs + for _, e := range errs { + return fmt.Errorf("cannot ApplyBulk elem err: %w", e) + } + return nil +} + +// Add inserts a new row with the given key, column, and data into the Bigtable +// It applies a mutation that stores data in the receiver column family +// It returns error if the operation fails +func (b BigTableStore) Add(table, family string, key string, column string, data []byte, allowDuplicate bool) error { + // Open the transfer table for data operations + tbl := b.client.Open(table) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // Create a new mutation to store data in the given column + mut := bigtable.NewMutation() + mut.Set(family, column, bigtable.Now(), data) + + if !allowDuplicate { + mut = bigtable.NewCondMutation(bigtable.RowKeyFilter(key), nil, mut) + } + // Apply the mutation to the table using the given key + if err := tbl.Apply(ctx, key, mut); err != nil { + return fmt.Errorf("could not apply row mutation: %v", err) + } + return nil +} + +// Read retrieves all rows from the Bigtable's receiver column family +// It returns the data in the form of a 2D byte slice and an error if the operation fails +func (b BigTableStore) Read(table, family, prefix string) ([][]byte, error) { + // Open the transfer table for reading + tbl := b.client.Open(table) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + var data [][]byte + // Read all rows from the table and collect values from the receiver column family + err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool { + for _, item := range row[family] { + // Append each value from the receiver family to the data slice + data = append(data, item.Value) + } + return true + }) + if err != nil { + return nil, fmt.Errorf("could not read rows: %v", err) + } + + return data, nil +} + +func (b BigTableStore) GetLatestValue(table, family, key string) ([]byte, error) { + // Open the transfer table for reading + tbl := b.client.Open(table) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + var data []byte + err := tbl.ReadRows(ctx, bigtable.PrefixRange(key), func(row bigtable.Row) bool { + data = row[family][0].Value + return true + }) + + if err != nil { + return nil, fmt.Errorf("could not read rows: %v", err) + } + + return data, nil +} + +func (b BigTableStore) GetRow(table, key string) (map[string][]byte, error) { + // Open the transfer table for reading + tbl := b.client.Open(table) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + data := make(map[string][]byte) + err := tbl.ReadRows(ctx, bigtable.PrefixRange(key), func(row bigtable.Row) bool { + for _, family := range row { + for _, item := range family { + data[item.Column] = item.Value + } + } + return true + }) + + if err != nil { + return nil, fmt.Errorf("could not read rows: %v", err) + } + if len(data) == 0 { + return nil, ErrNotFound + } + + return data, nil +} + +func (b BigTableStore) GetRowsRange(table, high, low string) (map[string]map[string][]byte, error) { + tbl := b.client.Open(table) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + rowRange := bigtable.NewRange(low, high) + data := make(map[string]map[string][]byte) + err := tbl.ReadRows(ctx, rowRange, func(row bigtable.Row) bool { + data[row.Key()] = make(map[string][]byte) + for _, family := range row { + for _, item := range family { + data[row.Key()][item.Column] = item.Value + } + } + return true + }) + + if err != nil { + return nil, fmt.Errorf("could not read rows: %v", err) + } + if len(data) == 0 { + return nil, ErrNotFound + } + + return data, nil +} + +func (b BigTableStore) GetRowKeys(table, prefix string) ([]string, error) { + // Open the transfer table for reading + tbl := b.client.Open(table) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + var data []string + // Read all rows from the table and collect all the row keys + err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool { + data = append(data, row.Key()) + return true + }) + + if err != nil { + return nil, fmt.Errorf("could not read rows: %v", err) + } + + return data, nil +} + +func (b BigTableStore) Clear() error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + tables, err := b.admin.Tables(ctx) + if err != nil { + return err + } + for _, table := range tables { + if err := b.admin.DropAllRows(ctx, table); err != nil { + return fmt.Errorf("could not drop all rows: %v", err) + } + } + return nil +} + +// Close shuts down the BigTableStore by closing the Bigtable client connection +// It returns an error if the operation fails +func (b BigTableStore) Close() error { + if err := b.client.Close(); err != nil { + return fmt.Errorf("could not close client: %v", err) + } + if err := b.admin.Close(); err != nil { + if !strings.Contains(err.Error(), "the client connection is closing") { + return fmt.Errorf("could not close admin client: %v", err) + } + } + + return nil +} diff --git a/db2/store/bigtable_test.go b/db2/store/bigtable_test.go new file mode 100644 index 0000000000..da2f84027c --- /dev/null +++ b/db2/store/bigtable_test.go @@ -0,0 +1,171 @@ +package store + +import ( + "context" + "slices" + "strings" + "testing" + + "github.com/gobitfly/eth2-beaconchain-explorer/db2/storetest" +) + +func TestBigTableStore(t *testing.T) { + type item struct { + key string + column string + data string + } + tests := []struct { + name string + bulk bool + items []item + expected []string + }{ + { + name: "simple add", + items: []item{{ + key: "foo", + column: "bar", + data: "foobar", + }}, + expected: []string{"foobar"}, + }, + { + name: "bulk add", + bulk: true, + items: []item{{ + key: "key1", + column: "col1", + data: "foobar", + }, { + key: "key2", + column: "col2", + data: "foobar", + }, { + key: "key3", + column: "col3", + data: "foobar", + }}, + expected: []string{"foobar", "foobar", "foobar"}, + }, + { + name: "dont duplicate", + items: []item{{ + key: "foo", + column: "bar", + data: "foobar", + }, { + key: "foo", + column: "bar", + data: "foobar", + }}, + expected: []string{"foobar"}, + }, + { + name: "with a prefix", + items: []item{{ + key: "foo", + }, { + key: "foofoo", + }, { + key: "foofoofoo", + }, { + key: "bar", + }}, + expected: []string{"", "", "", ""}, + }, + } + tables := map[string][]string{"testTable": {"testFamily"}} + client, admin := storetest.NewBigTable(t) + store, err := NewBigTableWithClient(context.Background(), client, admin, tables) + if err != nil { + t.Fatal(err) + } + db := Wrap(store, "testTable", "testFamily") + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer func() { + _ = db.Clear() + }() + + if tt.bulk { + itemsByKey := make(map[string][]Item) + for _, item := range tt.items { + itemsByKey[item.key] = append(itemsByKey[item.key], Item{ + Family: "testFamily", + Column: item.column, + Data: []byte(item.data), + }) + } + if err := db.BulkAdd(itemsByKey); err != nil { + t.Error(err) + } + } else { + for _, it := range tt.items { + if err := db.Add(it.key, it.column, []byte(it.data), false); err != nil { + t.Error(err) + } + } + } + + t.Run("Read", func(t *testing.T) { + res, err := db.Read("") + if err != nil { + t.Error(err) + } + if got, want := len(res), len(tt.expected); got != want { + t.Errorf("got %v want %v", got, want) + } + for _, data := range res { + if !slices.Contains(tt.expected, string(data)) { + t.Errorf("wrong data %s", data) + } + } + }) + + t.Run("GetLatestValue", func(t *testing.T) { + for _, it := range tt.items { + v, err := db.GetLatestValue(it.key) + if err != nil { + t.Error(err) + } + if got, want := string(v), it.data; got != want { + t.Errorf("got %v want %v", got, want) + } + } + }) + + t.Run("GetRowKeys", func(t *testing.T) { + for _, it := range tt.items { + keys, err := db.GetRowKeys(it.key) + if err != nil { + t.Error(err) + } + count, found := 0, false + for _, expected := range tt.items { + if !strings.HasPrefix(expected.key, it.key) { + continue + } + // don't count duplicate inputs since the add prevent duplicate keys + if expected.key == it.key && found { + continue + } + found = expected.key == it.key + count++ + if !slices.Contains(keys, expected.key) { + t.Errorf("missing %v in %v", expected.key, keys) + } + } + if got, want := len(keys), count; got != want { + t.Errorf("got %v want %v", got, want) + } + } + }) + }) + } + + if err := db.Close(); err != nil { + t.Errorf("cannot close db: %v", err) + } +} diff --git a/db2/store/remote.go b/db2/store/remote.go new file mode 100644 index 0000000000..444c55c7cf --- /dev/null +++ b/db2/store/remote.go @@ -0,0 +1,171 @@ +package store + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" +) + +const ( + routeGetRowsRange = "/rowRange" + routeGetRow = "/row" +) + +type RemoteServer struct { + store Store +} + +func NewRemoteStore(store Store) RemoteServer { + return RemoteServer{store: store} +} + +func (api RemoteServer) Routes() http.Handler { + mux := http.NewServeMux() + mux.HandleFunc(routeGetRowsRange, api.GetRowsRange) + mux.HandleFunc(routeGetRow, api.GetRow) + + return mux +} + +type ParamsGetRowsRange struct { + High string `json:"high"` + Low string `json:"low"` +} + +func (api RemoteServer) GetRowsRange(w http.ResponseWriter, r *http.Request) { + var args ParamsGetRowsRange + err := json.NewDecoder(r.Body).Decode(&args) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(err.Error())) + return + } + rows, err := api.store.GetRowsRange(args.High, args.Low) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) + return + } + data, _ := json.Marshal(rows) + _, _ = w.Write(data) +} + +type ParamsGetRow struct { + Key string `json:"key"` +} + +func (api RemoteServer) GetRow(w http.ResponseWriter, r *http.Request) { + var args ParamsGetRow + err := json.NewDecoder(r.Body).Decode(&args) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(err.Error())) + return + } + row, err := api.store.GetRow(args.Key) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) + return + } + data, _ := json.Marshal(row) + _, _ = w.Write(data) +} + +type RemoteClient struct { + url string +} + +func NewRemoteClient(url string) *RemoteClient { + return &RemoteClient{url: url} +} + +func (r RemoteClient) Add(key, column string, data []byte, allowDuplicate bool) error { + //TODO implement me + panic("implement me") +} + +func (r RemoteClient) BulkAdd(itemsByKey map[string][]Item) error { + //TODO implement me + panic("implement me") +} + +func (r RemoteClient) Read(prefix string) ([][]byte, error) { + //TODO implement me + panic("implement me") +} + +func (r RemoteClient) GetRow(key string) (map[string][]byte, error) { + b, err := json.Marshal(ParamsGetRow{Key: key}) + if err != nil { + return nil, err + } + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s%s", r.url, routeGetRow), bytes.NewReader(b)) + if err != nil { + return nil, err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, b) + } + var row map[string][]byte + if err := json.NewDecoder(resp.Body).Decode(&row); err != nil { + return nil, err + } + return row, nil +} + +func (r RemoteClient) GetRowKeys(prefix string) ([]string, error) { + //TODO implement me + panic("implement me") +} + +func (r RemoteClient) GetLatestValue(key string) ([]byte, error) { + //TODO implement me + panic("implement me") +} + +func (r RemoteClient) GetRowsRange(high, low string) (map[string]map[string][]byte, error) { + b, err := json.Marshal(ParamsGetRowsRange{ + High: high, + Low: low, + }) + if err != nil { + return nil, err + } + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s%s", r.url, routeGetRowsRange), bytes.NewReader(b)) + if err != nil { + return nil, err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, b) + } + var rows map[string]map[string][]byte + if err := json.NewDecoder(resp.Body).Decode(&rows); err != nil { + return nil, err + } + return rows, nil +} + +func (r RemoteClient) Close() error { + //TODO implement me + panic("implement me") +} + +func (r RemoteClient) Clear() error { + //TODO implement me + panic("implement me") +} diff --git a/db2/store/store.go b/db2/store/store.go new file mode 100644 index 0000000000..595ddb4f14 --- /dev/null +++ b/db2/store/store.go @@ -0,0 +1,18 @@ +package store + +type Store interface { + Add(key, column string, data []byte, allowDuplicate bool) error + BulkAdd(itemsByKey map[string][]Item) error + Read(prefix string) ([][]byte, error) + GetRow(key string) (map[string][]byte, error) + GetRowKeys(prefix string) ([]string, error) + GetLatestValue(key string) ([]byte, error) + GetRowsRange(high, low string) (map[string]map[string][]byte, error) + Close() error + Clear() error +} + +var ( + _ Store = (*TableWrapper)(nil) + _ Store = (*RemoteClient)(nil) +) diff --git a/db2/storetest/bigtable.go b/db2/storetest/bigtable.go new file mode 100644 index 0000000000..4d6415438e --- /dev/null +++ b/db2/storetest/bigtable.go @@ -0,0 +1,38 @@ +package storetest + +import ( + "context" + "testing" + + "cloud.google.com/go/bigtable" + "cloud.google.com/go/bigtable/bttest" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func NewBigTable(t testing.TB) (*bigtable.Client, *bigtable.AdminClient) { + srv, err := bttest.NewServer("localhost:0") + if err != nil { + t.Fatal(err) + } + ctx := context.Background() + + conn, err := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatal(err) + } + + project, instance := "proj", "instance" + adminClient, err := bigtable.NewAdminClient(ctx, project, instance, option.WithGRPCConn(conn)) + if err != nil { + t.Fatal(err) + } + + client, err := bigtable.NewClientWithConfig(ctx, project, instance, bigtable.ClientConfig{}, option.WithGRPCConn(conn)) + if err != nil { + t.Fatal(err) + } + + return client, adminClient +} diff --git a/db2/tables.go b/db2/tables.go new file mode 100644 index 0000000000..63e6084464 --- /dev/null +++ b/db2/tables.go @@ -0,0 +1,23 @@ +package db2 + +const BlocksRawTable = "blocks-raw" + +const BT_COLUMNFAMILY_BLOCK = "b" +const BT_COLUMN_BLOCK = "b" +const BT_COLUMNFAMILY_RECEIPTS = "r" +const BT_COLUMN_RECEIPTS = "r" +const BT_COLUMNFAMILY_TRACES = "t" +const BT_COLUMN_TRACES = "t" +const BT_COLUMNFAMILY_UNCLES = "u" +const BT_COLUMN_UNCLES = "u" + +const MAX_EL_BLOCK_NUMBER = int64(1_000_000_000_000 - 1) + +var raw = map[string][]string{ + BlocksRawTable: { + BT_COLUMNFAMILY_BLOCK, + BT_COLUMNFAMILY_RECEIPTS, + BT_COLUMNFAMILY_TRACES, + BT_COLUMNFAMILY_UNCLES, + }, +} diff --git a/go.mod b/go.mod index ffa56a4ebf..d92da7d6c1 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ toolchain go1.22.0 require ( cloud.google.com/go/bigtable v1.16.0 cloud.google.com/go/secretmanager v1.11.5 - firebase.google.com/go v3.13.0+incompatible firebase.google.com/go/v4 v4.14.1 github.com/Gurpartap/storekit-go v0.0.0-20201205024111-36b6cd5c6a21 github.com/alexedwards/scs/redisstore v0.0.0-20230217120314-6b1bedc0f08c @@ -35,7 +34,6 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e - github.com/jackc/pgx/v4 v4.18.1 github.com/jackc/pgx/v5 v5.4.3 github.com/jmoiron/sqlx v1.2.0 github.com/juliangruber/go-intersect v1.1.0 @@ -74,6 +72,7 @@ require ( golang.org/x/text v0.14.0 golang.org/x/time v0.5.0 google.golang.org/api v0.170.0 + google.golang.org/grpc v1.62.1 google.golang.org/protobuf v1.33.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -138,6 +137,7 @@ require ( github.com/goccy/go-json v0.10.2 // indirect github.com/gofrs/flock v0.8.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/btree v1.1.2 // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect @@ -158,9 +158,6 @@ require ( github.com/ipfs/go-metrics-interface v0.0.1 // indirect github.com/ipld/go-codec-dagpb v1.6.0 // indirect github.com/ipld/go-ipld-prime v0.20.0 // indirect - github.com/jackc/chunkreader/v2 v2.0.1 // indirect - github.com/jackc/pgconn v1.14.0 // indirect - github.com/jackc/pgproto3/v2 v2.3.2 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jbenet/goprocess v0.1.4 // indirect github.com/kr/pretty v0.3.1 // indirect @@ -205,8 +202,8 @@ require ( google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240314234333-6e1732d8331c // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240311132316-a219d84964c2 // indirect - google.golang.org/grpc v1.62.1 // indirect lukechampine.com/blake3 v1.2.1 // indirect + rsc.io/binaryregexp v0.2.0 // indirect rsc.io/tmplfunc v0.0.3 // indirect ) diff --git a/go.sum b/go.sum index c842fab76f..22b72aaa48 100644 --- a/go.sum +++ b/go.sum @@ -52,8 +52,6 @@ cloud.google.com/go/storage v1.40.0/go.mod h1:Rrj7/hKlG87BLqDJYtwR0fbPld8uJPbQ2u contrib.go.opencensus.io/exporter/jaeger v0.2.1 h1:yGBYzYMewVL0yO9qqJv3Z5+IRhPdU7e9o/2oKpX4YvI= contrib.go.opencensus.io/exporter/jaeger v0.2.1/go.mod h1:Y8IsLgdxqh1QxYxPC5IgXVmBaeLUeQFfBeBi9PbeZd0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4= -firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs= firebase.google.com/go/v4 v4.14.1 h1:4qiUETaFRWoFGE1XP5VbcEdtPX93Qs+8B/7KvP2825g= firebase.google.com/go/v4 v4.14.1/go.mod h1:fgk2XshgNDEKaioKco+AouiegSI9oTWVqRaBdTTGBoM= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= @@ -70,7 +68,6 @@ github.com/Gurpartap/storekit-go v0.0.0-20201205024111-36b6cd5c6a21/go.mod h1:7P github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= -github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc= github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/MicahParks/keyfunc v1.9.0 h1:lhKd5xrFHLNOWrDc4Tyb/Q1AJ4LCzQ48GVJyVIID3+o= github.com/MicahParks/keyfunc v1.9.0/go.mod h1:IdnCilugA0O/99dW+/MkvlyrsX8+L8+x95xuVNtM5jw= @@ -188,7 +185,6 @@ github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe/go.mod h1:6pvJx4me5XP github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= -github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4= @@ -388,7 +384,6 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/gofrs/flock v0.8.0/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= -github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/googleapis v1.4.1/go.mod h1:2lpHqI5OcWCtVElxXnPt+s8oJvMpySlOyM6xDCrzib4= @@ -623,6 +618,7 @@ github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/ github.com/iris-contrib/jade v1.1.3/go.mod h1:H/geBymxJhShH5kecoiOCSssPX7QWYH7UaeZTSWddIk= github.com/iris-contrib/pongo2 v0.0.1/go.mod h1:Ssh+00+3GAZqSQb30AvBRNxBx7rf0GqwkjqxNd0u65g= github.com/iris-contrib/schema v0.0.1/go.mod h1:urYA3uvUNG1TIIjOSCzHr9/LmbQo8LrOcOqfqxa4hXw= +github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= @@ -639,10 +635,10 @@ github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= github.com/jackc/pgmock v0.0.0-20201204152224-4fe30f7445fd/go.mod h1:hrBW0Enj2AZTNpt/7Y5rr2xe/9Mn757Wtb2xeBzPv2c= -github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5Wi/+Zz7xoE5ALHsRQlOctkOiHc= github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A= github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= @@ -674,7 +670,6 @@ github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSlj github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= -github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= @@ -1227,7 +1222,6 @@ golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1318,7 +1312,6 @@ golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220708220712-1185a9018129/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1423,7 +1416,6 @@ golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1436,7 +1428,6 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/rpc/erigon.go b/rpc/erigon.go index 8ab0d3c09e..e4b1ad4caf 100644 --- a/rpc/erigon.go +++ b/rpc/erigon.go @@ -6,12 +6,16 @@ import ( "encoding/json" "fmt" "math/big" + "net/http" + "os" "strconv" "strings" "sync" "time" "github.com/gobitfly/eth2-beaconchain-explorer/contracts/oneinchoracle" + "github.com/gobitfly/eth2-beaconchain-explorer/db2" + "github.com/gobitfly/eth2-beaconchain-explorer/db2/store" "github.com/gobitfly/eth2-beaconchain-explorer/erc20" "github.com/gobitfly/eth2-beaconchain-explorer/metrics" "github.com/gobitfly/eth2-beaconchain-explorer/types" @@ -37,6 +41,8 @@ type ErigonClient struct { ethClient *ethclient.Client chainID *big.Int multiChecker *Balance + + rawStore db2.RawStoreReader } var CurrentErigonClient *ErigonClient @@ -47,17 +53,40 @@ func NewErigonClient(endpoint string) (*ErigonClient, error) { endpoint: endpoint, } - rpcClient, err := geth_rpc.Dial(client.endpoint) - if err != nil { - return nil, fmt.Errorf("error dialing rpc node: %w", err) + var opts []geth_rpc.ClientOption + if utils.Config != nil { + if utils.Config.RawBigtable.Bigtable.Project != "" && utils.Config.RawBigtable.Bigtable.Instance != "" { + if utils.Config.RawBigtable.Bigtable.Emulator { + err := os.Setenv("BIGTABLE_EMULATOR_HOST", fmt.Sprintf("%s:%d", utils.Config.RawBigtable.Bigtable.EmulatorHost, utils.Config.RawBigtable.Bigtable.EmulatorPort)) + if err != nil { + return nil, fmt.Errorf("error while setting BIGTABLE_EMULATOR_HOST env: %w", err) + } + } + project, instance := utils.Config.RawBigtable.Bigtable.Project, utils.Config.RawBigtable.Bigtable.Instance + var db store.Store + bt, err := store.NewBigTable(project, instance, nil) + if err != nil { + return nil, err + } + db = store.Wrap(bt, db2.BlocksRawTable, "") + if utils.Config.RawBigtable.Remote != "" { + db = store.NewRemoteClient(utils.Config.RawBigtable.Remote) + } + rawStore := db2.WithCache(db2.NewRawStore(db)) + roundTripper := db2.NewBigTableEthRaw(rawStore, utils.Config.Chain.Id) + opts = append(opts, geth_rpc.WithHTTPClient(&http.Client{ + Transport: db2.NewWithFallback(roundTripper, http.DefaultTransport), + })) + client.rawStore = rawStore + } } - client.rpcClient = rpcClient - ethClient, err := ethclient.Dial(client.endpoint) + rpcClient, err := geth_rpc.DialOptions(context.Background(), client.endpoint, opts...) if err != nil { return nil, fmt.Errorf("error dialing rpc node: %w", err) } - client.ethClient = ethClient + client.rpcClient = rpcClient + client.ethClient = ethclient.NewClient(rpcClient) client.multiChecker, err = NewBalance(common.HexToAddress("0xb1F8e55c7f64D203C1400B9D8555d050F94aDF39"), client.ethClient) if err != nil { @@ -93,144 +122,35 @@ func (client *ErigonClient) GetRPCClient() *geth_rpc.Client { } func (client *ErigonClient) GetBlock(number int64, traceMode string) (*types.Eth1Block, *types.GetBlockTimings, error) { - startTime := time.Now() + start := time.Now() + timings := &types.GetBlockTimings{} + mu := sync.Mutex{} + defer func() { - metrics.TaskDuration.WithLabelValues("rpc_el_get_block").Observe(time.Since(startTime).Seconds()) + metrics.TaskDuration.WithLabelValues("rpc_el_get_block").Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - timings := &types.GetBlockTimings{} - mu := sync.Mutex{} - - block, err := client.ethClient.BlockByNumber(ctx, big.NewInt(number)) - if err != nil { - return nil, nil, err - } - timings.Headers = time.Since(startTime) - - c := &types.Eth1Block{ - Hash: block.Hash().Bytes(), - ParentHash: block.ParentHash().Bytes(), - UncleHash: block.UncleHash().Bytes(), - Coinbase: block.Coinbase().Bytes(), - Root: block.Root().Bytes(), - TxHash: block.TxHash().Bytes(), - ReceiptHash: block.ReceiptHash().Bytes(), - Difficulty: block.Difficulty().Bytes(), - Number: block.NumberU64(), - GasLimit: block.GasLimit(), - GasUsed: block.GasUsed(), - Time: timestamppb.New(time.Unix(int64(block.Time()), 0)), - Extra: block.Extra(), - MixDigest: block.MixDigest().Bytes(), - Bloom: block.Bloom().Bytes(), - Uncles: []*types.Eth1Block{}, - Transactions: []*types.Eth1Transaction{}, - Withdrawals: []*types.Eth1Withdrawal{}, - } - blobGasUsed := block.BlobGasUsed() - if blobGasUsed != nil { - c.BlobGasUsed = *blobGasUsed - } - excessBlobGas := block.ExcessBlobGas() - if excessBlobGas != nil { - c.ExcessBlobGas = *excessBlobGas - } - - if block.BaseFee() != nil { - c.BaseFee = block.BaseFee().Bytes() - } - - for _, uncle := range block.Uncles() { - pbUncle := &types.Eth1Block{ - Hash: uncle.Hash().Bytes(), - ParentHash: uncle.ParentHash.Bytes(), - UncleHash: uncle.UncleHash.Bytes(), - Coinbase: uncle.Coinbase.Bytes(), - Root: uncle.Root.Bytes(), - TxHash: uncle.TxHash.Bytes(), - ReceiptHash: uncle.ReceiptHash.Bytes(), - Difficulty: uncle.Difficulty.Bytes(), - Number: uncle.Number.Uint64(), - GasLimit: uncle.GasLimit, - GasUsed: uncle.GasUsed, - Time: timestamppb.New(time.Unix(int64(uncle.Time), 0)), - Extra: uncle.Extra, - MixDigest: uncle.MixDigest.Bytes(), - Bloom: uncle.Bloom.Bytes(), - } - - c.Uncles = append(c.Uncles, pbUncle) - } - - receipts := make([]*geth_types.Receipt, len(block.Transactions())) - - if len(block.Withdrawals()) > 0 { - withdrawalsIndexed := make([]*types.Eth1Withdrawal, 0, len(block.Withdrawals())) - for _, w := range block.Withdrawals() { - withdrawalsIndexed = append(withdrawalsIndexed, &types.Eth1Withdrawal{ - Index: w.Index, - ValidatorIndex: w.Validator, - Address: w.Address.Bytes(), - Amount: new(big.Int).SetUint64(w.Amount).Bytes(), - }) - } - c.Withdrawals = withdrawalsIndexed - } - - txs := block.Transactions() - - for _, tx := range txs { - - var from []byte - sender, err := geth_types.Sender(geth_types.NewCancunSigner(tx.ChainId()), tx) - if err != nil { - from, _ = hex.DecodeString("abababababababababababababababababababab") - logrus.Errorf("error converting tx %v to msg: %v", tx.Hash(), err) - } else { - from = sender.Bytes() - } - - pbTx := &types.Eth1Transaction{ - Type: uint32(tx.Type()), - Nonce: tx.Nonce(), - GasPrice: tx.GasPrice().Bytes(), - MaxPriorityFeePerGas: tx.GasTipCap().Bytes(), - MaxFeePerGas: tx.GasFeeCap().Bytes(), - Gas: tx.Gas(), - Value: tx.Value().Bytes(), - Data: tx.Data(), - From: from, - ChainId: tx.ChainId().Bytes(), - AccessList: []*types.AccessList{}, - Hash: tx.Hash().Bytes(), - Itx: []*types.Eth1InternalTransaction{}, - BlobVersionedHashes: [][]byte{}, - } - - if tx.BlobGasFeeCap() != nil { - pbTx.MaxFeePerBlobGas = tx.BlobGasFeeCap().Bytes() - } - for _, h := range tx.BlobHashes() { - pbTx.BlobVersionedHashes = append(pbTx.BlobVersionedHashes, h.Bytes()) - } - - if tx.To() != nil { - pbTx.To = tx.To().Bytes() - } - - c.Transactions = append(c.Transactions, pbTx) - - } - var traces []*Eth1InternalTransactionWithPosition + var block *geth_types.Block + var receipts []*geth_types.Receipt g := new(errgroup.Group) g.Go(func() error { - start := time.Now() - if err = client.rpcClient.CallContext(ctx, &receipts, "eth_getBlockReceipts", fmt.Sprintf("0x%x", block.NumberU64())); err != nil { - return fmt.Errorf("error retrieving receipts for block %v: %w", block.Number(), err) + b, err := client.ethClient.BlockByNumber(ctx, big.NewInt(number)) + if err != nil { + return err + } + mu.Lock() + timings.Headers = time.Since(start) + mu.Unlock() + block = b + return nil + }) + g.Go(func() error { + if err := client.rpcClient.CallContext(ctx, &receipts, "eth_getBlockReceipts", fmt.Sprintf("0x%x", number)); err != nil { + return fmt.Errorf("error retrieving receipts for block %v: %w", number, err) } mu.Lock() timings.Receipts = time.Since(start) @@ -238,11 +158,11 @@ func (client *ErigonClient) GetBlock(number int64, traceMode string) (*types.Eth return nil }) g.Go(func() error { - start := time.Now() - traces, err = client.getTrace(traceMode, block) + t, err := client.getTrace(traceMode, big.NewInt(number)) if err != nil { - return fmt.Errorf("error retrieving traces for block %v: %w", block.Number(), err) + return fmt.Errorf("error retrieving traces for block %v: %w", number, err) } + traces = t mu.Lock() timings.Traces = time.Since(start) mu.Unlock() @@ -251,40 +171,174 @@ func (client *ErigonClient) GetBlock(number int64, traceMode string) (*types.Eth if err := g.Wait(); err != nil { return nil, nil, err } - traceIndex := 0 - for txPosition, receipt := range receipts { - c.Transactions[txPosition].ContractAddress = receipt.ContractAddress[:] - c.Transactions[txPosition].CommulativeGasUsed = receipt.CumulativeGasUsed - c.Transactions[txPosition].GasUsed = receipt.GasUsed - c.Transactions[txPosition].LogsBloom = receipt.Bloom[:] - c.Transactions[txPosition].Logs = make([]*types.Eth1Log, 0, len(receipt.Logs)) - c.Transactions[txPosition].Status = receipt.Status - if receipt.BlobGasPrice != nil { - c.Transactions[txPosition].BlobGasPrice = receipt.BlobGasPrice.Bytes() + withdrawals := make([]*types.Eth1Withdrawal, len(block.Withdrawals())) + for i, withdrawal := range block.Withdrawals() { + withdrawals[i] = &types.Eth1Withdrawal{ + Index: withdrawal.Index, + ValidatorIndex: withdrawal.Validator, + Address: withdrawal.Address.Bytes(), + Amount: new(big.Int).SetUint64(withdrawal.Amount).Bytes(), } - c.Transactions[txPosition].BlobGasUsed = receipt.BlobGasUsed + } - for _, l := range receipt.Logs { - topics := make([][]byte, 0, len(l.Topics)) - for _, t := range l.Topics { - topics = append(topics, t.Bytes()) + transactions := make([]*types.Eth1Transaction, len(block.Transactions())) + traceIndex := 0 + for txPosition, receipt := range receipts { + logs := make([]*types.Eth1Log, len(receipt.Logs)) + for i, log := range receipt.Logs { + topics := make([][]byte, len(log.Topics)) + for j, topic := range log.Topics { + topics[j] = topic.Bytes() } - c.Transactions[txPosition].Logs = append(c.Transactions[txPosition].Logs, &types.Eth1Log{ - Address: l.Address.Bytes(), - Data: l.Data, - Removed: l.Removed, + logs[i] = &types.Eth1Log{ + Address: log.Address.Bytes(), + Data: log.Data, + Removed: log.Removed, Topics: topics, - }) - } - if len(traces) == 0 { - continue + } } + + var internals []*types.Eth1InternalTransaction for ; traceIndex < len(traces) && traces[traceIndex].txPosition == txPosition; traceIndex++ { - c.Transactions[txPosition].Itx = append(c.Transactions[txPosition].Itx, &traces[traceIndex].Eth1InternalTransaction) + internals = append(internals, &traces[traceIndex].Eth1InternalTransaction) + } + + tx := block.Transactions()[txPosition] + transactions[txPosition] = &types.Eth1Transaction{ + Type: uint32(tx.Type()), + Nonce: tx.Nonce(), + GasPrice: tx.GasPrice().Bytes(), + MaxPriorityFeePerGas: tx.GasTipCap().Bytes(), + MaxFeePerGas: tx.GasFeeCap().Bytes(), + Gas: tx.Gas(), + Value: tx.Value().Bytes(), + Data: tx.Data(), + To: func() []byte { + if tx.To() != nil { + return tx.To().Bytes() + } + return nil + }(), + From: func() []byte { + // this won't make a request in most cases as the sender is already present in the cache + // context https://github.com/ethereum/go-ethereum/blob/v1.14.11/ethclient/ethclient.go#L268 + sender, err := client.ethClient.TransactionSender(context.Background(), tx, block.Hash(), uint(txPosition)) + if err != nil { + sender = common.HexToAddress("abababababababababababababababababababab") + logrus.Errorf("could not retrieve tx sender %v: %v", tx.Hash(), err) + } + return sender.Bytes() + }(), + ChainId: tx.ChainId().Bytes(), + AccessList: []*types.AccessList{}, + Hash: tx.Hash().Bytes(), + ContractAddress: receipt.ContractAddress[:], + CommulativeGasUsed: receipt.CumulativeGasUsed, + GasUsed: receipt.GasUsed, + LogsBloom: receipt.Bloom[:], + Status: receipt.Status, + Logs: logs, + Itx: internals, + MaxFeePerBlobGas: func() []byte { + if tx.BlobGasFeeCap() != nil { + return tx.BlobGasFeeCap().Bytes() + } + return nil + }(), + BlobVersionedHashes: func() (b [][]byte) { + for _, h := range tx.BlobHashes() { + b = append(b, h.Bytes()) + } + return b + }(), + BlobGasPrice: func() []byte { + if receipt.BlobGasPrice != nil { + return receipt.BlobGasPrice.Bytes() + } + return nil + }(), + BlobGasUsed: receipt.BlobGasUsed, + } + } + + uncles := make([]*types.Eth1Block, len(block.Uncles())) + for i, uncle := range block.Uncles() { + uncles[i] = &types.Eth1Block{ + Hash: uncle.Hash().Bytes(), + ParentHash: uncle.ParentHash.Bytes(), + UncleHash: uncle.UncleHash.Bytes(), + Coinbase: uncle.Coinbase.Bytes(), + Root: uncle.Root.Bytes(), + TxHash: uncle.TxHash.Bytes(), + ReceiptHash: uncle.ReceiptHash.Bytes(), + Difficulty: uncle.Difficulty.Bytes(), + Number: uncle.Number.Uint64(), + GasLimit: uncle.GasLimit, + GasUsed: uncle.GasUsed, + Time: timestamppb.New(time.Unix(int64(uncle.Time), 0)), + Extra: uncle.Extra, + MixDigest: uncle.MixDigest.Bytes(), + Bloom: uncle.Bloom.Bytes(), + } + } + + return &types.Eth1Block{ + Hash: block.Hash().Bytes(), + ParentHash: block.ParentHash().Bytes(), + UncleHash: block.UncleHash().Bytes(), + Coinbase: block.Coinbase().Bytes(), + Root: block.Root().Bytes(), + TxHash: block.TxHash().Bytes(), + ReceiptHash: block.ReceiptHash().Bytes(), + Difficulty: block.Difficulty().Bytes(), + Number: block.NumberU64(), + GasLimit: block.GasLimit(), + GasUsed: block.GasUsed(), + Time: timestamppb.New(time.Unix(int64(block.Time()), 0)), + Extra: block.Extra(), + MixDigest: block.MixDigest().Bytes(), + Bloom: block.Bloom().Bytes(), + BaseFee: func() []byte { + if block.BaseFee() != nil { + return block.BaseFee().Bytes() + } + return nil + }(), + Uncles: uncles, + Transactions: transactions, + Withdrawals: withdrawals, + BlobGasUsed: func() uint64 { + blobGasUsed := block.BlobGasUsed() + if blobGasUsed != nil { + return *blobGasUsed + } + return 0 + }(), + ExcessBlobGas: func() uint64 { + excessBlobGas := block.ExcessBlobGas() + if excessBlobGas != nil { + return *excessBlobGas + } + return 0 + }(), + }, timings, nil +} + +func (client *ErigonClient) GetBlocks(start, end int64, traceMode string) ([]*types.Eth1Block, error) { + _, err := client.rawStore.ReadBlocksByNumber(client.chainID.Uint64(), start, end) + if err != nil { + return nil, err + } + blocks := make([]*types.Eth1Block, end-start+1) + for i := start; i <= end; i++ { + block, _, err := client.GetBlock(i, traceMode) + if err != nil { + return nil, err } + blocks[i-start] = block } - return c, timings, nil + return blocks, nil } func (client *ErigonClient) GetBlockNumberByHash(hash string) (uint64, error) { @@ -358,10 +412,10 @@ func extractCalls(r *GethTraceCallResult, d *[]*GethTraceCallResult) { } } -func (client *ErigonClient) TraceGeth(blockHash common.Hash) ([]*GethTraceCallResult, error) { +func (client *ErigonClient) TraceGeth(blockNumber *big.Int) ([]*GethTraceCallResult, error) { var res []*GethTraceCallResultWrapper - err := client.rpcClient.Call(&res, "debug_traceBlockByHash", blockHash, gethTracerArg) + err := client.rpcClient.Call(&res, "debug_traceBlockByNumber", hexutil.EncodeBig(blockNumber), gethTracerArg) if err != nil { return nil, err } @@ -672,32 +726,31 @@ func toCallArg(msg ethereum.CallMsg) interface{} { return arg } -func (client *ErigonClient) getTrace(traceMode string, block *geth_types.Block) ([]*Eth1InternalTransactionWithPosition, error) { - if block.NumberU64() == 0 { // genesis block is not traceable +func (client *ErigonClient) getTrace(traceMode string, blockNumber *big.Int) ([]*Eth1InternalTransactionWithPosition, error) { + if blockNumber.Uint64() == 0 { // genesis block is not traceable return nil, nil } switch traceMode { case "parity": - return client.getTraceParity(block) + return client.getTraceParity(blockNumber) case "parity/geth": - traces, err := client.getTraceParity(block) + traces, err := client.getTraceParity(blockNumber) if err == nil { return traces, nil } - logger.Errorf("error tracing block via parity style traces (%v), %v: %v", block.Number(), block.Hash(), err) + logger.Errorf("error tracing block via parity style traces (%v): %v", blockNumber, err) // fallback to geth traces fallthrough case "geth": - return client.getTraceGeth(block) + return client.getTraceGeth(blockNumber) } return nil, fmt.Errorf("unknown trace mode '%s'", traceMode) } -func (client *ErigonClient) getTraceParity(block *geth_types.Block) ([]*Eth1InternalTransactionWithPosition, error) { - traces, err := client.TraceParity(block.NumberU64()) - +func (client *ErigonClient) getTraceParity(blockNumber *big.Int) ([]*Eth1InternalTransactionWithPosition, error) { + traces, err := client.TraceParity(blockNumber.Uint64()) if err != nil { - return nil, fmt.Errorf("error tracing block via parity style traces (%v), %v: %w", block.Number(), block.Hash(), err) + return nil, fmt.Errorf("error tracing block via parity style traces (%v): %w", blockNumber, err) } var indexedTraces []*Eth1InternalTransactionWithPosition @@ -708,9 +761,6 @@ func (client *ErigonClient) getTraceParity(block *geth_types.Block) ([]*Eth1Inte if trace.TransactionHash == "" { continue } - if trace.TransactionPosition >= len(block.Transactions()) { - return nil, fmt.Errorf("error transaction position %v out of range", trace.TransactionPosition) - } from, to, value, traceType := trace.ConvertFields() indexedTraces = append(indexedTraces, &Eth1InternalTransactionWithPosition{ @@ -728,29 +778,37 @@ func (client *ErigonClient) getTraceParity(block *geth_types.Block) ([]*Eth1Inte return indexedTraces, nil } -func (client *ErigonClient) getTraceGeth(block *geth_types.Block) ([]*Eth1InternalTransactionWithPosition, error) { - traces, err := client.TraceGeth(block.Hash()) +func (client *ErigonClient) getTraceGeth(blockNumber *big.Int) ([]*Eth1InternalTransactionWithPosition, error) { + traces, err := client.TraceGeth(blockNumber) if err != nil { - return nil, fmt.Errorf("error tracing block via geth style traces (%v), %v: %w", block.Number(), block.Hash(), err) + return nil, fmt.Errorf("error tracing block via geth style traces (%v): %w", blockNumber, err) } var indexedTraces []*Eth1InternalTransactionWithPosition + var txPosition int //, tracePath int + paths := make(map[*GethTraceCallResult]string) for i, trace := range traces { switch trace.Type { case "CREATE2": trace.Type = "CREATE" case "CREATE", "SELFDESTRUCT", "SUICIDE", "CALL", "DELEGATECALL", "STATICCALL": case "": - logrus.WithFields(logrus.Fields{"type": trace.Type, "block.Number": block.Number(), "block.Hash": block.Hash()}).Errorf("geth style trace without type") + logrus.WithFields(logrus.Fields{"type": trace.Type, "block.Number": blockNumber}).Errorf("geth style trace without type") spew.Dump(trace) continue default: spew.Dump(trace) logrus.Fatalf("unknown trace type %v in tx %v", trace.Type, trace.TransactionPosition) } + if txPosition != trace.TransactionPosition { + txPosition = trace.TransactionPosition + paths = make(map[*GethTraceCallResult]string) + } + for index, call := range trace.Calls { + paths[call] = fmt.Sprintf("%s %d", paths[trace], index) + } - logger.Tracef("appending trace %v to tx %d:%x from %v to %v value %v", i, block.Number(), trace.TransactionPosition, trace.From, trace.To, trace.Value) - + logger.Tracef("appending trace %v to tx %d:%x from %v to %v value %v", i, blockNumber, trace.TransactionPosition, trace.From, trace.To, trace.Value) indexedTraces = append(indexedTraces, &Eth1InternalTransactionWithPosition{ Eth1InternalTransaction: types.Eth1InternalTransaction{ Type: strings.ToLower(trace.Type), @@ -758,7 +816,7 @@ func (client *ErigonClient) getTraceGeth(block *geth_types.Block) ([]*Eth1Intern To: trace.To.Bytes(), Value: common.FromHex(trace.Value), ErrorMsg: trace.Error, - Path: "0", + Path: fmt.Sprintf("[%s]", strings.TrimPrefix(paths[trace], " ")), }, txPosition: trace.TransactionPosition, }) diff --git a/types/config.go b/types/config.go index a59b160d45..3547eae043 100644 --- a/types/config.go +++ b/types/config.go @@ -29,7 +29,11 @@ type Config struct { MaxIdleConns int `yaml:"maxIdleConns" envconfig:"WRITER_DB_MAX_IDLE_CONNS"` SSL bool `yaml:"ssl" envconfig:"WRITER_DB_SSL"` } `yaml:"writerDatabase"` - Bigtable `yaml:"bigtable"` + Bigtable Bigtable `yaml:"bigtable"` + RawBigtable struct { + Bigtable Bigtable `yaml:"bigtable"` + Remote string `yaml:"remote"` + } `yaml:"rawBigtable"` BlobIndexer struct { S3 struct { Endpoint string `yaml:"endpoint" envconfig:"BLOB_INDEXER_S3_ENDPOINT"`