diff --git a/Cargo.lock b/Cargo.lock index d8306072905..a10acb3b8fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -422,6 +422,22 @@ dependencies = [ "unreachable", ] +[[package]] +name = "console" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a50aab2529019abfabfa93f1e6c41ef392f91fbf179b347a7e96abb524884a08" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "regex", + "terminal_size", + "unicode-width", + "winapi", + "winapi-util", +] + [[package]] name = "const_fn_assert" version = "0.1.2" @@ -826,6 +842,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198" +[[package]] +name = "difflib" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" + [[package]] name = "digest" version = "0.9.0" @@ -889,6 +911,12 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "encoding_rs" version = "0.8.28" @@ -1549,6 +1577,7 @@ dependencies = [ "graph-store-postgres", "graphql-parser", "http", + "json-structural-diff", "lazy_static", "prometheus", "regex", @@ -2106,6 +2135,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json-structural-diff" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25c7940d3c84d2079306c176c7b2b37622b6bc5e43fbd1541b1e4a4e1fd02045" +dependencies = [ + "console", + "difflib", + "regex", + "serde_json", +] + [[package]] name = "jsonrpc-core" version = "18.0.0" @@ -3855,6 +3896,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "terminal_size" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "test-store" version = "0.25.2" diff --git a/node/Cargo.toml b/node/Cargo.toml index 85996450858..a1245ac7984 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -44,6 +44,7 @@ diesel = "1.4.8" fail = "0.5" http = "0.2.5" # must be compatible with the version rust-web3 uses prometheus = { version ="0.13.0", features = ["push"] } +json-structural-diff = {version = "0.1", features = ["colorize"] } [dev-dependencies] assert_cli = "0.6" diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 838fd5f8eb0..3c631aae3fd 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -1,43 +1,38 @@ -use std::{collections::HashMap, env, num::ParseIntError, sync::Arc, time::Duration}; - use config::PoolSize; use git_testament::{git_testament, render_testament}; use graph::{data::graphql::effort::LoadManager, prelude::chrono, prometheus::Registry}; -use graph_core::MetricsRegistry; -use graph_graphql::prelude::GraphQlRunner; -use lazy_static::lazy_static; -use structopt::StructOpt; - use graph::{ log::logger, - prelude::{info, o, slog, tokio, Logger, NodeId, ENV_VARS}, + prelude::{ + anyhow::{self, Context as AnyhowContextTrait}, + info, o, slog, tokio, Logger, NodeId, ENV_VARS, + }, url::Url, }; +use graph_chain_ethereum::{EthereumAdapter, EthereumNetworks}; +use graph_core::MetricsRegistry; +use graph_graphql::prelude::GraphQlRunner; +use graph_node::config::{self, Config as Cfg}; +use graph_node::manager::commands; use graph_node::{ + chain::create_ethereum_networks, manager::{deployment::DeploymentSearch, PanicSubscriptionManager}, store_builder::StoreBuilder, MetricsContext, }; +use graph_store_postgres::ChainStore; use graph_store_postgres::{ connection_pool::ConnectionPool, BlockStore, Shard, Store, SubgraphStore, SubscriptionManager, PRIMARY_SHARD, }; - -use graph_node::config::{self, Config as Cfg}; -use graph_node::manager::commands; +use lazy_static::lazy_static; +use std::{collections::HashMap, env, num::ParseIntError, sync::Arc, time::Duration}; +use structopt::StructOpt; const VERSION_LABEL_KEY: &str = "version"; git_testament!(TESTAMENT); -macro_rules! die { - ($fmt:expr, $($arg:tt)*) => {{ - use std::io::Write; - writeln!(&mut ::std::io::stderr(), $fmt, $($arg)*).unwrap(); - ::std::process::exit(1) - }} -} - lazy_static! { static ref RENDERED_TESTAMENT: String = render_testament!(TESTAMENT); } @@ -346,6 +341,25 @@ pub enum ChainCommand { /// There must be no deployments using that chain. If there are, the /// subgraphs and/or deployments using the chain must first be removed Remove { name: String }, + + /// Compares cached blocks with fresh ones and clears the block cache when they differ. + CheckBlocks { + #[structopt(subcommand)] // Note that we mark a field as a subcommand + method: CheckBlockMethod, + + /// Chain name (must be an existing chain, see 'chain list') + #[structopt(empty_values = false)] + chain_name: String, + }, + /// Truncates the whole block cache for the given chain. + Truncate { + /// Chain name (must be an existing chain, see 'chain list') + #[structopt(empty_values = false)] + chain_name: String, + /// Skips confirmation prompt + #[structopt(long, short)] + force: bool, + }, } #[derive(Clone, Debug, StructOpt)] @@ -445,6 +459,23 @@ pub enum IndexCommand { }, } +#[derive(Clone, Debug, StructOpt)] +pub enum CheckBlockMethod { + /// The number of the target block + ByHash { hash: String }, + + /// The hash of the target block + ByNumber { number: i32 }, + + /// A block number range, inclusive on both ends. + ByRange { + #[structopt(long, short)] + from: Option, + #[structopt(long, short)] + to: Option, + }, +} + impl From for config::Opt { fn from(opt: Opt) -> Self { let mut config_opt = config::Opt::default(); @@ -521,7 +552,7 @@ impl Context { &self.node_id, PRIMARY_SHARD.as_str(), primary, - self.registry, + self.metrics_registry(), Arc::new(vec![]), ); pool.skip_setup(); @@ -624,10 +655,42 @@ impl Context { registry, )) } + + async fn ethereum_networks(&self) -> anyhow::Result { + let logger = self.logger.clone(); + let registry = self.metrics_registry(); + create_ethereum_networks(logger, registry, &self.config).await + } + + fn chain_store(self, chain_name: &str) -> anyhow::Result> { + use graph::components::store::BlockStore; + self.store() + .block_store() + .chain_store(&chain_name) + .ok_or_else(|| anyhow::anyhow!("Could not find a network named '{}'", chain_name)) + } + + async fn chain_store_and_adapter( + self, + chain_name: &str, + ) -> anyhow::Result<(Arc, Arc)> { + let ethereum_networks = self.ethereum_networks().await?; + let chain_store = self.chain_store(chain_name)?; + let ethereum_adapter = ethereum_networks + .networks + .get(chain_name) + .map(|adapters| adapters.cheapest()) + .flatten() + .ok_or(anyhow::anyhow!( + "Failed to obtain an Ethereum adapter for chain '{}'", + chain_name + ))?; + Ok((chain_store, ethereum_adapter)) + } } #[tokio::main] -async fn main() { +async fn main() -> anyhow::Result<()> { let opt = Opt::from_args(); let version_label = opt.version_label.clone(); @@ -644,13 +707,8 @@ async fn main() { render_testament!(TESTAMENT) ); - let mut config = match Cfg::load(&logger, &opt.clone().into()) { - Err(e) => { - eprintln!("configuration error: {}", e); - std::process::exit(1); - } - Ok(config) => config, - }; + let mut config = Cfg::load(&logger, &opt.clone().into()).context("Configuration error")?; + if opt.pool_size > 0 && !opt.cmd.use_configured_pool_size() { // Override pool size from configuration for shard in config.stores.values_mut() { @@ -701,7 +759,7 @@ async fn main() { ); use Command::*; - let result = match opt.cmd { + match opt.cmd { TxnSpeed { delay } => commands::txn_speed::run(ctx.primary_pool(), delay), Info { deployment, @@ -862,6 +920,29 @@ async fn main() { let (block_store, primary) = ctx.block_store_and_primary_pool(); commands::chain::remove(primary, block_store, name) } + CheckBlocks { method, chain_name } => { + use commands::check_blocks::{by_hash, by_number, by_range}; + use CheckBlockMethod::*; + let logger = ctx.logger.clone(); + let (chain_store, ethereum_adapter) = + ctx.chain_store_and_adapter(&chain_name).await?; + match method { + ByHash { hash } => { + by_hash(&hash, chain_store, ðereum_adapter, &logger).await + } + ByNumber { number } => { + by_number(number, chain_store, ðereum_adapter, &logger).await + } + ByRange { from, to } => { + by_range(chain_store, ðereum_adapter, from, to, &logger).await + } + } + } + Truncate { chain_name, force } => { + use commands::check_blocks::truncate; + let chain_store = ctx.chain_store(&chain_name)?; + truncate(chain_store, force) + } } } Stats(cmd) => { @@ -916,9 +997,6 @@ async fn main() { } } } - }; - if let Err(e) = result { - die!("error: {}", e) } } diff --git a/node/src/manager/commands/check_blocks.rs b/node/src/manager/commands/check_blocks.rs new file mode 100644 index 00000000000..5d6ad7a9793 --- /dev/null +++ b/node/src/manager/commands/check_blocks.rs @@ -0,0 +1,269 @@ +use graph::{ + anyhow::{bail, ensure}, + components::store::ChainStore as ChainStoreTrait, + prelude::{ + anyhow::{self, anyhow, Context}, + web3::types::H256, + }, + slog::Logger, +}; +use graph_chain_ethereum::{EthereumAdapter, EthereumAdapterTrait}; +use graph_store_postgres::ChainStore; +use std::sync::Arc; + +pub async fn by_hash( + hash: &str, + chain_store: Arc, + ethereum_adapter: &EthereumAdapter, + logger: &Logger, +) -> anyhow::Result<()> { + let block_hash = helpers::parse_block_hash(hash)?; + run(&block_hash, &chain_store, ethereum_adapter, logger).await +} + +pub async fn by_number( + number: i32, + chain_store: Arc, + ethereum_adapter: &EthereumAdapter, + logger: &Logger, +) -> anyhow::Result<()> { + let block_hash = steps::resolve_block_hash_from_block_number(number, &chain_store)?; + run(&block_hash, &chain_store, ethereum_adapter, logger).await +} + +pub async fn by_range( + chain_store: Arc, + ethereum_adapter: &EthereumAdapter, + range_from: Option, + range_to: Option, + logger: &Logger, +) -> anyhow::Result<()> { + // Resolve a range of block numbers into a collection of blocks hashes + let range = ranges::Range::new(range_from, range_to)?; + let max = match range.upper_bound { + // When we have an open upper bound, we use the chain head's block number + None => steps::find_chain_head(&chain_store)?, + Some(x) => x, + }; + // FIXME: This performs poorly. + // TODO: This could be turned into async code + for block_number in range.lower_bound..=max { + println!("Fixing block [{block_number}/{max}]"); + let block_hash = steps::resolve_block_hash_from_block_number(block_number, &chain_store)?; + run(&block_hash, &chain_store, ethereum_adapter, logger).await? + } + Ok(()) +} + +pub fn truncate(chain_store: Arc, skip_confirmation: bool) -> anyhow::Result<()> { + if !skip_confirmation && !helpers::prompt_for_confirmation()? { + println!("Aborting."); + return Ok(()); + } + + chain_store + .truncate_block_cache() + .with_context(|| format!("Failed to truncate block cache for {}", chain_store.chain)) +} + +async fn run( + block_hash: &H256, + chain_store: &ChainStore, + ethereum_adapter: &EthereumAdapter, + logger: &Logger, +) -> anyhow::Result<()> { + let cached_block = steps::fetch_single_cached_block(block_hash, &chain_store)?; + let provider_block = + steps::fetch_single_provider_block(&block_hash, ethereum_adapter, logger).await?; + let diff = steps::diff_block_pair(&cached_block, &provider_block); + steps::report_difference(diff.as_deref(), &block_hash); + if diff.is_some() { + steps::delete_block(&block_hash, &chain_store)?; + } + Ok(()) +} + +mod steps { + use super::*; + use futures::compat::Future01CompatExt; + use graph::prelude::serde_json::{self, Value}; + use json_structural_diff::{colorize as diff_to_string, JsonDiff}; + + /// Queries the [`ChainStore`] about the block hash for the given block number. + /// + /// Errors on a non-unary result. + pub(super) fn resolve_block_hash_from_block_number( + number: i32, + chain_store: &ChainStore, + ) -> anyhow::Result { + let block_hashes = chain_store.block_hashes_by_block_number(number)?; + helpers::get_single_item("block hash", block_hashes) + .with_context(|| format!("Failed to locate block number {} in store", number)) + } + + /// Queries the [`ChainStore`] for a cached block given a block hash. + /// + /// Errors on a non-unary result. + pub(super) fn fetch_single_cached_block( + block_hash: &H256, + chain_store: &ChainStore, + ) -> anyhow::Result { + let blocks = chain_store.blocks(&[*block_hash])?; + if blocks.is_empty() { + bail!("Could not find a block with hash={block_hash:?} in cache") + } + helpers::get_single_item("block", blocks) + .with_context(|| format!("Failed to locate block {} in store.", block_hash)) + } + + /// Fetches a block from a JRPC endpoint. + /// + /// Errors on a non-unary result. + pub(super) async fn fetch_single_provider_block( + block_hash: &H256, + ethereum_adapter: &EthereumAdapter, + logger: &Logger, + ) -> anyhow::Result { + let provider_block = ethereum_adapter + .block_by_hash(&logger, *block_hash) + .compat() + .await + .with_context(|| format!("failed to fetch block {block_hash}"))? + .ok_or_else(|| anyhow!("JRPC provider found no block {block_hash}"))?; + ensure!( + provider_block.hash == Some(*block_hash), + "Provider responded with a different block hash" + ); + serde_json::to_value(provider_block) + .context("failed to parse provider block as a JSON value") + } + + /// Compares two [`serde_json::Value`] values. + /// + /// If they are different, returns a user-friendly string ready to be displayed. + pub(super) fn diff_block_pair(a: &Value, b: &Value) -> Option { + if a == b { + None + } else { + match JsonDiff::diff(a, &b, false).diff { + // The diff could potentially be a `Value::Null`, which is equivalent to not being + // different at all. + None | Some(Value::Null) => None, + Some(diff) => { + // Convert the JSON diff to a pretty-formatted text that will be displayed to + // the user + Some(diff_to_string(&diff, false)) + } + } + } + } + + /// Prints the difference between two [`serde_json::Value`] values to the user. + pub(super) fn report_difference(difference: Option<&str>, hash: &H256) { + if let Some(diff) = difference { + eprintln!("block {hash} diverges from cache:"); + eprintln!("{diff}"); + } else { + println!("Cached block is equal to the same block from provider.") + } + } + + /// Attempts to delete a block from the block cache. + pub(super) fn delete_block(hash: &H256, chain_store: &ChainStore) -> anyhow::Result<()> { + println!("Deleting block {hash} from cache."); + chain_store.delete_blocks(&[&hash])?; + println!("Done."); + Ok(()) + } + + /// Queries the [`ChainStore`] about the chain head. + pub(super) fn find_chain_head(chain_store: &ChainStore) -> anyhow::Result { + let chain_head: Option = chain_store.chain_head_block(&chain_store.chain)?; + chain_head.ok_or_else(|| anyhow!("Could not find the chain head for {}", chain_store.chain)) + } +} + +mod helpers { + use super::*; + use graph::prelude::hex; + use std::io::{self, Write}; + + /// Tries to parse a [`H256`] from a hex string. + pub(super) fn parse_block_hash(hash: &str) -> anyhow::Result { + let hash = hash.trim_start_matches("0x"); + let hash = hex::decode(hash) + .with_context(|| format!("Cannot parse H256 value from string `{}`", hash))?; + Ok(H256::from_slice(&hash)) + } + + /// Asks users if they are certain about truncating the whole block cache. + pub(super) fn prompt_for_confirmation() -> anyhow::Result { + print!("This will delete all cached blocks.\nProceed? [y/N] "); + io::stdout().flush()?; + + let mut answer = String::new(); + io::stdin().read_line(&mut answer)?; + answer.make_ascii_lowercase(); + + match answer.trim() { + "y" | "yes" => Ok(true), + _ => Ok(false), + } + } + + /// Convenience function for extracting values from unary sets. + pub(super) fn get_single_item(name: &'static str, collection: I) -> anyhow::Result + where + I: IntoIterator, + { + let mut iterator = collection.into_iter(); + match (iterator.next(), iterator.next()) { + (Some(a), None) => Ok(a), + (None, None) => bail!("Expected a single {name} but found none."), + _ => bail!("Expected a single {name} but found multiple occurrences."), + } + } +} + +/// Custom range type +mod ranges { + use graph::prelude::anyhow::{self, bail}; + + pub(super) struct Range { + pub(super) lower_bound: i32, + pub(super) upper_bound: Option, + } + + impl Range { + pub fn new(lower_bound: Option, upper_bound: Option) -> anyhow::Result { + let (lower_bound, upper_bound) = match (lower_bound, upper_bound) { + // Invalid cases: + (None, None) => { + bail!( + "This would wipe the whole cache. \ + Use `graphman chain truncate` instead" + ) + } + (Some(0), _) => bail!("Genesis block can't be removed"), + (Some(x), _) | (_, Some(x)) if x < 0 => { + bail!("Negative block number used as range bound: {}", x) + } + (Some(lower), Some(upper)) if upper < lower => bail!( + "Upper bound ({}) can't be smaller than lower bound ({})", + upper, + lower + ), + + // Valid cases: + // Open lower bounds are set to the lowest possible block number + (None, upper @ Some(_)) => (1, upper), + (Some(lower), upper) => (lower, upper), + }; + + Ok(Self { + lower_bound, + upper_bound, + }) + } + } +} diff --git a/node/src/manager/commands/mod.rs b/node/src/manager/commands/mod.rs index 9fd201c0c2c..c3ae31bbb79 100644 --- a/node/src/manager/commands/mod.rs +++ b/node/src/manager/commands/mod.rs @@ -1,5 +1,6 @@ pub mod assign; pub mod chain; +pub mod check_blocks; pub mod config; pub mod copy; pub mod create; diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index e6a09f2efcb..de0a8502743 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -47,13 +47,20 @@ pub use data::Storage; /// Encapuslate access to the blocks table for a chain. mod data { - use diesel::connection::SimpleConnection; - use diesel::dsl::sql; - use diesel::pg::{Pg, PgConnection}; - use diesel::serialize::Output; - use diesel::sql_types::{BigInt, Binary, Bytea, Integer, Jsonb, Text}; - use diesel::types::{FromSql, ToSql}; - use diesel::{delete, insert_into, prelude::*, sql_query, update}; + use diesel::sql_types::{Array, Binary}; + use diesel::{connection::SimpleConnection, insert_into}; + use diesel::{delete, prelude::*, sql_query}; + use diesel::{dsl::sql, pg::PgConnection}; + use diesel::{ + pg::Pg, + serialize::Output, + sql_types::Text, + types::{FromSql, ToSql}, + }; + use diesel::{ + sql_types::{BigInt, Bytea, Integer, Jsonb}, + update, + }; use diesel_dynamic_schema as dds; use graph::blockchain::{Block, BlockHash}; use graph::constraint_violation; @@ -63,7 +70,6 @@ mod data { use graph::prelude::{ serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, StoreError, }; - use std::fmt; use std::iter::FromIterator; use std::{convert::TryFrom, io::Write}; @@ -886,6 +892,46 @@ mod data { } } + pub(super) fn delete_blocks_by_hash( + &self, + conn: &PgConnection, + chain: &str, + block_hashes: &[&H256], + ) -> Result { + use diesel::dsl::any; + match self { + Storage::Shared => { + use public::ethereum_blocks as b; + + let hashes: Vec = block_hashes + .iter() + .map(|hash| format!("{hash:x}")) + .collect(); + + diesel::delete(b::table) + .filter(b::network_name.eq(chain)) + .filter(b::hash.eq(any(hashes))) + .filter(b::number.gt(0)) // keep genesis + .execute(conn) + .map_err(Error::from) + } + Storage::Private(Schema { blocks, .. }) => { + let query = format!( + "delete from {} where hash = any($1) and number > 0", + blocks.qname + ); + + let hashes: Vec<&[u8]> = + block_hashes.iter().map(|hash| hash.as_bytes()).collect(); + + sql_query(query) + .bind::, _>(hashes) + .execute(conn) + .map_err(Error::from) + } + } + } + pub(super) fn get_call_and_access( &self, conn: &PgConnection, @@ -1332,6 +1378,12 @@ impl ChainStore { .set_chain(&conn, &self.chain, genesis_hash, chain); } + pub fn delete_blocks(&self, block_hashes: &[&H256]) -> Result { + let conn = self.get_conn()?; + self.storage + .delete_blocks_by_hash(&conn, &self.chain, block_hashes) + } + pub fn truncate_block_cache(&self) -> Result<(), StoreError> { let conn = self.get_conn()?; self.storage.truncate_block_cache(&conn)?;