Skip to content

Commit

Permalink
Merge pull request ethereum#397 from thanhson1085/org_master
Browse files Browse the repository at this point in the history
refactor cache blocksigners
  • Loading branch information
ngtuna authored Jan 12, 2019
2 parents e9a3e7b + 381864d commit f7bd222
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 145 deletions.
1 change: 1 addition & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
UnvoteMethod = "0x02aa9be2"
ProposeMethod = "0x01267951"
ResignMethod = "0xae6e43f5"
SignMethod = "0xe341eaa4"
)

var (
Expand Down
48 changes: 12 additions & 36 deletions consensus/posv/posv.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
const (
inmemorySnapshots = 128 // Number of recent vote snapshots to keep in memory
blockSignersCacheLimit = 36000
votingCacheLimit = 1500000
M2ByteLength = 4
)

Expand Down Expand Up @@ -226,9 +225,7 @@ type Posv struct {
signFn clique.SignerFn // Signer function to authorize hashes with
lock sync.RWMutex // Protects the signer fields

EnableCache bool
BlockSigners *lru.Cache
Votes *lru.Cache
HookReward func(chain consensus.ChainReader, state *state.StateDB, header *types.Header) (error, map[string]interface{})
HookPenalty func(chain consensus.ChainReader, blockNumberEpoc uint64) ([]common.Address, error)
HookValidator func(header *types.Header, signers []common.Address) ([]byte, error)
Expand All @@ -245,17 +242,14 @@ func New(config *params.PosvConfig, db ethdb.Database) *Posv {
}
// Allocate the snapshot caches and create the engine
BlockSigners, _ := lru.New(blockSignersCacheLimit)
Votes, _ := lru.New(votingCacheLimit)
recents, _ := lru.NewARC(inmemorySnapshots)
signatures, _ := lru.NewARC(inmemorySnapshots)
validatorSignatures, _ := lru.NewARC(inmemorySnapshots)
verifiedHeaders, _ := lru.NewARC(inmemorySnapshots)
return &Posv{
config: &conf,
db: db,
EnableCache: false,
BlockSigners: BlockSigners,
Votes: Votes,
recents: recents,
signatures: signatures,
verifiedHeaders: verifiedHeaders,
Expand Down Expand Up @@ -859,12 +853,9 @@ func (c *Posv) Finalize(chain consensus.ChainReader, header *types.Header, state
number := header.Number.Uint64()
rCheckpoint := chain.Config().Posv.RewardCheckpoint

if c.HookReward != nil && number%rCheckpoint == 0 {
if !c.EnableCache && int(c.BlockSigners.Len()) >= int(rCheckpoint*3) {
log.Debug("EnableCache true c.BlockSigners.Len() ", "BlockSigners.Len", c.BlockSigners.Len())
c.EnableCache = true
}
// _ = c.CacheData(header, txs, receipts)

if c.HookReward != nil && number%rCheckpoint == 0 {
err, rewards := c.HookReward(chain, state, header)
if err != nil {
return nil, err
Expand All @@ -880,8 +871,6 @@ func (c *Posv) Finalize(chain consensus.ChainReader, header *types.Header, state
}
}

_ = c.cacheData(txs, receipts)

// the state remains as is and uncles are dropped
header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number))
header.UncleHash = types.CalcUncleHash(nil)
Expand Down Expand Up @@ -1036,12 +1025,10 @@ func (c *Posv) GetMasternodesFromCheckpointHeader(preCheckpointHeader *types.Hea
return masternodes
}

func (c *Posv) cacheData(txs []*types.Transaction, receipts []*types.Receipt) error {
func (c *Posv) CacheData(header *types.Header, txs []*types.Transaction, receipts []*types.Receipt) error {
var signTxs []*types.Transaction
for _, tx := range txs {
if tx.IsSigningTransaction() {
blkHash := common.BytesToHash(tx.Data()[len(tx.Data())-32:])
from := *tx.From()

var b uint
for _, r := range receipts {
if r.TxHash == tx.Hash() {
Expand All @@ -1054,31 +1041,20 @@ func (c *Posv) cacheData(txs []*types.Transaction, receipts []*types.Receipt) er
continue
}

var lAddr []common.Address
if cached, ok := c.BlockSigners.Get(blkHash); ok {
lAddr = cached.([]common.Address)
lAddr = append(lAddr, from)
} else {
lAddr = []common.Address{from}
}
c.BlockSigners.Add(blkHash, lAddr)
} else {

b, addr := tx.IsVotingTransaction()
if b && addr != nil {
var vote common.Vote
vote.Masternode = *addr
vote.Voter = *tx.From()

log.Debug("Remove from Votes cache ", "Masternode", vote.Masternode.String(), "Voter", vote.Voter.String())
c.Votes.Remove(vote)
}
signTxs = append(signTxs, tx)
}
}

log.Debug("Save tx signers to cache", "hash", header.Hash().String(), "number", header.Number, "len(txs)", len(signTxs))
c.BlockSigners.Add(header.Hash(), signTxs)

return nil
}

func (c *Posv) GetDb() ethdb.Database {
return c.db
}

// Extract validators from byte array.
func RemovePenaltiesFromBlock(chain consensus.ChainReader, masternodes []common.Address, epochNumber uint64) []common.Address {
if epochNumber <= 0 {
Expand Down
169 changes: 68 additions & 101 deletions contracts/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,23 +199,19 @@ func BuildTxOpeningRandomize(nonce uint64, randomizeAddr common.Address, randomi
}

// Get signers signed for blockNumber from blockSigner contract.
func GetSignersFromContract(c *posv.Posv, addrBlockSigner common.Address, client bind.ContractBackend, blockHash common.Hash) ([]common.Address, error) {
func GetSignersFromContract(addrBlockSigner common.Address, client bind.ContractBackend, blockHash common.Hash) ([]common.Address, error) {
blockSigner, err := contract.NewBlockSigner(addrBlockSigner, client)
if err != nil {
log.Error("Fail get instance of blockSigner", "error", err)
return nil, err
}
if caddrs, ok := c.BlockSigners.Get(blockHash); !ok || !c.EnableCache {
opts := new(bind.CallOpts)
addrs, err := blockSigner.GetSigners(opts, blockHash)
if err != nil {
log.Error("Fail get block signers", "error", err)
return nil, err
}
return addrs, nil
} else {
return caddrs.([]common.Address), nil
opts := new(bind.CallOpts)
addrs, err := blockSigner.GetSigners(opts, blockHash)
if err != nil {
log.Error("Fail get block signers", "error", err)
return nil, err
}
return addrs, nil
}

// Get random from randomize contract.
Expand Down Expand Up @@ -310,7 +306,7 @@ func DecryptRandomizeFromSecretsAndOpening(secrets [][32]byte, opening [32]byte)
}

// Calculate reward for reward checkpoint.
func GetRewardForCheckpoint(c *posv.Posv, chain consensus.ChainReader, blockSignerAddr common.Address, number uint64, rCheckpoint uint64, client bind.ContractBackend, totalSigner *uint64) (map[common.Address]*rewardLog, error) {
func GetRewardForCheckpoint(c *posv.Posv, chain consensus.ChainReader, number uint64, rCheckpoint uint64, totalSigner *uint64) (map[common.Address]*rewardLog, error) {
// Not reward for singer of genesis block and only calculate reward at checkpoint block.
prevCheckpoint := number - (rCheckpoint * 2)
startBlockNumber := prevCheckpoint + 1
Expand All @@ -321,88 +317,76 @@ func GetRewardForCheckpoint(c *posv.Posv, chain consensus.ChainReader, blockSign

if len(masternodes) > 0 {

if !c.EnableCache {
for i := startBlockNumber; i <= endBlockNumber; i++ {
block := chain.GetHeaderByNumber(i)
addrs, err := GetSignersFromContract(c, blockSignerAddr, client, block.Hash())
if err != nil {
log.Error("Fail to get signers from smartcontract.", "error", err, "blockNumber", i)
return nil, err
data := make(map[common.Hash][]common.Address)
for i := startBlockNumber; i <= prevCheckpoint+(rCheckpoint*2)-1; i++ {
header := chain.GetHeaderByNumber(i)

if signData, ok := c.BlockSigners.Get(header.Hash()); ok {
txs := signData.([]*types.Transaction)
for _, tx := range txs {
blkHash := common.BytesToHash(tx.Data()[len(tx.Data())-32:])
from := *tx.From()
data[blkHash] = append(data[blkHash], from)
}
// Filter duplicate address.
if len(addrs) > 0 {
addrSigners := make(map[common.Address]bool)
for _, masternode := range masternodes {
for _, addr := range addrs {
if addr == masternode {
if _, ok := addrSigners[addr]; !ok {
addrSigners[addr] = true
}
} else {
log.Debug("Failed get from cached", "hash", header.Hash().String(), "number", i)
block := chain.GetBlock(header.Hash(), i)
txs := block.Transactions()
receipts := core.GetBlockReceipts(c.GetDb(), header.Hash(), i)

var signTxs []*types.Transaction
for _, tx := range txs {
if tx.IsSigningTransaction() {
var b uint
for _, r := range receipts {
if r.TxHash == tx.Hash() {
b = r.Status
break
}
}
}

for addr := range addrSigners {
_, exist := signers[addr]
if exist {
signers[addr].Sign++
} else {
signers[addr] = &rewardLog{1, new(big.Int)}
if b == types.ReceiptStatusFailed {
continue
}
*totalSigner++

signTxs = append(signTxs, tx)
blkHash := common.BytesToHash(tx.Data()[len(tx.Data())-32:])
from := *tx.From()
data[blkHash] = append(data[blkHash], from)
}
}
c.BlockSigners.Add(header.Hash(), signTxs)

}
} else {
var wg sync.WaitGroup
squeue := make(chan []common.Address, 1)
wg.Add(int(rCheckpoint))

for i := startBlockNumber; i <= endBlockNumber; i++ {
go func(i uint64) {
block := chain.GetHeaderByNumber(i)
addrs, err := GetSignersFromContract(c, blockSignerAddr, client, block.Hash())
if err != nil {
log.Crit("Fail to get signers from smartcontract.", "error", err, "blockNumber", i)
}
squeue <- addrs
}(i)
}
}

fsigner := func() {
for addrs := range squeue {
// Filter duplicate address.
if len(addrs) > 0 {
addrSigners := make(map[common.Address]bool)
for _, masternode := range masternodes {
for _, addr := range addrs {
if addr == masternode {
if _, ok := addrSigners[addr]; !ok {
addrSigners[addr] = true
}
break
}
for i := startBlockNumber; i <= endBlockNumber; i++ {
block := chain.GetHeaderByNumber(i)
addrs := data[block.Hash()]
// Filter duplicate address.
if len(addrs) > 0 {
addrSigners := make(map[common.Address]bool)
for _, masternode := range masternodes {
for _, addr := range addrs {
if addr == masternode {
if _, ok := addrSigners[addr]; !ok {
addrSigners[addr] = true
}
break
}
}
}

for addr := range addrSigners {
_, exist := signers[addr]
if exist {
signers[addr].Sign++
} else {
signers[addr] = &rewardLog{1, new(big.Int)}
}
*totalSigner++
}
for addr := range addrSigners {
_, exist := signers[addr]
if exist {
signers[addr].Sign++
} else {
signers[addr] = &rewardLog{1, new(big.Int)}
}
wg.Done()
*totalSigner++
}
}

go fsigner()

wg.Wait()
}
}

Expand Down Expand Up @@ -449,8 +433,8 @@ func GetCandidatesOwnerBySigner(validator *contractValidator.TomoValidator, sign
}

// Calculate reward for holders.
func CalculateRewardForHolders(c *posv.Posv, foudationWalletAddr common.Address, validator *contractValidator.TomoValidator, state *state.StateDB, signer common.Address, calcReward *big.Int) (error, map[common.Address]*big.Int) {
rewards, err := GetRewardBalancesRate(c, foudationWalletAddr, signer, calcReward, validator)
func CalculateRewardForHolders(foudationWalletAddr common.Address, validator *contractValidator.TomoValidator, state *state.StateDB, signer common.Address, calcReward *big.Int) (error, map[common.Address]*big.Int) {
rewards, err := GetRewardBalancesRate(foudationWalletAddr, signer, calcReward, validator)
if err != nil {
return err, nil
}
Expand All @@ -463,7 +447,7 @@ func CalculateRewardForHolders(c *posv.Posv, foudationWalletAddr common.Address,
}

// Get reward balance rates for master node, founder and holders.
func GetRewardBalancesRate(c *posv.Posv, foudationWalletAddr common.Address, masterAddr common.Address, totalReward *big.Int, validator *contractValidator.TomoValidator) (map[common.Address]*big.Int, error) {
func GetRewardBalancesRate(foudationWalletAddr common.Address, masterAddr common.Address, totalReward *big.Int, validator *contractValidator.TomoValidator) (map[common.Address]*big.Int, error) {
owner := GetCandidatesOwnerBySigner(validator, masterAddr)
balances := make(map[common.Address]*big.Int)
rewardMaster := new(big.Int).Mul(totalReward, new(big.Int).SetInt64(common.RewardMasterPercent))
Expand All @@ -484,28 +468,11 @@ func GetRewardBalancesRate(c *posv.Posv, foudationWalletAddr common.Address, mas
// Get voters capacities.
voterCaps := make(map[common.Address]*big.Int)
for _, voteAddr := range voters {
var vote common.Vote
var voterCap *big.Int

vote.Masternode = masterAddr
vote.Voter = voteAddr

if c != nil {
if vCap, ok := c.Votes.Get(vote); ok {
voterCap = vCap.(*big.Int)
} else {
voterCap, err = validator.GetVoterCap(opts, masterAddr, voteAddr)
if err != nil {
log.Crit("Fail to get vote capacity", "error", err)
}
log.Debug("Add to Votes cache ", "vote.Masternode", vote.Masternode.String(), "vote.Voter", vote.Voter.String(), "voterCap", voterCap.String())
c.Votes.Add(vote, voterCap)
}
} else {
voterCap, err = validator.GetVoterCap(opts, masterAddr, voteAddr)
if err != nil {
log.Crit("Fail to get vote capacity", "error", err)
}
voterCap, err = validator.GetVoterCap(opts, masterAddr, voteAddr)
if err != nil {
log.Crit("Fail to get vote capacity", "error", err)
}

totalCap.Add(totalCap, voterCap)
Expand Down
2 changes: 1 addition & 1 deletion contracts/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestRewardBalance(t *testing.T) {

foundationAddr := common.HexToAddress(common.FoudationAddr)
totalReward := new(big.Int).SetInt64(15 * 1000)
rewards, err := contracts.GetRewardBalancesRate(nil, foundationAddr, acc3Addr, totalReward, baseValidator)
rewards, err := contracts.GetRewardBalancesRate(foundationAddr, acc3Addr, totalReward, baseValidator)
if err != nil {
t.Error("Fail to get reward balances rate.", err)
}
Expand Down
4 changes: 4 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,10 @@ func (bc *BlockChain) insert(block *types.Block) {
}
bc.currentBlock.Store(block)

// save cache BlockSigners
engine := bc.Engine().(*posv.Posv)
engine.CacheData(block.Header(), block.Transactions(), bc.GetReceiptsByHash(block.Hash()))

// If the block is better than our head or is on a different chain, force update heads
if updateHeads {
bc.hc.SetCurrentHeader(block.Header())
Expand Down
Loading

0 comments on commit f7bd222

Please sign in to comment.