diff --git a/graph/src/components/store.rs b/graph/src/components/store.rs index 3f5289ff139..0b5b86cf22f 100644 --- a/graph/src/components/store.rs +++ b/graph/src/components/store.rs @@ -500,7 +500,6 @@ pub enum MetadataOperation { entity: String, id: String, data: Entity, - guard: Option<(EntityFilter, String)>, }, } diff --git a/graph/src/data/subgraph/schema.rs b/graph/src/data/subgraph/schema.rs index 5f2dcd8f32d..ad6b4eb04f6 100644 --- a/graph/src/data/subgraph/schema.rs +++ b/graph/src/data/subgraph/schema.rs @@ -157,7 +157,7 @@ impl SubgraphEntity { let mut entity = Entity::new(); entity.set("currentVersion", version_id_opt); - vec![update_metadata_operation(Self::TYPENAME, id, entity, None)] + vec![update_metadata_operation(Self::TYPENAME, id, entity)] } pub fn update_pending_version_operations( @@ -167,7 +167,7 @@ impl SubgraphEntity { let mut entity = Entity::new(); entity.set("pendingVersion", version_id_opt); - vec![update_metadata_operation(Self::TYPENAME, id, entity, None)] + vec![update_metadata_operation(Self::TYPENAME, id, entity)] } } @@ -318,27 +318,16 @@ impl SubgraphDeploymentEntity { pub fn update_ethereum_block_pointer_operations( id: &SubgraphDeploymentId, - block_ptr_from: EthereumBlockPointer, block_ptr_to: EthereumBlockPointer, - reason: &str, ) -> Vec { let mut entity = Entity::new(); entity.set("latestEthereumBlockHash", block_ptr_to.hash_hex()); entity.set("latestEthereumBlockNumber", block_ptr_to.number); - let guard = EntityFilter::And(vec![ - EntityFilter::new_equal("latestEthereumBlockHash", block_ptr_from.hash_hex()), - EntityFilter::new_equal("latestEthereumBlockNumber", block_ptr_from.number), - ]); - let msg = format!( - "advance block pointer ({}) from {} to {}", - reason, block_ptr_from.number, block_ptr_to.number - ); vec![update_metadata_operation( Self::TYPENAME, id.to_string(), entity, - Some((guard, msg)), )] } @@ -355,7 +344,6 @@ impl SubgraphDeploymentEntity { Self::TYPENAME, id.to_string(), entity, - None, )] } @@ -370,7 +358,6 @@ impl SubgraphDeploymentEntity { Self::TYPENAME, id.as_str(), entity, - None, )] } @@ -385,7 +372,6 @@ impl SubgraphDeploymentEntity { Self::TYPENAME, id.as_str(), entity, - None, )] } } @@ -1230,13 +1216,11 @@ fn update_metadata_operation( entity_type_name: impl Into, entity_id: impl Into, data: impl Into, - guard: Option<(EntityFilter, String)>, ) -> MetadataOperation { MetadataOperation::Update { entity: entity_type_name.into(), id: entity_id.into(), data: data.into(), - guard, } } diff --git a/mock/src/store.rs b/mock/src/store.rs index 9404f0172a3..3cbbfe801a0 100644 --- a/mock/src/store.rs +++ b/mock/src/store.rs @@ -223,12 +223,7 @@ impl Store for MockStore { .push(EntityChange::from_key(key, EntityChangeOperation::Set)); } } - MetadataOperation::Update { - entity, - id, - data, - guard, - } => { + MetadataOperation::Update { entity, id, data } => { let key = MetadataOperation::entity_key(entity, id); let entities_of_type = entities .entry(key.subgraph_id.clone()) @@ -238,18 +233,6 @@ impl Store for MockStore { if entities_of_type.contains_key(&key.entity_id) { let existing_entity = entities_of_type.get_mut(&key.entity_id).unwrap(); - if let Some((filter, _)) = guard { - if !entity_matches_filter(existing_entity, &filter) { - return Err(TransactionAbortError::AbortUnless { - expected_entity_ids: vec![key.entity_id], - actual_entity_ids: vec![], - description: - "update failed because entity does not match guard" - .to_owned(), - } - .into()); - } - } existing_entity.merge(data); entity_changes diff --git a/store/postgres/src/entities.rs b/store/postgres/src/entities.rs index ee701dd8340..e7a76978725 100644 --- a/store/postgres/src/entities.rs +++ b/store/postgres/src/entities.rs @@ -45,7 +45,7 @@ use graph::prelude::{ debug, format_err, info, serde_json, warn, AttributeIndexDefinition, Entity, EntityChange, EntityChangeOperation, EntityFilter, EntityKey, EntityModification, Error, EthereumBlockPointer, Logger, QueryExecutionError, StoreError, StoreEvent, - SubgraphDeploymentId, SubgraphDeploymentStore, TransactionAbortError, ValueType, + SubgraphDeploymentId, SubgraphDeploymentStore, ValueType, }; use crate::block_range::{block_number, BlockNumber}; @@ -485,10 +485,9 @@ impl Connection { &self, key: &EntityKey, entity: &Entity, - guard: Option, ) -> Result { match &*self.metadata { - Storage::Json(json) => json.update_metadata(&self.conn, key, entity, guard), + Storage::Json(json) => json.update_metadata(&self.conn, key, entity), Storage::Relational(_) => unreachable!("relational storeage is not used for metadata"), } } @@ -1048,7 +1047,6 @@ impl JsonStorage { conn: &PgConnection, key: &EntityKey, entity: &Entity, - guard: Option, ) -> Result { const NONE: &str = "none"; assert_eq!( @@ -1067,18 +1065,7 @@ impl JsonStorage { entities::event_source.eq(NONE), )); - match guard { - Some(filter) => { - let filter = build_filter(filter).map_err(|e| { - TransactionAbortError::Other(format!( - "invalid filter '{}' for value '{}'", - e.filter, e.value - )) - })?; - Ok(query.filter(filter).execute(conn)?) - } - None => Ok(query.execute(conn)?), - } + Ok(query.execute(conn)?) } fn delete( diff --git a/store/postgres/src/store.rs b/store/postgres/src/store.rs index 067c05e8cc5..d0eac77d907 100644 --- a/store/postgres/src/store.rs +++ b/store/postgres/src/store.rs @@ -453,25 +453,17 @@ impl Store { .into() }) } - MetadataOperation::Update { - entity, - id, - data, - guard, - } => { + MetadataOperation::Update { entity, id, data } => { let key = MetadataOperation::entity_key(entity, id); self.check_interface_entity_uniqueness(conn, &key)?; // Update the entity in Postgres - let (filter, msg) = guard - .map(|(filter, msg)| (Some(filter), msg)) - .unwrap_or((None, "".to_owned())); - match conn.update_metadata(&key, &data, filter)? { + match conn.update_metadata(&key, &data)? { 0 => Err(TransactionAbortError::AbortUnless { expected_entity_ids: vec![key.entity_id.clone()], actual_entity_ids: vec![], - description: format!("{}: update did not change any rows", msg), + description: format!("update for entity {:?} with data {:?} did not change any rows", key, data), } .into()), 1 => Ok(0), @@ -479,8 +471,8 @@ impl Store { expected_entity_ids: vec![key.entity_id.clone()], actual_entity_ids: vec![], description: format!( - "{}: update changed {} rows instead of just one", - msg, res + "update for entity {:?} with data {:?} changed {} rows instead of just one", + key, data, res ), } .into()), @@ -728,12 +720,15 @@ impl Store { Ok(cache.get(&subgraph_id).unwrap().clone()) } -} -impl StoreTrait for Store { - fn block_ptr(&self, subgraph_id: SubgraphDeploymentId) -> Result { + fn block_ptr_with_conn( + &self, + subgraph_id: SubgraphDeploymentId, + conn: &e::Connection, + ) -> Result { + let key = SubgraphDeploymentEntity::key(subgraph_id.clone()); let subgraph_entity = self - .get(SubgraphDeploymentEntity::key(subgraph_id.clone())) + .get_entity(&conn, &key.subgraph_id, &key.entity_type, &key.entity_id) .map_err(|e| format_err!("error reading subgraph entity: {}", e))? .ok_or_else(|| { format_err!( @@ -765,6 +760,17 @@ impl StoreTrait for Store { Ok(EthereumBlockPointer { hash, number }) } +} + +impl StoreTrait for Store { + fn block_ptr(&self, subgraph_id: SubgraphDeploymentId) -> Result { + self.block_ptr_with_conn( + subgraph_id, + &self + .get_entity_conn(&*SUBGRAPHS_ID) + .map_err(|e| QueryExecutionError::StoreError(e.into()))?, + ) + } fn get(&self, key: EntityKey) -> Result, QueryExecutionError> { let conn = self @@ -821,10 +827,6 @@ impl StoreTrait for Store { block_ptr_to: EthereumBlockPointer, mods: Vec, ) -> Result { - // Improvement: Move this inside the transaction. - let block_ptr_from = self.block_ptr(subgraph_id.clone())?; - assert!(block_ptr_from.number < block_ptr_to.number); - // All operations should apply only to entities in this subgraph or // the subgraph of subgraphs if mods @@ -842,6 +844,9 @@ impl StoreTrait for Store { let (event, metadata_event, should_migrate) = econn.transaction(|| -> Result<_, StoreError> { + let block_ptr_from = self.block_ptr(subgraph_id.clone())?; + assert!(block_ptr_from.number < block_ptr_to.number); + // Ensure the history event exists in the database let history_event = econn.create_history_event(block_ptr_to, &mods)?; @@ -861,9 +866,7 @@ impl StoreTrait for Store { let block_ptr_ops = SubgraphDeploymentEntity::update_ethereum_block_pointer_operations( &subgraph_id, - block_ptr_from, block_ptr_to, - "transact block operations", ); let metadata_event = self.apply_metadata_operations_with_conn(&econn, block_ptr_ops)?; @@ -915,11 +918,13 @@ impl StoreTrait for Store { let econn = self.get_entity_conn(&subgraph_id)?; let (event, metadata_event) = econn.transaction(|| -> Result<_, StoreError> { + assert_eq!( + block_ptr_from, + self.block_ptr_with_conn(subgraph_id.clone(), &econn)? + ); let ops = SubgraphDeploymentEntity::update_ethereum_block_pointer_operations( &subgraph_id, - block_ptr_from, block_ptr_to, - "revert block", ); let metadata_event = self.apply_metadata_operations_with_conn(&econn, ops)?;