diff --git a/core/src/subgraph/instance.rs b/core/src/subgraph/instance.rs index b26ef7cd91a..d111e63631f 100644 --- a/core/src/subgraph/instance.rs +++ b/core/src/subgraph/instance.rs @@ -97,16 +97,13 @@ where ) } - fn process_trigger_in_runtime_hosts( + fn process_trigger_in_runtime_hosts( logger: &Logger, - hosts: I, + hosts: impl Iterator>, block: Arc, trigger: EthereumTrigger, state: BlockState, - ) -> Box + Send> - where - I: IntoIterator>, - { + ) -> Box + Send> { let logger = logger.to_owned(); match trigger { EthereumTrigger::Log(log) => { @@ -114,10 +111,7 @@ where .transaction_for_log(&log) .map(Arc::new) .ok_or_else(|| format_err!("Found no transaction for event")); - let matching_hosts: Vec<_> = hosts - .into_iter() - .filter(|host| host.matches_log(&log)) - .collect(); + let matching_hosts: Vec<_> = hosts.filter(|host| host.matches_log(&log)).collect(); let log = Arc::new(log); // Process the log in each host in the same order the corresponding data diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 937d6c6b8f9..0f44f5e4ee7 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -66,7 +66,7 @@ impl SubgraphInstanceManager { where S: Store + ChainStore, T: RuntimeHostBuilder, - B: BlockStreamBuilder + 'static, + B: BlockStreamBuilder, { let logger = logger_factory.component_logger("SubgraphInstanceManager", None); let logger_factory = logger_factory.with_parent(logger.clone()); @@ -99,7 +99,7 @@ impl SubgraphInstanceManager { ) where S: Store + ChainStore, T: RuntimeHostBuilder, - B: BlockStreamBuilder + 'static, + B: BlockStreamBuilder, { // Subgraph instance shutdown senders let instances: SharedInstanceKeepAliveMap = Default::default(); @@ -173,7 +173,7 @@ impl SubgraphInstanceManager { ) -> Result<(), Error> where T: RuntimeHostBuilder, - B: BlockStreamBuilder + 'static, + B: BlockStreamBuilder, S: Store + ChainStore, { // Clear the 'failed' state of the subgraph. We were told explicitly @@ -424,6 +424,7 @@ where "block_number" => format!("{:?}", block.block.number.unwrap()), "block_hash" => format!("{:?}", block.block.hash.unwrap()) )); + let logger1 = logger.clone(); if triggers.len() == 1 { info!(logger, "1 trigger found in this block for this subgraph"); @@ -441,12 +442,6 @@ where let block_ptr_after = EthereumBlockPointer::from(&*block); let block_ptr_for_new_data_sources = block_ptr_after.clone(); - // Clone a few things to pass into futures - let logger1 = logger.clone(); - let logger2 = logger.clone(); - let logger3 = logger.clone(); - let logger4 = logger.clone(); - // Process events one after the other, passing in entity operations // collected previously to every new event being processed process_triggers( @@ -456,83 +451,101 @@ where block.clone(), triggers, ) - .and_then(|(ctx, block_state)| { - // Instantiate dynamic data sources - create_dynamic_data_sources(logger1, ctx, block_state).from_err() - }) - .and_then(move |(ctx, block_state, data_sources, runtime_hosts)| { - // Reprocess the triggers from this block that match the new data sources - - let created_ds_count = block_state.created_data_sources.len(); - let block_with_calls = EthereumBlockWithCalls { - ethereum_block: block.deref().clone(), - calls, - }; - - future::result(::Stream::parse_triggers( - EthereumLogFilter::from_data_sources(data_sources.iter()), - EthereumCallFilter::from_data_sources(data_sources.iter()), - EthereumBlockFilter::from_data_sources(data_sources.iter()), - false, - block_with_calls, - )) - .from_err() - .and_then(move |block_with_triggers| { - let triggers = block_with_triggers.triggers; + .and_then(move |(ctx, block_state)| { + // If new data sources have been created, restart the subgraph after this block. + let needs_restart = !block_state.created_data_sources.is_empty(); + + // This loop will: + // 1. Instantiate created data sources. + // 2. Process those data sources for the current block. + // Until no data sources are created or MAX_DATA_SOURCES is hit. + + // Note that this algorithm processes data sources spawned on the same block _breadth + // first_ on the tree implied by the parent-child relationship between data sources. Only a + // very contrived subgraph would be able to observe this. + loop_fn( + (ctx, block_state), + move |(mut ctx, mut block_state)| -> Box + Send> { + if block_state.created_data_sources.is_empty() { + // No new data sources, nothing to do. + return Box::new(future::ok(Loop::Break((ctx, block_state)))); + } - if triggers.len() == 1 { - info!( - logger, - "1 trigger found in this block for the new data sources" - ); - } else if triggers.len() > 1 { - info!( - logger, - "{} triggers found in this block for the new data sources", - triggers.len() - ); - } + // Instantiate dynamic data sources, removing them from the block state. + let (data_sources, runtime_hosts) = match create_dynamic_data_sources( + logger.clone(), + &ctx.inputs, + block_state.created_data_sources.drain(..), + ) { + Ok(ok) => ok, + Err(err) => return Box::new(future::err(err.into())), + }; + + // Reprocess the triggers from this block that match the new data sources + + let block_with_calls = EthereumBlockWithCalls { + ethereum_block: block.deref().clone(), + calls: calls.clone(), + }; + + let block_with_triggers = match ::Stream::parse_triggers( + EthereumLogFilter::from_data_sources(data_sources.iter()), + EthereumCallFilter::from_data_sources(data_sources.iter()), + EthereumBlockFilter::from_data_sources(data_sources.iter()), + false, + block_with_calls, + ) { + Ok(block_with_triggers) => block_with_triggers, + Err(err) => return Box::new(future::err(err.into())), + }; + let triggers = block_with_triggers.triggers; + + if triggers.len() == 1 { + info!( + logger, + "1 trigger found in this block for the new data sources" + ); + } else if triggers.len() > 1 { + info!( + logger, + "{} triggers found in this block for the new data sources", + triggers.len() + ); + } - process_triggers_in_runtime_hosts::( - logger2.clone(), - runtime_hosts.clone(), - block_state, - block.clone(), - triggers, - ) - .and_then(move |block_state| { - match block_state.created_data_sources.len() > created_ds_count { - false => Ok((ctx, block_state, data_sources, runtime_hosts)), - true => Err(err_msg( - "A dynamic data source, in the same block that it was created, - attempted to create another dynamic data source. - To let us know that you are affected by this bug, please comment in - https://github.com/graphprotocol/graph-node/issues/1105", - ) - .into()), + // Add entity operations for the new data sources to the block state + // and add runtimes for the data sources to the subgraph instance. + if let Err(err) = persist_dynamic_data_sources( + logger.clone(), + &mut ctx, + &mut block_state.entity_cache, + data_sources, + runtime_hosts.clone(), + block_ptr_for_new_data_sources, + ) { + return Box::new(future::err(err.into())); } - }) - }) - }) - .and_then(move |(ctx, block_state, data_sources, runtime_hosts)| { - // Add entity operations for the new data sources to the block state - // and add runtimes for the data sources to the subgraph instance - future::result( - persist_dynamic_data_sources( - logger3, - ctx, - block_state, - data_sources, - runtime_hosts, - block_ptr_for_new_data_sources, - ) - .map(move |(ctx, block_state, data_sources_created)| { - // If new data sources have been added, indicate that the subgraph - // needs to be restarted after this block - (ctx, block_state, data_sources_created) - }), + + let logger = logger.clone(); + let block = block.clone(); + Box::new( + stream::iter_ok(triggers) + .fold(block_state, move |block_state, trigger| { + // Process the triggers in each host in the same order the + // corresponding data sources have been created. + SubgraphInstance::::process_trigger_in_runtime_hosts( + &logger, + runtime_hosts.iter().cloned(), + block.clone(), + trigger, + block_state, + ) + }) + .and_then(|block_state| future::ok(Loop::Continue((ctx, block_state)))), + ) + }, ) - .from_err() + .map(move |(ctx, block_state)| (ctx, block_state, needs_restart)) }) // Apply entity operations and advance the stream .and_then(move |(ctx, block_state, needs_restart)| { @@ -551,7 +564,7 @@ where )) })?; if !mods.is_empty() { - info!(logger4, "Applying {} entity operation(s)", mods.len()); + info!(logger1, "Applying {} entity operation(s)", mods.len()); } // Transact entity operations into the store and update the @@ -567,7 +580,7 @@ where .map(|should_migrate| { if should_migrate { ctx.inputs.store.migrate_subgraph_deployment( - &logger4, + &logger1, &ctx.inputs.deployment_id, &block_ptr_after, ); @@ -606,115 +619,47 @@ where }) } -fn process_triggers_in_runtime_hosts( - logger: Logger, - runtime_hosts: Vec>, - block_state: BlockState, - block: Arc, - triggers: Vec, -) -> impl Future> -where - T: RuntimeHostBuilder, -{ - stream::iter_ok::<_, CancelableError>(triggers) - // Process events from the block stream - .fold(block_state, move |block_state, trigger| { - let logger = logger.clone(); - let block = block.clone(); - let runtime_hosts = runtime_hosts.clone(); - - // Process the log in each host in the same order the corresponding - // data sources have been created - SubgraphInstance::::process_trigger_in_runtime_hosts( - &logger, - runtime_hosts.iter().map(|host| host.clone()), - block.clone(), - trigger, - block_state, - ) - }) -} - fn create_dynamic_data_sources( logger: Logger, - ctx: IndexingContext, - block_state: BlockState, -) -> impl Future< - Item = ( - IndexingContext, - BlockState, - Vec, - Vec>, - ), - Error = Error, -> + inputs: &IndexingInputs, + created_data_sources: impl Iterator, +) -> Result<(Vec, Vec>), Error> where B: BlockStreamBuilder, S: ChainStore + Store, T: RuntimeHostBuilder, { - struct State - where - B: BlockStreamBuilder, - S: ChainStore + Store, - T: RuntimeHostBuilder, - { - ctx: IndexingContext, - block_state: BlockState, - data_sources: Vec, - runtime_hosts: Vec>, - }; - - let initial_state = State { - ctx, - block_state, - data_sources: vec![], - runtime_hosts: vec![], - }; - - stream::iter_ok(initial_state.block_state.created_data_sources.clone()) - .fold(initial_state, move |mut state, info| { - // Try to instantiate a data source from the template - let data_source = match DataSource::try_from_template(info.template, &info.params) { - Ok(data_source) => data_source, - Err(e) => return future::err(e), - }; - - // Try to create a runtime host for the data source - let host = match state.ctx.inputs.host_builder.build( - &logger, - state.ctx.inputs.network_name.clone(), - state.ctx.inputs.deployment_id.clone(), - data_source.clone(), - state.ctx.inputs.top_level_templates.clone(), - ) { - Ok(host) => Arc::new(host), - Err(e) => return future::err(e), - }; - - state.data_sources.push(data_source); - state.runtime_hosts.push(host); + let mut data_sources = vec![]; + let mut runtime_hosts = vec![]; + + for info in created_data_sources { + // Try to instantiate a data source from the template + let data_source = DataSource::try_from_template(info.template, &info.params)?; + + // Try to create a runtime host for the data source + let host = inputs.host_builder.build( + &logger, + inputs.network_name.clone(), + inputs.deployment_id.clone(), + data_source.clone(), + inputs.top_level_templates.clone(), + )?; + + data_sources.push(data_source); + runtime_hosts.push(Arc::new(host)); + } - future::ok(state) - }) - .map(|final_state| { - ( - final_state.ctx, - final_state.block_state, - final_state.data_sources, - final_state.runtime_hosts, - ) - }) + Ok((data_sources, runtime_hosts)) } fn persist_dynamic_data_sources( logger: Logger, - mut ctx: IndexingContext, - block_state: BlockState, + ctx: &mut IndexingContext, + entity_cache: &mut EntityCache, data_sources: Vec, runtime_hosts: Vec>, block_ptr: EthereumBlockPointer, -) -> Result<(IndexingContext, BlockState, bool), Error> +) -> Result<(), Error> where B: BlockStreamBuilder, S: ChainStore + Store, @@ -728,13 +673,8 @@ where ); } - // If there are any new dynamic data sources, we'll have to restart - // the subgraph after this block - let needs_restart = !data_sources.is_empty(); - // Add entity operations to the block state in order to persist // the dynamic data sources - let mut block_state = block_state; for data_source in data_sources.iter() { let entity = DynamicEthereumContractDataSourceEntity::from(( &ctx.inputs.deployment_id, @@ -743,7 +683,7 @@ where )); let id = format!("{}-dynamic", Uuid::new_v4().to_simple()); let operations = entity.write_entity_operations(id.as_ref()); - block_state.entity_cache.append(operations); + entity_cache.append(operations); } // Merge log filters from data sources into the block stream builder @@ -762,8 +702,5 @@ where .extend(EthereumBlockFilter::from_data_sources(&data_sources)); // Add the new data sources to the subgraph instance - ctx.state - .instance - .add_dynamic_data_sources(runtime_hosts) - .map(move |_| (ctx, block_state, needs_restart)) + ctx.state.instance.add_dynamic_data_sources(runtime_hosts) } diff --git a/graph/src/components/ethereum/stream.rs b/graph/src/components/ethereum/stream.rs index e0223db1120..144c0ee9a19 100644 --- a/graph/src/components/ethereum/stream.rs +++ b/graph/src/components/ethereum/stream.rs @@ -13,7 +13,7 @@ pub trait BlockStream: Stream { ) -> Result; } -pub trait BlockStreamBuilder: Clone + Send + Sync { +pub trait BlockStreamBuilder: Clone + Send + Sync + 'static { type Stream: BlockStream + Send + 'static; fn build( diff --git a/graph/src/components/subgraph/instance.rs b/graph/src/components/subgraph/instance.rs index cbd9de12ed2..c70cf219836 100644 --- a/graph/src/components/subgraph/instance.rs +++ b/graph/src/components/subgraph/instance.rs @@ -39,15 +39,13 @@ where ) -> Box + Send>; /// Like `process_trigger` but processes an Ethereum event in a given list of hosts. - fn process_trigger_in_runtime_hosts( + fn process_trigger_in_runtime_hosts( logger: &Logger, - hosts: I, + hosts: impl Iterator>, block: Arc, trigger: EthereumTrigger, state: BlockState, - ) -> Box + Send> - where - I: IntoIterator>; + ) -> Box + Send>; /// Adds dynamic data sources to the subgraph. fn add_dynamic_data_sources(&mut self, runtime_hosts: Vec>) -> Result<(), Error>;