Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timestamp to meta block #3738

Merged
merged 2 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,14 @@ pub trait ChainStore: Send + Sync + 'static {
/// may purge any other blocks with that number
fn confirm_block_hash(&self, number: BlockNumber, hash: &BlockHash) -> Result<usize, Error>;

/// Find the block with `block_hash` and return the network name and number
fn block_number(&self, hash: &BlockHash) -> Result<Option<(String, BlockNumber)>, StoreError>;
/// Find the block with `block_hash` and return the network name, number and timestamp if present.
/// Currently, the timestamp is only returned if it's present in the top level block. This format is
/// depends on the chain and the implementation of Blockchain::Block for the specific chain.
/// eg: {"block": { "timestamp": 123123123 } }
fn block_number(
&self,
hash: &BlockHash,
) -> Result<Option<(String, BlockNumber, Option<String>)>, StoreError>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tuple is getting too complex, seems worth introducing a struct. Also there might be a better name than block_number since this also returns other info.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this refactoring should go on the same PR though. I'm happy to make the changes and raise a new PR for it


/// Tries to retrieve all transactions receipts for a given block.
async fn transaction_receipts_in_block(
Expand Down Expand Up @@ -409,6 +415,14 @@ pub trait QueryStore: Send + Sync {

fn block_number(&self, block_hash: &BlockHash) -> Result<Option<BlockNumber>, StoreError>;

/// Returns the blocknumber as well as the timestamp. Timestamp depends on the chain block type
/// and can have multiple formats, it can also not be prevent. For now this is only available
/// for EVM chains both firehose and rpc.
fn block_number_with_timestamp(
&self,
block_hash: &BlockHash,
) -> Result<Option<(BlockNumber, Option<String>)>, StoreError>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have a struct BlockPtrTs (not a fan of the name but ok), we could also return it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's an package internal name, I went with I thought was an accurate description 😛 happy to rename if you have suggestions


fn wait_stats(&self) -> Result<PoolWaitStats, StoreError>;

async fn has_deterministic_errors(&self, block: BlockNumber) -> Result<bool, StoreError>;
Expand Down
2 changes: 1 addition & 1 deletion graphql/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ where
let query_res = execute_query(
query.clone(),
Some(selection_set),
resolver.block_ptr.clone(),
resolver.block_ptr.as_ref().map(Into::into).clone(),
QueryExecutionOptions {
resolver,
deadline: ENV_VARS.graphql.query_timeout.map(|t| Instant::now() + t),
Expand Down
2 changes: 2 additions & 0 deletions graphql/src/schema/meta.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type _Block_ {
hash: Bytes
"The block number"
number: Int!
"Timestamp of the block if available, format depends on the chain"
timestamp: String
}

enum _SubgraphErrorPolicy_ {
Expand Down
84 changes: 70 additions & 14 deletions graphql/src/store/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,34 @@ pub struct StoreResolver {
logger: Logger,
pub(crate) store: Arc<dyn QueryStore>,
subscription_manager: Arc<dyn SubscriptionManager>,
pub(crate) block_ptr: Option<BlockPtr>,
pub(crate) block_ptr: Option<BlockPtrTs>,
deployment: DeploymentHash,
has_non_fatal_errors: bool,
error_policy: ErrorPolicy,
graphql_metrics: Arc<GraphQLMetrics>,
}

#[derive(Clone, Debug)]
pub(crate) struct BlockPtrTs {
pub ptr: BlockPtr,
pub timestamp: Option<String>,
}

impl From<BlockPtr> for BlockPtrTs {
fn from(ptr: BlockPtr) -> Self {
Self {
ptr,
timestamp: None,
}
}
}

impl From<&BlockPtrTs> for BlockPtr {
fn from(ptr: &BlockPtrTs) -> Self {
ptr.ptr.cheap_clone()
}
}

impl CheapClone for StoreResolver {}

impl StoreResolver {
Expand Down Expand Up @@ -80,7 +101,7 @@ impl StoreResolver {
let block_ptr = Self::locate_block(store_clone.as_ref(), bc, state).await?;

let has_non_fatal_errors = store
.has_deterministic_errors(block_ptr.block_number())
.has_deterministic_errors(block_ptr.ptr.block_number())
.await?;

let resolver = StoreResolver {
Expand All @@ -99,15 +120,16 @@ impl StoreResolver {
pub fn block_number(&self) -> BlockNumber {
self.block_ptr
.as_ref()
.map(|ptr| ptr.number as BlockNumber)
.map(|ptr| ptr.ptr.number as BlockNumber)
.unwrap_or(BLOCK_NUMBER_MAX)
}

/// locate_block returns the block pointer and it's timestamp when available.
async fn locate_block(
store: &dyn QueryStore,
bc: BlockConstraint,
state: &DeploymentState,
) -> Result<BlockPtr, QueryExecutionError> {
) -> Result<BlockPtrTs, QueryExecutionError> {
fn block_queryable(
state: &DeploymentState,
block: BlockNumber,
Expand All @@ -117,23 +139,39 @@ impl StoreResolver {
.map_err(|msg| QueryExecutionError::ValueParseError("block.number".to_owned(), msg))
}

fn get_block_ts(
store: &dyn QueryStore,
ptr: &BlockPtr,
) -> Result<Option<String>, QueryExecutionError> {
match store
.block_number_with_timestamp(&ptr.hash)
.map_err(Into::<QueryExecutionError>::into)?
{
Some((_, Some(ts))) => Ok(Some(ts)),
_ => Ok(None),
}
}

match bc {
BlockConstraint::Hash(hash) => {
let ptr = store
.block_number(&hash)
.block_number_with_timestamp(&hash)
.map_err(Into::into)
.and_then(|number| {
number
.and_then(|result| {
result
.ok_or_else(|| {
QueryExecutionError::ValueParseError(
"block.hash".to_owned(),
"no block with that hash found".to_owned(),
)
})
.map(|number| BlockPtr::new(hash, number))
.map(|(number, ts)| BlockPtrTs {
ptr: BlockPtr::new(hash, number),
timestamp: ts,
})
})?;

block_queryable(state, ptr.number)?;
block_queryable(state, ptr.ptr.number)?;
Ok(ptr)
}
BlockConstraint::Number(number) => {
Expand All @@ -144,7 +182,7 @@ impl StoreResolver {
// always return an all zeroes hash when users specify
// a block number
// See 7a7b9708-adb7-4fc2-acec-88680cb07ec1
Ok(BlockPtr::from((web3::types::H256::zero(), number as u64)))
Ok(BlockPtr::from((web3::types::H256::zero(), number as u64)).into())
}
BlockConstraint::Min(min) => {
let ptr = state.latest_block.cheap_clone();
Expand All @@ -158,9 +196,18 @@ impl StoreResolver {
),
));
}
Ok(ptr)
let timestamp = get_block_ts(store, &state.latest_block)?;

Ok(BlockPtrTs { ptr, timestamp })
}
BlockConstraint::Latest => {
let timestamp = get_block_ts(store, &state.latest_block)?;

Ok(BlockPtrTs {
ptr: state.latest_block.cheap_clone(),
timestamp,
})
}
BlockConstraint::Latest => Ok(state.latest_block.cheap_clone()),
}
}

Expand All @@ -181,7 +228,7 @@ impl StoreResolver {
// locate_block indicates that we do not have a block hash
// by setting the hash to `zero`
// See 7a7b9708-adb7-4fc2-acec-88680cb07ec1
let hash_h256 = ptr.hash_as_h256();
let hash_h256 = ptr.ptr.hash_as_h256();
if hash_h256 == web3::types::H256::zero() {
None
} else {
Expand All @@ -192,12 +239,21 @@ impl StoreResolver {
let number = self
.block_ptr
.as_ref()
.map(|ptr| r::Value::Int((ptr.number as i32).into()))
.map(|ptr| r::Value::Int((ptr.ptr.number as i32).into()))
.unwrap_or(r::Value::Null);

let timestamp = self.block_ptr.as_ref().map(|ptr| {
ptr.timestamp
.clone()
.map(|ts| r::Value::String(ts))
.unwrap_or(r::Value::Null)
});

let mut map = BTreeMap::new();
let block = object! {
hash: hash,
number: number,
timestamp: timestamp,
__typename: BLOCK_FIELD_TYPE
};
map.insert("prefetch:block".into(), r::Value::List(vec![block]));
Expand Down
2 changes: 1 addition & 1 deletion graphql/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async fn execute_subscription_event(
Err(e) => return Arc::new(e.into()),
};

let block_ptr = resolver.block_ptr.clone();
let block_ptr = resolver.block_ptr.as_ref().map(Into::into);

// Create a fresh execution context with deadline.
let ctx = Arc::new(ExecutionContext {
Expand Down
2 changes: 1 addition & 1 deletion node/src/manager/commands/rewind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn block_ptr(
None => bail!("can not find chain store for {}", chain),
Some(store) => store,
};
if let Some((_, number)) = chain_store.block_number(&block_ptr_to.hash)? {
if let Some((_, number, _)) = chain_store.block_number(&block_ptr_to.hash)? {
if number != block_ptr_to.number {
bail!(
"the given hash is for block number {} but the command specified block number {}",
Expand Down
5 changes: 3 additions & 2 deletions server/index-node/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ impl<S: Store> IndexNodeResolver<S> {
let chain_store = chain.chain_store();
let call_cache = chain.call_cache();

let block_number = match chain_store.block_number(&block_hash) {
Ok(Some((_, n))) => n,
let (block_number, timestamp) = match chain_store.block_number(&block_hash) {
Ok(Some((_, n, timestamp))) => (n, timestamp),
Ok(None) => {
error!(
self.logger,
Expand Down Expand Up @@ -277,6 +277,7 @@ impl<S: Store> IndexNodeResolver<S> {
block: object! {
hash: cached_call.block_ptr.hash.hash_hex(),
number: cached_call.block_ptr.number,
timestamp: timestamp.clone(),
},
contractAddress: &cached_call.contract_address[..],
returnValue: &cached_call.return_value[..],
Expand Down
37 changes: 24 additions & 13 deletions store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,34 +589,42 @@ mod data {
}
}

/// timestamp's representation depends the blockchain::Block implementation, on
/// ethereum this is a U256 but on different chains it will most likely be different.
pub(super) fn block_number(
&self,
conn: &PgConnection,
hash: &BlockHash,
) -> Result<Option<BlockNumber>, StoreError> {
) -> Result<Option<(BlockNumber, Option<String>)>, StoreError> {
const TIMESTAMP_QUERY: &str =
"coalesce(data->'block'->>'timestamp', data->>'timestamp')";

let number = match self {
Storage::Shared => {
use public::ethereum_blocks as b;

b::table
.select(b::number)
.select((b::number, sql(TIMESTAMP_QUERY)))
.filter(b::hash.eq(format!("{:x}", hash)))
.first::<i64>(conn)
.first::<(i64, Option<String>)>(conn)
.optional()?
}
Storage::Private(Schema { blocks, .. }) => blocks
.table()
.select(blocks.number())
.select((blocks.number(), sql(TIMESTAMP_QUERY)))
.filter(blocks.hash().eq(hash.as_slice()))
.first::<i64>(conn)
.first::<(i64, Option<String>)>(conn)
.optional()?,
};
number
.map(|number| {
BlockNumber::try_from(number)
.map_err(|e| StoreError::QueryExecutionError(e.to_string()))
})
.transpose()

match number {
None => Ok(None),
Some((number, ts)) => {
let number = BlockNumber::try_from(number)
.map_err(|e| StoreError::QueryExecutionError(e.to_string()))?;
Ok(Some((number, ts)))
}
}
}

/// Find the first block that is missing from the database needed to
Expand Down Expand Up @@ -1690,12 +1698,15 @@ impl ChainStoreTrait for ChainStore {
.confirm_block_hash(&conn, &self.chain, number, hash)
}

fn block_number(&self, hash: &BlockHash) -> Result<Option<(String, BlockNumber)>, StoreError> {
fn block_number(
&self,
hash: &BlockHash,
) -> Result<Option<(String, BlockNumber, Option<String>)>, StoreError> {
let conn = self.get_conn()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would ideally be asyncified to use with_conn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is also used in a few places so I'd rather take that as a separate PR

Ok(self
.storage
.block_number(&conn, hash)?
.map(|number| (self.chain.clone(), number)))
.map(|(number, timestamp)| (self.chain.clone(), number, timestamp)))
}

async fn transaction_receipts_in_block(
Expand Down
15 changes: 11 additions & 4 deletions store/postgres/src/query_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ impl QueryStoreTrait for QueryStore {
async fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError> {
self.store.block_ptr(self.site.cheap_clone()).await
}

fn block_number(&self, block_hash: &BlockHash) -> Result<Option<BlockNumber>, StoreError> {
fn block_number_with_timestamp(
&self,
block_hash: &BlockHash,
) -> Result<Option<(BlockNumber, Option<String>)>, StoreError> {
// We should also really check that the block with the given hash is
// on the chain starting at the subgraph's current head. That check is
// very expensive though with the data structures we have currently
Expand All @@ -71,9 +73,9 @@ impl QueryStoreTrait for QueryStore {
let subgraph_network = self.network_name();
self.chain_store
.block_number(block_hash)?
.map(|(network_name, number)| {
.map(|(network_name, number, timestamp)| {
if network_name == subgraph_network {
Ok(number)
Ok((number, timestamp))
} else {
Err(StoreError::QueryExecutionError(format!(
"subgraph {} belongs to network {} but block {:x} belongs to network {}",
Expand All @@ -84,6 +86,11 @@ impl QueryStoreTrait for QueryStore {
.transpose()
}

fn block_number(&self, block_hash: &BlockHash) -> Result<Option<BlockNumber>, StoreError> {
self.block_number_with_timestamp(block_hash)
.map(|opt| opt.map(|(number, _)| number))
}

fn wait_stats(&self) -> Result<PoolWaitStats, StoreError> {
self.store.wait_stats(self.replica_id)
}
Expand Down
Loading