Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Initial work to remove engines fallback from the execution_layer crate #3257

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 119 additions & 160 deletions beacon_node/execution_layer/src/engines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,11 @@ impl Builder for Engine<BuilderApi> {
}
}

/// Holds multiple execution engines and provides functionality for managing them in a fallback
/// manner.
// This structure used to hold multiple execution engines managed in a fallback manner. This
// functionality has been removed following https://github.com/sigp/lighthouse/issues/3118 and this
// struct will likely be removed in the future.
pub struct Engines {
pub engines: Vec<Engine<EngineApi>>,
pub engine: Engine<EngineApi>,
pub latest_forkchoice_state: RwLock<Option<ForkChoiceState>>,
pub log: Logger,
}
Expand All @@ -185,7 +186,7 @@ impl Engines {
*self.latest_forkchoice_state.write().await = Some(state);
}

async fn send_latest_forkchoice_state(&self, engine: &Engine<EngineApi>) {
async fn send_latest_forkchoice_state(&self) {
let latest_forkchoice_state = self.get_latest_forkchoice_state().await;

if let Some(forkchoice_state) = latest_forkchoice_state {
Expand All @@ -194,7 +195,7 @@ impl Engines {
self.log,
"No need to call forkchoiceUpdated";
"msg" => "head does not have execution enabled",
"id" => &engine.id,
"id" => &self.engine.id,
);
return;
}
Expand All @@ -203,12 +204,13 @@ impl Engines {
self.log,
"Issuing forkchoiceUpdated";
"forkchoice_state" => ?forkchoice_state,
"id" => &engine.id,
"id" => &self.engine.id,
);

// For simplicity, payload attributes are never included in this call. It may be
// reasonable to include them in the future.
if let Err(e) = engine
if let Err(e) = self
.engine
.api
.forkchoice_updated_v1(forkchoice_state, None)
.await
Expand All @@ -217,98 +219,77 @@ impl Engines {
self.log,
"Failed to issue latest head to engine";
"error" => ?e,
"id" => &engine.id,
"id" => &self.engine.id,
);
}
} else {
debug!(
self.log,
"No head, not sending to engine";
"id" => &engine.id,
"id" => &self.engine.id,
);
}
}

/// Returns `true` if there is at least one engine with a "synced" status.
pub async fn any_synced(&self) -> bool {
for engine in &self.engines {
if *engine.state.read().await == EngineState::Synced {
return true;
}
}
false
/// Returns `true` if the engine has a "synced" status.
pub async fn is_synced(&self) -> bool {
*self.engine.state.read().await == EngineState::Synced
}

/// Run the `EngineApi::upcheck` function on all nodes which are currently offline.
///
/// This can be used to try and recover any offline nodes.
/// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This
/// might be used to recover the node if offline.
pub async fn upcheck_not_synced(&self, logging: Logging) {
let upcheck_futures = self.engines.iter().map(|engine| async move {
let mut state_lock = engine.state.write().await;
if *state_lock != EngineState::Synced {
match engine.api.upcheck().await {
Ok(()) => {
if logging.is_enabled() {
info!(
self.log,
"Execution engine online";
"id" => &engine.id
);
}

// Send the node our latest forkchoice_state.
self.send_latest_forkchoice_state(engine).await;

*state_lock = EngineState::Synced
let mut state_lock = self.engine.state.write().await;
if *state_lock != EngineState::Synced {
match self.engine.api.upcheck().await {
Ok(()) => {
if logging.is_enabled() {
info!(
self.log,
"Execution engine online";
);
}
Err(EngineApiError::IsSyncing) => {
if logging.is_enabled() {
warn!(
self.log,
"Execution engine syncing";
"id" => &engine.id
)
}

// Send the node our latest forkchoice_state, it may assist with syncing.
self.send_latest_forkchoice_state(engine).await;

*state_lock = EngineState::Syncing
// Send the node our latest forkchoice_state.
self.send_latest_forkchoice_state().await;

*state_lock = EngineState::Synced
}
Err(EngineApiError::IsSyncing) => {
if logging.is_enabled() {
warn!(
self.log,
"Execution engine syncing";
)
}
Err(EngineApiError::Auth(err)) => {
if logging.is_enabled() {
warn!(
self.log,
"Failed jwt authorization";
"error" => ?err,
"id" => &engine.id
);
}

*state_lock = EngineState::AuthFailed

// Send the node our latest forkchoice_state, it may assist with syncing.
self.send_latest_forkchoice_state().await;

*state_lock = EngineState::Syncing
}
Err(EngineApiError::Auth(err)) => {
if logging.is_enabled() {
warn!(
self.log,
"Failed jwt authorization";
"error" => ?err,
);
}
Err(e) => {
if logging.is_enabled() {
warn!(
self.log,
"Execution engine offline";
"error" => ?e,
"id" => &engine.id
)
}

*state_lock = EngineState::AuthFailed
}
Err(e) => {
if logging.is_enabled() {
warn!(
self.log,
"Execution engine offline";
"error" => ?e,
)
}
}
}
*state_lock
});

let num_synced = join_all(upcheck_futures)
.await
.into_iter()
.filter(|state: &EngineState| *state == EngineState::Synced)
.count();
}

if num_synced == 0 && logging.is_enabled() {
if *state_lock != EngineState::Synced && logging.is_enabled() {
crit!(
self.log,
"No synced execution engines";
Expand Down Expand Up @@ -355,111 +336,89 @@ impl Engines {
{
let mut errors = vec![];

for engine in &self.engines {
let (engine_synced, engine_auth_failed) = {
let state = engine.state.read().await;
(
*state == EngineState::Synced,
*state == EngineState::AuthFailed,
)
};
if engine_synced {
match func(engine).await {
Ok(result) => return Ok(result),
Err(error) => {
debug!(
self.log,
"Execution engine call failed";
"error" => ?error,
"id" => &engine.id
);
*engine.state.write().await = EngineState::Offline;
errors.push(EngineError::Api {
id: engine.id.clone(),
error,
})
}
let (engine_synced, engine_auth_failed) = {
let state = self.engine.state.read().await;
(
*state == EngineState::Synced,
*state == EngineState::AuthFailed,
)
};
if engine_synced {
match func(&self.engine).await {
Ok(result) => return Ok(result),
Err(error) => {
debug!(
self.log,
"Execution engine call failed";
"error" => ?error,
"id" => &&self.engine.id
);
*self.engine.state.write().await = EngineState::Offline;
errors.push(EngineError::Api {
id: self.engine.id.clone(),
error,
})
}
} else if engine_auth_failed {
errors.push(EngineError::Auth {
id: engine.id.clone(),
})
} else {
errors.push(EngineError::Offline {
id: engine.id.clone(),
})
}
} else if engine_auth_failed {
errors.push(EngineError::Auth {
id: self.engine.id.clone(),
})
} else {
errors.push(EngineError::Offline {
id: self.engine.id.clone(),
})
}

Err(errors)
}

/// Runs `func` on all nodes concurrently, returning all results. Any nodes that are offline
/// will be ignored, however all synced or unsynced nodes will receive the broadcast.
/// Runs `func` on the node.
///
/// This function might try to run `func` twice. If all nodes return an error on the first time
/// it runs, it will try to upcheck all offline nodes and then run the function again.
pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Vec<Result<H, EngineError>>
pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine<EngineApi>) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
let first_results = self.broadcast_without_retry(func).await;

let mut any_offline = false;
for result in &first_results {
match result {
Ok(_) => return first_results,
Err(EngineError::Offline { .. }) => any_offline = true,
_ => (),
match self.broadcast_without_retry(func).await {
Err(EngineError::Offline { .. }) => {
self.upcheck_not_synced(Logging::Enabled).await;
self.broadcast_without_retry(func).await
}
}

if any_offline {
self.upcheck_not_synced(Logging::Enabled).await;
self.broadcast_without_retry(func).await
} else {
first_results
other => other,
}
}

/// Runs `func` on all nodes concurrently, returning all results.
pub async fn broadcast_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Vec<Result<H, EngineError>>
/// Runs `func` on the node if it's last state is not offline.
pub async fn broadcast_without_retry<'a, F, G, H>(&'a self, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine<EngineApi>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let func = &func;
let futures = self.engines.iter().map(|engine| async move {
let is_offline = *engine.state.read().await == EngineState::Offline;
if !is_offline {
match func(engine).await {
Ok(res) => Ok(res),
Err(error) => {
debug!(
self.log,
"Execution engine call failed";
"error" => ?error,
"id" => &engine.id
);
*engine.state.write().await = EngineState::Offline;
Err(EngineError::Api {
id: engine.id.clone(),
error,
})
}
if *self.engine.state.read().await == EngineState::Offline {
Err(EngineError::Offline {
id: self.engine.id.clone(),
})
} else {
match func(&self.engine).await {
Ok(res) => Ok(res),
Err(error) => {
debug!(
self.log,
"Execution engine call failed";
"error" => ?error,
);
*self.engine.state.write().await = EngineState::Offline;
Err(EngineError::Api {
id: self.engine.id.clone(),
error,
})
}
} else {
Err(EngineError::Offline {
id: engine.id.clone(),
})
}
});

join_all(futures).await
}
}
}

Expand Down
Loading