Skip to content

Commit

Permalink
Merge pull request ethereum#85 from prestonvanloon/refactoring
Browse files Browse the repository at this point in the history
Refactoring sharding package for proposer/client separation
  • Loading branch information
prestonvanloon authored Apr 1, 2018
2 parents 173fda2 + ff59640 commit 5efb934
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 415 deletions.
16 changes: 6 additions & 10 deletions cmd/geth/shardingcmd.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package main

import (
"github.com/ethereum/go-ethereum/sharding"
"github.com/ethereum/go-ethereum/sharding/collator"
"github.com/ethereum/go-ethereum/sharding/proposer"

"fmt"
"github.com/ethereum/go-ethereum/cmd/utils"
cli "gopkg.in/urfave/cli.v1"
)
Expand Down Expand Up @@ -36,15 +36,11 @@ Launches a sharding proposer client that connects to a running geth node and pro
)

func collatorClient(ctx *cli.Context) error {
c := sharding.MakeCollatorClient(ctx)
if err := c.Start(); err != nil {
return err
}
c.Wait()
return nil
c := collator.NewCollator(ctx)
return c.Start()
}

func proposerClient(ctx *cli.Context) error {
fmt.Println("Starting proposer client")
return nil
p := proposer.NewProposer(ctx)
return p.Start()
}
142 changes: 91 additions & 51 deletions sharding/collator_client.go → sharding/client/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sharding

//go:generate abigen --sol contracts/sharding_manager.sol --pkg contracts --out contracts/sharding_manager.go
// Package client provides an interface for interacting with a running ethereum full node.
// As part of the initial phases of sharding, actors will need access to the sharding management
// contract on the main PoW chain.
package client

import (
"bufio"
Expand All @@ -9,8 +10,9 @@ import (
"fmt"
"math/big"
"os"
"time"

"github.com/ethereum/go-ethereum"
ethereum "github.com/ethereum/go-ethereum"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand All @@ -22,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/sharding"
"github.com/ethereum/go-ethereum/sharding/contracts"
cli "gopkg.in/urfave/cli.v1"
)
Expand All @@ -30,17 +33,28 @@ const (
clientIdentifier = "geth" // Used to determine the ipc name.
)

// Client for Collator. Communicates to Geth node via JSON RPC.
type collatorClient struct {
endpoint string // Endpoint to JSON RPC
client *ethclient.Client // Ethereum RPC client.
keystore *keystore.KeyStore // Keystore containing the single signer
ctx *cli.Context // Command line context
smc *contracts.SMC // The deployed sharding management contract
// General client for Collator/Proposer. Communicates to Geth node via JSON RPC.

type shardingClient struct {
endpoint string // Endpoint to JSON RPC
client *ethclient.Client // Ethereum RPC client.
keystore *keystore.KeyStore // Keystore containing the single signer
ctx *cli.Context // Command line context
smc *contracts.SMC // The deployed sharding management contract
rpcClient *rpc.Client // The RPC client connection to the main geth node
}

// MakeCollatorClient for interfacing with Geth full node.
func MakeCollatorClient(ctx *cli.Context) *collatorClient {
type Client interface {
Start() error
Close()
CreateTXOps(*big.Int) (*bind.TransactOpts, error)
ChainReader() ethereum.ChainReader
Account() *accounts.Account
SMCCaller() *contracts.SMCCaller
SMCTransactor() *contracts.SMCTransactor
}

func NewClient(ctx *cli.Context) *shardingClient {
path := node.DefaultDataDir()
if ctx.GlobalIsSet(utils.DataDirFlag.Name) {
path = ctx.GlobalString(utils.DataDirFlag.Name)
Expand All @@ -64,24 +78,23 @@ func MakeCollatorClient(ctx *cli.Context) *collatorClient {
}
ks := keystore.NewKeyStore(keydir, scryptN, scryptP)

return &collatorClient{
return &shardingClient{
endpoint: endpoint,
keystore: ks,
ctx: ctx,
}
}

// Start the collator client.
// Start the sharding client.
// * Connects to Geth node.
// * Verifies or deploys the sharding manager contract.
func (c *collatorClient) Start() error {
log.Info("Starting collator client")
func (c *shardingClient) Start() error {
rpcClient, err := dialRPC(c.endpoint)
if err != nil {
return err
return fmt.Errorf("cannot start rpc client. %v", err)
}
c.rpcClient = rpcClient
c.client = ethclient.NewClient(rpcClient)
defer rpcClient.Close()

// Check account existence and unlock account before starting collator client
accounts := c.keystore.Accounts()
Expand All @@ -93,42 +106,22 @@ func (c *collatorClient) Start() error {
return fmt.Errorf("cannot unlock account. %v", err)
}

if err := initSMC(c); err != nil {
return err
}

// Deposit 100ETH into the collator set in the SMC. Checks if account
// is already a collator in the SMC (in the case the client restarted).
// Once that's done we can subscribe to block headers.
//
// TODO: this function should store the collator's SMC index as a property
// in the client's struct
if err := joinCollatorPool(c); err != nil {
smc, err := initSMC(c)
if err != nil {
return err
}
c.smc = smc

// Listens to block headers from the Geth node and if we are an eligible
// collator, we fetch pending transactions and collator a collation
if err := subscribeBlockHeaders(c); err != nil {
return err
}
return nil
}

// Wait until collator client is shutdown.
func (c *collatorClient) Wait() {
log.Info("Sharding client has been shutdown...")
}

// WatchCollationHeaders checks the logs for add_header func calls
// and updates the head collation of the client. We can probably store
// this as a property of the client struct
func (c *collatorClient) WatchCollationHeaders() {

// Close the RPC client connection
func (c *shardingClient) Close() {
c.rpcClient.Close()
}

// UnlockAccount will unlock the specified account using utils.PasswordFileFlag or empty string if unset.
func (c *collatorClient) unlockAccount(account accounts.Account) error {
func (c *shardingClient) unlockAccount(account accounts.Account) error {
pass := ""

if c.ctx.GlobalIsSet(utils.PasswordFileFlag.Name) {
Expand All @@ -152,7 +145,8 @@ func (c *collatorClient) unlockAccount(account accounts.Account) error {
return c.keystore.Unlock(account, pass)
}

func (c *collatorClient) createTXOps(value *big.Int) (*bind.TransactOpts, error) {
// CreateTXOps creates a *TransactOpts with a signer using the default account on the keystore.
func (c *shardingClient) CreateTXOps(value *big.Int) (*bind.TransactOpts, error) {
account := c.Account()

return &bind.TransactOpts{
Expand All @@ -169,31 +163,77 @@ func (c *collatorClient) createTXOps(value *big.Int) (*bind.TransactOpts, error)
}

// Account to use for sharding transactions.
func (c *collatorClient) Account() *accounts.Account {
func (c *shardingClient) Account() *accounts.Account {
accounts := c.keystore.Accounts()

return &accounts[0]
}

// ChainReader for interacting with the chain.
func (c *collatorClient) ChainReader() ethereum.ChainReader {
func (c *shardingClient) ChainReader() ethereum.ChainReader {
return ethereum.ChainReader(c.client)
}

// Client to interact with ethereum node.
func (c *collatorClient) Client() *ethclient.Client {
func (c *shardingClient) ethereumClient() *ethclient.Client {
return c.client
}

// SMCCaller to interact with the sharding manager contract.
func (c *collatorClient) SMCCaller() *contracts.SMCCaller {
func (c *shardingClient) SMCCaller() *contracts.SMCCaller {
return &c.smc.SMCCaller
}

func (c *shardingClient) SMCTransactor() *contracts.SMCTransactor {
return &c.smc.SMCTransactor
}

// dialRPC endpoint to node.
func dialRPC(endpoint string) (*rpc.Client, error) {
if endpoint == "" {
endpoint = node.DefaultIPCEndpoint(clientIdentifier)
}
return rpc.Dial(endpoint)
}

// initSMC initializes the sharding manager contract bindings.
// If the SMC does not exist, it will be deployed.
func initSMC(c *shardingClient) (*contracts.SMC, error) {
b, err := c.client.CodeAt(context.Background(), sharding.ShardingManagerAddress, nil)
if err != nil {
return nil, fmt.Errorf("unable to get contract code at %s: %v", sharding.ShardingManagerAddress, err)
}

// Deploy SMC for development only.
// TODO: Separate contract deployment from the sharding client. It would only need to be deployed
// once on the mainnet, so this code would not need to ship with the client.
if len(b) == 0 {
log.Info(fmt.Sprintf("No sharding manager contract found at %s. Deploying new contract.", sharding.ShardingManagerAddress.String()))

txOps, err := c.CreateTXOps(big.NewInt(0))
if err != nil {
return nil, fmt.Errorf("unable to intiate the transaction: %v", err)
}

addr, tx, contract, err := contracts.DeploySMC(txOps, c.client)
if err != nil {
return nil, fmt.Errorf("unable to deploy sharding manager contract: %v", err)
}

for pending := true; pending; _, pending, err = c.client.TransactionByHash(context.Background(), tx.Hash()) {
if err != nil {
return nil, fmt.Errorf("unable to get transaction by hash: %v", err)
}
time.Sleep(1 * time.Second)
}

log.Info(fmt.Sprintf("New contract deployed at %s", addr.String()))
return contract, nil
}

contract, err := contracts.NewSMC(sharding.ShardingManagerAddress, c.client)
if err != nil {

}
return contract, nil
}
56 changes: 34 additions & 22 deletions sharding/collator.go → sharding/collator/collator.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,26 @@
package sharding
package collator

import (
"context"
"fmt"
"math/big"

ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding/contracts"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/sharding"
"github.com/ethereum/go-ethereum/sharding/client"
)

type collator interface {
Account() *accounts.Account
ChainReader() ethereum.ChainReader
SMCCaller() *contracts.SMCCaller
}

// SubscribeBlockHeaders checks incoming block headers and determines if
// we are an eligible collator for collations. Then, it finds the pending tx's
// from the running geth node and sorts them by descending order of gas price,
// eliminates those that ask for too much gas, and routes them over
// to the SMC to create a collation
func subscribeBlockHeaders(c collator) error {
func subscribeBlockHeaders(c client.Client) error {
headerChan := make(chan *types.Header, 16)

account := c.Account()

_, err := c.ChainReader().SubscribeNewHead(context.Background(), headerChan)
if err != nil {
return fmt.Errorf("unable to subscribe to incoming headers. %v", err)
Expand All @@ -53,7 +45,6 @@ func subscribeBlockHeaders(c collator) error {
return fmt.Errorf("unable to watch shards. %v", err)
}
} else {
log.Warn(fmt.Sprintf("Account %s not in collator pool.", account.Address.String()))
}

}
Expand All @@ -63,12 +54,10 @@ func subscribeBlockHeaders(c collator) error {
// collation for the available shards in the SMC. The function calls
// getEligibleCollator from the SMC and collator a collation if
// conditions are met
func checkSMCForCollator(c collator, head *types.Header) error {
account := c.Account()

func checkSMCForCollator(c client.Client, head *types.Header) error {
log.Info("Checking if we are an eligible collation collator for a shard...")
period := big.NewInt(0).Div(head.Number, big.NewInt(periodLength))
for s := int64(0); s < shardCount; s++ {
period := big.NewInt(0).Div(head.Number, big.NewInt(sharding.PeriodLength))
for s := int64(0); s < sharding.ShardCount; s++ {
// Checks if we are an eligible collator according to the SMC
addr, err := c.SMCCaller().GetEligibleCollator(&bind.CallOpts{}, big.NewInt(s), period)

Expand All @@ -77,7 +66,7 @@ func checkSMCForCollator(c collator, head *types.Header) error {
}

// If output is non-empty and the addr == coinbase
if addr == account.Address {
if addr == c.Account().Address {
log.Info(fmt.Sprintf("Selected as collator on shard: %d", s))
err := submitCollation(s)
if err != nil {
Expand All @@ -93,10 +82,14 @@ func checkSMCForCollator(c collator, head *types.Header) error {
// we can't guarantee our tx for deposit will be in the next block header we receive.
// The function calls IsCollatorDeposited from the SMC and returns true if
// the client is in the collator pool
func isAccountInCollatorPool(c collator) (bool, error) {
func isAccountInCollatorPool(c client.Client) (bool, error) {
account := c.Account()
// Checks if our deposit has gone through according to the SMC
return c.SMCCaller().IsCollatorDeposited(&bind.CallOpts{}, account.Address)
b, err := c.SMCCaller().IsCollatorDeposited(&bind.CallOpts{}, account.Address)
if !b && err != nil {
log.Warn(fmt.Sprintf("Account %s not in collator pool.", account.Address.String()))
}
return b, err
}

// submitCollation interacts with the SMC directly to add a collation header
Expand Down Expand Up @@ -128,3 +121,22 @@ func submitCollation(shardID int64) error {
log.Info("Submit collation function called")
return nil
}

// joinCollatorPool checks if the account is a collator in the SMC. If
// the account is not in the set, it will deposit 100ETH into contract.
func joinCollatorPool(c client.Client) error {

log.Info("Joining collator pool")
txOps, err := c.CreateTXOps(sharding.DepositSize)
if err != nil {
return fmt.Errorf("unable to intiate the deposit transaction: %v", err)
}

tx, err := c.SMCTransactor().Deposit(txOps)
if err != nil {
return fmt.Errorf("unable to deposit eth and become a collator: %v", err)
}
log.Info(fmt.Sprintf("Deposited %dETH into contract with transaction hash: %s", new(big.Int).Div(sharding.DepositSize, big.NewInt(params.Ether)), tx.Hash().String()))

return nil
}
Loading

0 comments on commit 5efb934

Please sign in to comment.