From c99229cdacd5d9681fb9a4f34f60d32c862b5eee Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 8 Apr 2022 18:27:09 -0700 Subject: [PATCH] core, graph, runtime: Add store.get_in_block --- core/src/subgraph/runner.rs | 17 ++- graph/src/components/store/entity_cache.rs | 33 ++++- graph/src/components/store/mod.rs | 2 +- runtime/wasm/src/host_exports.rs | 5 +- runtime/wasm/src/module/mod.rs | 135 ++++++++++++------- store/test-store/tests/graph/entity_cache.rs | 43 +++++- 6 files changed, 162 insertions(+), 73 deletions(-) diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 455edaa55fb..510e9d195d1 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -6,7 +6,7 @@ use crate::subgraph::stream::new_block_stream; use atomic_refcell::AtomicRefCell; use graph::blockchain::block_stream::{BlockStreamEvent, BlockWithTriggers, FirehoseCursor}; use graph::blockchain::{Block, Blockchain, DataSource as _, TriggerFilter as _}; -use graph::components::store::{EmptyStore, EntityKey, StoredDynamicDataSource}; +use graph::components::store::{EmptyStore, EntityKey, GetScope, StoredDynamicDataSource}; use graph::components::{ store::ModificationsAndCache, subgraph::{MappingError, PoICausalityRegion, ProofOfIndexing, SharedProofOfIndexing}, @@ -1034,14 +1034,13 @@ async fn update_proof_of_indexing( }; // Grab the current digest attribute on this entity - let prev_poi = - entity_cache - .get(&entity_key) - .map_err(Error::from)? - .map(|entity| match entity.get("digest") { - Some(Value::Bytes(b)) => b.clone(), - _ => panic!("Expected POI entity to have a digest and for it to be bytes"), - }); + let prev_poi = entity_cache + .get(&entity_key, GetScope::Store) + .map_err(Error::from)? + .map(|entity| match entity.get("digest") { + Some(Value::Bytes(b)) => b.clone(), + _ => panic!("Expected POI entity to have a digest and for it to be bytes"), + }); // Finish the POI stream, getting the new POI value. let updated_proof_of_indexing = stream.pause(prev_poi.as_deref()); diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index af618bb8aad..3535f8dd107 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -9,6 +9,14 @@ use crate::util::lfu_cache::LfuCache; use super::{DerivedEntityQuery, EntityType, LoadRelatedRequest}; +/// The scope in which the `EntityCache` should perform a `get` operation +pub enum GetScope { + /// Get from all previously stored entities in the store + Store, + /// Get from the entities that have been stored during this block + InBlock, +} + /// A cache for entities from the store that provides the basic functionality /// needed for the store interactions in the host exports. This struct tracks /// how entities are modified, and caches all entities looked up from the @@ -98,18 +106,29 @@ impl EntityCache { self.handler_updates.clear(); } - pub fn get(&mut self, eref: &EntityKey) -> Result, s::QueryExecutionError> { + pub fn get( + &mut self, + key: &EntityKey, + scope: GetScope, + ) -> Result, s::QueryExecutionError> { // Get the current entity, apply any updates from `updates`, then // from `handler_updates`. - let mut entity = self.current.get_entity(&*self.store, eref)?; + let mut entity = match scope { + GetScope::Store => self.current.get_entity(&*self.store, key)?, + GetScope::InBlock => None, + }; - // Always test the cache consistency in debug mode. - debug_assert!(entity == self.store.get(eref).unwrap()); + // Always test the cache consistency in debug mode. The test only + // makes sense when we were actually asked to read from the store + debug_assert!(match scope { + GetScope::Store => entity == self.store.get(key).unwrap(), + GetScope::InBlock => true, + }); - if let Some(op) = self.updates.get(eref).cloned() { + if let Some(op) = self.updates.get(key).cloned() { entity = op.apply_to(entity) } - if let Some(op) = self.handler_updates.get(eref).cloned() { + if let Some(op) = self.handler_updates.get(key).cloned() { entity = op.apply_to(entity) } Ok(entity) @@ -183,7 +202,7 @@ impl EntityCache { // lookup in the database and check again with an entity that merges // the existing entity with the changes if !is_valid { - let entity = self.get(&key)?.ok_or_else(|| { + let entity = self.get(&key, GetScope::Store)?.ok_or_else(|| { anyhow!( "Failed to read entity {}[{}] back from cache", key.entity_type, diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index a1199ac22ae..17bb533b419 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -2,7 +2,7 @@ mod entity_cache; mod err; mod traits; -pub use entity_cache::{EntityCache, ModificationsAndCache}; +pub use entity_cache::{EntityCache, GetScope, ModificationsAndCache}; use diesel::types::{FromSql, ToSql}; pub use err::StoreError; diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index e943749b332..ef2ac1c09c5 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -9,7 +9,7 @@ use wasmtime::Trap; use web3::types::H160; use graph::blockchain::Blockchain; -use graph::components::store::{EnsLookup, LoadRelatedRequest}; +use graph::components::store::{EnsLookup, GetScope, LoadRelatedRequest}; use graph::components::store::{EntityKey, EntityType}; use graph::components::subgraph::{ PoICausalityRegion, ProofOfIndexingEvent, SharedProofOfIndexing, @@ -225,6 +225,7 @@ impl HostExports { entity_type: String, entity_id: String, gas: &GasCounter, + scope: GetScope, ) -> Result, anyhow::Error> { let store_key = EntityKey { entity_type: EntityType::new(entity_type), @@ -233,7 +234,7 @@ impl HostExports { }; self.check_entity_type_access(&store_key.entity_type)?; - let result = state.entity_cache.get(&store_key)?; + let result = state.entity_cache.get(&store_key, scope)?; gas.consume_host_fn(gas::STORE_GET.with_args(complexity::Linear, (&store_key, &result)))?; Ok(result) diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index 94eda62d807..cbcc4eea3e8 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -9,6 +9,7 @@ use std::time::Instant; use anyhow::anyhow; use anyhow::Error; +use graph::components::store::GetScope; use graph::slog::SendSyncRefUnwindSafeKV; use never::Never; use semver::Version; @@ -535,6 +536,13 @@ impl WasmInstance { id, field ); + link!( + "store.get_in_block", + store_get_in_block, + "host_export_store_get_in_block", + entity, + id + ); link!( "store.set", store_set, @@ -910,6 +918,71 @@ impl WasmInstanceContext { experimental_features, }) } + + fn store_get_scoped( + &mut self, + gas: &GasCounter, + entity_ptr: AscPtr, + id_ptr: AscPtr, + scope: GetScope, + ) -> Result, HostExportError> { + let _timer = self + .host_metrics + .cheap_clone() + .time_host_fn_execution_region("store_get"); + + let entity_type: String = asc_get(self, entity_ptr, gas)?; + let id: String = asc_get(self, id_ptr, gas)?; + let entity_option = self.ctx.host_exports.store_get( + &mut self.ctx.state, + entity_type.clone(), + id.clone(), + gas, + scope, + )?; + + if self.ctx.instrument { + debug!(self.ctx.logger, "store_get"; + "type" => &entity_type, + "id" => &id, + "found" => entity_option.is_some()); + } + + let ret = match entity_option { + Some(entity) => { + let _section = self + .host_metrics + .stopwatch + .start_section("store_get_asc_new"); + asc_new(self, &entity.sorted(), gas)? + } + None => match &self.ctx.debug_fork { + Some(fork) => { + let entity_option = fork.fetch(entity_type, id).map_err(|e| { + HostExportError::Unknown(anyhow!( + "store_get: failed to fetch entity from the debug fork: {}", + e + )) + })?; + match entity_option { + Some(entity) => { + let _section = self + .host_metrics + .stopwatch + .start_section("store_get_asc_new"); + let entity = asc_new(self, &entity.sorted(), gas)?; + self.store_set(gas, entity_ptr, id_ptr, entity)?; + entity + } + None => AscPtr::null(), + } + } + None => AscPtr::null(), + }, + }; + + Ok(ret) + } } // Implementation of externals. @@ -1012,59 +1085,17 @@ impl WasmInstanceContext { entity_ptr: AscPtr, id_ptr: AscPtr, ) -> Result, HostExportError> { - let _timer = self - .host_metrics - .cheap_clone() - .time_host_fn_execution_region("store_get"); - - let entity_type: String = asc_get(self, entity_ptr, gas)?; - let id: String = asc_get(self, id_ptr, gas)?; - let entity_option = self.ctx.host_exports.store_get( - &mut self.ctx.state, - entity_type.clone(), - id.clone(), - gas, - )?; - if self.ctx.instrument { - debug!(self.ctx.logger, "store_get"; - "type" => &entity_type, - "id" => &id, - "found" => entity_option.is_some()); - } - let ret = match entity_option { - Some(entity) => { - let _section = self - .host_metrics - .stopwatch - .start_section("store_get_asc_new"); - asc_new(self, &entity.sorted(), gas)? - } - None => match &self.ctx.debug_fork { - Some(fork) => { - let entity_option = fork.fetch(entity_type, id).map_err(|e| { - HostExportError::Unknown(anyhow!( - "store_get: failed to fetch entity from the debug fork: {}", - e - )) - })?; - match entity_option { - Some(entity) => { - let _section = self - .host_metrics - .stopwatch - .start_section("store_get_asc_new"); - let entity = asc_new(self, &entity.sorted(), gas)?; - self.store_set(gas, entity_ptr, id_ptr, entity)?; - entity - } - None => AscPtr::null(), - } - } - None => AscPtr::null(), - }, - }; + self.store_get_scoped(gas, entity_ptr, id_ptr, GetScope::Store) + } - Ok(ret) + /// function store.get_in_block(entity: string, id: string): Entity | null + pub fn store_get_in_block( + &mut self, + gas: &GasCounter, + entity_ptr: AscPtr, + id_ptr: AscPtr, + ) -> Result, HostExportError> { + self.store_get_scoped(gas, entity_ptr, id_ptr, GetScope::InBlock) } /// function store.loadRelated(entity_type: string, id: string, field: string): Array diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index d0fc7dd3b26..d284a98d107 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -1,7 +1,7 @@ use graph::blockchain::block_stream::FirehoseCursor; use graph::components::store::{ - DeploymentCursorTracker, DerivedEntityQuery, EntityKey, EntityType, LoadRelatedRequest, - ReadStore, StoredDynamicDataSource, WritableStore, + DeploymentCursorTracker, DerivedEntityQuery, EntityKey, EntityType, GetScope, + LoadRelatedRequest, ReadStore, StoredDynamicDataSource, WritableStore, }; use graph::data::subgraph::schema::{DeploymentCreate, SubgraphError, SubgraphHealth}; use graph::data_source::CausalityRegion; @@ -752,3 +752,42 @@ fn check_for_delete_async_related() { assert_eq!(result, expeted_vec); }); } + +#[test] +fn scoped_get() { + run_store_test(|mut cache, _store, _deployment, _writable| async move { + // Key for an existing entity that is in the store + let key1 = EntityKey::data(WALLET.to_owned(), "1".to_owned()); + let wallet1 = create_wallet_entity("1", "1", 67); + + // Create a new entity that is not in the store + let wallet5 = create_wallet_entity("5", "5", 100); + let key5 = EntityKey::data(WALLET.to_owned(), "5".to_owned()); + cache.set(key5.clone(), wallet5.clone()).unwrap(); + + // For the new entity, we can retrieve it with either scope + let act5 = cache.get(&key5, GetScope::InBlock).unwrap(); + assert_eq!(Some(&wallet5), act5.as_ref()); + let act5 = cache.get(&key5, GetScope::Store).unwrap(); + assert_eq!(Some(&wallet5), act5.as_ref()); + + // For an entity in the store, we can not get it `InBlock` but with + // `Store` + let act1 = cache.get(&key1, GetScope::InBlock).unwrap(); + assert_eq!(None, act1); + let act1 = cache.get(&key1, GetScope::Store).unwrap(); + assert_eq!(Some(&wallet1), act1.as_ref()); + // Even after reading from the store, the entity is not visible with + // `InBlock` + let act1 = cache.get(&key1, GetScope::InBlock).unwrap(); + assert_eq!(None, act1); + // But if it gets updated, it becomes visible with either scope + let mut wallet1 = wallet1; + wallet1.set("balance", 70); + cache.set(key1.clone(), wallet1.clone()).unwrap(); + let act1 = cache.get(&key1, GetScope::InBlock).unwrap(); + assert_eq!(Some(&wallet1), act1.as_ref()); + let act1 = cache.get(&key1, GetScope::Store).unwrap(); + assert_eq!(Some(&wallet1), act1.as_ref()); + }); +}