diff --git a/core/state/statedb.go b/core/state/statedb.go index 75c52ce15f..49dc2e2419 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -1234,8 +1234,7 @@ func (s *StateDB) CopyForSlot() *ParallelStateDB { isParallel: true, parallel: parallel, }, - wbnbMakeUp: true, - balanceUpdateDepth: 0, + wbnbMakeUp: true, } // no need to copy preimages, comment out and remove later // for hash, preimage := range s.preimages { @@ -2313,9 +2312,8 @@ var parallelKvCheckResCh chan bool type ParallelStateDB struct { StateDB - wbnbMakeUp bool // default true, we can not do WBNB make up if its absolute balance is used. - balanceUpdateDepth int - wbnbMakeUpBalance *big.Int + wbnbMakeUp bool // default true, we can not do WBNB make up if its absolute balance is used. + wbnbMakeUpBalance *big.Int } func hasKvConflict(slotDB *ParallelStateDB, addr common.Address, key common.Hash, val common.Hash, isStage2 bool) bool { @@ -2888,10 +2886,6 @@ func (s *ParallelStateDB) HasSuicided(addr common.Address) bool { func (s *ParallelStateDB) AddBalance(addr common.Address, amount *big.Int) { // add balance will perform a read operation first // if amount == 0, no balance change, but there is still an empty check. - s.balanceUpdateDepth++ - defer func() { - s.balanceUpdateDepth-- - }() stateObject := s.GetOrNewStateObject(addr) if stateObject != nil { if addr == s.parallel.systemAddress { @@ -2926,11 +2920,6 @@ func (s *ParallelStateDB) AddBalance(addr common.Address, amount *big.Int) { // SubBalance subtracts amount from the account associated with addr. func (s *ParallelStateDB) SubBalance(addr common.Address, amount *big.Int) { // unlike add, sub 0 balance will not touch empty object - s.balanceUpdateDepth++ - defer func() { - s.balanceUpdateDepth-- - }() - stateObject := s.GetOrNewStateObject(addr) if stateObject != nil { if addr == s.parallel.systemAddress { @@ -2964,11 +2953,6 @@ func (s *ParallelStateDB) SubBalance(addr common.Address, amount *big.Int) { } func (s *ParallelStateDB) SetBalance(addr common.Address, amount *big.Int) { - s.balanceUpdateDepth++ - defer func() { - s.balanceUpdateDepth-- - }() - stateObject := s.GetOrNewStateObject(addr) if stateObject != nil { if addr == s.parallel.systemAddress { diff --git a/core/state_processor.go b/core/state_processor.go index b69df9063f..9df75b9c7a 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -50,7 +50,7 @@ const ( farDiffLayerTimeout = 2 parallelPrimarySlot = 0 - parallelShadowlot = 1 + parallelShadowSlot = 1 stage2CheckNumber = 30 // ConfirmStage2 will check this number of transaction, to avoid too busy stage2 check stage2AheadNum = 3 // enter ConfirmStage2 in advance to avoid waiting for Fat Tx ) @@ -474,7 +474,7 @@ func (p *ParallelStateProcessor) init() { // It is back up of the primary slot to make sure transaction can be redo ASAP, // since the primary slot could be busy at executing another transaction go func(slotIndex int) { - p.runSlotLoop(slotIndex, 1) // this loop will be permanent live + p.runSlotLoop(slotIndex, parallelShadowSlot) // this loop will be permanent live }(i) } @@ -521,7 +521,7 @@ func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) { // ** reduce IPC cost by dispatch in Unit // ** make sure same From in same slot // ** try to make it balanced, queue to the most hungry slot for new Address -func (p *ParallelStateProcessor) doStaticDispatch(mainStatedb *state.StateDB, txReqs []*ParallelTxRequest) { +func (p *ParallelStateProcessor) doStaticDispatch(txReqs []*ParallelTxRequest) { fromSlotMap := make(map[common.Address]int, 100) toSlotMap := make(map[common.Address]int, 100) for _, txReq := range txReqs { @@ -584,12 +584,12 @@ func (p *ParallelStateProcessor) hasConflict(txResult *ParallelTxResult, isStage func (p *ParallelStateProcessor) switchSlot(slotIndex int) { slot := p.slotState[slotIndex] - if atomic.CompareAndSwapInt32(&slot.activatedType, parallelPrimarySlot, parallelShadowlot) { + if atomic.CompareAndSwapInt32(&slot.activatedType, parallelPrimarySlot, parallelShadowSlot) { // switch from normal to shadow slot if len(slot.shadowWakeUpChan) == 0 { slot.shadowWakeUpChan <- struct{}{} // only notify when target once } - } else if atomic.CompareAndSwapInt32(&slot.activatedType, parallelShadowlot, parallelPrimarySlot) { + } else if atomic.CompareAndSwapInt32(&slot.activatedType, parallelShadowSlot, parallelPrimarySlot) { // switch from shadow to normal slot if len(slot.primaryWakeUpChan) == 0 { slot.primaryWakeUpChan <- struct{}{} // only notify when target once @@ -645,8 +645,8 @@ func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxR func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bool) *ParallelTxResult { if isStage2 { if targetTxIndex <= p.mergedTxIndex+1 { - // this is the one that can been merged, - // others are for likely conflict check, since it is not their tuen. + // `p.mergedTxIndex+1` is the one to be merged, + // in stage2, we do likely conflict check, for these not their turn. return nil } } @@ -963,7 +963,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat p.targetStage2Count = p.targetStage2Count - stage2AheadNum } - p.doStaticDispatch(statedb, p.allTxReqs) // todo: put txReqs in unit? + p.doStaticDispatch(p.allTxReqs) // todo: put txReqs in unit? // after static dispatch, we notify the slot to work. for _, slot := range p.slotState { slot.primaryWakeUpChan <- struct{}{} @@ -1007,6 +1007,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat if result.err != nil { log.Error("ProcessParallel a failed tx", "resultSlotIndex", result.slotIndex, "resultTxIndex", result.txReq.txIndex, "result.err", result.err) + p.doCleanUp() bloomProcessor.Close() return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", result.txReq.txIndex, result.txReq.tx.Hash().Hex(), result.err) }