Skip to content

Commit f84e6b8

Browse files
authored
node/manager: graphman fix block command
1 parent 67873ac commit f84e6b8

File tree

6 files changed

+492
-40
lines changed

6 files changed

+492
-40
lines changed

Cargo.lock

+51
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ diesel = "1.4.8"
4444
fail = "0.5"
4545
http = "0.2.5" # must be compatible with the version rust-web3 uses
4646
prometheus = { version ="0.13.0", features = ["push"] }
47+
json-structural-diff = {version = "0.1", features = ["colorize"] }
4748

4849
[dev-dependencies]
4950
assert_cli = "0.6"

node/src/bin/manager.rs

+110-32
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,38 @@
1-
use std::{collections::HashMap, env, num::ParseIntError, sync::Arc, time::Duration};
2-
31
use config::PoolSize;
42
use git_testament::{git_testament, render_testament};
53
use graph::{data::graphql::effort::LoadManager, prelude::chrono, prometheus::Registry};
6-
use graph_core::MetricsRegistry;
7-
use graph_graphql::prelude::GraphQlRunner;
8-
use lazy_static::lazy_static;
9-
use structopt::StructOpt;
10-
114
use graph::{
125
log::logger,
13-
prelude::{info, o, slog, tokio, Logger, NodeId, ENV_VARS},
6+
prelude::{
7+
anyhow::{self, Context as AnyhowContextTrait},
8+
info, o, slog, tokio, Logger, NodeId, ENV_VARS,
9+
},
1410
url::Url,
1511
};
12+
use graph_chain_ethereum::{EthereumAdapter, EthereumNetworks};
13+
use graph_core::MetricsRegistry;
14+
use graph_graphql::prelude::GraphQlRunner;
15+
use graph_node::config::{self, Config as Cfg};
16+
use graph_node::manager::commands;
1617
use graph_node::{
18+
chain::create_ethereum_networks,
1719
manager::{deployment::DeploymentSearch, PanicSubscriptionManager},
1820
store_builder::StoreBuilder,
1921
MetricsContext,
2022
};
23+
use graph_store_postgres::ChainStore;
2124
use graph_store_postgres::{
2225
connection_pool::ConnectionPool, BlockStore, NotificationSender, Shard, Store, SubgraphStore,
2326
SubscriptionManager, PRIMARY_SHARD,
2427
};
25-
26-
use graph_node::config::{self, Config as Cfg};
27-
use graph_node::manager::commands;
28+
use lazy_static::lazy_static;
29+
use std::{collections::HashMap, env, num::ParseIntError, sync::Arc, time::Duration};
30+
use structopt::StructOpt;
2831

2932
const VERSION_LABEL_KEY: &str = "version";
3033

3134
git_testament!(TESTAMENT);
3235

33-
macro_rules! die {
34-
($fmt:expr, $($arg:tt)*) => {{
35-
use std::io::Write;
36-
writeln!(&mut ::std::io::stderr(), $fmt, $($arg)*).unwrap();
37-
::std::process::exit(1)
38-
}}
39-
}
40-
4136
lazy_static! {
4237
static ref RENDERED_TESTAMENT: String = render_testament!(TESTAMENT);
4338
}
@@ -346,6 +341,25 @@ pub enum ChainCommand {
346341
/// There must be no deployments using that chain. If there are, the
347342
/// subgraphs and/or deployments using the chain must first be removed
348343
Remove { name: String },
344+
345+
/// Compares cached blocks with fresh ones and clears the block cache when they differ.
346+
CheckBlocks {
347+
#[structopt(subcommand)] // Note that we mark a field as a subcommand
348+
method: CheckBlockMethod,
349+
350+
/// Chain name (must be an existing chain, see 'chain list')
351+
#[structopt(empty_values = false)]
352+
chain_name: String,
353+
},
354+
/// Truncates the whole block cache for the given chain.
355+
Truncate {
356+
/// Chain name (must be an existing chain, see 'chain list')
357+
#[structopt(empty_values = false)]
358+
chain_name: String,
359+
/// Skips confirmation prompt
360+
#[structopt(long, short)]
361+
force: bool,
362+
},
349363
}
350364

351365
#[derive(Clone, Debug, StructOpt)]
@@ -445,6 +459,23 @@ pub enum IndexCommand {
445459
},
446460
}
447461

462+
#[derive(Clone, Debug, StructOpt)]
463+
pub enum CheckBlockMethod {
464+
/// The number of the target block
465+
ByHash { hash: String },
466+
467+
/// The hash of the target block
468+
ByNumber { number: i32 },
469+
470+
/// A block number range, inclusive on both ends.
471+
ByRange {
472+
#[structopt(long, short)]
473+
from: Option<i32>,
474+
#[structopt(long, short)]
475+
to: Option<i32>,
476+
},
477+
}
478+
448479
impl From<Opt> for config::Opt {
449480
fn from(opt: Opt) -> Self {
450481
let mut config_opt = config::Opt::default();
@@ -525,7 +556,7 @@ impl Context {
525556
&self.node_id,
526557
PRIMARY_SHARD.as_str(),
527558
primary,
528-
self.registry,
559+
self.metrics_registry(),
529560
Arc::new(vec![]),
530561
);
531562
pool.skip_setup();
@@ -628,10 +659,42 @@ impl Context {
628659
registry,
629660
))
630661
}
662+
663+
async fn ethereum_networks(&self) -> anyhow::Result<EthereumNetworks> {
664+
let logger = self.logger.clone();
665+
let registry = self.metrics_registry();
666+
create_ethereum_networks(logger, registry, &self.config).await
667+
}
668+
669+
fn chain_store(self, chain_name: &str) -> anyhow::Result<Arc<ChainStore>> {
670+
use graph::components::store::BlockStore;
671+
self.store()
672+
.block_store()
673+
.chain_store(&chain_name)
674+
.ok_or_else(|| anyhow::anyhow!("Could not find a network named '{}'", chain_name))
675+
}
676+
677+
async fn chain_store_and_adapter(
678+
self,
679+
chain_name: &str,
680+
) -> anyhow::Result<(Arc<ChainStore>, Arc<EthereumAdapter>)> {
681+
let ethereum_networks = self.ethereum_networks().await?;
682+
let chain_store = self.chain_store(chain_name)?;
683+
let ethereum_adapter = ethereum_networks
684+
.networks
685+
.get(chain_name)
686+
.map(|adapters| adapters.cheapest())
687+
.flatten()
688+
.ok_or(anyhow::anyhow!(
689+
"Failed to obtain an Ethereum adapter for chain '{}'",
690+
chain_name
691+
))?;
692+
Ok((chain_store, ethereum_adapter))
693+
}
631694
}
632695

633696
#[tokio::main]
634-
async fn main() {
697+
async fn main() -> anyhow::Result<()> {
635698
let opt = Opt::from_args();
636699

637700
let version_label = opt.version_label.clone();
@@ -648,13 +711,8 @@ async fn main() {
648711
render_testament!(TESTAMENT)
649712
);
650713

651-
let mut config = match Cfg::load(&logger, &opt.clone().into()) {
652-
Err(e) => {
653-
eprintln!("configuration error: {}", e);
654-
std::process::exit(1);
655-
}
656-
Ok(config) => config,
657-
};
714+
let mut config = Cfg::load(&logger, &opt.clone().into()).context("Configuration error")?;
715+
658716
if opt.pool_size > 0 && !opt.cmd.use_configured_pool_size() {
659717
// Override pool size from configuration
660718
for shard in config.stores.values_mut() {
@@ -705,7 +763,7 @@ async fn main() {
705763
);
706764

707765
use Command::*;
708-
let result = match opt.cmd {
766+
match opt.cmd {
709767
TxnSpeed { delay } => commands::txn_speed::run(ctx.primary_pool(), delay),
710768
Info {
711769
deployment,
@@ -869,6 +927,29 @@ async fn main() {
869927
let (block_store, primary) = ctx.block_store_and_primary_pool();
870928
commands::chain::remove(primary, block_store, name)
871929
}
930+
CheckBlocks { method, chain_name } => {
931+
use commands::check_blocks::{by_hash, by_number, by_range};
932+
use CheckBlockMethod::*;
933+
let logger = ctx.logger.clone();
934+
let (chain_store, ethereum_adapter) =
935+
ctx.chain_store_and_adapter(&chain_name).await?;
936+
match method {
937+
ByHash { hash } => {
938+
by_hash(&hash, chain_store, &ethereum_adapter, &logger).await
939+
}
940+
ByNumber { number } => {
941+
by_number(number, chain_store, &ethereum_adapter, &logger).await
942+
}
943+
ByRange { from, to } => {
944+
by_range(chain_store, &ethereum_adapter, from, to, &logger).await
945+
}
946+
}
947+
}
948+
Truncate { chain_name, force } => {
949+
use commands::check_blocks::truncate;
950+
let chain_store = ctx.chain_store(&chain_name)?;
951+
truncate(chain_store, force)
952+
}
872953
}
873954
}
874955
Stats(cmd) => {
@@ -923,9 +1004,6 @@ async fn main() {
9231004
}
9241005
}
9251006
}
926-
};
927-
if let Err(e) = result {
928-
die!("error: {}", e)
9291007
}
9301008
}
9311009

0 commit comments

Comments
 (0)