Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move remote eth backend from ethdb to core package #1567

Merged
merged 1 commit into from
Mar 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/ledgerwatch/turbo-geth/cmd/utils"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/internal/debug"
"github.com/ledgerwatch/turbo-geth/log"
Expand Down Expand Up @@ -110,13 +111,14 @@ func OpenDB(cfg Flags) (ethdb.KV, ethdb.Backend, error) {
}
}
if cfg.PrivateApiAddr != "" {
var remoteDb ethdb.KV
remoteDb, ethBackend, err = ethdb.NewRemote().Path(cfg.PrivateApiAddr).Open(cfg.TLSCertfile, cfg.TLSKeyFile, cfg.TLSCACert)
var remoteKv ethdb.KV
remoteKv, err = ethdb.NewRemote().Path(cfg.PrivateApiAddr).Open(cfg.TLSCertfile, cfg.TLSKeyFile, cfg.TLSCACert)
if err != nil {
return nil, nil, fmt.Errorf("could not connect to remoteDb: %w", err)
return nil, nil, fmt.Errorf("could not connect to remoteKv: %w", err)
}
core.NewRemoteBackend(remoteKv)
if db == nil {
db = remoteDb
db = remoteKv
}
} else {
return nil, nil, fmt.Errorf("either remote db or lmdb must be specified")
Expand Down
2 changes: 1 addition & 1 deletion cmd/state/commands/gas_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var gasLimitsCmd = &cobra.Command{
}
}).MustOpen()

remoteDB, _, err := ethdb.NewRemote().Path(privateApiAddr).Open("", "", "")
remoteDB, err := ethdb.NewRemote().Path(privateApiAddr).Open("", "", "")
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/state/commands/state_growth.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func init() {
}
}).MustOpen()

remoteDB, _, err := ethdb.NewRemote().Path(privateApiAddr).Open("", "", "")
remoteDB, err := ethdb.NewRemote().Path(privateApiAddr).Open("", "", "")
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion common/dbutils/composite_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ func HeaderKey(number uint64, hash common.Hash) []byte {
return append(EncodeBlockNumber(number), hash.Bytes()...)
}


// blockBodyKey = blockBodyPrefix + num (uint64 big endian) + hash
func BlockBodyKey(number uint64, hash common.Hash) []byte {
return append(EncodeBlockNumber(number), hash.Bytes()...)
Expand Down
2 changes: 0 additions & 2 deletions common/dbutils/composite_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"github.com/stretchr/testify/assert"
)



func TestPlainParseStoragePrefix(t *testing.T) {
expectedAddr := common.HexToAddress("0x5A0b54D5dc17e0AadC383d2db43B0a0D3E029c4c")
expectedIncarnation := uint64(999000999)
Expand Down
64 changes: 64 additions & 0 deletions core/eth_backend.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package core

import (
"context"
"io"

"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/gointerfaces"
"github.com/ledgerwatch/turbo-geth/gointerfaces/remote"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/rlp"
)

Expand Down Expand Up @@ -34,3 +40,61 @@ func (back *EthBackend) Subscribe(func(*remote.SubscribeReply)) error {
// do nothing
return nil
}

type RemoteBackend struct {
remoteEthBackend remote.ETHBACKENDClient
log log.Logger
}

func NewRemoteBackend(kv ethdb.KV) *RemoteBackend {
return &RemoteBackend{
remoteEthBackend: remote.NewETHBACKENDClient(kv.(*ethdb.RemoteKV).GrpcConn()),
log: log.New("remote_db"),
}
}

func (back *RemoteBackend) AddLocal(signedTx []byte) ([]byte, error) {
res, err := back.remoteEthBackend.Add(context.Background(), &remote.TxRequest{Signedtx: signedTx})
if err != nil {
return common.Hash{}.Bytes(), err
}
return gointerfaces.ConvertH256ToHash(res.Hash).Bytes(), nil
}

func (back *RemoteBackend) Etherbase() (common.Address, error) {
res, err := back.remoteEthBackend.Etherbase(context.Background(), &remote.EtherbaseRequest{})
if err != nil {
return common.Address{}, err
}

return gointerfaces.ConvertH160toAddress(res.Address), nil
}

func (back *RemoteBackend) NetVersion() (uint64, error) {
res, err := back.remoteEthBackend.NetVersion(context.Background(), &remote.NetVersionRequest{})
if err != nil {
return 0, err
}

return res.Id, nil
}

func (back *RemoteBackend) Subscribe(onNewEvent func(*remote.SubscribeReply)) error {
subscription, err := back.remoteEthBackend.Subscribe(context.Background(), &remote.SubscribeRequest{})
if err != nil {
return err
}
for {
event, err := subscription.Recv()
if err == io.EOF {
log.Info("rpcdaemon: the subscription channel was closed")
break
}
if err != nil {
return err
}

onNewEvent(event)
}
return nil
}
2 changes: 1 addition & 1 deletion ethdb/kv_abstract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func setupDatabases(f ethdb.BucketConfigsFunc) (writeDBs []ethdb.KV, readDBs []e

conn := bufconn.Listen(1024 * 1024)

rdb, _ := ethdb.NewRemote().InMem(conn).MustOpen()
rdb := ethdb.NewRemote().InMem(conn).MustOpen()
readDBs = []ethdb.KV{
writeDBs[0],
writeDBs[1],
Expand Down
84 changes: 13 additions & 71 deletions ethdb/kv_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ import (
"unsafe"

"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/gointerfaces"
"github.com/ledgerwatch/turbo-geth/gointerfaces/remote"
"github.com/ledgerwatch/turbo-geth/log"
"google.golang.org/grpc"
Expand Down Expand Up @@ -88,14 +86,7 @@ func (opts remoteOpts) InMem(listener *bufconn.Listener) remoteOpts {
return opts
}

type RemoteBackend struct {
opts remoteOpts
remoteEthBackend remote.ETHBACKENDClient
conn *grpc.ClientConn
log log.Logger
}

func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, Backend, error) {
func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, error) {
var dialOpts []grpc.DialOption
dialOpts = []grpc.DialOption{
grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.DefaultConfig, MinConnectTimeout: 10 * time.Minute}),
Expand All @@ -111,19 +102,19 @@ func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, Backend, erro
creds, err = credentials.NewClientTLSFromFile(certFile, "")

if err != nil {
return nil, nil, err
return nil, err
}
} else {
// load peer cert/key, ca cert
peerCert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
log.Error("load peer cert/key error:%v", err)
return nil, nil, err
return nil, err
}
caCert, err := ioutil.ReadFile(caCert)
if err != nil {
log.Error("read ca cert file error:%v", err)
return nil, nil, err
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
Expand All @@ -150,7 +141,7 @@ func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, Backend, erro

conn, err := grpc.DialContext(ctx, opts.DialAddress, dialOpts...)
if err != nil {
return nil, nil, err
return nil, err
}

db := &RemoteKV{
Expand All @@ -166,22 +157,15 @@ func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, Backend, erro
db.buckets[name] = cfg
}

eth := &RemoteBackend{
opts: opts,
remoteEthBackend: remote.NewETHBACKENDClient(conn),
conn: conn,
log: log.New("remote_db", opts.DialAddress),
}

return db, eth, nil
return db, nil
}

func (opts remoteOpts) MustOpen() (KV, Backend) {
db, txPool, err := opts.Open("", "", "")
func (opts remoteOpts) MustOpen() KV {
db, err := opts.Open("", "", "")
if err != nil {
panic(err)
}
return db, txPool
return db
}

func NewRemote() remoteOpts {
Expand All @@ -192,6 +176,10 @@ func (db *RemoteKV) AllBuckets() dbutils.BucketsCfg {
return db.buckets
}

func (db *RemoteKV) GrpcConn() *grpc.ClientConn {
return db.conn
}

// Close
// All transactions must be closed before closing the database.
func (db *RemoteKV) Close() {
Expand Down Expand Up @@ -642,49 +630,3 @@ func (c *remoteCursorDupSort) LastDup() ([]byte, error) {
}
return c.lastDup()
}

func (back *RemoteBackend) AddLocal(signedTx []byte) ([]byte, error) {
res, err := back.remoteEthBackend.Add(context.Background(), &remote.TxRequest{Signedtx: signedTx})
if err != nil {
return common.Hash{}.Bytes(), err
}
return gointerfaces.ConvertH256ToHash(res.Hash).Bytes(), nil
}

func (back *RemoteBackend) Etherbase() (common.Address, error) {
res, err := back.remoteEthBackend.Etherbase(context.Background(), &remote.EtherbaseRequest{})
if err != nil {
return common.Address{}, err
}

return gointerfaces.ConvertH160toAddress(res.Address), nil
}

func (back *RemoteBackend) NetVersion() (uint64, error) {
res, err := back.remoteEthBackend.NetVersion(context.Background(), &remote.NetVersionRequest{})
if err != nil {
return 0, err
}

return res.Id, nil
}

func (back *RemoteBackend) Subscribe(onNewEvent func(*remote.SubscribeReply)) error {
subscription, err := back.remoteEthBackend.Subscribe(context.Background(), &remote.SubscribeRequest{})
if err != nil {
return err
}
for {
event, err := subscription.Recv()
if err == io.EOF {
log.Info("rpcdaemon: the subscription channel was closed")
break
}
if err != nil {
return err
}

onNewEvent(event)
}
return nil
}