Skip to content

Commit e7e4149

Browse files
committed
WIP
1 parent 0ea5fc7 commit e7e4149

File tree

15 files changed

+106
-25
lines changed

15 files changed

+106
-25
lines changed

core/src/polling_monitor/ipfs_service.rs

+20-3
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,33 @@ use anyhow::{anyhow, Error};
22
use bytes::Bytes;
33
use futures::future::BoxFuture;
44
use graph::{
5+
data_source::CausalityRegion,
56
ipfs_client::{CidFile, IpfsClient, StatApi},
67
prelude::CheapClone,
78
};
8-
use std::time::Duration;
9+
use std::{fmt::Display, time::Duration};
910
use tower::{buffer::Buffer, ServiceBuilder, ServiceExt};
1011

1112
const CLOUDFLARE_TIMEOUT: u16 = 524;
1213
const GATEWAY_TIMEOUT: u16 = 504;
1314

14-
pub type IpfsService = Buffer<CidFile, BoxFuture<'static, Result<Option<Bytes>, Error>>>;
15+
#[derive(Clone, Default, PartialEq, Eq, Hash)]
16+
pub struct IpfsItem {
17+
pub item: CidFile,
18+
pub causality_region: CausalityRegion,
19+
}
20+
21+
impl Display for IpfsItem {
22+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23+
let str = match self.item.path {
24+
Some(ref f) => format!("{}/{}", self.item.cid.to_string(), f),
25+
None => self.item.cid.to_string(),
26+
};
27+
f.write_str(&str)
28+
}
29+
}
30+
31+
pub type IpfsService = Buffer<IpfsItem, BoxFuture<'static, Result<Option<Bytes>, Error>>>;
1532

1633
pub fn ipfs_service(
1734
client: IpfsClient,
@@ -28,7 +45,7 @@ pub fn ipfs_service(
2845
let svc = ServiceBuilder::new()
2946
.rate_limit(concurrency_and_rate_limit.into(), Duration::from_secs(1))
3047
.concurrency_limit(concurrency_and_rate_limit as usize)
31-
.service_fn(move |req| ipfs.cheap_clone().call_inner(req))
48+
.service_fn(move |req: IpfsItem| ipfs.cheap_clone().call_inner(req.item))
3249
.boxed();
3350

3451
// The `Buffer` makes it so the rate and concurrency limit are shared among clones.

core/src/polling_monitor/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use tower::util::rng::HasherRng;
2323
use tower::{Service, ServiceExt};
2424

2525
pub use self::metrics::PollingMonitorMetrics;
26-
pub use ipfs_service::{ipfs_service, IpfsService};
26+
pub use ipfs_service::{ipfs_service, IpfsItem, IpfsService};
2727

2828
const MIN_BACKOFF: Duration = Duration::from_secs(5);
2929

core/src/subgraph/context.rs

+17-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
pub mod instance;
22

3-
use crate::polling_monitor::{spawn_monitor, IpfsService, PollingMonitor, PollingMonitorMetrics};
3+
use crate::polling_monitor::{
4+
spawn_monitor, IpfsItem, IpfsService, PollingMonitor, PollingMonitorMetrics,
5+
};
46
use anyhow::{self, Error};
57
use bytes::Bytes;
68
use graph::{
@@ -10,7 +12,6 @@ use graph::{
1012
subgraph::{MappingError, SharedProofOfIndexing},
1113
},
1214
data_source::{offchain, CausalityRegion, DataSource, TriggerData},
13-
ipfs_client::CidFile,
1415
prelude::{
1516
BlockNumber, BlockState, CancelGuard, DeploymentHash, MetricsRegistry, RuntimeHostBuilder,
1617
SubgraphInstanceMetrics, TriggerProcessor,
@@ -134,12 +135,14 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
134135
logger: &Logger,
135136
data_source: DataSource<C>,
136137
) -> Result<Option<Arc<T::Host>>, Error> {
137-
let source = data_source.as_offchain().map(|ds| ds.source.clone());
138-
let host = self.instance.add_dynamic_data_source(logger, data_source)?;
138+
let host = self
139+
.instance
140+
.add_dynamic_data_source(logger, data_source.clone())?;
139141

140142
if host.is_some() {
141-
if let Some(source) = source {
142-
self.offchain_monitor.add_source(source)?;
143+
if let Some(data_source) = data_source.as_offchain() {
144+
self.offchain_monitor
145+
.add_source(data_source.source.clone())?;
143146
}
144147
}
145148

@@ -157,8 +160,8 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
157160
}
158161

159162
pub struct OffchainMonitor {
160-
ipfs_monitor: PollingMonitor<CidFile>,
161-
ipfs_monitor_rx: mpsc::Receiver<(CidFile, Bytes)>,
163+
ipfs_monitor: PollingMonitor<IpfsItem>,
164+
ipfs_monitor_rx: mpsc::Receiver<(IpfsItem, Bytes)>,
162165
}
163166

164167
impl OffchainMonitor {
@@ -183,7 +186,10 @@ impl OffchainMonitor {
183186

184187
fn add_source(&mut self, source: offchain::Source) -> Result<(), Error> {
185188
match source {
186-
offchain::Source::Ipfs(cid_file) => self.ipfs_monitor.monitor(cid_file),
189+
offchain::Source::Ipfs(item, causality_region) => self.ipfs_monitor.monitor(IpfsItem {
190+
item,
191+
causality_region,
192+
}),
187193
};
188194
Ok(())
189195
}
@@ -194,8 +200,8 @@ impl OffchainMonitor {
194200
let mut triggers = vec![];
195201
loop {
196202
match self.ipfs_monitor_rx.try_recv() {
197-
Ok((cid_file, data)) => triggers.push(offchain::TriggerData {
198-
source: offchain::Source::Ipfs(cid_file),
203+
Ok((item, data)) => triggers.push(offchain::TriggerData {
204+
source: offchain::Source::Ipfs(item.item, item.causality_region),
199205
data: Arc::new(data),
200206
}),
201207
Err(TryRecvError::Disconnected) => {

core/src/subgraph/runner.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use graph::data::subgraph::{
1717
SubgraphFeature,
1818
};
1919
use graph::data_source::{
20-
offchain, DataSource, DataSourceCreationError, DataSourceTemplate, TriggerData,
20+
offchain, CausalityRegion, DataSource, DataSourceCreationError, DataSourceTemplate, TriggerData,
2121
};
2222
use graph::env::EnvVars;
2323
use graph::prelude::*;
@@ -999,6 +999,7 @@ async fn update_proof_of_indexing(
999999
let entity_key = EntityKey {
10001000
entity_type: POI_OBJECT.to_owned(),
10011001
entity_id: causality_region.into(),
1002+
causality_region: CausalityRegion::ONCHAIN,
10021003
};
10031004

10041005
// Grab the current digest attribute on this entity

graph/src/components/store/entity_cache.rs

+3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::sync::Arc;
66
use crate::components::store::{
77
self as s, Entity, EntityKey, EntityOp, EntityOperation, EntityType,
88
};
9+
use crate::data_source::CausalityRegion;
910
use crate::prelude::{Schema, ENV_VARS};
1011
use crate::util::lfu_cache::LfuCache;
1112

@@ -246,6 +247,8 @@ impl EntityCache {
246247
let key = EntityKey {
247248
entity_type: entity_type.clone(),
248249
entity_id: entity.id().unwrap().into(),
250+
// TODO: Figure this one out
251+
causality_region: CausalityRegion::ONCHAIN,
249252
};
250253
self.current.insert(key, Some(entity));
251254
}

graph/src/components/store/mod.rs

+10
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,25 @@ pub struct EntityKey {
128128

129129
/// ID of the individual entity.
130130
pub entity_id: Word,
131+
132+
/// Causality region associated with this Entity id.
133+
pub causality_region: CausalityRegion,
131134
}
132135

133136
impl EntityKey {
134137
pub fn data(entity_type: String, entity_id: String) -> Self {
135138
Self {
136139
entity_type: EntityType::new(entity_type),
137140
entity_id: entity_id.into(),
141+
causality_region: CausalityRegion::ONCHAIN,
138142
}
139143
}
144+
145+
pub fn with_causality_region(mut self, cr: CausalityRegion) -> Self {
146+
self.causality_region = cr;
147+
148+
self
149+
}
140150
}
141151

142152
#[derive(Clone, Debug, PartialEq)]

graph/src/data_source/causality_region.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::io;
1919
/// This necessary for determinism because offchain data sources don't have a deterministic order of
2020
/// execution, for example an IPFS file may become available at any point in time. The isolation
2121
/// rules make the indexing result reproducible, given a set of available files.
22-
#[derive(Debug, Copy, Clone, PartialEq, Eq, FromSqlRow)]
22+
#[derive(Debug, Copy, Clone, PartialEq, Eq, FromSqlRow, PartialOrd, Ord, Hash)]
2323
pub struct CausalityRegion(i32);
2424

2525
impl fmt::Display for CausalityRegion {
@@ -40,6 +40,12 @@ impl ToSql<Integer, Pg> for CausalityRegion {
4040
}
4141
}
4242

43+
impl Default for CausalityRegion {
44+
fn default() -> Self {
45+
Self::ONCHAIN
46+
}
47+
}
48+
4349
impl CausalityRegion {
4450
/// The causality region of all onchain data sources.
4551
pub const ONCHAIN: CausalityRegion = CausalityRegion(0);

graph/src/data_source/mod.rs

+11
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,17 @@ pub enum MappingTrigger<C: Blockchain> {
408408
Offchain(offchain::TriggerData),
409409
}
410410

411+
impl<C: Blockchain> MappingTrigger<C> {
412+
pub fn causality_region(&self) -> CausalityRegion {
413+
match self {
414+
MappingTrigger::Onchain(_) => CausalityRegion::ONCHAIN,
415+
MappingTrigger::Offchain(td) => match td.source {
416+
offchain::Source::Ipfs(_, cr) => cr,
417+
},
418+
}
419+
}
420+
}
421+
411422
macro_rules! clone_data_source {
412423
($t:ident) => {
413424
impl<C: Blockchain> Clone for $t<C> {

graph/src/data_source/offchain.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ impl DataSource {
105105
))?;
106106

107107
let source = match source.parse() {
108-
Ok(source) => Source::Ipfs(source),
108+
Ok(source) => Source::Ipfs(source, causality_region),
109109

110110
// Ignore data sources created with an invalid CID.
111111
Err(e) => return Err(DataSourceCreationError::Ignore(source, e)),
@@ -140,7 +140,7 @@ impl DataSource {
140140

141141
pub fn as_stored_dynamic_data_source(&self) -> StoredDynamicDataSource {
142142
let param = match self.source {
143-
Source::Ipfs(ref link) => Bytes::from(link.to_bytes()),
143+
Source::Ipfs(ref link, _) => Bytes::from(link.to_bytes()),
144144
};
145145

146146
let done_at = self.done_at.load(std::sync::atomic::Ordering::SeqCst);
@@ -182,7 +182,7 @@ impl DataSource {
182182
let param = param.context("no param on stored data source")?;
183183
let cid_file = CidFile::try_from(param)?;
184184

185-
let source = Source::Ipfs(cid_file);
185+
let source = Source::Ipfs(cid_file, causality_region);
186186
let context = Arc::new(context.map(serde_json::from_value).transpose()?);
187187

188188
Ok(Self {
@@ -202,7 +202,7 @@ impl DataSource {
202202
/// used as the value to be returned to mappings from the `dataSource.address()` host function.
203203
pub fn address(&self) -> Option<Vec<u8>> {
204204
match self.source {
205-
Source::Ipfs(ref cid) => Some(cid.to_bytes()),
205+
Source::Ipfs(ref cid, _) => Some(cid.to_bytes()),
206206
}
207207
}
208208

@@ -238,7 +238,7 @@ impl DataSource {
238238

239239
#[derive(Clone, Debug, Eq, PartialEq)]
240240
pub enum Source {
241-
Ipfs(CidFile),
241+
Ipfs(CidFile, CausalityRegion),
242242
}
243243

244244
#[derive(Clone, Debug)]
@@ -289,7 +289,7 @@ impl UnresolvedDataSource {
289289
"source" => format_args!("{:?}", &self.source),
290290
);
291291
let source = match self.kind.as_str() {
292-
"file/ipfs" => Source::Ipfs(self.source.file.link.parse()?),
292+
"file/ipfs" => Source::Ipfs(self.source.file.link.parse()?, causality_region),
293293
_ => {
294294
anyhow::bail!(
295295
"offchain data source has invalid `kind`, expected `file/ipfs` but found {}",

runtime/wasm/src/host.rs

+1
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ where
185185
proof_of_indexing,
186186
host_fns: self.host_fns.cheap_clone(),
187187
debug_fork: debug_fork.cheap_clone(),
188+
causality_region: trigger.trigger.causality_region(),
188189
},
189190
trigger,
190191
result_sender,

runtime/wasm/src/host_exports.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use graph::components::subgraph::{
1515
PoICausalityRegion, ProofOfIndexingEvent, SharedProofOfIndexing,
1616
};
1717
use graph::data::store;
18-
use graph::data_source::{DataSource, DataSourceTemplate, EntityTypeAccess};
18+
use graph::data_source::{CausalityRegion, DataSource, DataSourceTemplate, EntityTypeAccess};
1919
use graph::ensure;
2020
use graph::prelude::ethabi::param_type::Reader;
2121
use graph::prelude::ethabi::{decode, encode, Token};
@@ -156,6 +156,7 @@ impl<C: Blockchain> HostExports<C> {
156156
data: HashMap<String, Value>,
157157
stopwatch: &StopwatchMetrics,
158158
gas: &GasCounter,
159+
causality_region: &CausalityRegion,
159160
) -> Result<(), HostExportError> {
160161
let poi_section = stopwatch.start_section("host_export_store_set__proof_of_indexing");
161162
write_poi_event(
@@ -173,6 +174,7 @@ impl<C: Blockchain> HostExports<C> {
173174
let key = EntityKey {
174175
entity_type: EntityType::new(entity_type),
175176
entity_id: entity_id.into(),
177+
causality_region: causality_region.clone(),
176178
};
177179
self.check_entity_type_access(&key.entity_type)?;
178180

@@ -192,6 +194,7 @@ impl<C: Blockchain> HostExports<C> {
192194
entity_type: String,
193195
entity_id: String,
194196
gas: &GasCounter,
197+
causality_region: &CausalityRegion,
195198
) -> Result<(), HostExportError> {
196199
write_poi_event(
197200
proof_of_indexing,
@@ -205,6 +208,7 @@ impl<C: Blockchain> HostExports<C> {
205208
let key = EntityKey {
206209
entity_type: EntityType::new(entity_type),
207210
entity_id: entity_id.into(),
211+
causality_region: causality_region.clone(),
208212
};
209213
self.check_entity_type_access(&key.entity_type)?;
210214

@@ -221,10 +225,12 @@ impl<C: Blockchain> HostExports<C> {
221225
entity_type: String,
222226
entity_id: String,
223227
gas: &GasCounter,
228+
causality_region: &CausalityRegion,
224229
) -> Result<Option<Entity>, anyhow::Error> {
225230
let store_key = EntityKey {
226231
entity_type: EntityType::new(entity_type),
227232
entity_id: entity_id.into(),
233+
causality_region: causality_region.clone(),
228234
};
229235
self.check_entity_type_access(&store_key.entity_type)?;
230236

runtime/wasm/src/mapping.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use futures03::channel::oneshot::Sender;
55
use graph::blockchain::{Blockchain, HostFn};
66
use graph::components::store::SubgraphFork;
77
use graph::components::subgraph::{MappingError, SharedProofOfIndexing};
8-
use graph::data_source::{MappingTrigger, TriggerWithHandler};
8+
use graph::data_source::{CausalityRegion, MappingTrigger, TriggerWithHandler};
99
use graph::prelude::*;
1010
use graph::runtime::gas::Gas;
1111
use std::collections::BTreeMap;
@@ -124,6 +124,7 @@ pub struct MappingContext<C: Blockchain> {
124124
pub proof_of_indexing: SharedProofOfIndexing,
125125
pub host_fns: Arc<Vec<HostFn>>,
126126
pub debug_fork: Option<Arc<dyn SubgraphFork>>,
127+
pub causality_region: CausalityRegion,
127128
}
128129

129130
impl<C: Blockchain> MappingContext<C> {
@@ -136,6 +137,7 @@ impl<C: Blockchain> MappingContext<C> {
136137
proof_of_indexing: self.proof_of_indexing.cheap_clone(),
137138
host_fns: self.host_fns.cheap_clone(),
138139
debug_fork: self.debug_fork.cheap_clone(),
140+
causality_region: self.causality_region,
139141
}
140142
}
141143
}

runtime/wasm/src/module/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -934,6 +934,7 @@ impl<C: Blockchain> WasmInstanceContext<C> {
934934
data,
935935
stopwatch,
936936
gas,
937+
&self.ctx.causality_region,
937938
)?;
938939

939940
Ok(())
@@ -955,6 +956,7 @@ impl<C: Blockchain> WasmInstanceContext<C> {
955956
entity,
956957
id,
957958
gas,
959+
&self.ctx.causality_region,
958960
)
959961
}
960962

@@ -977,6 +979,7 @@ impl<C: Blockchain> WasmInstanceContext<C> {
977979
entity_type.clone(),
978980
id.clone(),
979981
gas,
982+
&self.ctx.causality_region,
980983
)?;
981984

982985
let ret = match entity_option {

0 commit comments

Comments
 (0)