Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Rerun Staking OCW periodically. #7976

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions frame/staking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1363,22 +1363,42 @@ decl_module! {
consumed_weight
}

/// Check if the current block number is the one at which the election window has been set
/// to open. If so, it runs the offchain worker code.
/// Offchain worker logic.
fn offchain_worker(now: T::BlockNumber) {
use offchain_election::{set_check_offchain_execution_status, compute_offchain_election};
use offchain_election::{
set_check_offchain_execution_status, compute_save_and_submit,
restore_or_compute_then_submit, OFFCHAIN_REPEAT,
};
// ensure that we don't run OCW in any case more at least with 5 blocks delay.
let threshold: T::BlockNumber = OFFCHAIN_REPEAT.into();

if Self::era_election_status().is_open_at(now) {
let offchain_status = set_check_offchain_execution_status::<T>(now);
if let Err(why) = offchain_status {
log!(warn, "💸 skipping offchain worker in open election window due to [{}]", why);
} else {
if let Err(e) = compute_offchain_election::<T>() {
log!(error, "💸 Error in election offchain worker: {:?}", e);
} else {
log!(debug, "💸 Executed offchain worker thread without errors.");
let election_status = Self::era_election_status();

log!(
trace,
"Running OCW at {:?}, election status = {:?}",
now,
election_status,
);
match Self::era_election_status() {
ElectionStatus::Open(opened) if opened == now => {
// If era election status is open at the current block, mine a new solution
// then save and submit it.
let initial_output = set_check_offchain_execution_status::<T>(now, threshold)
.and_then(|_| compute_save_and_submit::<T>());
log!(info, "initial OCW output at {:?} = {:?}", now, initial_output);
},
ElectionStatus::Open(opened) if now > opened => {
kianenigma marked this conversation as resolved.
Show resolved Hide resolved
if Self::queued_score().is_none() {
kianenigma marked this conversation as resolved.
Show resolved Hide resolved
// If the election window is open, and we don't have a queued solution,
// constantly try to challenge it by either resubmitting a saved solution,
// or mining a new one (just in the case that the previous was skipped).
let resubmit_output = set_check_offchain_execution_status::<T>(now, threshold)
.and_then(|_| restore_or_compute_then_submit::<T>());
log!(info, "resubmit OCW output at {:?} = {:?}", now, resubmit_output);
}
}
},
_ => {}
}
}

Expand Down
125 changes: 101 additions & 24 deletions frame/staking/src/offchain_election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,17 @@ use sp_npos_elections::{
ExtendedBalance, CompactSolution,
};
use sp_runtime::{
offchain::storage::StorageValueRef, traits::TrailingZeroInput, RuntimeDebug,
offchain::storage::StorageValueRef, traits::TrailingZeroInput,
};
use sp_std::{convert::TryInto, prelude::*};
use sp_std::{convert::TryInto, prelude::*, fmt::Debug};

/// Error types related to the offchain election machinery.
#[derive(RuntimeDebug)]
#[derive(Debug)]
#[cfg_attr(feature = "std", derive(PartialEq, Eq))]
pub enum OffchainElectionError {
/// election returned None. This means less candidate that minimum number of needed
/// validators were present. The chain is in trouble and not much that we can do about it.
ElectionFailed,
/// Submission to the transaction pool failed.
PoolSubmissionFailed,
/// The snapshot data is not available.
SnapshotUnavailable,
/// Error from npos-election crate. This usually relates to compact operation.
Expand All @@ -49,6 +48,22 @@ pub enum OffchainElectionError {
InvalidWinner,
/// A nominator is not available in the snapshot.
NominatorSnapshotCorrupt,
/// Failed to write some data to the offchain worker storage.
DBWriteFailed,
/// An offchain thread was not executed because fork was detected (executed with a block
/// number less than the last record one).
Fork,
/// An offchain thread was not executed because the last executed one is too recent (less than
/// `OFFCHAIN_REPEAT`).
TooRecent,
/// An unreachable state was reached. Should never happen.
Unreachable,
/// Submission to the transaction pool failed.
PoolSubmissionFailed,
/// No solution is stored in the offchain DB.
SolutionUnavailable,
kianenigma marked this conversation as resolved.
Show resolved Hide resolved
/// The stored solution belongs to an old era and cannot be used.
SolutionOld,
}

impl From<sp_npos_elections::Error> for OffchainElectionError {
Expand All @@ -73,42 +88,108 @@ pub(crate) const DEFAULT_LONGEVITY: u64 = 25;
/// Returns `Ok(())` if offchain worker should happen, `Err(reason)` otherwise.
pub(crate) fn set_check_offchain_execution_status<T: Config>(
now: T::BlockNumber,
) -> Result<(), &'static str> {
threshold: T::BlockNumber,
) -> Result<(), OffchainElectionError> {
let storage = StorageValueRef::persistent(&OFFCHAIN_HEAD_DB);
let threshold = T::BlockNumber::from(OFFCHAIN_REPEAT);

let mutate_stat =
storage.mutate::<_, &'static str, _>(|maybe_head: Option<Option<T::BlockNumber>>| {
storage.mutate(|maybe_head: Option<Option<T::BlockNumber>>| {
match maybe_head {
Some(Some(head)) if now < head => Err("fork."),
Some(Some(head)) if now < head => Err(OffchainElectionError::Fork),
Some(Some(head)) if now >= head && now <= head + threshold => {
Err("recently executed.")
Err(OffchainElectionError::TooRecent)
}
Some(Some(head)) if now > head + threshold => {
// we can run again now. Write the new head.
// we can allow again now. Write the new head.
Ok(now)
}
_ => {
// value doesn't exists. Probably this node just booted up. Write, and run
// value doesn't exists. Probably this node just booted up. Write, and allow.
Ok(now)
}
}
});
});

crate::log!(trace, "attempting to acquire the OCW lock at {:?} = {:?}", now, mutate_stat);
match mutate_stat {
// all good
Ok(Ok(_)) => Ok(()),
// failed to write.
Ok(Err(_)) => Err("failed to write to offchain db."),
Ok(Err(_)) => Err(OffchainElectionError::DBWriteFailed),
// fork etc.
Err(why) => Err(why),
}
}

/// Storage path for the solution `call`.
pub(crate) const OFFCHAIN_QUEUED_CALL: &[u8] = b"parity/staking-election/call";

/// Save a given call OCW storage.
pub(crate) fn save_solution<T: Config>(call: Call<T>) -> Result<(), OffchainElectionError> {
let storage = StorageValueRef::persistent(&OFFCHAIN_QUEUED_CALL);
// in all cases, just write the new value regardless of the the old one, if any.
let set_outcome = storage.mutate::<_, OffchainElectionError, _>(|_| Ok(call));
kianenigma marked this conversation as resolved.
Show resolved Hide resolved

match set_outcome {
Ok(Ok(_)) => Ok(()),
// failed to write.
Ok(Err(_)) => Err(OffchainElectionError::DBWriteFailed),
_ => {
// Defensive only: should not happen. Inner mutate closure always returns ok.
Err(OffchainElectionError::Unreachable)
}
}
}

/// Get a saved OCW solution, if it exists.
pub(crate) fn get_solution<T: Config>() -> Option<Call<T>> {
StorageValueRef::persistent(&OFFCHAIN_QUEUED_CALL).get().flatten()
}

/// Submit a given solution as an unsigned transaction.
pub(crate) fn submit_solution<T: Config>(call: Call<T>) -> Result<(), OffchainElectionError> {
SubmitTransaction::<T, Call<T>>::submit_unsigned_transaction(call.into())
.map_err(|_| OffchainElectionError::PoolSubmissionFailed)
}

/// Ensure that the given solution call belongs to the current era. Returns `Ok(call)` if so to be
/// used with `Result::and`.
pub(crate) fn ensure_solution_is_recent<T: Config>(
kianenigma marked this conversation as resolved.
Show resolved Hide resolved
call: Call<T>,
) -> Result<Call<T>, OffchainElectionError> {
let current_era = <Module<T>>::current_era().unwrap_or_default();
match call {
Call::submit_election_solution_unsigned(_, _, _, era, _) if era == current_era => Ok(call),
_ => Err(OffchainElectionError::SolutionOld),
kianenigma marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Compute a new solution and save it to the OCW storage.
pub(crate) fn compute_and_save<T: Config>() -> Result<Call<T>, OffchainElectionError> {
let call = compute_offchain_election::<T>()?;
save_solution::<T>(call.clone())?;
Ok(call)
}

/// Restore an old solution if exist, else compute a new one and save it, finally submit it.
pub(crate) fn restore_or_compute_then_submit<T: Config>() -> Result<(), OffchainElectionError> {
let call = get_solution::<T>()
.ok_or(OffchainElectionError::SolutionUnavailable)
.and_then(ensure_solution_is_recent)
.or_else(|_| compute_and_save::<T>())?;
kianenigma marked this conversation as resolved.
Show resolved Hide resolved
submit_solution::<T>(call)
}

/// Compute the solution, save it, and submit it.
pub(crate) fn compute_save_and_submit<T: Config>() -> Result<(), OffchainElectionError> {
let call = compute_and_save::<T>()?;
submit_solution::<T>(call)
}

/// The internal logic of the offchain worker of this module. This runs the phragmen election,
/// compacts and reduces the solution, computes the score and submits it back to the chain as an
/// unsigned transaction, without any signature.
pub(crate) fn compute_offchain_election<T: Config>() -> Result<(), OffchainElectionError> {
/// compacts and reduces the solution, computes the score returns a call that can be submitted back
/// to the chain.
pub(crate) fn compute_offchain_election<T: Config>() -> Result<Call<T>, OffchainElectionError> {
let iters = get_balancing_iters::<T>();
// compute raw solution. Note that we use `OffchainAccuracy`.
let ElectionResult {
Expand All @@ -135,17 +216,13 @@ pub(crate) fn compute_offchain_election<T: Config>() -> Result<(), OffchainElect
// defensive-only: current era can never be none except genesis.
let current_era = <Module<T>>::current_era().unwrap_or_default();

// send it.
let call = Call::submit_election_solution_unsigned(
Ok(Call::submit_election_solution_unsigned(
winners,
compact,
score,
current_era,
size,
).into();

SubmitTransaction::<T, Call<T>>::submit_unsigned_transaction(call)
.map_err(|_| OffchainElectionError::PoolSubmissionFailed)
))
}

/// Get a random number of iterations to run the balancing.
Expand Down Expand Up @@ -252,7 +329,7 @@ pub fn maximum_compact_len<W: crate::WeightInfo>(
/// Thus, we reside to stripping away some voters. This means only changing the `compact` struct.
///
/// Note that the solution is already computed, and the winners are elected based on the merit of
/// teh entire stake in the system. Nonetheless, some of the voters will be removed further down the
/// the entire stake in the system. Nonetheless, some of the voters will be removed further down the
/// line.
///
/// Indeed, the score must be computed **after** this step. If this step reduces the score too much,
Expand Down
19 changes: 10 additions & 9 deletions frame/staking/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4134,35 +4134,36 @@ mod offchain_election {
ext.execute_with(|| {
use offchain_election::OFFCHAIN_HEAD_DB;
use sp_runtime::offchain::storage::StorageValueRef;
use crate::offchain_election::{OffchainElectionError, OFFCHAIN_REPEAT};
let storage = StorageValueRef::persistent(&OFFCHAIN_HEAD_DB);

run_to_block(12);

// first run -- ok
assert_eq!(
offchain_election::set_check_offchain_execution_status::<Test>(12),
offchain_election::set_check_offchain_execution_status::<Test>(12, OFFCHAIN_REPEAT.into()),
Ok(()),
);
assert_eq!(storage.get::<BlockNumber>().unwrap().unwrap(), 12);

// re-execute after the next. not allowed.
assert_eq!(
offchain_election::set_check_offchain_execution_status::<Test>(13),
Err("recently executed."),
offchain_election::set_check_offchain_execution_status::<Test>(13, OFFCHAIN_REPEAT.into()),
Err(OffchainElectionError::TooRecent),
);

// a fork like situation -- re-execute 10, 11, 12. But it won't go through.
assert_eq!(
offchain_election::set_check_offchain_execution_status::<Test>(10),
Err("fork."),
offchain_election::set_check_offchain_execution_status::<Test>(10, OFFCHAIN_REPEAT.into()),
Err(OffchainElectionError::Fork),
);
assert_eq!(
offchain_election::set_check_offchain_execution_status::<Test>(11),
Err("fork."),
offchain_election::set_check_offchain_execution_status::<Test>(11, OFFCHAIN_REPEAT.into()),
Err(OffchainElectionError::Fork),
);
assert_eq!(
offchain_election::set_check_offchain_execution_status::<Test>(12),
Err("recently executed."),
offchain_election::set_check_offchain_execution_status::<Test>(12, OFFCHAIN_REPEAT.into()),
Err(OffchainElectionError::TooRecent),
);
})
}
Expand Down