Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add runtime support for offchain data sources & templates #3791

Merged
merged 33 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a6b33f5
Refactor manifest data sources
Theodus Jul 28, 2022
48ccdd3
Refactor manifest data source templates
Theodus Jul 29, 2022
55f8847
Start offchain monitors for static sources
Theodus Jul 29, 2022
ca94702
Run offchain handlers
Theodus Jul 30, 2022
c6338c6
offchain: dont expect `manifest_idx` in the manifest
leoyvens Aug 10, 2022
3f51dc0
offchain: add `match_and_decode`, persist normally, require source
leoyvens Aug 10, 2022
4def210
trigger processor: take block ptr from the trigger
leoyvens Aug 10, 2022
f237fea
offchain: Return cid to dataSource.address host fn
leoyvens Aug 10, 2022
d43a2fa
runner: transact modifications of offchain events
leoyvens Aug 10, 2022
b1b224a
ethereum: fix test build
leoyvens Aug 11, 2022
d5a5174
ipfs: Set a default maximum file size
leoyvens Aug 11, 2022
7347307
ipfs: Add env var for max concurrent requests
leoyvens Aug 11, 2022
e17fd10
ipfs: Share ipfs service across subgraphs
leoyvens Aug 11, 2022
516e104
offchain: move `ready_offchain_events` to `OffchainMonitor`
leoyvens Aug 11, 2022
3f940c0
runner: Clarify comments
leoyvens Aug 12, 2022
c3a0855
core: Remove unecessary params from `add_dynamic_data_source`
leoyvens Aug 12, 2022
6936376
core: Move poi_version out of the instance
leoyvens Aug 12, 2022
801eb36
core: Move `mod instance` under `mod context`
leoyvens Aug 15, 2022
8930c5c
core: Refactor OffchainMonitor::add_data_source
leoyvens Aug 15, 2022
9338787
offchain: Better handling of duplicates
leoyvens Aug 15, 2022
acdc5a7
offchain: Bump max ipfs concurrent requests to 100
leoyvens Aug 15, 2022
ea0311e
refactor: Expose RuntimeHost data source
leoyvens Aug 15, 2022
a95454f
offchain: Remove dses that have been processed
leoyvens Aug 15, 2022
f30b210
refactor: Extract ReadStore out of WritableStore
leoyvens Aug 18, 2022
ad6264d
test: Add graphql queries to end-to-end tests
leoyvens Aug 19, 2022
f59d9c9
feat(file ds): Bump max spec version to 0.0.7
leoyvens Aug 21, 2022
7e99135
test: Add basic file data sources e2e test
leoyvens Aug 21, 2022
d22b51f
runner: Isolate offchain data sources
leoyvens Aug 22, 2022
824957b
offchain: Forbid static file data sources
leoyvens Aug 22, 2022
717b1d0
store: Assign separate causality region for offchain dses
leoyvens Aug 22, 2022
ae8e2d7
graph: Fix release build
leoyvens Aug 23, 2022
11633ef
tests: yarn upgrade, add file ds to the workspace
leoyvens Aug 23, 2022
149a14d
fix: Update comments
leoyvens Aug 23, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Start offchain monitors for static sources
  • Loading branch information
Theodus authored and leoyvens committed Aug 24, 2022
commit 55f8847720b7cfa7d84d9680cd9afde0b68bf163
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 11 additions & 8 deletions chain/ethereum/tests/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use std::time::Duration;

use graph::data::subgraph::schema::SubgraphError;
use graph::data::subgraph::{SPEC_VERSION_0_0_4, SPEC_VERSION_0_0_7};
use graph::data_source::DataSource;
use graph::offchain;
use graph::data_source::{offchain, DataSource};
use graph::prelude::{
anyhow, async_trait, serde_yaml, tokio, DeploymentHash, Entity, Link, Logger, SubgraphManifest,
SubgraphManifestValidationError, UnvalidatedSubgraphManifest,
Expand All @@ -28,6 +27,7 @@ const GQL_SCHEMA_FULLTEXT: &str = include_str!("full-text.graphql");
const MAPPING_WITH_IPFS_FUNC_WASM: &[u8] = include_bytes!("ipfs-on-ethereum-contracts.wasm");
const ABI: &str = "[{\"type\":\"function\", \"inputs\": [{\"name\": \"i\",\"type\": \"uint256\"}],\"name\":\"get\",\"outputs\": [{\"type\": \"address\",\"name\": \"o\"}]}]";
const FILE: &str = "{}";
const FILE_CID: &str = "bafkreigkhuldxkyfkoaye4rgcqcwr45667vkygd45plwq6hawy7j4rbdky";

#[derive(Default, Debug, Clone)]
struct TextResolver {
Expand Down Expand Up @@ -84,7 +84,7 @@ async fn resolve_manifest(
resolver.add("/ipfs/Qmschema", &GQL_SCHEMA);
resolver.add("/ipfs/Qmabi", &ABI);
resolver.add("/ipfs/Qmmapping", &MAPPING_WITH_IPFS_FUNC_WASM);
resolver.add("/ipfs/Qmfile", &FILE);
resolver.add(FILE_CID, &FILE);

let resolver: Arc<dyn LinkResolverTrait> = Arc::new(resolver);

Expand Down Expand Up @@ -130,7 +130,8 @@ specVersion: 0.0.2

#[tokio::test]
async fn ipfs_manifest() {
const YAML: &str = "
let yaml = format!(
"
schema:
file:
/: /ipfs/Qmschema
Expand All @@ -139,7 +140,7 @@ dataSources:
kind: file/ipfs
source:
file:
/: /ipfs/Qmfile
/: {}
mapping:
apiVersion: 0.0.6
language: wasm/assemblyscript
Expand All @@ -149,9 +150,11 @@ dataSources:
/: /ipfs/Qmmapping
handler: handleFile
specVersion: 0.0.7
";
",
FILE_CID
);

let manifest = resolve_manifest(YAML, SPEC_VERSION_0_0_7).await;
let manifest = resolve_manifest(&yaml, SPEC_VERSION_0_0_7).await;

assert_eq!("Qmmanifest", manifest.id.as_str());
assert_eq!(manifest.data_sources.len(), 1);
Expand All @@ -161,7 +164,7 @@ specVersion: 0.0.7
};
assert_eq!(
data_source.source,
Some(offchain::Source::Ipfs(Link::from("/ipfs/Qmfile")))
Some(offchain::Source::Ipfs(FILE_CID.parse().unwrap()))
);
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/polling_monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use graph::util::monitored::MonitoredVecDeque as VecDeque;
use tokio::sync::{mpsc, watch};
use tower::{Service, ServiceExt};

use self::metrics::PollingMonitorMetrics;
pub use self::metrics::PollingMonitorMetrics;

/// Spawn a monitor that actively polls a service. Whenever the service has capacity, the monitor
/// pulls object ids from the queue and polls the service. If the object is not present or in case
Expand Down
72 changes: 69 additions & 3 deletions core/src/subgraph/context.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
use crate::subgraph::SubgraphInstance;
use crate::{
polling_monitor::{
ipfs_service::IpfsService, spawn_monitor, PollingMonitor, PollingMonitorMetrics,
},
subgraph::SubgraphInstance,
};
use bytes::Bytes;
use cid::Cid;
use graph::{
blockchain::Blockchain,
components::store::DeploymentId,
prelude::{CancelGuard, RuntimeHostBuilder},
data_source::offchain,
env::ENV_VARS,
ipfs_client::IpfsClient,
prelude::{CancelGuard, DeploymentHash, MetricsRegistry, RuntimeHostBuilder},
slog::Logger,
tokio::{select, spawn, sync::mpsc},
};

use std::collections::HashMap;
use std::sync::{Arc, RwLock};

Expand All @@ -18,4 +29,59 @@ where
pub instance: SubgraphInstance<C, T>,
pub instances: SharedInstanceKeepAliveMap,
pub filter: C::TriggerFilter,
pub offchain_monitor: OffchainMonitor,
pub offchain_monitor_rx: mpsc::Receiver<(offchain::Source, Bytes)>,
}

pub struct OffchainMonitor {
ipfs_monitor: PollingMonitor<Cid>,
}

impl OffchainMonitor {
pub fn new(
logger: Logger,
registry: Arc<dyn MetricsRegistry>,
subgraph_hash: &DeploymentHash,
client: IpfsClient,
) -> (Self, mpsc::Receiver<(offchain::Source, Bytes)>) {
let (ipfs_monitor_tx, mut ipfs_monitor_rx) = mpsc::channel(10);
let ipfs_service = IpfsService::new(
client,
ENV_VARS.mappings.max_ipfs_file_bytes.unwrap_or(1 << 20) as u64,
ENV_VARS.mappings.ipfs_timeout,
10,
);
let ipfs_monitor = spawn_monitor(
ipfs_service,
ipfs_monitor_tx,
logger,
PollingMonitorMetrics::new(registry, subgraph_hash),
);
let (tx, rx) = mpsc::channel(10);
spawn(async move {
loop {
select! {
msg = ipfs_monitor_rx.recv() => {
match msg {
Some((cid, bytes)) => {
if let Err(_) = tx.send((offchain::Source::Ipfs(cid), bytes)).await {
break;
}
},
None => break,
}
}
}
}
});
(Self { ipfs_monitor }, rx)
}

pub fn monitor(&self, source: offchain::Source) {
match source {
offchain::Source::Ipfs(cid) => {
self.ipfs_monitor.monitor(cid);
}
}
}
}
12 changes: 11 additions & 1 deletion core/src/subgraph/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use graph::{
};
use std::collections::HashMap;

use super::context::OffchainMonitor;

pub struct SubgraphInstance<C: Blockchain, T: RuntimeHostBuilder<C>> {
subgraph_id: DeploymentHash,
network: String,
Expand Down Expand Up @@ -40,6 +42,7 @@ where
host_builder: T,
trigger_processor: Box<dyn TriggerProcessor<C, T>>,
host_metrics: Arc<HostMetrics>,
offchain_monitor: &OffchainMonitor,
) -> Result<Self, Error> {
let subgraph_id = manifest.id.clone();
let network = manifest.network_name();
Expand Down Expand Up @@ -71,14 +74,21 @@ where
Some(ref module_bytes) => module_bytes,
};

// Create services for static offchain data sources
if let DataSource::Offchain(ds) = &ds {
if let Some(source) = &ds.source {
offchain_monitor.monitor(source.clone());
}
}

let host = this.new_host(
logger.cheap_clone(),
ds,
module_bytes,
templates.cheap_clone(),
host_metrics.cheap_clone(),
)?;
this.hosts.push(Arc::new(host))
this.hosts.push(Arc::new(host));
}

Ok(this)
Expand Down
15 changes: 15 additions & 0 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ use graph::blockchain::block_stream::BlockStreamMetrics;
use graph::blockchain::Blockchain;
use graph::blockchain::NodeCapabilities;
use graph::blockchain::{BlockchainKind, TriggerFilter};
use graph::ipfs_client::IpfsClient;
use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *};
use graph::{blockchain::BlockchainMap, components::store::DeploymentLocator};
use graph_runtime_wasm::RuntimeHostBuilder;
use tokio::task;

use super::context::OffchainMonitor;
use super::SubgraphTriggerProcessor;

pub struct SubgraphInstanceManager<S: SubgraphStore> {
Expand All @@ -22,6 +24,7 @@ pub struct SubgraphInstanceManager<S: SubgraphStore> {
manager_metrics: SubgraphInstanceManagerMetrics,
instances: SharedInstanceKeepAliveMap,
link_resolver: Arc<dyn LinkResolver>,
ipfs_client: IpfsClient,
static_filters: bool,
}

Expand Down Expand Up @@ -133,6 +136,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
chains: Arc<BlockchainMap>,
metrics_registry: Arc<dyn MetricsRegistry>,
link_resolver: Arc<dyn LinkResolver>,
ipfs_client: IpfsClient,
static_filters: bool,
) -> Self {
let logger = logger_factory.component_logger("SubgraphInstanceManager", None);
Expand All @@ -146,6 +150,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
metrics_registry,
instances: SharedInstanceKeepAliveMap::default(),
link_resolver,
ipfs_client,
static_filters,
}
}
Expand Down Expand Up @@ -290,6 +295,13 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
stopwatch_metrics,
));

let (offchain_monitor, offchain_monitor_rx) = OffchainMonitor::new(
logger.cheap_clone(),
registry.cheap_clone(),
&manifest.id,
self.ipfs_client.cheap_clone(),
);

// Initialize deployment_head with current deployment head. Any sort of trouble in
// getting the deployment head ptr leads to initializing with 0
let deployment_head = store.block_ptr().map(|ptr| ptr.number).unwrap_or(0) as f64;
Expand All @@ -309,6 +321,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
host_builder,
tp,
host_metrics.clone(),
&offchain_monitor,
)?;

let inputs = IndexingInputs {
Expand All @@ -331,6 +344,8 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
instance,
instances: self.instances.cheap_clone(),
filter,
offchain_monitor,
offchain_monitor_rx,
};

let metrics = RunnerMetrics {
Expand Down
3 changes: 2 additions & 1 deletion graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ async-stream = "0.3"
atomic_refcell = "0.1.8"
bigdecimal = { version = "0.1.0", features = ["serde"] }
bytes = "1.0.1"
cid = "0.8.3"
diesel = { version = "1.4.8", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono"] }
diesel_derives = "1.4"
chrono = "0.4.22"
Expand Down Expand Up @@ -69,4 +70,4 @@ maplit = "1.0.2"
structopt = { version = "0.3" }

[build-dependencies]
tonic-build = { version = "0.7.2", features = ["prost"] }
tonic-build = { version = "0.7.2", features = ["prost"] }
4 changes: 2 additions & 2 deletions graph/src/data/subgraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ use crate::{
subgraph::features::validate_subgraph_features,
},
data_source::{
DataSource, DataSourceTemplate, UnresolvedDataSource, UnresolvedDataSourceTemplate,
offchain::OFFCHAIN_KINDS, DataSource, DataSourceTemplate, UnresolvedDataSource,
UnresolvedDataSourceTemplate,
},
offchain::OFFCHAIN_KINDS,
prelude::{r, CheapClone, ENV_VARS},
};

Expand Down
4 changes: 3 additions & 1 deletion graph/src/data_source.rs → graph/src/data_source/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod offchain;

use crate::{
blockchain::{
Blockchain, DataSource as _, DataSourceTemplate as _, UnresolvedDataSource as _,
Expand All @@ -8,7 +10,7 @@ use crate::{
store::{BlockNumber, StoredDynamicDataSource},
subgraph::DataSourceTemplateInfo,
},
offchain::{self, OFFCHAIN_KINDS},
data_source::offchain::OFFCHAIN_KINDS,
prelude::{CheapClone as _, DataSourceContext},
};
use anyhow::Error;
Expand Down
17 changes: 10 additions & 7 deletions graph/src/offchain/mod.rs → graph/src/data_source/offchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::{
data_source,
prelude::{DataSourceContext, Link},
};

use anyhow::{self, Error};
use cid::Cid;
use serde::Deserialize;
use slog::{info, Logger};
use std::sync::Arc;
Expand Down Expand Up @@ -46,7 +46,7 @@ impl<C: Blockchain> TryFrom<DataSourceTemplateInfo<C>> for DataSource {
kind: template.kind.clone(),
name: template.name.clone(),
manifest_idx: template.manifest_idx,
source: Some(Source::Ipfs(Link::from(source))),
source: Some(Source::Ipfs(source.parse()?)),
mapping: template.mapping.clone(),
context: Arc::new(info.context),
creation_block: Some(info.creation_block),
Expand All @@ -57,7 +57,7 @@ impl<C: Blockchain> TryFrom<DataSourceTemplateInfo<C>> for DataSource {
impl DataSource {
pub fn as_stored_dynamic_data_source(&self) -> StoredDynamicDataSource {
let param = self.source.as_ref().map(|source| match source {
Source::Ipfs(link) => Bytes::from(link.link.as_bytes()),
Source::Ipfs(link) => Bytes::from(link.to_bytes()),
});
let context = self
.context
Expand All @@ -77,9 +77,9 @@ impl DataSource {
stored: StoredDynamicDataSource,
) -> Result<Self, Error> {
let source = stored.param.and_then(|bytes| {
String::from_utf8(bytes.as_slice().to_vec())
Cid::try_from(bytes.as_slice().to_vec())
.ok()
.map(|link| Source::Ipfs(Link::from(link)))
.map(Source::Ipfs)
});
let context = Arc::new(stored.context.map(serde_json::from_value).transpose()?);
Ok(Self {
Expand All @@ -96,7 +96,7 @@ impl DataSource {

#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Source {
Ipfs(Link),
Ipfs(Cid),
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -145,7 +145,10 @@ impl UnresolvedDataSource {
"source" => format_args!("{:?}", &self.source),
);
let source = match self.kind.as_str() {
"file/ipfs" => self.source.map(|src| Source::Ipfs(src.file)),
"file/ipfs" => self
.source
.map(|src| src.file.link.parse().map(Source::Ipfs))
.transpose()?,
_ => {
anyhow::bail!(
"offchain data source has invalid `kind`, expected `file/ipfs` but found {}",
Expand Down
2 changes: 0 additions & 2 deletions graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ pub mod data_source;

pub mod blockchain;

pub mod offchain;

pub mod runtime;

pub mod firehose;
Expand Down
Loading