Skip to content

Commit

Permalink
Further remove EE redundancy (#3324)
Browse files Browse the repository at this point in the history
## Issue Addressed

Resolves #3176

## Proposed Changes

Continues from PRs by @divagant-martian to gradually remove EL redundancy (see #3284, #3257).

This PR achieves:

- Removes the `broadcast` and `first_success` methods. The functional impact is that every request to the EE will always be tried immediately, regardless of the cached `EngineState` (this resolves #3176). Previously we would check the engine state before issuing requests, this doesn't make sense in a single-EE world; there's only one EE so we might as well try it for every request.
- Runs the upcheck/watchdog routine once per slot rather than thrice. When we had multiple EEs frequent polling was useful to try and detect when the primary EE had come back online and we could switch to it. That's not as relevant now.
- Always creates logs in the `Engines::upcheck` function. Previously we would mute some logs since they could get really noisy when one EE was down but others were functioning fine. Now we only have one EE and are upcheck-ing it less, it makes sense to always produce logs.

This PR purposefully does not achieve:

- Updating all occurances of "engines" to "engine". I'm trying to keep the diff small and manageable. We can come back for this.

## Additional Info

NA
  • Loading branch information
paulhauner committed Jul 13, 2022
1 parent a390695 commit 7a6e692
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 244 deletions.
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/tests/payload_invalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,11 @@ async fn payload_preparation_before_transition_block() {
let rig = InvalidPayloadRig::new();
let el = rig.execution_layer();

// Run the watchdog routine so that the status of the execution engine is set. This ensures
// that we don't end up with `eth_syncing` requests later in this function that will impede
// testing.
el.watchdog_task().await;

let head = rig.harness.chain.head_snapshot();
assert_eq!(
head.beacon_block
Expand Down
253 changes: 97 additions & 156 deletions beacon_node/execution_layer/src/engines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use crate::engine_api::{
};
use crate::HttpJsonRpc;
use lru::LruCache;
use slog::{crit, debug, info, warn, Logger};
use slog::{debug, error, info, Logger};
use std::future::Future;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::sync::{Mutex, RwLock};
use types::{Address, ExecutionBlockHash, Hash256};

Expand All @@ -16,7 +18,7 @@ use types::{Address, ExecutionBlockHash, Hash256};
const PAYLOAD_ID_LRU_CACHE_SIZE: usize = 512;

/// Stores the remembered state of a engine.
#[derive(Copy, Clone, PartialEq)]
#[derive(Copy, Clone, PartialEq, Debug)]
enum EngineState {
Synced,
Offline,
Expand All @@ -31,22 +33,6 @@ pub struct ForkChoiceState {
pub finalized_block_hash: ExecutionBlockHash,
}

/// Used to enable/disable logging on some tasks.
#[derive(Copy, Clone, PartialEq)]
pub enum Logging {
Enabled,
Disabled,
}

impl Logging {
pub fn is_enabled(&self) -> bool {
match self {
Logging::Enabled => true,
Logging::Disabled => false,
}
}
}

#[derive(Hash, PartialEq, std::cmp::Eq)]
struct PayloadIdCacheKey {
pub head_block_hash: ExecutionBlockHash,
Expand All @@ -69,17 +55,19 @@ pub struct Engine {
payload_id_cache: Mutex<LruCache<PayloadIdCacheKey, PayloadId>>,
state: RwLock<EngineState>,
pub latest_forkchoice_state: RwLock<Option<ForkChoiceState>>,
pub executor: TaskExecutor,
pub log: Logger,
}

impl Engine {
/// Creates a new, offline engine.
pub fn new(api: HttpJsonRpc, log: &Logger) -> Self {
pub fn new(api: HttpJsonRpc, executor: TaskExecutor, log: &Logger) -> Self {
Self {
api,
payload_id_cache: Mutex::new(LruCache::new(PAYLOAD_ID_LRU_CACHE_SIZE)),
state: RwLock::new(EngineState::Offline),
latest_forkchoice_state: Default::default(),
executor,
log: log.clone(),
}
}
Expand Down Expand Up @@ -179,164 +167,117 @@ impl Engine {
pub async fn is_synced(&self) -> bool {
*self.state.read().await == EngineState::Synced
}

/// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This
/// might be used to recover the node if offline.
pub async fn upcheck_not_synced(&self, logging: Logging) {
let mut state_lock = self.state.write().await;
if *state_lock != EngineState::Synced {
match self.api.upcheck().await {
Ok(()) => {
if logging.is_enabled() {
info!(
self.log,
"Execution engine online";
);
}
pub async fn upcheck(&self) {
let state: EngineState = match self.api.upcheck().await {
Ok(()) => {
let mut state = self.state.write().await;

if *state != EngineState::Synced {
info!(
self.log,
"Execution engine online";
);

// Send the node our latest forkchoice_state.
self.send_latest_forkchoice_state().await;

*state_lock = EngineState::Synced
} else {
debug!(
self.log,
"Execution engine online";
);
}
Err(EngineApiError::IsSyncing) => {
if logging.is_enabled() {
warn!(
self.log,
"Execution engine syncing";
)
}

// Send the node our latest forkchoice_state, it may assist with syncing.
self.send_latest_forkchoice_state().await;

*state_lock = EngineState::Syncing
}
Err(EngineApiError::Auth(err)) => {
if logging.is_enabled() {
warn!(
self.log,
"Failed jwt authorization";
"error" => ?err,
);
}

*state_lock = EngineState::AuthFailed
}
Err(e) => {
if logging.is_enabled() {
warn!(
self.log,
"Execution engine offline";
"error" => ?e,
)
}
}
*state = EngineState::Synced;
*state
}
}

if *state_lock != EngineState::Synced && logging.is_enabled() {
crit!(
self.log,
"No synced execution engines";
)
}
}
Err(EngineApiError::IsSyncing) => {
let mut state = self.state.write().await;
*state = EngineState::Syncing;
*state
}
Err(EngineApiError::Auth(err)) => {
error!(
self.log,
"Failed jwt authorization";
"error" => ?err,
);

/// Run `func` on the node.
///
/// This function might try to run `func` twice. If the node returns an error it will try to
/// upcheck it and then run the function again.
pub async fn first_success<'a, F, G, H>(&'a self, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
match self.first_success_without_retry(func).await {
Ok(result) => Ok(result),
let mut state = self.state.write().await;
*state = EngineState::AuthFailed;
*state
}
Err(e) => {
debug!(self.log, "First engine call failed. Retrying"; "err" => ?e);
// Try to recover the node.
self.upcheck_not_synced(Logging::Enabled).await;
// Try again.
self.first_success_without_retry(func).await
error!(
self.log,
"Error during execution engine upcheck";
"error" => ?e,
);

let mut state = self.state.write().await;
*state = EngineState::Offline;
*state
}
}
};

debug!(
self.log,
"Execution engine upcheck complete";
"state" => ?state,
);
}

/// Run `func` on the node.
pub async fn first_success_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Result<H, EngineError>
/// Run `func` on the node regardless of the node's current state.
///
/// ## Note
///
/// This function takes locks on `self.state`, holding a conflicting lock might cause a
/// deadlock.
pub async fn request<'a, F, G, H>(self: &'a Arc<Self>, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let (engine_synced, engine_auth_failed) = {
let state = self.state.read().await;
(
*state == EngineState::Synced,
*state == EngineState::AuthFailed,
)
};
if engine_synced {
match func(self).await {
Ok(result) => Ok(result),
Err(error) => {
debug!(
self.log,
"Execution engine call failed";
"error" => ?error,
match func(self).await {
Ok(result) => {
// Take a clone *without* holding the read-lock since the `upcheck` function will
// take a write-lock.
let state: EngineState = *self.state.read().await;

// If this request just returned successfully but we don't think this node is
// synced, check to see if it just became synced. This helps to ensure that the
// networking stack can get fast feedback about a synced engine.
if state != EngineState::Synced {
// Spawn the upcheck in another task to avoid slowing down this request.
let inner_self = self.clone();
self.executor.spawn(
async move { inner_self.upcheck().await },
"upcheck_after_success",
);
*self.state.write().await = EngineState::Offline;
Err(EngineError::Api { error })
}
}
} else if engine_auth_failed {
Err(EngineError::Auth)
} else {
Err(EngineError::Offline)
}
}

/// Runs `func` on the node.
///
/// This function might try to run `func` twice. If all nodes return an error on the first time
/// it runs, it will try to upcheck all offline nodes and then run the function again.
pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
match self.broadcast_without_retry(func).await {
Err(EngineError::Offline { .. }) => {
self.upcheck_not_synced(Logging::Enabled).await;
self.broadcast_without_retry(func).await
Ok(result)
}
other => other,
}
}
Err(error) => {
error!(
self.log,
"Execution engine call failed";
"error" => ?error,
);

/// Runs `func` on the node if it's last state is not offline.
pub async fn broadcast_without_retry<'a, F, G, H>(&'a self, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let func = &func;
if *self.state.read().await == EngineState::Offline {
Err(EngineError::Offline)
} else {
match func(self).await {
Ok(res) => Ok(res),
Err(error) => {
debug!(
self.log,
"Execution engine call failed";
"error" => ?error,
);
*self.state.write().await = EngineState::Offline;
Err(EngineError::Api { error })
}
// The node just returned an error, run an upcheck so we can update the endpoint
// state.
//
// Spawn the upcheck in another task to avoid slowing down this request.
let inner_self = self.clone();
self.executor.spawn(
async move { inner_self.upcheck().await },
"upcheck_after_error",
);

Err(EngineError::Api { error })
}
}
}
Expand Down
Loading

0 comments on commit 7a6e692

Please sign in to comment.