Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add runtime support for offchain data sources & templates #3791

Merged
merged 33 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a6b33f5
Refactor manifest data sources
Theodus Jul 28, 2022
48ccdd3
Refactor manifest data source templates
Theodus Jul 29, 2022
55f8847
Start offchain monitors for static sources
Theodus Jul 29, 2022
ca94702
Run offchain handlers
Theodus Jul 30, 2022
c6338c6
offchain: dont expect `manifest_idx` in the manifest
leoyvens Aug 10, 2022
3f51dc0
offchain: add `match_and_decode`, persist normally, require source
leoyvens Aug 10, 2022
4def210
trigger processor: take block ptr from the trigger
leoyvens Aug 10, 2022
f237fea
offchain: Return cid to dataSource.address host fn
leoyvens Aug 10, 2022
d43a2fa
runner: transact modifications of offchain events
leoyvens Aug 10, 2022
b1b224a
ethereum: fix test build
leoyvens Aug 11, 2022
d5a5174
ipfs: Set a default maximum file size
leoyvens Aug 11, 2022
7347307
ipfs: Add env var for max concurrent requests
leoyvens Aug 11, 2022
e17fd10
ipfs: Share ipfs service across subgraphs
leoyvens Aug 11, 2022
516e104
offchain: move `ready_offchain_events` to `OffchainMonitor`
leoyvens Aug 11, 2022
3f940c0
runner: Clarify comments
leoyvens Aug 12, 2022
c3a0855
core: Remove unecessary params from `add_dynamic_data_source`
leoyvens Aug 12, 2022
6936376
core: Move poi_version out of the instance
leoyvens Aug 12, 2022
801eb36
core: Move `mod instance` under `mod context`
leoyvens Aug 15, 2022
8930c5c
core: Refactor OffchainMonitor::add_data_source
leoyvens Aug 15, 2022
9338787
offchain: Better handling of duplicates
leoyvens Aug 15, 2022
acdc5a7
offchain: Bump max ipfs concurrent requests to 100
leoyvens Aug 15, 2022
ea0311e
refactor: Expose RuntimeHost data source
leoyvens Aug 15, 2022
a95454f
offchain: Remove dses that have been processed
leoyvens Aug 15, 2022
f30b210
refactor: Extract ReadStore out of WritableStore
leoyvens Aug 18, 2022
ad6264d
test: Add graphql queries to end-to-end tests
leoyvens Aug 19, 2022
f59d9c9
feat(file ds): Bump max spec version to 0.0.7
leoyvens Aug 21, 2022
7e99135
test: Add basic file data sources e2e test
leoyvens Aug 21, 2022
d22b51f
runner: Isolate offchain data sources
leoyvens Aug 22, 2022
824957b
offchain: Forbid static file data sources
leoyvens Aug 22, 2022
717b1d0
store: Assign separate causality region for offchain dses
leoyvens Aug 22, 2022
ae8e2d7
graph: Fix release build
leoyvens Aug 23, 2022
11633ef
tests: yarn upgrade, add file ds to the workspace
leoyvens Aug 23, 2022
149a14d
fix: Update comments
leoyvens Aug 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Refactor manifest data sources
  • Loading branch information
Theodus authored and leoyvens committed Aug 24, 2022
commit a6b33f5d8925810625543dce8b015096e87ae59a
1 change: 1 addition & 0 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ impl Chain {
#[async_trait]
impl Blockchain for Chain {
const KIND: BlockchainKind = BlockchainKind::Ethereum;
const ALIASES: &'static [&'static str] = &["ethereum/contract"];

type Block = BlockFinality;

Expand Down
20 changes: 17 additions & 3 deletions chain/ethereum/tests/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::time::Duration;

use graph::data::subgraph::schema::SubgraphError;
use graph::data::subgraph::{SPEC_VERSION_0_0_4, SPEC_VERSION_0_0_7};
use graph::data_source::DataSource;
use graph::offchain;
use graph::prelude::{
anyhow, async_trait, serde_yaml, tokio, DeploymentHash, Entity, Link, Logger, SubgraphManifest,
SubgraphManifestValidationError, UnvalidatedSubgraphManifest,
Expand Down Expand Up @@ -152,8 +154,15 @@ specVersion: 0.0.7
let manifest = resolve_manifest(YAML, SPEC_VERSION_0_0_7).await;

assert_eq!("Qmmanifest", manifest.id.as_str());
assert_eq!(manifest.offchain_data_sources.len(), 1);
assert_eq!(manifest.data_sources.len(), 0);
assert_eq!(manifest.data_sources.len(), 1);
let data_source = match &manifest.data_sources[0] {
DataSource::Offchain(ds) => ds,
DataSource::Onchain(_) => unreachable!(),
};
assert_eq!(
data_source.source,
Some(offchain::Source::Ipfs(Link::from("/ipfs/Qmfile")))
);
}

#[tokio::test]
Expand Down Expand Up @@ -392,7 +401,12 @@ specVersion: 0.0.2
";

let manifest = resolve_manifest(YAML, SPEC_VERSION_0_0_4).await;
let required_capabilities = NodeCapabilities::from_data_sources(&manifest.data_sources);
let onchain_data_sources = manifest
.data_sources
.iter()
.filter_map(|ds| ds.as_onchain().cloned())
.collect::<Vec<_>>();
let required_capabilities = NodeCapabilities::from_data_sources(&onchain_data_sources);

assert_eq!("Qmmanifest", manifest.id.as_str());
assert_eq!(true, required_capabilities.traces);
Expand Down
17 changes: 7 additions & 10 deletions core/src/subgraph/instance.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
use futures01::sync::mpsc::Sender;
use graph::components::subgraph::ProofOfIndexingVersion;
use graph::data::subgraph::SPEC_VERSION_0_0_6;

use std::collections::HashMap;

use graph::{
blockchain::Blockchain,
components::{
store::SubgraphFork,
subgraph::{MappingError, SharedProofOfIndexing},
subgraph::{MappingError, ProofOfIndexingVersion, SharedProofOfIndexing},
},
prelude::ENV_VARS,
data::subgraph::SPEC_VERSION_0_0_6,
data_source::DataSource,
prelude::*,
};
use graph::{blockchain::DataSource, prelude::*};
use std::collections::HashMap;

pub struct SubgraphInstance<C: Blockchain, T: RuntimeHostBuilder<C>> {
subgraph_id: DeploymentHash,
Expand Down Expand Up @@ -92,7 +89,7 @@ where
fn new_host(
&mut self,
logger: Logger,
data_source: C::DataSource,
data_source: DataSource<C>,
module_bytes: &Arc<Vec<u8>>,
templates: Arc<Vec<C::DataSourceTemplate>>,
host_metrics: Arc<HostMetrics>,
Expand Down Expand Up @@ -151,7 +148,7 @@ where
pub(crate) fn add_dynamic_data_source(
&mut self,
logger: &Logger,
data_source: C::DataSource,
data_source: DataSource<C>,
templates: Arc<Vec<C::DataSourceTemplate>>,
metrics: Arc<HostMetrics>,
) -> Result<Option<Arc<T::Host>>, Error> {
Expand Down
9 changes: 7 additions & 2 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,12 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
(manifest, manifest_idx_and_name)
};

let required_capabilities = C::NodeCapabilities::from_data_sources(&manifest.data_sources);
let onchain_data_sources = manifest
.data_sources
.iter()
.filter_map(|d| d.as_onchain().cloned())
.collect::<Vec<_>>();
let required_capabilities = C::NodeCapabilities::from_data_sources(&onchain_data_sources);
let network = manifest.network_name();

let chain = self
Expand All @@ -230,7 +235,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
.clone();

// Obtain filters from the manifest
let mut filter = C::TriggerFilter::from_data_sources(manifest.data_sources.iter());
let mut filter = C::TriggerFilter::from_data_sources(onchain_data_sources.iter());

if self.static_filters {
filter.extend_with_template(manifest.templates.clone().into_iter());
Expand Down
9 changes: 5 additions & 4 deletions core/src/subgraph/loader.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::time::Instant;

use graph::blockchain::{Blockchain, DataSource, DataSourceTemplate as _};
use graph::blockchain::{Blockchain, DataSource as _, DataSourceTemplate as _};
use graph::components::store::WritableStore;
use graph::data_source::DataSource;
use graph::prelude::*;

pub async fn load_dynamic_data_sources<C: Blockchain>(
store: Arc<dyn WritableStore>,
logger: Logger,
manifest: &SubgraphManifest<C>,
manifest_idx_and_name: Vec<(u32, String)>,
) -> Result<Vec<C::DataSource>, Error> {
) -> Result<Vec<DataSource<C>>, Error> {
let start_time = Instant::now();

let mut data_sources: Vec<C::DataSource> = vec![];
let mut data_sources: Vec<DataSource<C>> = vec![];

for stored in store
.load_dynamic_data_sources(manifest_idx_and_name)
Expand All @@ -33,7 +34,7 @@ pub async fn load_dynamic_data_sources<C: Blockchain>(
"Assertion failure: new data source has lower creation block than existing ones"
);

data_sources.push(ds);
data_sources.push(DataSource::Onchain(ds));
}

trace!(
Expand Down
5 changes: 3 additions & 2 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::subgraph::state::IndexingState;
use crate::subgraph::stream::new_block_stream;
use atomic_refcell::AtomicRefCell;
use graph::blockchain::block_stream::{BlockStreamEvent, BlockWithTriggers, FirehoseCursor};
use graph::blockchain::{Block, Blockchain, DataSource, TriggerFilter as _};
use graph::blockchain::{Block, Blockchain, DataSource as _, TriggerFilter as _};
use graph::components::store::EntityKey;
use graph::components::{
store::ModificationsAndCache,
Expand All @@ -16,6 +16,7 @@ use graph::data::subgraph::{
schema::{SubgraphError, SubgraphHealth, POI_OBJECT},
SubgraphFeature,
};
use graph::data_source::DataSource;
use graph::prelude::*;
use graph::util::{backoff::ExponentialBackoff, lfu_cache::LfuCache};
use std::convert::TryFrom;
Expand Down Expand Up @@ -465,7 +466,7 @@ where
// Try to create a runtime host for the data source
let host = self.ctx.instance.add_dynamic_data_source(
&self.logger,
data_source.clone(),
DataSource::Onchain(data_source.clone()),
self.inputs.templates.clone(),
self.metrics.host.clone(),
)?;
Expand Down
1 change: 1 addition & 0 deletions graph/src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub trait Block: Send + Sync {
// This is only `Debug` because some tests require that
pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static {
const KIND: BlockchainKind;
const ALIASES: &'static [&'static str] = &[];

// The `Clone` bound is used when reprocessing a block, because `triggers_in_block` requires an
// owned `Block`. It would be good to come up with a way to remove this bound.
Expand Down
3 changes: 2 additions & 1 deletion graph/src/components/subgraph/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::sync::mpsc;

use crate::blockchain::TriggerWithHandler;
use crate::components::store::SubgraphFork;
use crate::data_source::DataSource;
use crate::prelude::*;
use crate::{blockchain::Blockchain, components::subgraph::SharedProofOfIndexing};
use crate::{components::metrics::HistogramVec, runtime::DeterministicHostError};
Expand Down Expand Up @@ -151,7 +152,7 @@ pub trait RuntimeHostBuilder<C: Blockchain>: Clone + Send + Sync + 'static {
&self,
network_name: String,
subgraph_id: DeploymentHash,
data_source: C::DataSource,
data_source: DataSource<C>,
top_level_templates: Arc<Vec<C::DataSourceTemplate>>,
mapping_request_sender: mpsc::Sender<Self::Req>,
metrics: Arc<HostMetrics>,
Expand Down
Loading