Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate triggers #4055

Merged
merged 6 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## Unreleased

#### Upgrade notes

- This release includes a **determinism fix** that should affect very few subgraphs on the network (currently only two). There was an issue that if a subgraph manifest had one data source with no contract address, listening to the same events or calls of another data source that has a specified address, the handlers for those would be called twice. With the fix, this will happen no more, the handler will be called just once like it should.
- The two affected deployments are: `Qmccst5mbV5a6vT6VvJMLPKMAA1VRgT6NGbxkLL8eDRsE7` and `Qmd9nZKCH8UZU1pBzk7G8ECJr3jX3a2vAf3vowuTwFvrQg`;
- Here's an example [manifest](https://ipfs.io/ipfs/Qmd9nZKCH8UZU1pBzk7G8ECJr3jX3a2vAf3vowuTwFvrQg), taking a look at the data sources of name `ERC721` and `CryptoKitties`, both listen to the `Transfer(...)` event. Considering a block where there's only one occurence of this event, `graph-node` would duplicate it and call `handleTransfer` twice. Now this is fixed and it will be called only once per event/call that happened on chain.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is mentioning the network impact it's important to note any impact to indexers and any actions they should take.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thanks for the feedback I'll address it 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just pushed what they should do, what do you think? @leoyvens

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

- In the case you're indexing one of those, you should first upgrade the `graph-node` version, then rewind the affected subgraphs to the smallest `startBlock` of their subgraph manifest. To achieve that the `graphman rewind` CLI command can be used.

## v0.28.2

**Indexers are advised to migrate to `v0.28.2`** and entirely bypass `v0.28.0` and `v0.28.1`.
Expand All @@ -17,7 +24,7 @@ Yanked. Please migrate to `v0.28.2`.
#### Upgrade notes

- **New DB table for dynamic data sources.**
For new subgraph deployments, dynamic data sources will be recorded under the `sgd*.data_sources$` table, rather than `subgraphs.dynamic_ethereum_contract_data_source`. As a consequence new deployments will not work correctly on earlier graph node versions, so _downgrading to an earlier graph node version is not supported_.
For new subgraph deployments, dynamic data sources will be recorded under the `sgd*.data_sources$` table, rather than `subgraphs.dynamic_ethereum_contract_data_source`. As a consequence new deployments will not work correctly on earlier graph node versions, so _downgrading to an earlier graph node version is not supported_.
See issue [#3405](https://github.com/graphprotocol/graph-node/issues/3405) for other details.

### What's new
Expand Down
4 changes: 2 additions & 2 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {

async fn triggers_in_block(
&self,
_logger: &Logger,
logger: &Logger,
block: codec::Block,
filter: &TriggerFilter,
) -> Result<BlockWithTriggers<Chain>, Error> {
Expand Down Expand Up @@ -212,7 +212,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
trigger_data.push(ArweaveTrigger::Block(shared_block.cheap_clone()));
}

Ok(BlockWithTriggers::new(block, trigger_data))
Ok(BlockWithTriggers::new(block, trigger_data, logger))
}

async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result<bool, Error> {
Expand Down
4 changes: 2 additions & 2 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {

async fn triggers_in_block(
&self,
_logger: &Logger,
logger: &Logger,
block: codec::Block,
filter: &TriggerFilter,
) -> Result<BlockWithTriggers<Chain>, Error> {
Expand Down Expand Up @@ -268,7 +268,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
triggers.push(CosmosTrigger::Block(shared_block.cheap_clone()));
}

Ok(BlockWithTriggers::new(block, triggers))
Ok(BlockWithTriggers::new(block, triggers, logger))
}

async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result<bool, Error> {
Expand Down
2 changes: 1 addition & 1 deletion chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
));
triggers.append(&mut parse_call_triggers(&filter.call, &full_block)?);
triggers.append(&mut parse_block_triggers(&filter.block, &full_block));
Ok(BlockWithTriggers::new(block, triggers))
Ok(BlockWithTriggers::new(block, triggers, logger))
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1438,13 +1438,16 @@ pub(crate) async fn blocks_with_triggers(
block_hashes.insert(to_hash);
triggers_by_block.entry(to).or_insert(Vec::new());

let logger2 = logger.cheap_clone();

let blocks = adapter
.load_blocks(logger.cheap_clone(), chain_store.clone(), block_hashes)
.and_then(
move |block| match triggers_by_block.remove(&(block.number() as BlockNumber)) {
Some(triggers) => Ok(BlockWithTriggers::new(
BlockFinality::Final(block),
triggers,
&logger2,
)),
None => Err(anyhow!(
"block {} not found in `triggers_by_block`",
Expand Down
119 changes: 116 additions & 3 deletions chain/ethereum/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use graph::{
blockchain::{block_stream::BlockWithTriggers, BlockPtr},
prelude::{
web3::types::{Address, Bytes, Log, H160, H256, U64},
EthereumCall,
EthereumCall, LightEthereumBlock,
},
slog::{self, o, Logger},
};

use crate::{
Expand All @@ -31,6 +32,7 @@ fn test_trigger_ordering() {

let mut call2 = EthereumCall::default();
call2.transaction_index = 2;
call2.input = Bytes(vec![0]);
let call2 = EthereumTrigger::Call(Arc::new(call2));

let mut call3 = EthereumCall::default();
Expand All @@ -40,6 +42,8 @@ fn test_trigger_ordering() {
// Call with the same tx index as call2
let mut call4 = EthereumCall::default();
call4.transaction_index = 2;
// different than call2 so they don't get mistaken as the same
call4.input = Bytes(vec![1]);
let call4 = EthereumTrigger::Call(Arc::new(call4));

fn create_log(tx_index: u64, log_index: u64) -> Arc<Log> {
Expand Down Expand Up @@ -86,12 +90,121 @@ fn test_trigger_ordering() {
log1.clone(),
];

let logger = Logger::root(slog::Discard, o!());

let mut b: LightEthereumBlock = Default::default();

// This is necessary because inside of BlockWithTriggers::new
// there's a log for both fields. So just using Default above
// gives None on them.
b.number = Some(Default::default());
b.hash = Some(Default::default());

// Test that `BlockWithTriggers` sorts the triggers.
let block_with_triggers =
BlockWithTriggers::<crate::Chain>::new(BlockFinality::Final(Default::default()), triggers);
let block_with_triggers = BlockWithTriggers::<crate::Chain>::new(
BlockFinality::Final(Arc::new(b)),
triggers,
&logger,
);

assert_eq!(
block_with_triggers.trigger_data,
vec![log1, log2, call1, log3, call2, call4, call3, block2, block1]
);
}

#[test]
fn test_trigger_dedup() {
let block1 = EthereumTrigger::Block(
BlockPtr::from((H256::random(), 1u64)),
EthereumBlockTriggerType::Every,
);

let block2 = EthereumTrigger::Block(
BlockPtr::from((H256::random(), 0u64)),
EthereumBlockTriggerType::WithCallTo(Address::random()),
);

// duplicate block2
let block3 = block2.clone();

let mut call1 = EthereumCall::default();
call1.transaction_index = 1;
let call1 = EthereumTrigger::Call(Arc::new(call1));

let mut call2 = EthereumCall::default();
call2.transaction_index = 2;
let call2 = EthereumTrigger::Call(Arc::new(call2));

let mut call3 = EthereumCall::default();
call3.transaction_index = 3;
let call3 = EthereumTrigger::Call(Arc::new(call3));

// duplicate call2
let mut call4 = EthereumCall::default();
call4.transaction_index = 2;
let call4 = EthereumTrigger::Call(Arc::new(call4));

fn create_log(tx_index: u64, log_index: u64) -> Arc<Log> {
Arc::new(Log {
address: H160::default(),
topics: vec![],
data: Bytes::default(),
block_hash: Some(H256::zero()),
block_number: Some(U64::zero()),
transaction_hash: Some(H256::zero()),
transaction_index: Some(tx_index.into()),
log_index: Some(log_index.into()),
transaction_log_index: Some(log_index.into()),
log_type: Some("".into()),
removed: Some(false),
})
}

let log1 = EthereumTrigger::Log(create_log(1, 0), None);
let log2 = EthereumTrigger::Log(create_log(1, 1), None);
let log3 = EthereumTrigger::Log(create_log(2, 5), None);
// duplicate logs 2 and 3
let log4 = log2.clone();
let log5 = log3.clone();

let triggers = vec![
// Call triggers
call3.clone(),
call1.clone(),
call2.clone(),
call4.clone(),
// Block triggers
block3.clone(),
block2.clone(),
block1.clone(),
// Event triggers
log5.clone(),
log4.clone(),
log3.clone(),
log2.clone(),
log1.clone(),
];

let logger = Logger::root(slog::Discard, o!());

let mut b: LightEthereumBlock = Default::default();

// This is necessary because inside of BlockWithTriggers::new
// there's a log for both fields. So just using Default above
// gives None on them.
b.number = Some(Default::default());
b.hash = Some(Default::default());

// Test that `BlockWithTriggers` sorts the triggers.
let block_with_triggers = BlockWithTriggers::<crate::Chain>::new(
BlockFinality::Final(Arc::new(b)),
triggers,
&logger,
);

assert_eq!(
block_with_triggers.trigger_data,
vec![log1, log2, call1, log3, call2, call3, block2, block1]
);
}
4 changes: 2 additions & 2 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {

async fn triggers_in_block(
&self,
_logger: &Logger,
logger: &Logger,
block: codec::Block,
filter: &TriggerFilter,
) -> Result<BlockWithTriggers<Chain>, Error> {
Expand Down Expand Up @@ -283,7 +283,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
trigger_data.push(NearTrigger::Block(shared_block.cheap_clone()));
}

Ok(BlockWithTriggers::new(block, trigger_data))
Ok(BlockWithTriggers::new(block, trigger_data, logger))
}

async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result<bool, Error> {
Expand Down
3 changes: 2 additions & 1 deletion chain/substreams/src/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct Mapper {}
impl SubstreamsMapper<Chain> for Mapper {
async fn to_block_stream_event(
&self,
_logger: &Logger,
logger: &Logger,
block_scoped_data: &BlockScopedData,
) -> Result<Option<BlockStreamEvent<Chain>>, SubstreamsError> {
let BlockScopedData {
Expand Down Expand Up @@ -81,6 +81,7 @@ impl SubstreamsMapper<Chain> for Mapper {
changes,
},
vec![TriggerData {}],
logger,
),
FirehoseCursor::from(cursor.clone()),
))),
Expand Down
24 changes: 23 additions & 1 deletion graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,31 @@ where
}

impl<C: Blockchain> BlockWithTriggers<C> {
pub fn new(block: C::Block, mut trigger_data: Vec<C::TriggerData>) -> Self {
/// Creates a BlockWithTriggers structure, which holds
/// the trigger data ordered and without any duplicates.
pub fn new(block: C::Block, mut trigger_data: Vec<C::TriggerData>, logger: &Logger) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This to me sounds like a lot of logic for a new function. I'd rather change this to something from_trigger_data or something with a comment saying it will dedup etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added a doc comment, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strongly about it but I feel like reading the code you would not expect new to do the sort + dedup etc. The doc helps though I'll leave the rest up to you.

// This is where triggers get sorted.
trigger_data.sort();

let old_len = trigger_data.len();

// This is removing the duplicate triggers in the case of multiple
// data sources fetching the same event/call/etc.
trigger_data.dedup();

let new_len = trigger_data.len();

if new_len != old_len {
debug!(
logger,
"Trigger data had duplicate triggers";
"block_number" => block.number(),
"block_hash" => block.hash().hash_hex(),
"old_length" => old_len,
"new_length" => new_len,
);
}

Self {
block,
trigger_data,
Expand Down
4 changes: 2 additions & 2 deletions tests/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,14 +517,14 @@ impl<C: Blockchain> TriggersAdapter<C> for NoopTriggersAdapter<C> {

async fn triggers_in_block(
&self,
_logger: &Logger,
logger: &Logger,
block: <C as Blockchain>::Block,
_filter: &<C as Blockchain>::TriggerFilter,
) -> Result<BlockWithTriggers<C>, Error> {
tokio::time::sleep(self.triggers_in_block_sleep).await;

// Return no triggers on data source reprocessing.
Ok(BlockWithTriggers::new(block, Vec::new()))
Ok(BlockWithTriggers::new(block, Vec::new(), logger))
}

async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result<bool, Error> {
Expand Down