Skip to content

Commit

Permalink
instance_manager: Inline process_triggers_in_runtime_hosts
Browse files Browse the repository at this point in the history
It's too small to exist by itself.

Add note to the loop.
  • Loading branch information
leoyvens committed Sep 19, 2019
1 parent 0e4f464 commit 256cc4a
Showing 1 changed file with 19 additions and 33 deletions.
52 changes: 19 additions & 33 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,10 @@ where
// 1. Instantiate created data sources.
// 2. Process those data sources for the current block.
// Until no data sources are created or MAX_DATA_SOURCES is hit.

// Note that this algorithm processes data sources spawned on the same block _breadth
// first_ on the tree implied by the parent-child relationship between data sources. Only a
// very contrived subgraph would be able to observe this.
loop_fn(
(ctx, block_state),
move |(mut ctx, mut block_state)| -> Box<dyn Future<Item = _, Error = _> + Send> {
Expand Down Expand Up @@ -522,15 +526,22 @@ where
return Box::new(future::err(err.into()));
}

let logger = logger.clone();
let block = block.clone();
Box::new(
process_triggers_in_runtime_hosts::<T>(
logger.clone(),
runtime_hosts,
block_state,
block.clone(),
triggers,
)
.and_then(|block_state| future::ok(Loop::Continue((ctx, block_state)))),
stream::iter_ok(triggers)
.fold(block_state, move |block_state, trigger| {
// Process the triggers in each host in the same order the
// corresponding data sources have been created.
SubgraphInstance::<T>::process_trigger_in_runtime_hosts(
&logger,
runtime_hosts.iter().cloned(),
block.clone(),
trigger,
block_state,
)
})
.and_then(|block_state| future::ok(Loop::Continue((ctx, block_state)))),
)
},
)
Expand Down Expand Up @@ -608,31 +619,6 @@ where
})
}

fn process_triggers_in_runtime_hosts<T>(
logger: Logger,
runtime_hosts: Vec<Arc<T::Host>>,
block_state: BlockState,
block: Arc<EthereumBlock>,
triggers: Vec<EthereumTrigger>,
) -> impl Future<Item = BlockState, Error = CancelableError<Error>>
where
T: RuntimeHostBuilder,
{
stream::iter_ok::<_, CancelableError<Error>>(triggers)
// Process events from the block stream
.fold(block_state, move |block_state, trigger| {
// Process the log in each host in the same order the corresponding
// data sources have been created
SubgraphInstance::<T>::process_trigger_in_runtime_hosts(
&logger,
runtime_hosts.iter().cloned(),
block.clone(),
trigger,
block_state,
)
})
}

fn create_dynamic_data_sources<B, S, T>(
logger: Logger,
inputs: &IndexingInputs<B, S, T>,
Expand Down

0 comments on commit 256cc4a

Please sign in to comment.