Skip to content

Commit

Permalink
code prune rd:6, for review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
setunapo committed May 25, 2022
1 parent 9628a04 commit 21723fd
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 27 deletions.
22 changes: 3 additions & 19 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,8 +1234,7 @@ func (s *StateDB) CopyForSlot() *ParallelStateDB {
isParallel: true,
parallel: parallel,
},
wbnbMakeUp: true,
balanceUpdateDepth: 0,
wbnbMakeUp: true,
}
// no need to copy preimages, comment out and remove later
// for hash, preimage := range s.preimages {
Expand Down Expand Up @@ -2313,9 +2312,8 @@ var parallelKvCheckResCh chan bool

type ParallelStateDB struct {
StateDB
wbnbMakeUp bool // default true, we can not do WBNB make up if its absolute balance is used.
balanceUpdateDepth int
wbnbMakeUpBalance *big.Int
wbnbMakeUp bool // default true, we can not do WBNB make up if its absolute balance is used.
wbnbMakeUpBalance *big.Int
}

func hasKvConflict(slotDB *ParallelStateDB, addr common.Address, key common.Hash, val common.Hash, isStage2 bool) bool {
Expand Down Expand Up @@ -2888,10 +2886,6 @@ func (s *ParallelStateDB) HasSuicided(addr common.Address) bool {
func (s *ParallelStateDB) AddBalance(addr common.Address, amount *big.Int) {
// add balance will perform a read operation first
// if amount == 0, no balance change, but there is still an empty check.
s.balanceUpdateDepth++
defer func() {
s.balanceUpdateDepth--
}()
stateObject := s.GetOrNewStateObject(addr)
if stateObject != nil {
if addr == s.parallel.systemAddress {
Expand Down Expand Up @@ -2926,11 +2920,6 @@ func (s *ParallelStateDB) AddBalance(addr common.Address, amount *big.Int) {
// SubBalance subtracts amount from the account associated with addr.
func (s *ParallelStateDB) SubBalance(addr common.Address, amount *big.Int) {
// unlike add, sub 0 balance will not touch empty object
s.balanceUpdateDepth++
defer func() {
s.balanceUpdateDepth--
}()

stateObject := s.GetOrNewStateObject(addr)
if stateObject != nil {
if addr == s.parallel.systemAddress {
Expand Down Expand Up @@ -2964,11 +2953,6 @@ func (s *ParallelStateDB) SubBalance(addr common.Address, amount *big.Int) {
}

func (s *ParallelStateDB) SetBalance(addr common.Address, amount *big.Int) {
s.balanceUpdateDepth++
defer func() {
s.balanceUpdateDepth--
}()

stateObject := s.GetOrNewStateObject(addr)
if stateObject != nil {
if addr == s.parallel.systemAddress {
Expand Down
17 changes: 9 additions & 8 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
farDiffLayerTimeout = 2

parallelPrimarySlot = 0
parallelShadowlot = 1
parallelShadowSlot = 1
stage2CheckNumber = 30 // ConfirmStage2 will check this number of transaction, to avoid too busy stage2 check
stage2AheadNum = 3 // enter ConfirmStage2 in advance to avoid waiting for Fat Tx
)
Expand Down Expand Up @@ -474,7 +474,7 @@ func (p *ParallelStateProcessor) init() {
// It is back up of the primary slot to make sure transaction can be redo ASAP,
// since the primary slot could be busy at executing another transaction
go func(slotIndex int) {
p.runSlotLoop(slotIndex, 1) // this loop will be permanent live
p.runSlotLoop(slotIndex, parallelShadowSlot) // this loop will be permanent live
}(i)

}
Expand Down Expand Up @@ -521,7 +521,7 @@ func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) {
// ** reduce IPC cost by dispatch in Unit
// ** make sure same From in same slot
// ** try to make it balanced, queue to the most hungry slot for new Address
func (p *ParallelStateProcessor) doStaticDispatch(mainStatedb *state.StateDB, txReqs []*ParallelTxRequest) {
func (p *ParallelStateProcessor) doStaticDispatch(txReqs []*ParallelTxRequest) {
fromSlotMap := make(map[common.Address]int, 100)
toSlotMap := make(map[common.Address]int, 100)
for _, txReq := range txReqs {
Expand Down Expand Up @@ -584,12 +584,12 @@ func (p *ParallelStateProcessor) hasConflict(txResult *ParallelTxResult, isStage

func (p *ParallelStateProcessor) switchSlot(slotIndex int) {
slot := p.slotState[slotIndex]
if atomic.CompareAndSwapInt32(&slot.activatedType, parallelPrimarySlot, parallelShadowlot) {
if atomic.CompareAndSwapInt32(&slot.activatedType, parallelPrimarySlot, parallelShadowSlot) {
// switch from normal to shadow slot
if len(slot.shadowWakeUpChan) == 0 {
slot.shadowWakeUpChan <- struct{}{} // only notify when target once
}
} else if atomic.CompareAndSwapInt32(&slot.activatedType, parallelShadowlot, parallelPrimarySlot) {
} else if atomic.CompareAndSwapInt32(&slot.activatedType, parallelShadowSlot, parallelPrimarySlot) {
// switch from shadow to normal slot
if len(slot.primaryWakeUpChan) == 0 {
slot.primaryWakeUpChan <- struct{}{} // only notify when target once
Expand Down Expand Up @@ -645,8 +645,8 @@ func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxR
func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bool) *ParallelTxResult {
if isStage2 {
if targetTxIndex <= p.mergedTxIndex+1 {
// this is the one that can been merged,
// others are for likely conflict check, since it is not their tuen.
// `p.mergedTxIndex+1` is the one to be merged,
// in stage2, we do likely conflict check, for these not their turn.
return nil
}
}
Expand Down Expand Up @@ -963,7 +963,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
p.targetStage2Count = p.targetStage2Count - stage2AheadNum
}

p.doStaticDispatch(statedb, p.allTxReqs) // todo: put txReqs in unit?
p.doStaticDispatch(p.allTxReqs) // todo: put txReqs in unit?
// after static dispatch, we notify the slot to work.
for _, slot := range p.slotState {
slot.primaryWakeUpChan <- struct{}{}
Expand Down Expand Up @@ -1007,6 +1007,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
if result.err != nil {
log.Error("ProcessParallel a failed tx", "resultSlotIndex", result.slotIndex,
"resultTxIndex", result.txReq.txIndex, "result.err", result.err)
p.doCleanUp()
bloomProcessor.Close()
return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", result.txReq.txIndex, result.txReq.tx.Hash().Hex(), result.err)
}
Expand Down

0 comments on commit 21723fd

Please sign in to comment.