Skip to content

Commit

Permalink
fix: executedSlotIndex & staticSlotIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
setunapo committed May 5, 2022
1 parent 531fd8f commit a1b9eed
Showing 1 changed file with 35 additions and 21 deletions.
56 changes: 35 additions & 21 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,9 @@ type ParallelTxResult struct {
}

type ParallelTxRequest struct {
txIndex int
tx *types.Transaction
txIndex int
staticSlotIndex int // static dispatched id
tx *types.Transaction
// slotDB *state.ParallelStateDB
gasLimit uint64
msg types.Message
Expand Down Expand Up @@ -647,6 +648,7 @@ func (p *ParallelStateProcessor) doStaticDispatch(mainStatedb *state.StateDB, tx
}

slot := p.slotState[slotIndex]
txReq.staticSlotIndex = slotIndex // txreq is better to be executed in this slot
slot.pendingTxReqList = append(slot.pendingTxReqList, txReq)
}
}
Expand Down Expand Up @@ -1030,28 +1032,35 @@ func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bo
"merged TxIndex", p.mergedTxIndex, "isStage2", isStage2)

valid := p.toConfirmTxIndexResult(lastResult, isStage2)
slotIndex := lastResult.slotIndex
executedSlotIndex := lastResult.slotIndex // could be stolen and executed in other slot.
staticSlotIndex := lastResult.txReq.staticSlotIndex
if !valid {
if resultsLen == 1 || isStage2 { // for Stage 2, we only check its latest result.
p.debugConflictRedoNum++
lastResult.txReq.runnable = 1 // needs redo
slot := p.slotState[slotIndex]
log.Debug("runConfirmLoop conflict", "slotIndex", slotIndex,
slot := p.slotState[staticSlotIndex]
log.Debug("runConfirmLoop conflict",
"staticSlotIndex", staticSlotIndex,
"executedSlotIndex", executedSlotIndex,
"txIndex", lastResult.txReq.txIndex,
"activatedId", slot.activatedId,
"isStage2", isStage2, "slot.activatedId", slot.activatedId)
// if hit { // switch only it is hit, for none-hit, it is not that necessary,
p.switchSlot(slot, slotIndex)
p.switchSlot(slot, staticSlotIndex)
//}
log.Debug("runConfirmLoop conflict, switched", "slotIndex", slotIndex,
log.Debug("runConfirmLoop conflict, switched",
"staticSlotIndex", staticSlotIndex,
"executedSlotIndex", executedSlotIndex,
"txIndex", lastResult.txReq.txIndex)
// this the last result for this txIndex,
// interrupt its current routine, and reschedule from the the other routine(shadow?)
return false
} else {
// try next
log.Debug("runConfirmLoop conflict, try next result of same txIndex",
"slotIndex", slotIndex, "txIndex", lastResult.txReq.txIndex)
"staticSlotIndex", staticSlotIndex,
"executedSlotIndex", executedSlotIndex,
"txIndex", lastResult.txReq.txIndex)
}
continue
}
Expand All @@ -1060,7 +1069,9 @@ func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bo
// fixme: need to handle txResult repeatedly check?
return false
}
log.Debug("runConfirmLoop result to deliver", "slotIndex", slotIndex,
log.Debug("runConfirmLoop result to deliver",
"staticSlotIndex", staticSlotIndex,
"executedSlotIndex", executedSlotIndex,
"txIndex", lastResult.txReq.txIndex, "mergedTxIndex", p.mergedTxIndex)
region2 := debug.Handler.StartTrace("valid, deliver to process")
// result is valid, deliver it to main processor
Expand All @@ -1070,7 +1081,9 @@ func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bo
// close(result.txReq.curTxChan) // fixme: to close
debug.Handler.EndTrace(region2)

log.Debug("runConfirmLoop result is delivered", "slotIndex", slotIndex,
log.Debug("runConfirmLoop result is delivered",
"staticSlotIndex", staticSlotIndex,
"executedSlotIndex", executedSlotIndex,
"txIndex", lastResult.txReq.txIndex, "mergedTxIndex", p.mergedTxIndex)
if p.mergedTxIndex != (targetTxIndex) {
log.Warn("runConfirmLoop result delivered, but unexpected mergedTxIndex",
Expand Down Expand Up @@ -1337,17 +1350,18 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat

// parallel start, wrap an exec message, which will be dispatched to a slot
txReq := &ParallelTxRequest{
txIndex: i,
tx: tx,
gasLimit: block.GasLimit(), // gp.Gas().
msg: msg,
block: block,
vmConfig: cfg,
bloomProcessor: bloomProcessor,
usedGas: usedGas,
curTxChan: make(chan int, 1),
systemAddrRedo: false, // set to true, when systemAddr access is detected.
runnable: 1, // 0: not runnable, 1: runnable
txIndex: i,
staticSlotIndex: -1,
tx: tx,
gasLimit: block.GasLimit(), // gp.Gas().
msg: msg,
block: block,
vmConfig: cfg,
bloomProcessor: bloomProcessor,
usedGas: usedGas,
curTxChan: make(chan int, 1),
systemAddrRedo: false, // set to true, when systemAddr access is detected.
runnable: 1, // 0: not runnable, 1: runnable
}
p.allTxReqs = append(p.allTxReqs, txReq)
}
Expand Down

0 comments on commit a1b9eed

Please sign in to comment.