diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs
index 455edaa55fb..510e9d195d1 100644
--- a/core/src/subgraph/runner.rs
+++ b/core/src/subgraph/runner.rs
@@ -6,7 +6,7 @@ use crate::subgraph::stream::new_block_stream;
use atomic_refcell::AtomicRefCell;
use graph::blockchain::block_stream::{BlockStreamEvent, BlockWithTriggers, FirehoseCursor};
use graph::blockchain::{Block, Blockchain, DataSource as _, TriggerFilter as _};
-use graph::components::store::{EmptyStore, EntityKey, StoredDynamicDataSource};
+use graph::components::store::{EmptyStore, EntityKey, GetScope, StoredDynamicDataSource};
use graph::components::{
store::ModificationsAndCache,
subgraph::{MappingError, PoICausalityRegion, ProofOfIndexing, SharedProofOfIndexing},
@@ -1034,14 +1034,13 @@ async fn update_proof_of_indexing(
};
// Grab the current digest attribute on this entity
- let prev_poi =
- entity_cache
- .get(&entity_key)
- .map_err(Error::from)?
- .map(|entity| match entity.get("digest") {
- Some(Value::Bytes(b)) => b.clone(),
- _ => panic!("Expected POI entity to have a digest and for it to be bytes"),
- });
+ let prev_poi = entity_cache
+ .get(&entity_key, GetScope::Store)
+ .map_err(Error::from)?
+ .map(|entity| match entity.get("digest") {
+ Some(Value::Bytes(b)) => b.clone(),
+ _ => panic!("Expected POI entity to have a digest and for it to be bytes"),
+ });
// Finish the POI stream, getting the new POI value.
let updated_proof_of_indexing = stream.pause(prev_poi.as_deref());
diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs
index af618bb8aad..3535f8dd107 100644
--- a/graph/src/components/store/entity_cache.rs
+++ b/graph/src/components/store/entity_cache.rs
@@ -9,6 +9,14 @@ use crate::util::lfu_cache::LfuCache;
use super::{DerivedEntityQuery, EntityType, LoadRelatedRequest};
+/// The scope in which the `EntityCache` should perform a `get` operation
+pub enum GetScope {
+ /// Get from all previously stored entities in the store
+ Store,
+ /// Get from the entities that have been stored during this block
+ InBlock,
+}
+
/// A cache for entities from the store that provides the basic functionality
/// needed for the store interactions in the host exports. This struct tracks
/// how entities are modified, and caches all entities looked up from the
@@ -98,18 +106,29 @@ impl EntityCache {
self.handler_updates.clear();
}
- pub fn get(&mut self, eref: &EntityKey) -> Result, s::QueryExecutionError> {
+ pub fn get(
+ &mut self,
+ key: &EntityKey,
+ scope: GetScope,
+ ) -> Result , s::QueryExecutionError> {
// Get the current entity, apply any updates from `updates`, then
// from `handler_updates`.
- let mut entity = self.current.get_entity(&*self.store, eref)?;
+ let mut entity = match scope {
+ GetScope::Store => self.current.get_entity(&*self.store, key)?,
+ GetScope::InBlock => None,
+ };
- // Always test the cache consistency in debug mode.
- debug_assert!(entity == self.store.get(eref).unwrap());
+ // Always test the cache consistency in debug mode. The test only
+ // makes sense when we were actually asked to read from the store
+ debug_assert!(match scope {
+ GetScope::Store => entity == self.store.get(key).unwrap(),
+ GetScope::InBlock => true,
+ });
- if let Some(op) = self.updates.get(eref).cloned() {
+ if let Some(op) = self.updates.get(key).cloned() {
entity = op.apply_to(entity)
}
- if let Some(op) = self.handler_updates.get(eref).cloned() {
+ if let Some(op) = self.handler_updates.get(key).cloned() {
entity = op.apply_to(entity)
}
Ok(entity)
@@ -183,7 +202,7 @@ impl EntityCache {
// lookup in the database and check again with an entity that merges
// the existing entity with the changes
if !is_valid {
- let entity = self.get(&key)?.ok_or_else(|| {
+ let entity = self.get(&key, GetScope::Store)?.ok_or_else(|| {
anyhow!(
"Failed to read entity {}[{}] back from cache",
key.entity_type,
diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs
index a1199ac22ae..17bb533b419 100644
--- a/graph/src/components/store/mod.rs
+++ b/graph/src/components/store/mod.rs
@@ -2,7 +2,7 @@ mod entity_cache;
mod err;
mod traits;
-pub use entity_cache::{EntityCache, ModificationsAndCache};
+pub use entity_cache::{EntityCache, GetScope, ModificationsAndCache};
use diesel::types::{FromSql, ToSql};
pub use err::StoreError;
diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs
index e943749b332..ef2ac1c09c5 100644
--- a/runtime/wasm/src/host_exports.rs
+++ b/runtime/wasm/src/host_exports.rs
@@ -9,7 +9,7 @@ use wasmtime::Trap;
use web3::types::H160;
use graph::blockchain::Blockchain;
-use graph::components::store::{EnsLookup, LoadRelatedRequest};
+use graph::components::store::{EnsLookup, GetScope, LoadRelatedRequest};
use graph::components::store::{EntityKey, EntityType};
use graph::components::subgraph::{
PoICausalityRegion, ProofOfIndexingEvent, SharedProofOfIndexing,
@@ -225,6 +225,7 @@ impl HostExports {
entity_type: String,
entity_id: String,
gas: &GasCounter,
+ scope: GetScope,
) -> Result, anyhow::Error> {
let store_key = EntityKey {
entity_type: EntityType::new(entity_type),
@@ -233,7 +234,7 @@ impl HostExports {
};
self.check_entity_type_access(&store_key.entity_type)?;
- let result = state.entity_cache.get(&store_key)?;
+ let result = state.entity_cache.get(&store_key, scope)?;
gas.consume_host_fn(gas::STORE_GET.with_args(complexity::Linear, (&store_key, &result)))?;
Ok(result)
diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs
index 94eda62d807..cbcc4eea3e8 100644
--- a/runtime/wasm/src/module/mod.rs
+++ b/runtime/wasm/src/module/mod.rs
@@ -9,6 +9,7 @@ use std::time::Instant;
use anyhow::anyhow;
use anyhow::Error;
+use graph::components::store::GetScope;
use graph::slog::SendSyncRefUnwindSafeKV;
use never::Never;
use semver::Version;
@@ -535,6 +536,13 @@ impl WasmInstance {
id,
field
);
+ link!(
+ "store.get_in_block",
+ store_get_in_block,
+ "host_export_store_get_in_block",
+ entity,
+ id
+ );
link!(
"store.set",
store_set,
@@ -910,6 +918,71 @@ impl WasmInstanceContext {
experimental_features,
})
}
+
+ fn store_get_scoped(
+ &mut self,
+ gas: &GasCounter,
+ entity_ptr: AscPtr,
+ id_ptr: AscPtr,
+ scope: GetScope,
+ ) -> Result, HostExportError> {
+ let _timer = self
+ .host_metrics
+ .cheap_clone()
+ .time_host_fn_execution_region("store_get");
+
+ let entity_type: String = asc_get(self, entity_ptr, gas)?;
+ let id: String = asc_get(self, id_ptr, gas)?;
+ let entity_option = self.ctx.host_exports.store_get(
+ &mut self.ctx.state,
+ entity_type.clone(),
+ id.clone(),
+ gas,
+ scope,
+ )?;
+
+ if self.ctx.instrument {
+ debug!(self.ctx.logger, "store_get";
+ "type" => &entity_type,
+ "id" => &id,
+ "found" => entity_option.is_some());
+ }
+
+ let ret = match entity_option {
+ Some(entity) => {
+ let _section = self
+ .host_metrics
+ .stopwatch
+ .start_section("store_get_asc_new");
+ asc_new(self, &entity.sorted(), gas)?
+ }
+ None => match &self.ctx.debug_fork {
+ Some(fork) => {
+ let entity_option = fork.fetch(entity_type, id).map_err(|e| {
+ HostExportError::Unknown(anyhow!(
+ "store_get: failed to fetch entity from the debug fork: {}",
+ e
+ ))
+ })?;
+ match entity_option {
+ Some(entity) => {
+ let _section = self
+ .host_metrics
+ .stopwatch
+ .start_section("store_get_asc_new");
+ let entity = asc_new(self, &entity.sorted(), gas)?;
+ self.store_set(gas, entity_ptr, id_ptr, entity)?;
+ entity
+ }
+ None => AscPtr::null(),
+ }
+ }
+ None => AscPtr::null(),
+ },
+ };
+
+ Ok(ret)
+ }
}
// Implementation of externals.
@@ -1012,59 +1085,17 @@ impl WasmInstanceContext {
entity_ptr: AscPtr,
id_ptr: AscPtr,
) -> Result, HostExportError> {
- let _timer = self
- .host_metrics
- .cheap_clone()
- .time_host_fn_execution_region("store_get");
-
- let entity_type: String = asc_get(self, entity_ptr, gas)?;
- let id: String = asc_get(self, id_ptr, gas)?;
- let entity_option = self.ctx.host_exports.store_get(
- &mut self.ctx.state,
- entity_type.clone(),
- id.clone(),
- gas,
- )?;
- if self.ctx.instrument {
- debug!(self.ctx.logger, "store_get";
- "type" => &entity_type,
- "id" => &id,
- "found" => entity_option.is_some());
- }
- let ret = match entity_option {
- Some(entity) => {
- let _section = self
- .host_metrics
- .stopwatch
- .start_section("store_get_asc_new");
- asc_new(self, &entity.sorted(), gas)?
- }
- None => match &self.ctx.debug_fork {
- Some(fork) => {
- let entity_option = fork.fetch(entity_type, id).map_err(|e| {
- HostExportError::Unknown(anyhow!(
- "store_get: failed to fetch entity from the debug fork: {}",
- e
- ))
- })?;
- match entity_option {
- Some(entity) => {
- let _section = self
- .host_metrics
- .stopwatch
- .start_section("store_get_asc_new");
- let entity = asc_new(self, &entity.sorted(), gas)?;
- self.store_set(gas, entity_ptr, id_ptr, entity)?;
- entity
- }
- None => AscPtr::null(),
- }
- }
- None => AscPtr::null(),
- },
- };
+ self.store_get_scoped(gas, entity_ptr, id_ptr, GetScope::Store)
+ }
- Ok(ret)
+ /// function store.get_in_block(entity: string, id: string): Entity | null
+ pub fn store_get_in_block(
+ &mut self,
+ gas: &GasCounter,
+ entity_ptr: AscPtr,
+ id_ptr: AscPtr,
+ ) -> Result, HostExportError> {
+ self.store_get_scoped(gas, entity_ptr, id_ptr, GetScope::InBlock)
}
/// function store.loadRelated(entity_type: string, id: string, field: string): Array
diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs
index d0fc7dd3b26..d284a98d107 100644
--- a/store/test-store/tests/graph/entity_cache.rs
+++ b/store/test-store/tests/graph/entity_cache.rs
@@ -1,7 +1,7 @@
use graph::blockchain::block_stream::FirehoseCursor;
use graph::components::store::{
- DeploymentCursorTracker, DerivedEntityQuery, EntityKey, EntityType, LoadRelatedRequest,
- ReadStore, StoredDynamicDataSource, WritableStore,
+ DeploymentCursorTracker, DerivedEntityQuery, EntityKey, EntityType, GetScope,
+ LoadRelatedRequest, ReadStore, StoredDynamicDataSource, WritableStore,
};
use graph::data::subgraph::schema::{DeploymentCreate, SubgraphError, SubgraphHealth};
use graph::data_source::CausalityRegion;
@@ -752,3 +752,42 @@ fn check_for_delete_async_related() {
assert_eq!(result, expeted_vec);
});
}
+
+#[test]
+fn scoped_get() {
+ run_store_test(|mut cache, _store, _deployment, _writable| async move {
+ // Key for an existing entity that is in the store
+ let key1 = EntityKey::data(WALLET.to_owned(), "1".to_owned());
+ let wallet1 = create_wallet_entity("1", "1", 67);
+
+ // Create a new entity that is not in the store
+ let wallet5 = create_wallet_entity("5", "5", 100);
+ let key5 = EntityKey::data(WALLET.to_owned(), "5".to_owned());
+ cache.set(key5.clone(), wallet5.clone()).unwrap();
+
+ // For the new entity, we can retrieve it with either scope
+ let act5 = cache.get(&key5, GetScope::InBlock).unwrap();
+ assert_eq!(Some(&wallet5), act5.as_ref());
+ let act5 = cache.get(&key5, GetScope::Store).unwrap();
+ assert_eq!(Some(&wallet5), act5.as_ref());
+
+ // For an entity in the store, we can not get it `InBlock` but with
+ // `Store`
+ let act1 = cache.get(&key1, GetScope::InBlock).unwrap();
+ assert_eq!(None, act1);
+ let act1 = cache.get(&key1, GetScope::Store).unwrap();
+ assert_eq!(Some(&wallet1), act1.as_ref());
+ // Even after reading from the store, the entity is not visible with
+ // `InBlock`
+ let act1 = cache.get(&key1, GetScope::InBlock).unwrap();
+ assert_eq!(None, act1);
+ // But if it gets updated, it becomes visible with either scope
+ let mut wallet1 = wallet1;
+ wallet1.set("balance", 70);
+ cache.set(key1.clone(), wallet1.clone()).unwrap();
+ let act1 = cache.get(&key1, GetScope::InBlock).unwrap();
+ assert_eq!(Some(&wallet1), act1.as_ref());
+ let act1 = cache.get(&key1, GetScope::Store).unwrap();
+ assert_eq!(Some(&wallet1), act1.as_ref());
+ });
+}