1
- use std:: { collections:: HashMap , env, num:: ParseIntError , sync:: Arc , time:: Duration } ;
2
-
3
1
use config:: PoolSize ;
4
2
use git_testament:: { git_testament, render_testament} ;
5
3
use graph:: { data:: graphql:: effort:: LoadManager , prelude:: chrono, prometheus:: Registry } ;
6
- use graph_core:: MetricsRegistry ;
7
- use graph_graphql:: prelude:: GraphQlRunner ;
8
- use lazy_static:: lazy_static;
9
- use structopt:: StructOpt ;
10
-
11
4
use graph:: {
12
5
log:: logger,
13
- prelude:: { info, o, slog, tokio, Logger , NodeId , ENV_VARS } ,
6
+ prelude:: {
7
+ anyhow:: { self , Context as AnyhowContextTrait } ,
8
+ info, o, slog, tokio, Logger , NodeId , ENV_VARS ,
9
+ } ,
14
10
url:: Url ,
15
11
} ;
12
+ use graph_chain_ethereum:: EthereumNetworks ;
13
+ use graph_core:: MetricsRegistry ;
14
+ use graph_graphql:: prelude:: GraphQlRunner ;
15
+ use graph_node:: config:: { self , Config as Cfg } ;
16
+ use graph_node:: manager:: commands;
16
17
use graph_node:: {
18
+ chain:: create_ethereum_networks,
17
19
manager:: { deployment:: DeploymentSearch , PanicSubscriptionManager } ,
18
20
store_builder:: StoreBuilder ,
19
21
MetricsContext ,
@@ -22,22 +24,14 @@ use graph_store_postgres::{
22
24
connection_pool:: ConnectionPool , BlockStore , Shard , Store , SubgraphStore , SubscriptionManager ,
23
25
PRIMARY_SHARD ,
24
26
} ;
25
-
26
- use graph_node :: config :: { self , Config as Cfg } ;
27
- use graph_node :: manager :: commands ;
27
+ use lazy_static :: lazy_static ;
28
+ use std :: { collections :: HashMap , env , num :: ParseIntError , sync :: Arc , time :: Duration } ;
29
+ use structopt :: StructOpt ;
28
30
29
31
const VERSION_LABEL_KEY : & str = "version" ;
30
32
31
33
git_testament ! ( TESTAMENT ) ;
32
34
33
- macro_rules! die {
34
- ( $fmt: expr, $( $arg: tt) * ) => { {
35
- use std:: io:: Write ;
36
- writeln!( & mut :: std:: io:: stderr( ) , $fmt, $( $arg) * ) . unwrap( ) ;
37
- :: std:: process:: exit( 1 )
38
- } }
39
- }
40
-
41
35
lazy_static ! {
42
36
static ref RENDERED_TESTAMENT : String = render_testament!( TESTAMENT ) ;
43
37
}
@@ -207,6 +201,9 @@ pub enum Command {
207
201
208
202
/// Manage database indexes
209
203
Index ( IndexCommand ) ,
204
+
205
+ /// Compares cached blocks with fresh ones and report any differences and affected subgraphs.
206
+ FixBlock ( FixBlockCommand ) ,
210
207
}
211
208
212
209
impl Command {
@@ -445,6 +442,36 @@ pub enum IndexCommand {
445
442
} ,
446
443
}
447
444
445
+ #[ derive( Clone , Debug , StructOpt ) ]
446
+ pub struct FixBlockCommand {
447
+ /// Network name (must fit one of the chain)
448
+ #[ structopt( empty_values = false ) ]
449
+ network_name : String ,
450
+ #[ structopt( subcommand) ]
451
+ method : FixBlockSubCommand ,
452
+ }
453
+
454
+ #[ derive( Clone , Debug , StructOpt ) ]
455
+ enum FixBlockSubCommand {
456
+ /// The number of the target block
457
+ ByHash { hash : String } ,
458
+
459
+ /// The hash of the target block
460
+ ByNumber { number : i32 } ,
461
+
462
+ /// A block number range in the form: `start..end`
463
+ ///
464
+ /// All valid [`std::ops::Range`] definitions are accepted.
465
+ ByRange { range : String } ,
466
+
467
+ /// Truncates the whole block cache for the given chain.
468
+ TruncateCache {
469
+ /// Skips prompt for confirming the operation.
470
+ #[ structopt( long) ]
471
+ no_confirm : bool ,
472
+ } ,
473
+ }
474
+
448
475
impl From < Opt > for config:: Opt {
449
476
fn from ( opt : Opt ) -> Self {
450
477
let mut config_opt = config:: Opt :: default ( ) ;
@@ -514,21 +541,21 @@ impl Context {
514
541
self . node_id . clone ( )
515
542
}
516
543
517
- fn primary_pool ( self ) -> ConnectionPool {
544
+ fn primary_pool ( & self ) -> ConnectionPool {
518
545
let primary = self . config . primary_store ( ) ;
519
546
let pool = StoreBuilder :: main_pool (
520
547
& self . logger ,
521
548
& self . node_id ,
522
549
PRIMARY_SHARD . as_str ( ) ,
523
550
primary,
524
- self . registry ,
551
+ self . metrics_registry ( ) ,
525
552
Arc :: new ( vec ! [ ] ) ,
526
553
) ;
527
554
pool. skip_setup ( ) ;
528
555
pool
529
556
}
530
557
531
- fn subgraph_store ( self ) -> Arc < SubgraphStore > {
558
+ fn subgraph_store ( & self ) -> Arc < SubgraphStore > {
532
559
self . store_and_pools ( ) . 0 . subgraph_store ( )
533
560
}
534
561
@@ -549,12 +576,12 @@ impl Context {
549
576
( primary_pool, mgr)
550
577
}
551
578
552
- fn store ( self ) -> Arc < Store > {
579
+ fn store ( & self ) -> Arc < Store > {
553
580
let ( store, _) = self . store_and_pools ( ) ;
554
581
store
555
582
}
556
583
557
- fn pools ( self ) -> HashMap < Shard , ConnectionPool > {
584
+ fn pools ( & self ) -> HashMap < Shard , ConnectionPool > {
558
585
let ( _, pools) = self . store_and_pools ( ) ;
559
586
pools
560
587
}
@@ -570,13 +597,13 @@ impl Context {
570
597
. await
571
598
}
572
599
573
- fn store_and_pools ( self ) -> ( Arc < Store > , HashMap < Shard , ConnectionPool > ) {
600
+ fn store_and_pools ( & self ) -> ( Arc < Store > , HashMap < Shard , ConnectionPool > ) {
574
601
let ( subgraph_store, pools) = StoreBuilder :: make_subgraph_store_and_pools (
575
602
& self . logger ,
576
603
& self . node_id ,
577
604
& self . config ,
578
- self . fork_base ,
579
- self . registry ,
605
+ self . fork_base . clone ( ) ,
606
+ self . registry . clone ( ) ,
580
607
) ;
581
608
582
609
for pool in pools. values ( ) {
@@ -594,20 +621,20 @@ impl Context {
594
621
( store, pools)
595
622
}
596
623
597
- fn store_and_primary ( self ) -> ( Arc < Store > , ConnectionPool ) {
624
+ fn store_and_primary ( & self ) -> ( Arc < Store > , ConnectionPool ) {
598
625
let ( store, pools) = self . store_and_pools ( ) ;
599
626
let primary = pools. get ( & * PRIMARY_SHARD ) . expect ( "there is a primary pool" ) ;
600
627
( store, primary. clone ( ) )
601
628
}
602
629
603
- fn block_store_and_primary_pool ( self ) -> ( Arc < BlockStore > , ConnectionPool ) {
630
+ fn block_store_and_primary_pool ( & self ) -> ( Arc < BlockStore > , ConnectionPool ) {
604
631
let ( store, pools) = self . store_and_pools ( ) ;
605
632
606
633
let primary = pools. get ( & * PRIMARY_SHARD ) . unwrap ( ) ;
607
634
( store. block_store ( ) , primary. clone ( ) )
608
635
}
609
636
610
- fn graphql_runner ( self ) -> Arc < GraphQlRunner < Store , PanicSubscriptionManager > > {
637
+ fn graphql_runner ( & self ) -> Arc < GraphQlRunner < Store , PanicSubscriptionManager > > {
611
638
let logger = self . logger . clone ( ) ;
612
639
let registry = self . registry . clone ( ) ;
613
640
@@ -624,10 +651,16 @@ impl Context {
624
651
registry,
625
652
) )
626
653
}
654
+
655
+ async fn ethereum_networks ( & self ) -> anyhow:: Result < EthereumNetworks > {
656
+ let logger = self . logger . clone ( ) ;
657
+ let registry = self . metrics_registry ( ) ;
658
+ create_ethereum_networks ( logger, registry, & self . config ) . await
659
+ }
627
660
}
628
661
629
662
#[ tokio:: main]
630
- async fn main ( ) {
663
+ async fn main ( ) -> anyhow :: Result < ( ) > {
631
664
let opt = Opt :: from_args ( ) ;
632
665
633
666
let version_label = opt. version_label . clone ( ) ;
@@ -644,13 +677,8 @@ async fn main() {
644
677
render_testament!( TESTAMENT )
645
678
) ;
646
679
647
- let mut config = match Cfg :: load ( & logger, & opt. clone ( ) . into ( ) ) {
648
- Err ( e) => {
649
- eprintln ! ( "configuration error: {}" , e) ;
650
- std:: process:: exit ( 1 ) ;
651
- }
652
- Ok ( config) => config,
653
- } ;
680
+ let mut config = Cfg :: load ( & logger, & opt. clone ( ) . into ( ) ) . context ( "Configuration error" ) ?;
681
+
654
682
if opt. pool_size > 0 && !opt. cmd . use_configured_pool_size ( ) {
655
683
// Override pool size from configuration
656
684
for shard in config. stores . values_mut ( ) {
@@ -701,7 +729,7 @@ async fn main() {
701
729
) ;
702
730
703
731
use Command :: * ;
704
- let result = match opt. cmd {
732
+ match opt. cmd {
705
733
TxnSpeed { delay } => commands:: txn_speed:: run ( ctx. primary_pool ( ) , delay) ,
706
734
Info {
707
735
deployment,
@@ -916,9 +944,42 @@ async fn main() {
916
944
}
917
945
}
918
946
}
919
- } ;
920
- if let Err ( e) = result {
921
- die ! ( "error: {}" , e)
947
+ FixBlock ( cmd) => {
948
+ use commands:: fix_block:: { by_hash, by_number, by_range, truncate} ;
949
+ use graph:: components:: store:: BlockStore as BlockStoreTrait ;
950
+ use FixBlockSubCommand :: * ;
951
+
952
+ if let Some ( chain_store) = ctx. store ( ) . block_store ( ) . chain_store ( & cmd. network_name ) {
953
+ let ethereum_networks = ctx. ethereum_networks ( ) . await ?;
954
+ let ethereum_adapter = ethereum_networks
955
+ . networks
956
+ . get ( & cmd. network_name )
957
+ . map ( |adapters| adapters. cheapest ( ) )
958
+ . flatten ( )
959
+ . ok_or ( anyhow:: anyhow!(
960
+ "Failed to obtain an Ethereum adapter for chain '{}'" ,
961
+ cmd. network_name
962
+ ) ) ?;
963
+
964
+ match cmd. method {
965
+ ByHash { hash } => {
966
+ by_hash ( & hash, chain_store, & ethereum_adapter, & ctx. logger ) . await
967
+ }
968
+ ByNumber { number } => {
969
+ by_number ( number, chain_store, & ethereum_adapter, & ctx. logger ) . await
970
+ }
971
+ ByRange { range } => {
972
+ by_range ( chain_store, & ethereum_adapter, & range, & ctx. logger ) . await
973
+ }
974
+ TruncateCache { no_confirm } => truncate ( chain_store, no_confirm) ,
975
+ }
976
+ } else {
977
+ Err ( anyhow:: anyhow!(
978
+ "Could not find a network named '{}'" ,
979
+ & cmd. network_name
980
+ ) )
981
+ }
982
+ }
922
983
}
923
984
}
924
985
0 commit comments