Skip to content

Commit

Permalink
store: Remove guard from MetadataOperation::Update
Browse files Browse the repository at this point in the history
It was only used in `update_ethereum_block_pointer_operations`,
which in turn is only used in reverts or transact block.

For transact block, the guard was made totally redundant
as we now fetch the block in the same transaction.

For reverts, I was more conservative and simply added
an assert to replace the guard.
  • Loading branch information
leoyvens committed Oct 29, 2019
1 parent d192e72 commit b7e5424
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 78 deletions.
1 change: 0 additions & 1 deletion graph/src/components/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,6 @@ pub enum MetadataOperation {
entity: String,
id: String,
data: Entity,
guard: Option<(EntityFilter, String)>,
},
}

Expand Down
20 changes: 2 additions & 18 deletions graph/src/data/subgraph/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl SubgraphEntity {
let mut entity = Entity::new();
entity.set("currentVersion", version_id_opt);

vec![update_metadata_operation(Self::TYPENAME, id, entity, None)]
vec![update_metadata_operation(Self::TYPENAME, id, entity)]
}

pub fn update_pending_version_operations(
Expand All @@ -167,7 +167,7 @@ impl SubgraphEntity {
let mut entity = Entity::new();
entity.set("pendingVersion", version_id_opt);

vec![update_metadata_operation(Self::TYPENAME, id, entity, None)]
vec![update_metadata_operation(Self::TYPENAME, id, entity)]
}
}

Expand Down Expand Up @@ -318,27 +318,16 @@ impl SubgraphDeploymentEntity {

pub fn update_ethereum_block_pointer_operations(
id: &SubgraphDeploymentId,
block_ptr_from: EthereumBlockPointer,
block_ptr_to: EthereumBlockPointer,
reason: &str,
) -> Vec<MetadataOperation> {
let mut entity = Entity::new();
entity.set("latestEthereumBlockHash", block_ptr_to.hash_hex());
entity.set("latestEthereumBlockNumber", block_ptr_to.number);

let guard = EntityFilter::And(vec![
EntityFilter::new_equal("latestEthereumBlockHash", block_ptr_from.hash_hex()),
EntityFilter::new_equal("latestEthereumBlockNumber", block_ptr_from.number),
]);
let msg = format!(
"advance block pointer ({}) from {} to {}",
reason, block_ptr_from.number, block_ptr_to.number
);
vec![update_metadata_operation(
Self::TYPENAME,
id.to_string(),
entity,
Some((guard, msg)),
)]
}

Expand All @@ -355,7 +344,6 @@ impl SubgraphDeploymentEntity {
Self::TYPENAME,
id.to_string(),
entity,
None,
)]
}

Expand All @@ -370,7 +358,6 @@ impl SubgraphDeploymentEntity {
Self::TYPENAME,
id.as_str(),
entity,
None,
)]
}

Expand All @@ -385,7 +372,6 @@ impl SubgraphDeploymentEntity {
Self::TYPENAME,
id.as_str(),
entity,
None,
)]
}
}
Expand Down Expand Up @@ -1230,13 +1216,11 @@ fn update_metadata_operation(
entity_type_name: impl Into<String>,
entity_id: impl Into<String>,
data: impl Into<Entity>,
guard: Option<(EntityFilter, String)>,
) -> MetadataOperation {
MetadataOperation::Update {
entity: entity_type_name.into(),
id: entity_id.into(),
data: data.into(),
guard,
}
}

Expand Down
19 changes: 1 addition & 18 deletions mock/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,7 @@ impl Store for MockStore {
.push(EntityChange::from_key(key, EntityChangeOperation::Set));
}
}
MetadataOperation::Update {
entity,
id,
data,
guard,
} => {
MetadataOperation::Update { entity, id, data } => {
let key = MetadataOperation::entity_key(entity, id);
let entities_of_type = entities
.entry(key.subgraph_id.clone())
Expand All @@ -238,18 +233,6 @@ impl Store for MockStore {

if entities_of_type.contains_key(&key.entity_id) {
let existing_entity = entities_of_type.get_mut(&key.entity_id).unwrap();
if let Some((filter, _)) = guard {
if !entity_matches_filter(existing_entity, &filter) {
return Err(TransactionAbortError::AbortUnless {
expected_entity_ids: vec![key.entity_id],
actual_entity_ids: vec![],
description:
"update failed because entity does not match guard"
.to_owned(),
}
.into());
}
}
existing_entity.merge(data);

entity_changes
Expand Down
19 changes: 3 additions & 16 deletions store/postgres/src/entities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use graph::prelude::{
debug, format_err, info, serde_json, warn, AttributeIndexDefinition, Entity, EntityChange,
EntityChangeOperation, EntityFilter, EntityKey, EntityModification, Error,
EthereumBlockPointer, Logger, QueryExecutionError, StoreError, StoreEvent,
SubgraphDeploymentId, SubgraphDeploymentStore, TransactionAbortError, ValueType,
SubgraphDeploymentId, SubgraphDeploymentStore, ValueType,
};

use crate::block_range::{block_number, BlockNumber};
Expand Down Expand Up @@ -485,10 +485,9 @@ impl Connection {
&self,
key: &EntityKey,
entity: &Entity,
guard: Option<EntityFilter>,
) -> Result<usize, StoreError> {
match &*self.metadata {
Storage::Json(json) => json.update_metadata(&self.conn, key, entity, guard),
Storage::Json(json) => json.update_metadata(&self.conn, key, entity),
Storage::Relational(_) => unreachable!("relational storeage is not used for metadata"),
}
}
Expand Down Expand Up @@ -1048,7 +1047,6 @@ impl JsonStorage {
conn: &PgConnection,
key: &EntityKey,
entity: &Entity,
guard: Option<EntityFilter>,
) -> Result<usize, StoreError> {
const NONE: &str = "none";
assert_eq!(
Expand All @@ -1067,18 +1065,7 @@ impl JsonStorage {
entities::event_source.eq(NONE),
));

match guard {
Some(filter) => {
let filter = build_filter(filter).map_err(|e| {
TransactionAbortError::Other(format!(
"invalid filter '{}' for value '{}'",
e.filter, e.value
))
})?;
Ok(query.filter(filter).execute(conn)?)
}
None => Ok(query.execute(conn)?),
}
Ok(query.execute(conn)?)
}

fn delete(
Expand Down
55 changes: 30 additions & 25 deletions store/postgres/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,34 +453,26 @@ impl Store {
.into()
})
}
MetadataOperation::Update {
entity,
id,
data,
guard,
} => {
MetadataOperation::Update { entity, id, data } => {
let key = MetadataOperation::entity_key(entity, id);

self.check_interface_entity_uniqueness(conn, &key)?;

// Update the entity in Postgres
let (filter, msg) = guard
.map(|(filter, msg)| (Some(filter), msg))
.unwrap_or((None, "".to_owned()));
match conn.update_metadata(&key, &data, filter)? {
match conn.update_metadata(&key, &data)? {
0 => Err(TransactionAbortError::AbortUnless {
expected_entity_ids: vec![key.entity_id.clone()],
actual_entity_ids: vec![],
description: format!("{}: update did not change any rows", msg),
description: format!("update for entity {:?} with data {:?} did not change any rows", key, data),
}
.into()),
1 => Ok(0),
res => Err(TransactionAbortError::AbortUnless {
expected_entity_ids: vec![key.entity_id.clone()],
actual_entity_ids: vec![],
description: format!(
"{}: update changed {} rows instead of just one",
msg, res
"update for entity {:?} with data {:?} changed {} rows instead of just one",
key, data, res
),
}
.into()),
Expand Down Expand Up @@ -728,12 +720,15 @@ impl Store {

Ok(cache.get(&subgraph_id).unwrap().clone())
}
}

impl StoreTrait for Store {
fn block_ptr(&self, subgraph_id: SubgraphDeploymentId) -> Result<EthereumBlockPointer, Error> {
fn block_ptr_with_conn(
&self,
subgraph_id: SubgraphDeploymentId,
conn: &e::Connection,
) -> Result<EthereumBlockPointer, Error> {
let key = SubgraphDeploymentEntity::key(subgraph_id.clone());
let subgraph_entity = self
.get(SubgraphDeploymentEntity::key(subgraph_id.clone()))
.get_entity(&conn, &key.subgraph_id, &key.entity_type, &key.entity_id)
.map_err(|e| format_err!("error reading subgraph entity: {}", e))?
.ok_or_else(|| {
format_err!(
Expand Down Expand Up @@ -765,6 +760,17 @@ impl StoreTrait for Store {

Ok(EthereumBlockPointer { hash, number })
}
}

impl StoreTrait for Store {
fn block_ptr(&self, subgraph_id: SubgraphDeploymentId) -> Result<EthereumBlockPointer, Error> {
self.block_ptr_with_conn(
subgraph_id,
&self
.get_entity_conn(&*SUBGRAPHS_ID)
.map_err(|e| QueryExecutionError::StoreError(e.into()))?,
)
}

fn get(&self, key: EntityKey) -> Result<Option<Entity>, QueryExecutionError> {
let conn = self
Expand Down Expand Up @@ -821,10 +827,6 @@ impl StoreTrait for Store {
block_ptr_to: EthereumBlockPointer,
mods: Vec<EntityModification>,
) -> Result<bool, StoreError> {
// Improvement: Move this inside the transaction.
let block_ptr_from = self.block_ptr(subgraph_id.clone())?;
assert!(block_ptr_from.number < block_ptr_to.number);

// All operations should apply only to entities in this subgraph or
// the subgraph of subgraphs
if mods
Expand All @@ -842,6 +844,9 @@ impl StoreTrait for Store {

let (event, metadata_event, should_migrate) =
econn.transaction(|| -> Result<_, StoreError> {
let block_ptr_from = self.block_ptr(subgraph_id.clone())?;
assert!(block_ptr_from.number < block_ptr_to.number);

// Ensure the history event exists in the database
let history_event = econn.create_history_event(block_ptr_to, &mods)?;

Expand All @@ -861,9 +866,7 @@ impl StoreTrait for Store {
let block_ptr_ops =
SubgraphDeploymentEntity::update_ethereum_block_pointer_operations(
&subgraph_id,
block_ptr_from,
block_ptr_to,
"transact block operations",
);
let metadata_event =
self.apply_metadata_operations_with_conn(&econn, block_ptr_ops)?;
Expand Down Expand Up @@ -915,11 +918,13 @@ impl StoreTrait for Store {

let econn = self.get_entity_conn(&subgraph_id)?;
let (event, metadata_event) = econn.transaction(|| -> Result<_, StoreError> {
assert_eq!(
block_ptr_from,
self.block_ptr_with_conn(subgraph_id.clone(), &econn)?
);
let ops = SubgraphDeploymentEntity::update_ethereum_block_pointer_operations(
&subgraph_id,
block_ptr_from,
block_ptr_to,
"revert block",
);
let metadata_event = self.apply_metadata_operations_with_conn(&econn, ops)?;

Expand Down

0 comments on commit b7e5424

Please sign in to comment.