From ba377066811a473e880fe5880ff89e3488f475b2 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sat, 20 Mar 2021 10:52:00 +0700 Subject: [PATCH] move remote eth backend from ethdb to core package (#1567) --- cmd/rpcdaemon/cli/config.go | 10 ++-- cmd/state/commands/gas_limits.go | 2 +- cmd/state/commands/state_growth.go | 2 +- common/dbutils/composite_keys.go | 1 - common/dbutils/composite_keys_test.go | 2 - core/eth_backend.go | 64 ++++++++++++++++++++ ethdb/kv_abstract_test.go | 2 +- ethdb/kv_remote.go | 84 +++++---------------------- 8 files changed, 86 insertions(+), 81 deletions(-) diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 340fed6e720..4a11e6b9224 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -7,6 +7,7 @@ import ( "time" "github.com/ledgerwatch/turbo-geth/cmd/utils" + "github.com/ledgerwatch/turbo-geth/core" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/internal/debug" "github.com/ledgerwatch/turbo-geth/log" @@ -110,13 +111,14 @@ func OpenDB(cfg Flags) (ethdb.KV, ethdb.Backend, error) { } } if cfg.PrivateApiAddr != "" { - var remoteDb ethdb.KV - remoteDb, ethBackend, err = ethdb.NewRemote().Path(cfg.PrivateApiAddr).Open(cfg.TLSCertfile, cfg.TLSKeyFile, cfg.TLSCACert) + var remoteKv ethdb.KV + remoteKv, err = ethdb.NewRemote().Path(cfg.PrivateApiAddr).Open(cfg.TLSCertfile, cfg.TLSKeyFile, cfg.TLSCACert) if err != nil { - return nil, nil, fmt.Errorf("could not connect to remoteDb: %w", err) + return nil, nil, fmt.Errorf("could not connect to remoteKv: %w", err) } + core.NewRemoteBackend(remoteKv) if db == nil { - db = remoteDb + db = remoteKv } } else { return nil, nil, fmt.Errorf("either remote db or lmdb must be specified") diff --git a/cmd/state/commands/gas_limits.go b/cmd/state/commands/gas_limits.go index 3b8347bfd2c..0ec3c026392 100644 --- a/cmd/state/commands/gas_limits.go +++ b/cmd/state/commands/gas_limits.go @@ -28,7 +28,7 @@ var gasLimitsCmd = &cobra.Command{ } }).MustOpen() - remoteDB, _, err := ethdb.NewRemote().Path(privateApiAddr).Open("", "", "") + remoteDB, err := ethdb.NewRemote().Path(privateApiAddr).Open("", "", "") if err != nil { return err } diff --git a/cmd/state/commands/state_growth.go b/cmd/state/commands/state_growth.go index 5e892a53aa6..5316106a761 100644 --- a/cmd/state/commands/state_growth.go +++ b/cmd/state/commands/state_growth.go @@ -27,7 +27,7 @@ func init() { } }).MustOpen() - remoteDB, _, err := ethdb.NewRemote().Path(privateApiAddr).Open("", "", "") + remoteDB, err := ethdb.NewRemote().Path(privateApiAddr).Open("", "", "") if err != nil { return err } diff --git a/common/dbutils/composite_keys.go b/common/dbutils/composite_keys.go index e1ccca25cfb..dd5ebbc05e8 100644 --- a/common/dbutils/composite_keys.go +++ b/common/dbutils/composite_keys.go @@ -18,7 +18,6 @@ func HeaderKey(number uint64, hash common.Hash) []byte { return append(EncodeBlockNumber(number), hash.Bytes()...) } - // blockBodyKey = blockBodyPrefix + num (uint64 big endian) + hash func BlockBodyKey(number uint64, hash common.Hash) []byte { return append(EncodeBlockNumber(number), hash.Bytes()...) diff --git a/common/dbutils/composite_keys_test.go b/common/dbutils/composite_keys_test.go index df282b7d42e..3c24b87fb30 100644 --- a/common/dbutils/composite_keys_test.go +++ b/common/dbutils/composite_keys_test.go @@ -7,8 +7,6 @@ import ( "github.com/stretchr/testify/assert" ) - - func TestPlainParseStoragePrefix(t *testing.T) { expectedAddr := common.HexToAddress("0x5A0b54D5dc17e0AadC383d2db43B0a0D3E029c4c") expectedIncarnation := uint64(999000999) diff --git a/core/eth_backend.go b/core/eth_backend.go index 25700c2ccb0..66854527da3 100644 --- a/core/eth_backend.go +++ b/core/eth_backend.go @@ -1,9 +1,15 @@ package core import ( + "context" + "io" + "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/core/types" + "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/ledgerwatch/turbo-geth/gointerfaces" "github.com/ledgerwatch/turbo-geth/gointerfaces/remote" + "github.com/ledgerwatch/turbo-geth/log" "github.com/ledgerwatch/turbo-geth/rlp" ) @@ -34,3 +40,61 @@ func (back *EthBackend) Subscribe(func(*remote.SubscribeReply)) error { // do nothing return nil } + +type RemoteBackend struct { + remoteEthBackend remote.ETHBACKENDClient + log log.Logger +} + +func NewRemoteBackend(kv ethdb.KV) *RemoteBackend { + return &RemoteBackend{ + remoteEthBackend: remote.NewETHBACKENDClient(kv.(*ethdb.RemoteKV).GrpcConn()), + log: log.New("remote_db"), + } +} + +func (back *RemoteBackend) AddLocal(signedTx []byte) ([]byte, error) { + res, err := back.remoteEthBackend.Add(context.Background(), &remote.TxRequest{Signedtx: signedTx}) + if err != nil { + return common.Hash{}.Bytes(), err + } + return gointerfaces.ConvertH256ToHash(res.Hash).Bytes(), nil +} + +func (back *RemoteBackend) Etherbase() (common.Address, error) { + res, err := back.remoteEthBackend.Etherbase(context.Background(), &remote.EtherbaseRequest{}) + if err != nil { + return common.Address{}, err + } + + return gointerfaces.ConvertH160toAddress(res.Address), nil +} + +func (back *RemoteBackend) NetVersion() (uint64, error) { + res, err := back.remoteEthBackend.NetVersion(context.Background(), &remote.NetVersionRequest{}) + if err != nil { + return 0, err + } + + return res.Id, nil +} + +func (back *RemoteBackend) Subscribe(onNewEvent func(*remote.SubscribeReply)) error { + subscription, err := back.remoteEthBackend.Subscribe(context.Background(), &remote.SubscribeRequest{}) + if err != nil { + return err + } + for { + event, err := subscription.Recv() + if err == io.EOF { + log.Info("rpcdaemon: the subscription channel was closed") + break + } + if err != nil { + return err + } + + onNewEvent(event) + } + return nil +} diff --git a/ethdb/kv_abstract_test.go b/ethdb/kv_abstract_test.go index 1b9c9007a5c..d7569b06a5c 100644 --- a/ethdb/kv_abstract_test.go +++ b/ethdb/kv_abstract_test.go @@ -142,7 +142,7 @@ func setupDatabases(f ethdb.BucketConfigsFunc) (writeDBs []ethdb.KV, readDBs []e conn := bufconn.Listen(1024 * 1024) - rdb, _ := ethdb.NewRemote().InMem(conn).MustOpen() + rdb := ethdb.NewRemote().InMem(conn).MustOpen() readDBs = []ethdb.KV{ writeDBs[0], writeDBs[1], diff --git a/ethdb/kv_remote.go b/ethdb/kv_remote.go index 0ed2b80d8a7..2f573d7580d 100644 --- a/ethdb/kv_remote.go +++ b/ethdb/kv_remote.go @@ -14,9 +14,7 @@ import ( "unsafe" "github.com/c2h5oh/datasize" - "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/dbutils" - "github.com/ledgerwatch/turbo-geth/gointerfaces" "github.com/ledgerwatch/turbo-geth/gointerfaces/remote" "github.com/ledgerwatch/turbo-geth/log" "google.golang.org/grpc" @@ -88,14 +86,7 @@ func (opts remoteOpts) InMem(listener *bufconn.Listener) remoteOpts { return opts } -type RemoteBackend struct { - opts remoteOpts - remoteEthBackend remote.ETHBACKENDClient - conn *grpc.ClientConn - log log.Logger -} - -func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, Backend, error) { +func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, error) { var dialOpts []grpc.DialOption dialOpts = []grpc.DialOption{ grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.DefaultConfig, MinConnectTimeout: 10 * time.Minute}), @@ -111,19 +102,19 @@ func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, Backend, erro creds, err = credentials.NewClientTLSFromFile(certFile, "") if err != nil { - return nil, nil, err + return nil, err } } else { // load peer cert/key, ca cert peerCert, err := tls.LoadX509KeyPair(certFile, keyFile) if err != nil { log.Error("load peer cert/key error:%v", err) - return nil, nil, err + return nil, err } caCert, err := ioutil.ReadFile(caCert) if err != nil { log.Error("read ca cert file error:%v", err) - return nil, nil, err + return nil, err } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) @@ -150,7 +141,7 @@ func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, Backend, erro conn, err := grpc.DialContext(ctx, opts.DialAddress, dialOpts...) if err != nil { - return nil, nil, err + return nil, err } db := &RemoteKV{ @@ -166,22 +157,15 @@ func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, Backend, erro db.buckets[name] = cfg } - eth := &RemoteBackend{ - opts: opts, - remoteEthBackend: remote.NewETHBACKENDClient(conn), - conn: conn, - log: log.New("remote_db", opts.DialAddress), - } - - return db, eth, nil + return db, nil } -func (opts remoteOpts) MustOpen() (KV, Backend) { - db, txPool, err := opts.Open("", "", "") +func (opts remoteOpts) MustOpen() KV { + db, err := opts.Open("", "", "") if err != nil { panic(err) } - return db, txPool + return db } func NewRemote() remoteOpts { @@ -192,6 +176,10 @@ func (db *RemoteKV) AllBuckets() dbutils.BucketsCfg { return db.buckets } +func (db *RemoteKV) GrpcConn() *grpc.ClientConn { + return db.conn +} + // Close // All transactions must be closed before closing the database. func (db *RemoteKV) Close() { @@ -642,49 +630,3 @@ func (c *remoteCursorDupSort) LastDup() ([]byte, error) { } return c.lastDup() } - -func (back *RemoteBackend) AddLocal(signedTx []byte) ([]byte, error) { - res, err := back.remoteEthBackend.Add(context.Background(), &remote.TxRequest{Signedtx: signedTx}) - if err != nil { - return common.Hash{}.Bytes(), err - } - return gointerfaces.ConvertH256ToHash(res.Hash).Bytes(), nil -} - -func (back *RemoteBackend) Etherbase() (common.Address, error) { - res, err := back.remoteEthBackend.Etherbase(context.Background(), &remote.EtherbaseRequest{}) - if err != nil { - return common.Address{}, err - } - - return gointerfaces.ConvertH160toAddress(res.Address), nil -} - -func (back *RemoteBackend) NetVersion() (uint64, error) { - res, err := back.remoteEthBackend.NetVersion(context.Background(), &remote.NetVersionRequest{}) - if err != nil { - return 0, err - } - - return res.Id, nil -} - -func (back *RemoteBackend) Subscribe(onNewEvent func(*remote.SubscribeReply)) error { - subscription, err := back.remoteEthBackend.Subscribe(context.Background(), &remote.SubscribeRequest{}) - if err != nil { - return err - } - for { - event, err := subscription.Recv() - if err == io.EOF { - log.Info("rpcdaemon: the subscription channel was closed") - break - } - if err != nil { - return err - } - - onNewEvent(event) - } - return nil -}