Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add runtime support for offchain data sources & templates #3791

Merged
merged 33 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a6b33f5
Refactor manifest data sources
Theodus Jul 28, 2022
48ccdd3
Refactor manifest data source templates
Theodus Jul 29, 2022
55f8847
Start offchain monitors for static sources
Theodus Jul 29, 2022
ca94702
Run offchain handlers
Theodus Jul 30, 2022
c6338c6
offchain: dont expect `manifest_idx` in the manifest
leoyvens Aug 10, 2022
3f51dc0
offchain: add `match_and_decode`, persist normally, require source
leoyvens Aug 10, 2022
4def210
trigger processor: take block ptr from the trigger
leoyvens Aug 10, 2022
f237fea
offchain: Return cid to dataSource.address host fn
leoyvens Aug 10, 2022
d43a2fa
runner: transact modifications of offchain events
leoyvens Aug 10, 2022
b1b224a
ethereum: fix test build
leoyvens Aug 11, 2022
d5a5174
ipfs: Set a default maximum file size
leoyvens Aug 11, 2022
7347307
ipfs: Add env var for max concurrent requests
leoyvens Aug 11, 2022
e17fd10
ipfs: Share ipfs service across subgraphs
leoyvens Aug 11, 2022
516e104
offchain: move `ready_offchain_events` to `OffchainMonitor`
leoyvens Aug 11, 2022
3f940c0
runner: Clarify comments
leoyvens Aug 12, 2022
c3a0855
core: Remove unecessary params from `add_dynamic_data_source`
leoyvens Aug 12, 2022
6936376
core: Move poi_version out of the instance
leoyvens Aug 12, 2022
801eb36
core: Move `mod instance` under `mod context`
leoyvens Aug 15, 2022
8930c5c
core: Refactor OffchainMonitor::add_data_source
leoyvens Aug 15, 2022
9338787
offchain: Better handling of duplicates
leoyvens Aug 15, 2022
acdc5a7
offchain: Bump max ipfs concurrent requests to 100
leoyvens Aug 15, 2022
ea0311e
refactor: Expose RuntimeHost data source
leoyvens Aug 15, 2022
a95454f
offchain: Remove dses that have been processed
leoyvens Aug 15, 2022
f30b210
refactor: Extract ReadStore out of WritableStore
leoyvens Aug 18, 2022
ad6264d
test: Add graphql queries to end-to-end tests
leoyvens Aug 19, 2022
f59d9c9
feat(file ds): Bump max spec version to 0.0.7
leoyvens Aug 21, 2022
7e99135
test: Add basic file data sources e2e test
leoyvens Aug 21, 2022
d22b51f
runner: Isolate offchain data sources
leoyvens Aug 22, 2022
824957b
offchain: Forbid static file data sources
leoyvens Aug 22, 2022
717b1d0
store: Assign separate causality region for offchain dses
leoyvens Aug 22, 2022
ae8e2d7
graph: Fix release build
leoyvens Aug 23, 2022
11633ef
tests: yarn upgrade, add file ds to the workspace
leoyvens Aug 23, 2022
149a14d
fix: Update comments
leoyvens Aug 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
offchain: add match_and_decode, persist normally, require source
  • Loading branch information
leoyvens committed Aug 24, 2022
commit 3f51dc01e023aff9603889d4b04dff1b02d15b5b
6 changes: 1 addition & 5 deletions core/src/subgraph/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ impl OffchainMonitor {
}

pub fn add_data_source(&mut self, ds: offchain::DataSource) -> Result<(), Error> {
let source = match &ds.source {
Some(source) => source,
None => anyhow::bail!("Failed to add offchain data source (missing source)"),
};
match source {
match ds.source {
offchain::Source::Ipfs(cid) => self.ipfs_monitor.monitor(cid.clone()),
};
self.data_sources.push(ds);
Expand Down
4 changes: 1 addition & 3 deletions core/src/subgraph/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ where

// Create services for static offchain data sources
if let DataSource::Offchain(ds) = &ds {
if ds.source.is_some() {
offchain_monitor.add_data_source(ds.clone())?;
}
offchain_monitor.add_data_source(ds.clone())?;
}

let host = this.new_host(
Expand Down
50 changes: 15 additions & 35 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use graph::data::subgraph::{
schema::{SubgraphError, SubgraphHealth, POI_OBJECT},
SubgraphFeature,
};
use graph::data_source::{self, offchain, DataSource, TriggerData};
use graph::data_source::{offchain, DataSource, TriggerData};
use graph::prelude::*;
use graph::util::{backoff::ExponentialBackoff, lfu_cache::LfuCache};
use std::convert::TryFrom;
Expand Down Expand Up @@ -223,18 +223,15 @@ where
let (data_sources, runtime_hosts) =
self.create_dynamic_data_sources(block_state.drain_created_data_sources())?;

let mut onchain_data_sources = Vec::new();
for ds in data_sources {
match ds {
DataSource::Onchain(ds) => onchain_data_sources.push(ds),
DataSource::Offchain(ds) => {
self.ctx.offchain_monitor.add_data_source(ds.clone())?
}
for ds in &data_sources {
if let DataSource::Offchain(ds) = ds {
self.ctx.offchain_monitor.add_data_source(ds.clone())?
}
}
let data_sources = onchain_data_sources;

let filter = C::TriggerFilter::from_data_sources(data_sources.iter());
let filter = C::TriggerFilter::from_data_sources(
data_sources.iter().filter_map(DataSource::as_onchain),
);

// Reprocess the triggers from this block that match the new data sources
let block_with_triggers = self
Expand All @@ -260,10 +257,7 @@ where

// Add entity operations for the new data sources to the block state
// and add runtimes for the data sources to the subgraph instance.
self.persist_dynamic_data_sources(
&mut block_state.entity_cache,
data_sources.into_iter().map(DataSource::Onchain).collect(),
);
self.persist_dynamic_data_sources(&mut block_state.entity_cache, data_sources);

// Process the triggers in each host in the same order the
// corresponding data sources have been created.
Expand Down Expand Up @@ -586,30 +580,16 @@ where
source: offchain::Source,
data: bytes::Bytes,
) -> Result<(), Error> {
let matching_data_sources = self
.ctx
.offchain_monitor
.data_sources
.iter()
.filter(|ds| ds.source.as_ref() == Some(&source));

let mut block_state =
BlockState::<C>::new(self.inputs.store.cheap_clone(), LfuCache::new());
let block_state = BlockState::<C>::new(self.inputs.store.cheap_clone(), LfuCache::new());
let entity_cache = std::mem::take(&mut self.state.entity_lfu_cache);

self.persist_dynamic_data_sources(
&mut block_state.entity_cache,
matching_data_sources
.cloned()
.map(data_source::DataSource::Offchain)
.collect(),
);
// TODO: Set per-file causality region
// let causality_region = match &source {
// offchain::Source::Ipfs(cid) => format!("ipfs/{}", cid.to_string()),
// };

let causality_region = match &source {
offchain::Source::Ipfs(cid) => format!("ipfs/{}", cid.to_string()),
};
let trigger = TriggerData::Offchain(offchain::TriggerData {
source: source.clone(),
source,
data: Arc::new(data),
});

Expand All @@ -622,7 +602,7 @@ where
&trigger,
block_state,
&None,
&causality_region,
"IPFS TODO",
&self.inputs.debug_fork,
&self.metrics.subgraph,
)
Expand Down
29 changes: 8 additions & 21 deletions graph/src/data_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,15 @@ impl<C: Blockchain> DataSource<C> {
block: &Arc<C::Block>,
logger: &Logger,
) -> Result<Option<TriggerWithHandler<MappingTrigger<C>>>, Error> {
match self {
Self::Onchain(ds) => ds
.match_and_decode(trigger.as_onchain().unwrap().clone(), block, logger)
match (self, trigger) {
(Self::Onchain(ds), TriggerData::Onchain(trigger)) => ds
.match_and_decode(trigger, block, logger)
.map(|t| t.map(|t| t.map(MappingTrigger::Onchain))),
Self::Offchain(ds) => Ok(Some(TriggerWithHandler::new(
MappingTrigger::Offchain(trigger.as_offchain().unwrap().clone()),
ds.mapping.handler.clone(),
))),
(Self::Offchain(ds), TriggerData::Offchain(trigger)) => {
Ok(ds.match_and_decode(trigger))
}
(Self::Onchain(_), TriggerData::Offchain(_))
| (Self::Offchain(_), TriggerData::Onchain(_)) => Ok(None),
}
}

Expand Down Expand Up @@ -323,20 +324,6 @@ pub enum TriggerData<C: Blockchain> {
}

impl<C: Blockchain> TriggerData<C> {
fn as_onchain(&self) -> Option<&C::TriggerData> {
match self {
Self::Onchain(trigger) => Some(trigger),
Self::Offchain(_) => None,
}
}

fn as_offchain(&self) -> Option<&offchain::TriggerData> {
match self {
Self::Onchain(_) => None,
Self::Offchain(trigger) => Some(trigger),
}
}

pub fn error_context(&self) -> String {
match self {
Self::Onchain(trigger) => trigger.error_context(),
Expand Down
43 changes: 26 additions & 17 deletions graph/src/data_source/offchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@ use crate::{
data_source,
prelude::{DataSourceContext, Link},
};
use anyhow::{self, Error};
use anyhow::{self, Context, Error};
use cid::Cid;
use serde::Deserialize;
use slog::{info, Logger};
use std::{fmt, sync::Arc};

use super::TriggerWithHandler;

pub const OFFCHAIN_KINDS: &'static [&'static str] = &["file/ipfs"];

#[derive(Clone, Debug)]
pub struct DataSource {
pub kind: String,
pub name: String,
pub manifest_idx: u32,
pub source: Option<Source>,
pub source: Source,
pub mapping: Mapping,
pub context: Arc<Option<DataSourceContext>>,
pub creation_block: Option<BlockNumber>,
Expand All @@ -46,7 +48,7 @@ impl<C: Blockchain> TryFrom<DataSourceTemplateInfo<C>> for DataSource {
kind: template.kind.clone(),
name: template.name.clone(),
manifest_idx: template.manifest_idx,
source: Some(Source::Ipfs(source.parse()?)),
source: Source::Ipfs(source.parse()?),
mapping: template.mapping.clone(),
context: Arc::new(info.context),
creation_block: Some(info.creation_block),
Expand All @@ -55,18 +57,32 @@ impl<C: Blockchain> TryFrom<DataSourceTemplateInfo<C>> for DataSource {
}

impl DataSource {
pub fn match_and_decode<C: Blockchain>(
&self,
trigger: &TriggerData,
) -> Option<TriggerWithHandler<super::MappingTrigger<C>>> {
if self.source != trigger.source {
return None;
}

Some(TriggerWithHandler::new(
data_source::MappingTrigger::Offchain(trigger.clone()),
self.mapping.handler.clone(),
))
}

pub fn as_stored_dynamic_data_source(&self) -> StoredDynamicDataSource {
let param = self.source.as_ref().map(|source| match source {
let param = match self.source {
Source::Ipfs(link) => Bytes::from(link.to_bytes()),
});
};
let context = self
.context
.as_ref()
.as_ref()
.map(|ctx| serde_json::to_value(&ctx).unwrap());
StoredDynamicDataSource {
manifest_idx: self.manifest_idx,
param,
param: Some(param),
context,
creation_block: self.creation_block,
}
Expand All @@ -76,11 +92,8 @@ impl DataSource {
template: &DataSourceTemplate,
stored: StoredDynamicDataSource,
) -> Result<Self, Error> {
let source = stored.param.and_then(|bytes| {
Cid::try_from(bytes.as_slice().to_vec())
.ok()
.map(Source::Ipfs)
});
let param = stored.param.context("no param on stored data source")?;
let source = Source::Ipfs(Cid::try_from(param.as_slice().to_vec())?);
let context = Arc::new(stored.context.map(serde_json::from_value).transpose()?);
Ok(Self {
kind: template.kind.clone(),
Expand Down Expand Up @@ -113,7 +126,7 @@ pub struct Mapping {
pub struct UnresolvedDataSource {
pub kind: String,
pub name: String,
pub source: Option<UnresolvedSource>,
pub source: UnresolvedSource,
pub mapping: UnresolvedMapping,
}

Expand Down Expand Up @@ -145,10 +158,7 @@ impl UnresolvedDataSource {
"source" => format_args!("{:?}", &self.source),
);
let source = match self.kind.as_str() {
"file/ipfs" => self
.source
.map(|src| src.file.link.parse().map(Source::Ipfs))
.transpose()?,
"file/ipfs" => Source::Ipfs(self.source.file.link.parse()?),
_ => {
anyhow::bail!(
"offchain data source has invalid `kind`, expected `file/ipfs` but found {}",
Expand Down Expand Up @@ -194,7 +204,6 @@ pub struct UnresolvedDataSourceTemplate {
pub mapping: UnresolvedMapping,
}


#[derive(Clone, Debug)]
pub struct DataSourceTemplate {
pub kind: String,
Expand Down