Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add builder SSZ flow #6859

Merged
merged 21 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions beacon_node/builder_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ authors = ["Sean Anderson <[email protected]>"]

[dependencies]
eth2 = { workspace = true }
ethereum_ssz = { workspace = true }
lighthouse_version = { workspace = true }
reqwest = { workspace = true }
sensitive_url = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
179 changes: 174 additions & 5 deletions beacon_node/builder_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
use eth2::types::builder_bid::SignedBuilderBid;
use eth2::types::fork_versioned_response::EmptyMetadata;
use eth2::types::{
EthSpec, ExecutionBlockHash, ForkVersionedResponse, PublicKeyBytes,
SignedValidatorRegistrationData, Slot,
ContentType, EthSpec, ExecutionBlockHash, ForkName, ForkVersionDecode, ForkVersionDeserialize,
ForkVersionedResponse, PublicKeyBytes, SignedValidatorRegistrationData, Slot,
};
use eth2::types::{FullPayloadContents, SignedBlindedBeaconBlock};
pub use eth2::Error;
use eth2::{ok_or_error, StatusCode, CONSENSUS_VERSION_HEADER};
use reqwest::header::{HeaderMap, HeaderValue};
use eth2::{
ok_or_error, StatusCode, CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER,
JSON_CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER,
};
use reqwest::header::{HeaderMap, HeaderValue, ACCEPT};
use reqwest::{IntoUrl, Response};
use sensitive_url::SensitiveUrl;
use serde::de::DeserializeOwned;
use serde::Serialize;
use ssz::Encode;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;

use std::sync::Arc;

pub const DEFAULT_TIMEOUT_MILLIS: u64 = 15000;

/// This timeout is in accordance with v0.2.0 of the [builder specs](https://github.com/flashbots/mev-boost/pull/20).
Expand Down Expand Up @@ -49,6 +58,7 @@ pub struct BuilderHttpClient {
server: SensitiveUrl,
timeouts: Timeouts,
user_agent: String,
ssz_enabled: Arc<AtomicBool>,
}

impl BuilderHttpClient {
Expand All @@ -64,13 +74,85 @@ impl BuilderHttpClient {
server,
timeouts: Timeouts::new(builder_header_timeout),
user_agent,
ssz_enabled: Arc::new(false.into()),
eserilev marked this conversation as resolved.
Show resolved Hide resolved
})
}

pub fn get_user_agent(&self) -> &str {
&self.user_agent
}

fn fork_name_from_header(&self, headers: &HeaderMap) -> Result<Option<ForkName>, String> {
headers
.get(CONSENSUS_VERSION_HEADER)
.map(|fork_name| {
fork_name
.to_str()
.map_err(|e| e.to_string())
.and_then(ForkName::from_str)
})
.transpose()
}

fn content_type_from_header(&self, headers: &HeaderMap) -> ContentType {
let Some(content_type) = headers.get(CONTENT_TYPE_HEADER).map(|content_type| {
let content_type = content_type.to_str();
match content_type {
Ok(SSZ_CONTENT_TYPE_HEADER) => ContentType::Ssz,
_ => ContentType::Json,
}
}) else {
return ContentType::Json;
};
content_type
}

async fn get_with_header<
T: DeserializeOwned + ForkVersionDecode + ForkVersionDeserialize,
U: IntoUrl,
>(
&self,
url: U,
timeout: Duration,
headers: HeaderMap,
) -> Result<ForkVersionedResponse<T>, Error> {
let response = self
.get_response_with_header(url, Some(timeout), headers)
.await?;

let headers = response.headers().clone();
let response_bytes = response.bytes().await?;

let Ok(Some(fork_name)) = self.fork_name_from_header(&headers) else {
// if no fork version specified, attempt to fallback to JSON
self.ssz_enabled.store(false, Ordering::SeqCst);
return serde_json::from_slice(&response_bytes).map_err(Error::InvalidJson);
};

let content_type = self.content_type_from_header(&headers);

match content_type {
ContentType::Ssz => {
self.ssz_enabled.store(true, Ordering::SeqCst);
T::from_ssz_bytes_by_fork(&response_bytes, fork_name)
.map(|data| ForkVersionedResponse {
version: Some(fork_name),
metadata: EmptyMetadata {},
data,
})
.map_err(Error::InvalidSsz)
}
ContentType::Json => {
eserilev marked this conversation as resolved.
Show resolved Hide resolved
self.ssz_enabled.store(false, Ordering::SeqCst);
serde_json::from_slice(&response_bytes).map_err(Error::InvalidJson)
}
}
}

pub fn is_ssz_enabled(&self) -> bool {
self.ssz_enabled.load(Ordering::SeqCst)
}

async fn get_with_timeout<T: DeserializeOwned, U: IntoUrl>(
&self,
url: U,
Expand All @@ -83,6 +165,21 @@ impl BuilderHttpClient {
.map_err(Into::into)
}

/// Perform a HTTP GET request, returning the `Response` for further processing.
async fn get_response_with_header<U: IntoUrl>(
&self,
url: U,
timeout: Option<Duration>,
headers: HeaderMap,
) -> Result<Response, Error> {
let mut builder = self.client.get(url);
if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}
let response = builder.headers(headers).send().await.map_err(Error::from)?;
ok_or_error(response).await
}

/// Perform a HTTP GET request, returning the `Response` for further processing.
async fn get_response_with_timeout<U: IntoUrl>(
&self,
Expand Down Expand Up @@ -112,6 +209,32 @@ impl BuilderHttpClient {
ok_or_error(response).await
}

async fn post_ssz_with_raw_response<U: IntoUrl>(
&self,
url: U,
ssz_body: Vec<u8>,
mut headers: HeaderMap,
timeout: Option<Duration>,
) -> Result<Response, Error> {
let mut builder = self.client.post(url);
if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}

headers.insert(
CONTENT_TYPE_HEADER,
HeaderValue::from_static(SSZ_CONTENT_TYPE_HEADER),
);

let response = builder
.headers(headers)
.body(ssz_body)
.send()
.await
.map_err(Error::from)?;
ok_or_error(response).await
}

async fn post_with_raw_response<T: Serialize, U: IntoUrl>(
&self,
url: U,
Expand Down Expand Up @@ -152,6 +275,42 @@ impl BuilderHttpClient {
Ok(())
}

/// `POST /eth/v1/builder/blinded_blocks` with SSZ serialized request body
pub async fn post_builder_blinded_blocks_ssz<E: EthSpec>(
&self,
blinded_block: &SignedBlindedBeaconBlock<E>,
) -> Result<FullPayloadContents<E>, Error> {
let mut path = self.server.full.clone();

let body = blinded_block.as_ssz_bytes();

path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("eth")
.push("v1")
.push("builder")
.push("blinded_blocks");

let mut headers = HeaderMap::new();
if let Ok(value) = HeaderValue::from_str(&blinded_block.fork_name_unchecked().to_string()) {
headers.insert(CONSENSUS_VERSION_HEADER, value);
}

let result = self
.post_ssz_with_raw_response(
path,
body,
headers,
Some(self.timeouts.post_blinded_blocks),
)
.await?
.bytes()
.await?;

FullPayloadContents::from_ssz_bytes_by_fork(&result, blinded_block.fork_name_unchecked())
.map_err(Error::InvalidSsz)
}

/// `POST /eth/v1/builder/blinded_blocks`
pub async fn post_builder_blinded_blocks<E: EthSpec>(
&self,
Expand Down Expand Up @@ -202,7 +361,17 @@ impl BuilderHttpClient {
.push(format!("{parent_hash:?}").as_str())
.push(pubkey.as_hex_string().as_str());

let resp = self.get_with_timeout(path, self.timeouts.get_header).await;
let mut headers = HeaderMap::new();
if let Ok(ssz_content_type_header) = HeaderValue::from_str(&format!(
"{}; q=1.0,{}; q=0.9",
SSZ_CONTENT_TYPE_HEADER, JSON_CONTENT_TYPE_HEADER
)) {
headers.insert(ACCEPT, ssz_content_type_header);
};

let resp = self
.get_with_header(path, self.timeouts.get_header, headers)
.await;

if matches!(resp, Err(Error::StatusCode(StatusCode::NO_CONTENT))) {
Ok(None)
Expand Down
17 changes: 12 additions & 5 deletions beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1900,11 +1900,18 @@ impl<E: EthSpec> ExecutionLayer<E> {
if let Some(builder) = self.builder() {
let (payload_result, duration) =
timed_future(metrics::POST_BLINDED_PAYLOAD_BUILDER, async {
builder
.post_builder_blinded_blocks(block)
.await
.map_err(Error::Builder)
.map(|d| d.data)
if builder.is_ssz_enabled() {
builder
.post_builder_blinded_blocks_ssz(block)
.await
.map_err(Error::Builder)
} else {
builder
.post_builder_blinded_blocks(block)
.await
.map_err(Error::Builder)
.map(|d| d.data)
}
})
.await;

Expand Down
Loading