Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add runtime support for offchain data sources & templates #3791

Merged
merged 33 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a6b33f5
Refactor manifest data sources
Theodus Jul 28, 2022
48ccdd3
Refactor manifest data source templates
Theodus Jul 29, 2022
55f8847
Start offchain monitors for static sources
Theodus Jul 29, 2022
ca94702
Run offchain handlers
Theodus Jul 30, 2022
c6338c6
offchain: dont expect `manifest_idx` in the manifest
leoyvens Aug 10, 2022
3f51dc0
offchain: add `match_and_decode`, persist normally, require source
leoyvens Aug 10, 2022
4def210
trigger processor: take block ptr from the trigger
leoyvens Aug 10, 2022
f237fea
offchain: Return cid to dataSource.address host fn
leoyvens Aug 10, 2022
d43a2fa
runner: transact modifications of offchain events
leoyvens Aug 10, 2022
b1b224a
ethereum: fix test build
leoyvens Aug 11, 2022
d5a5174
ipfs: Set a default maximum file size
leoyvens Aug 11, 2022
7347307
ipfs: Add env var for max concurrent requests
leoyvens Aug 11, 2022
e17fd10
ipfs: Share ipfs service across subgraphs
leoyvens Aug 11, 2022
516e104
offchain: move `ready_offchain_events` to `OffchainMonitor`
leoyvens Aug 11, 2022
3f940c0
runner: Clarify comments
leoyvens Aug 12, 2022
c3a0855
core: Remove unecessary params from `add_dynamic_data_source`
leoyvens Aug 12, 2022
6936376
core: Move poi_version out of the instance
leoyvens Aug 12, 2022
801eb36
core: Move `mod instance` under `mod context`
leoyvens Aug 15, 2022
8930c5c
core: Refactor OffchainMonitor::add_data_source
leoyvens Aug 15, 2022
9338787
offchain: Better handling of duplicates
leoyvens Aug 15, 2022
acdc5a7
offchain: Bump max ipfs concurrent requests to 100
leoyvens Aug 15, 2022
ea0311e
refactor: Expose RuntimeHost data source
leoyvens Aug 15, 2022
a95454f
offchain: Remove dses that have been processed
leoyvens Aug 15, 2022
f30b210
refactor: Extract ReadStore out of WritableStore
leoyvens Aug 18, 2022
ad6264d
test: Add graphql queries to end-to-end tests
leoyvens Aug 19, 2022
f59d9c9
feat(file ds): Bump max spec version to 0.0.7
leoyvens Aug 21, 2022
7e99135
test: Add basic file data sources e2e test
leoyvens Aug 21, 2022
d22b51f
runner: Isolate offchain data sources
leoyvens Aug 22, 2022
824957b
offchain: Forbid static file data sources
leoyvens Aug 22, 2022
717b1d0
store: Assign separate causality region for offchain dses
leoyvens Aug 22, 2022
ae8e2d7
graph: Fix release build
leoyvens Aug 23, 2022
11633ef
tests: yarn upgrade, add file ds to the workspace
leoyvens Aug 23, 2022
149a14d
fix: Update comments
leoyvens Aug 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
store: Assign separate causality region for offchain dses
  • Loading branch information
leoyvens committed Aug 24, 2022
commit 717b1d0c9d100cfbd422de9dc26daec861814761
7 changes: 7 additions & 0 deletions chain/ethereum/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl blockchain::DataSource<Chain> for DataSource {
.as_ref()
.map(|ctx| serde_json::to_value(&ctx).unwrap()),
creation_block: self.creation_block,
is_offchain: false,
}
}

Expand All @@ -143,8 +144,14 @@ impl blockchain::DataSource<Chain> for DataSource {
param,
context,
creation_block,
is_offchain,
} = stored;

ensure!(
!is_offchain,
"attempted to convert offchain data source to ethereum data source"
);

let context = context.map(serde_json::from_value).transpose()?;

let contract_abi = template.mapping.find_abi(&template.source.abi)?;
Expand Down
1 change: 1 addition & 0 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ pub struct StoredDynamicDataSource {
pub param: Option<Bytes>,
pub context: Option<serde_json::Value>,
pub creation_block: Option<BlockNumber>,
pub is_offchain: bool,
}

/// An internal identifer for the specific instance of a deployment. The
Expand Down
1 change: 1 addition & 0 deletions graph/src/data_source/offchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl DataSource {
param: Some(param),
context,
creation_block: self.creation_block,
is_offchain: true,
}
}

Expand Down
85 changes: 56 additions & 29 deletions store/postgres/src/dynds/private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) struct DataSourcesTable {
table: DynTable,
vid: DynColumn<Integer>,
block_range: DynColumn<sql_types::Range<Integer>>,
_causality_region: DynColumn<Integer>,
causality_region: DynColumn<Integer>,
manifest_idx: DynColumn<Integer>,
param: DynColumn<Nullable<Binary>>,
context: DynColumn<Nullable<Jsonb>>,
Expand All @@ -43,7 +43,7 @@ impl DataSourcesTable {
namespace,
vid: table.column("vid"),
block_range: table.column("block_range"),
_causality_region: table.column("causality_region"),
causality_region: table.column("causality_region"),
manifest_idx: table.column("manifest_idx"),
param: table.column("param"),
context: table.column("context"),
Expand Down Expand Up @@ -85,6 +85,7 @@ impl DataSourcesTable {
i32,
Option<Vec<u8>>,
Option<serde_json::Value>,
i32,
);
let tuples = self
.table
Expand All @@ -95,26 +96,34 @@ impl DataSourcesTable {
&self.manifest_idx,
&self.param,
&self.context,
&self.causality_region,
))
.order_by(&self.vid)
.load::<Tuple>(conn)?;

let mut dses: Vec<_> = tuples
.into_iter()
.map(|(block_range, manifest_idx, param, context)| {
let creation_block = match block_range.0 {
Bound::Included(block) => Some(block),

// Should never happen.
Bound::Excluded(_) | Bound::Unbounded => unreachable!("dds with open creation"),
};
StoredDynamicDataSource {
manifest_idx: manifest_idx as u32,
param: param.map(|p| p.into()),
context,
creation_block,
}
})
.map(
|(block_range, manifest_idx, param, context, causality_region)| {
let creation_block = match block_range.0 {
Bound::Included(block) => Some(block),

// Should never happen.
Bound::Excluded(_) | Bound::Unbounded => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're dealing with integers, wouldn't Bound::Excluded(block) be the same as Bound::Included(block+1)? It probably won't happen but isn't really fatal.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well we don't use or plan to use excluded initial bounds, so we can't know what they would mean. And our usage of lower(block_range) assumes an included bound, so it's best to blow up if an excluded bound is ever found.

unreachable!("dds with open creation")
}
};

let is_offchain = causality_region > 0;
StoredDynamicDataSource {
manifest_idx: manifest_idx as u32,
param: param.map(|p| p.into()),
context,
creation_block,
is_offchain,
}
},
)
.collect();

// This sort is stable and `tuples` was ordered by vid, so `dses` will be ordered by `(creation_block, vid)`.
Expand All @@ -129,9 +138,6 @@ impl DataSourcesTable {
data_sources: &[StoredDynamicDataSource],
block: BlockNumber,
) -> Result<usize, StoreError> {
// Currently all data sources share the same causality region.
let causality_region = 0;

let mut inserted_total = 0;

for ds in data_sources {
Expand All @@ -140,6 +146,7 @@ impl DataSourcesTable {
param,
context,
creation_block,
is_offchain,
} = ds;

if creation_block != &Some(block) {
Expand All @@ -150,19 +157,32 @@ impl DataSourcesTable {
));
}

let query = format!(
"insert into {}(block_range, manifest_idx, causality_region, param, context) \
values (int4range($1, null), $2, $3, $4, $5)",
self.qname
);

inserted_total += sql_query(query)
// Onchain data sources have the causality region explicitly set to 0, while offchain
// data sources have an unique causality region assigned from the sequence.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment would match the code better if was something like

// Offchain data sources have a unique causality region assigned from a sequence in the database,
// while onchain data sources always have causality region 0

let query = match is_offchain {
false => format!(
"insert into {}(block_range, manifest_idx, param, context, causality_region) \
values (int4range($1, null), $2, $3, $4, $5)",
self.qname
),

true => format!(
"insert into {}(block_range, manifest_idx, param, context) \
values (int4range($1, null), $2, $3, $4)",
self.qname
),
};

let query = sql_query(query)
.bind::<Nullable<Integer>, _>(creation_block)
.bind::<Integer, _>(*manifest_idx as i32)
.bind::<Integer, _>(causality_region)
.bind::<Nullable<Binary>, _>(param.as_ref().map(|p| &**p))
.bind::<Nullable<Jsonb>, _>(context)
.execute(conn)?;
.bind::<Nullable<Jsonb>, _>(context);

inserted_total += match is_offchain {
false => query.bind::<Integer, _>(0).execute(conn)?,
true => query.execute(conn)?,
};
}

Ok(inserted_total)
Expand Down Expand Up @@ -233,8 +253,15 @@ impl DataSourcesTable {
param,
context,
creation_block,
is_offchain,
} = ds;

if !is_offchain {
return Err(constraint_violation!(
"called remove_offchain with onchain data sources"
));
}

let query = format!(
"update {} set block_range = 'empty'::int4range \
where manifest_idx = $1
Expand Down
12 changes: 12 additions & 0 deletions store/postgres/src/dynds/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ pub(super) fn load(
param: Some(address.into()),
context: context.map(|ctx| serde_json::from_str(&ctx)).transpose()?,
creation_block,

// The shared schema is only used for legacy deployments, and therefore not used for
// subgraphs that use file data sources.
is_offchain: false,
};

if data_sources.last().and_then(|d| d.creation_block) > data_source.creation_block {
Expand Down Expand Up @@ -116,7 +120,15 @@ pub(super) fn insert(
param,
context,
creation_block: _,
is_offchain,
} = ds;

if *is_offchain {
return Err(constraint_violation!(
"using shared data source schema with file data sources"
));
}

let address = match param {
Some(param) => param,
None => {
Expand Down
10 changes: 8 additions & 2 deletions tests/tests/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ async fn file_data_sources() {
let blocks = {
let block_0 = genesis();
let block_1 = empty_block(block_0.ptr(), test_ptr(1));
vec![block_0, block_1]
let block_2 = empty_block(block_1.ptr(), test_ptr(2));
vec![block_0, block_1, block_2]
};
let stop_block = blocks.last().unwrap().block.ptr();
let stop_block = test_ptr(1);
let chain = Arc::new(chain(blocks, &stores).await);
let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, chain, None).await;
ctx.start_and_sync_to(stop_block).await;
Expand All @@ -144,4 +145,9 @@ async fn file_data_sources() {
query_res,
Some(object! { ipfsFile: object!{ id: id , content: "[]" } })
);

// Test loading offchain data sources from DB.
ctx.provider.stop(ctx.deployment.clone()).await.unwrap();
let stop_block = test_ptr(2);
ctx.start_and_sync_to(stop_block).await;
}