Skip to content

Commit 2848b44

Browse files
committed
code review changes
1 parent 4f97651 commit 2848b44

File tree

15 files changed

+89
-83
lines changed

15 files changed

+89
-83
lines changed

chain/arweave/src/chain.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use graph::blockchain::{
66
EmptyNodeCapabilities,
77
};
88
use graph::cheap_clone::CheapClone;
9-
use graph::components::store::WritableStore;
9+
use graph::components::store::StoreStateTracker;
1010
use graph::data::subgraph::UnifiedMappingApiVersion;
1111
use graph::firehose::FirehoseEndpoint;
1212
use graph::prelude::MetricsRegistry;
@@ -110,7 +110,7 @@ impl Blockchain for Chain {
110110
async fn new_block_stream(
111111
&self,
112112
deployment: DeploymentLocator,
113-
store: Arc<dyn WritableStore>,
113+
store: impl StoreStateTracker,
114114
start_blocks: Vec<BlockNumber>,
115115
filter: Arc<Self::TriggerFilter>,
116116
unified_api_version: UnifiedMappingApiVersion,

chain/cosmos/src/chain.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use graph::blockchain::client::ChainClient;
55
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
66
use graph::blockchain::{BasicBlockchainBuilder, BlockIngestor, BlockchainBuilder};
77
use graph::cheap_clone::CheapClone;
8-
use graph::components::store::WritableStore;
8+
use graph::components::store::StoreStateTracker;
99
use graph::data::subgraph::UnifiedMappingApiVersion;
1010
use graph::prelude::MetricsRegistry;
1111
use graph::{
@@ -104,7 +104,7 @@ impl Blockchain for Chain {
104104
async fn new_block_stream(
105105
&self,
106106
deployment: DeploymentLocator,
107-
store: Arc<dyn WritableStore>,
107+
store: impl StoreStateTracker,
108108
start_blocks: Vec<BlockNumber>,
109109
filter: Arc<Self::TriggerFilter>,
110110
unified_api_version: UnifiedMappingApiVersion,

chain/ethereum/src/chain.rs

+29-25
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use anyhow::{Context, Error};
33
use graph::blockchain::client::ChainClient;
44
use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transforms};
55
use graph::blockchain::{BlockIngestor, BlockchainKind, TriggersAdapterSelector};
6-
use graph::components::store::WritableStore;
6+
use graph::components::store::StoreStateTracker;
77
use graph::data::subgraph::UnifiedMappingApiVersion;
88
use graph::firehose::{FirehoseEndpoint, ForkStep};
99
use graph::prelude::{
@@ -349,39 +349,43 @@ impl Blockchain for Chain {
349349
self.adapter_selector
350350
.triggers_adapter(loc, capabilities, unified_api_version)
351351
}
352+
352353
async fn new_block_stream(
353354
&self,
354355
deployment: DeploymentLocator,
355-
store: Arc<dyn WritableStore>,
356+
store: impl StoreStateTracker,
356357
start_blocks: Vec<BlockNumber>,
357358
filter: Arc<Self::TriggerFilter>,
358359
unified_api_version: UnifiedMappingApiVersion,
359360
) -> Result<Box<dyn BlockStream<Self>>, Error> {
360361
let current_ptr = store.block_ptr();
361-
if !self.chain_client().is_firehose() {
362-
return self
363-
.block_stream_builder
364-
.build_polling(
365-
self,
366-
deployment,
367-
start_blocks,
368-
current_ptr,
369-
filter,
370-
unified_api_version,
371-
)
372-
.await;
362+
match self.chain_client().as_ref() {
363+
ChainClient::Rpc(_) => {
364+
self.block_stream_builder
365+
.build_polling(
366+
self,
367+
deployment,
368+
start_blocks,
369+
current_ptr,
370+
filter,
371+
unified_api_version,
372+
)
373+
.await
374+
}
375+
ChainClient::Firehose(_) => {
376+
self.block_stream_builder
377+
.build_firehose(
378+
self,
379+
deployment,
380+
store.block_cursor(),
381+
start_blocks,
382+
current_ptr,
383+
filter,
384+
unified_api_version,
385+
)
386+
.await
387+
}
373388
}
374-
self.block_stream_builder
375-
.build_firehose(
376-
self,
377-
deployment,
378-
store.block_cursor(),
379-
start_blocks,
380-
current_ptr,
381-
filter,
382-
unified_api_version,
383-
)
384-
.await
385389
}
386390

387391
fn chain_store(&self) -> Arc<dyn ChainStore> {

chain/near/src/chain.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use graph::blockchain::client::ChainClient;
33
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
44
use graph::blockchain::{BasicBlockchainBuilder, BlockIngestor, BlockchainBuilder, BlockchainKind};
55
use graph::cheap_clone::CheapClone;
6-
use graph::components::store::WritableStore;
6+
use graph::components::store::StoreStateTracker;
77
use graph::data::subgraph::UnifiedMappingApiVersion;
88
use graph::firehose::FirehoseEndpoint;
99
use graph::prelude::{MetricsRegistry, TryFutureExt};
@@ -155,7 +155,7 @@ impl Blockchain for Chain {
155155
async fn new_block_stream(
156156
&self,
157157
deployment: DeploymentLocator,
158-
store: Arc<dyn WritableStore>,
158+
store: impl StoreStateTracker,
159159
start_blocks: Vec<BlockNumber>,
160160
filter: Arc<Self::TriggerFilter>,
161161
unified_api_version: UnifiedMappingApiVersion,

chain/substreams/src/chain.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::{data_source::*, EntityChanges, TriggerData, TriggerFilter, TriggersA
22
use anyhow::Error;
33
use graph::blockchain::client::ChainClient;
44
use graph::blockchain::{BlockIngestor, EmptyNodeCapabilities};
5-
use graph::components::store::WritableStore;
5+
use graph::components::store::StoreStateTracker;
66
use graph::firehose::FirehoseEndpoints;
77
use graph::prelude::{BlockHash, LoggerFactory, MetricsRegistry};
88
use graph::{
@@ -107,7 +107,7 @@ impl Blockchain for Chain {
107107
async fn new_block_stream(
108108
&self,
109109
deployment: DeploymentLocator,
110-
store: Arc<dyn WritableStore>,
110+
store: impl StoreStateTracker,
111111
start_blocks: Vec<BlockNumber>,
112112
filter: Arc<Self::TriggerFilter>,
113113
unified_api_version: UnifiedMappingApiVersion,
@@ -163,7 +163,7 @@ impl Blockchain for Chain {
163163
}
164164

165165
fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
166-
panic!("Substreams rely on the block ingestor from the network they are processing")
166+
unreachable!("Substreams rely on the block ingestor from the network they are processing")
167167
}
168168
}
169169

graph/src/blockchain/firehose_block_ingestor.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ where
187187

188188
info!(
189189
logger,
190-
"Blockstream disconnected, connecting"; "endpoint uri" => format_args!("{}", endpoint), "cursor" => format_args!("{}", latest_cursor),
190+
"Trying to reconnect the Blockstream after disconnect"; "endpoint uri" => format_args!("{}", endpoint), "cursor" => format_args!("{}", latest_cursor),
191191
);
192192

193193
let result = endpoint

graph/src/blockchain/firehose_block_stream.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ where
121121
F: FirehoseMapper<C> + 'static,
122122
{
123123
if !client.is_firehose() {
124-
panic!("Firehose block stream called with rpc endpoint");
124+
unreachable!("Firehose block stream called with rpc endpoint");
125125
}
126126

127127
let manifest_start_block_num = start_blocks

graph/src/blockchain/mock.rs

+2-15
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
22
components::{
33
link_resolver::LinkResolver,
4-
store::{BlockNumber, DeploymentLocator, WritableStore},
4+
store::{BlockNumber, DeploymentLocator, StoreStateTracker},
55
},
66
data::subgraph::UnifiedMappingApiVersion,
77
prelude::DataSourceTemplateInfo,
@@ -272,19 +272,6 @@ impl<C: Blockchain> RuntimeAdapter<C> for MockRuntimeAdapter {
272272
}
273273
}
274274

275-
pub struct MockBlockIngestor {}
276-
277-
#[async_trait]
278-
impl BlockIngestor for MockBlockIngestor {
279-
async fn run(self: Box<Self>) {
280-
todo!()
281-
}
282-
283-
fn network_name(&self) -> String {
284-
todo!()
285-
}
286-
}
287-
288275
#[async_trait]
289276
impl Blockchain for MockBlockchain {
290277
const KIND: BlockchainKind = BlockchainKind::Ethereum;
@@ -320,7 +307,7 @@ impl Blockchain for MockBlockchain {
320307
async fn new_block_stream(
321308
&self,
322309
_deployment: DeploymentLocator,
323-
_store: Arc<dyn WritableStore>,
310+
_store: impl StoreStateTracker,
324311
_start_blocks: Vec<BlockNumber>,
325312
_filter: Arc<Self::TriggerFilter>,
326313
_unified_api_version: UnifiedMappingApiVersion,

graph/src/blockchain/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ mod types;
1616
// Try to reexport most of the necessary types
1717
use crate::{
1818
cheap_clone::CheapClone,
19-
components::store::{DeploymentLocator, StoredDynamicDataSource, WritableStore},
19+
components::store::{DeploymentLocator, StoreStateTracker, StoredDynamicDataSource},
2020
data::subgraph::UnifiedMappingApiVersion,
2121
data_source,
2222
prelude::DataSourceContext,
@@ -182,7 +182,7 @@ pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static {
182182
async fn new_block_stream(
183183
&self,
184184
deployment: DeploymentLocator,
185-
store: Arc<dyn WritableStore>,
185+
store: impl StoreStateTracker,
186186
start_blocks: Vec<BlockNumber>,
187187
filter: Arc<Self::TriggerFilter>,
188188
unified_api_version: UnifiedMappingApiVersion,

graph/src/components/store/traits.rs

+19-6
Original file line numberDiff line numberDiff line change
@@ -202,19 +202,32 @@ impl<T: ?Sized + ReadStore> ReadStore for Arc<T> {
202202
}
203203
}
204204

205-
/// A view of the store for indexing. All indexing-related operations need
206-
/// to go through this trait. Methods in this trait will never return a
207-
/// `StoreError::DatabaseUnavailable`. Instead, they will retry the
208-
/// operation indefinitely until it succeeds.
209-
#[async_trait]
210-
pub trait WritableStore: ReadStore {
205+
pub trait StoreStateTracker: Sync + Send + 'static {
211206
/// Get a pointer to the most recently processed block in the subgraph.
212207
fn block_ptr(&self) -> Option<BlockPtr>;
213208

214209
/// Returns the Firehose `cursor` this deployment is currently at in the block stream of events. This
215210
/// is used when re-connecting a Firehose stream to start back exactly where we left off.
216211
fn block_cursor(&self) -> FirehoseCursor;
212+
}
213+
214+
// This silly impl is needed until https://github.com/rust-lang/rust/issues/65991 is stable.
215+
impl<T: ?Sized + StoreStateTracker> StoreStateTracker for Arc<T> {
216+
fn block_ptr(&self) -> Option<BlockPtr> {
217+
(**self).block_ptr()
218+
}
217219

220+
fn block_cursor(&self) -> FirehoseCursor {
221+
(**self).block_cursor()
222+
}
223+
}
224+
225+
/// A view of the store for indexing. All indexing-related operations need
226+
/// to go through this trait. Methods in this trait will never return a
227+
/// `StoreError::DatabaseUnavailable`. Instead, they will retry the
228+
/// operation indefinitely until it succeeds.
229+
#[async_trait]
230+
pub trait WritableStore: ReadStore + StoreStateTracker {
218231
/// Start an existing subgraph deployment.
219232
async fn start_subgraph_deployment(&self, logger: &Logger) -> Result<(), StoreError>;
220233

graph/tests/entity_cache.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::collections::{BTreeMap, BTreeSet};
1010
use std::sync::Arc;
1111

1212
use graph::components::store::{
13-
EntityKey, EntityType, ReadStore, StoredDynamicDataSource, WritableStore,
13+
EntityKey, EntityType, ReadStore, StoreStateTracker, StoredDynamicDataSource, WritableStore,
1414
};
1515
use graph::{
1616
components::store::{DeploymentId, DeploymentLocator},
@@ -63,17 +63,18 @@ impl ReadStore for MockStore {
6363
SCHEMA.clone()
6464
}
6565
}
66-
67-
#[async_trait]
68-
impl WritableStore for MockStore {
66+
impl StoreStateTracker for MockStore {
6967
fn block_ptr(&self) -> Option<BlockPtr> {
7068
unimplemented!()
7169
}
7270

7371
fn block_cursor(&self) -> FirehoseCursor {
7472
unimplemented!()
7573
}
74+
}
7675

76+
#[async_trait]
77+
impl WritableStore for MockStore {
7778
async fn start_subgraph_deployment(&self, _: &Logger) -> Result<(), StoreError> {
7879
unimplemented!()
7980
}

node/src/bin/manager.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -991,7 +991,7 @@ async fn main() -> anyhow::Result<()> {
991991
(ctx.primary_pool(), None)
992992
};
993993

994-
Ok(match deployment {
994+
match deployment {
995995
Some(deployment) => {
996996
commands::info::run(primary, store, deployment, current, pending, used).err();
997997
}
@@ -1004,7 +1004,8 @@ async fn main() -> anyhow::Result<()> {
10041004
bail!("Please specify a deployment or use --all to list all deployments");
10051005
}
10061006
}
1007-
})
1007+
};
1008+
Ok(())
10081009
}
10091010
Unused(cmd) => {
10101011
let store = ctx.subgraph_store();

node/src/main.rs

+11-13
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,18 @@ fn read_expensive_queries(
8787
}
8888

8989
macro_rules! collect_ingestors {
90-
($acc:ident, $($chain:ident),+) => {
90+
($acc:ident, $logger:ident, $($chain:ident),+) => {
9191
$(
9292
$chain.iter().for_each(|(network_name, chain)| {
93-
let ingestor = chain.block_ingestor().expect(&format!(
94-
"Failed to create ingestor for network {}",
95-
network_name
96-
));
97-
98-
$acc.push(ingestor);
93+
let logger = $logger.new(o!("network_name" => network_name.clone()));
94+
match chain.block_ingestor() {
95+
Ok(ingestor) =>{
96+
info!(logger, "Started block ingestor");
97+
$acc.push(ingestor);
98+
}
99+
Err(err) => error!(&logger,
100+
"Failed to create block ingestor for network {}",err),
101+
}
99102
});
100103
)+
101104
};
@@ -379,16 +382,11 @@ async fn main() {
379382
);
380383

381384
if !opt.disable_block_ingestor {
382-
fn chains_to_ingestor(
383-
chains: Vec<Arc<impl Blockchain>>,
384-
) -> anyhow::Result<Vec<Box<dyn BlockIngestor>>> {
385-
chains.iter().map(|c| c.block_ingestor()).collect()
386-
}
387-
388385
let logger = logger.clone();
389386
let mut ingestors: Vec<Box<dyn BlockIngestor>> = vec![];
390387
collect_ingestors!(
391388
ingestors,
389+
logger,
392390
ethereum_chains,
393391
arweave_chains,
394392
near_chains,

store/postgres/src/writable.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use std::time::Duration;
55
use std::{collections::BTreeMap, sync::Arc};
66

77
use graph::blockchain::block_stream::FirehoseCursor;
8-
use graph::components::store::EntityKey;
98
use graph::components::store::ReadStore;
9+
use graph::components::store::{EntityKey, StoreStateTracker};
1010
use graph::data::subgraph::schema;
1111
use graph::data_source::CausalityRegion;
1212
use graph::prelude::{
@@ -1033,16 +1033,18 @@ impl ReadStore for WritableStore {
10331033
}
10341034
}
10351035

1036-
#[async_trait::async_trait]
1037-
impl WritableStoreTrait for WritableStore {
1036+
impl StoreStateTracker for WritableStore {
10381037
fn block_ptr(&self) -> Option<BlockPtr> {
10391038
self.block_ptr.lock().unwrap().clone()
10401039
}
10411040

10421041
fn block_cursor(&self) -> FirehoseCursor {
10431042
self.block_cursor.lock().unwrap().clone()
10441043
}
1044+
}
10451045

1046+
#[async_trait::async_trait]
1047+
impl WritableStoreTrait for WritableStore {
10461048
async fn start_subgraph_deployment(&self, logger: &Logger) -> Result<(), StoreError> {
10471049
let store = self.store.cheap_clone();
10481050
let logger = logger.cheap_clone();

0 commit comments

Comments
 (0)