-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ongoing pruning #4429
Ongoing pruning #4429
Conversation
e62d491
to
970224b
Compare
I've run this in the integration cluster without pruning any subgraphs yet, and at least for that, it does not cause POI differences. Will do another run with pruned subgraphs, but at least as long as we don't prune, this code is ok |
I've now run this in the integration cluster with all subgraphs pruned to 10,000 blocks. The initial pruning took almost 24hrs, but it was doing one subgraph at a time. Most subgraphs took under 5 minutes for the initial pruning, but there were a handful (< 10) where the initial pruning took 30-90 minutes. So far, there have been no POI differences except in cases where |
Just from reading the PR description, my biggest question would be why block indexing, rather than do this in a concurrent task? That could be spawned by the indexing process. Another thing, could |
There's a few reasons: (1) it's simpler in terms of behavior and implementation (2) doing this in a background task will mean that indexing a subgraph sometimes needs 2 connections which can lead to bursty connection requirements (3) we will run even more threads and I am a little concerned that we are already using a very large number of threads. None of this means we should never do this, but I would first like to see how things work out with the simpler processing model of non-concurrent prunes. The main factor for how much this will slow down indexing is the setting of
For 'as tight as possible', users can just set that to the I think once we have a different notion of finality, it will be much easier and clearer to adapt the pruning behavior to that rather than trying to anticipate what that might look like. |
The advantage of not blocking indexing seems stronger than these downsides. (1) the possibility of a concurrent prune through graphman already exists (2) the total time holding a connection should be the same so no change in total load, even if it's maybe burstier. The threshold can be calculated in the main task so the overhead is only taken when pruning will really happen. (3) More tasks, which doesn't change the maximum number of threads. If we agree that the ideal design is to do it concurrently, then imo we should do it now, so we can already start discussing and gaining experience with the intended design rather than a provisional one. |
Agreed on (1) and (2); for (3): we ultimately have to run the db interaction on the blocking pool which means another thread.
Another concern is error propagation: right now, if pruning fails, errors are handled just like any other error that happens during |
@leoyvens The change you are asking for would be easy enough: it would just require replacing this line with something like this: async fn run(
store: Arc<DeploymentStore>,
reporter: Box<OngoingPruneReporter>,
site: Arc<Site>,
req: PruneRequest,
) -> Result<Box<dyn PruneReporter>, StoreError> {
store.prune(reporter, site, req).await
}
let _handle = graph::spawn(run(this, reporter, site, req)); For error propagation, we would need to do something smart with |
b00891f
to
57ab779
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amazing work! I tried it out and it worked very well. I like the polish such as the reporter trait for ongoing vs cli pruning. Left some comments.
graph/src/env/store.rs
Outdated
@@ -160,4 +180,42 @@ pub struct InnerStore { | |||
write_queue_size: usize, | |||
#[envconfig(from = "GRAPH_STORE_BATCH_TARGET_DURATION", default = "180")] | |||
batch_target_duration_in_secs: u64, | |||
#[envconfig(from = "GRAPH_STORE_HISTORY_COPY_THRESHOLD", default = "0.5")] | |||
copy_threshold: ZeroToOneF64, | |||
#[envconfig(from = "GRAPH_STORE_HISTORY_COPY_THRESHOLD", default = "0.05")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[envconfig(from = "GRAPH_STORE_HISTORY_COPY_THRESHOLD", default = "0.05")] | |
#[envconfig(from = "GRAPH_STORE_HISTORY_DELETE_THRESHOLD", default = "0.05")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woops .. fixed
fn from_str(s: &str) -> Result<Self, Self::Err> { | ||
let f = s.parse::<f64>()?; | ||
if f < 1.01 { | ||
bail!("invalid value: {s} must be bigger than 1.01"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't the slack factor can't be set to 1? If that can cause numerical instability somewhere, imo we should fix that there and allow this to be set to 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's mostly to avoid doing a pruning run on every block - with a slack factor of 1, you will try to prune on every block which is just very wasteful. The lower bound of allowing 1% of slack is somewhat arbitrary though.
@@ -170,14 +197,16 @@ impl TablePair { | |||
// The conditions on `block_range` are expressed redundantly | |||
// to make more indexes useable | |||
sql_query(format!( | |||
"insert into {dst}({column_list}) \ | |||
"/* controller=prune,phase=nonfinal,start_vid={next_vid},next_vid={batch_size} */ \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and in a previous query there is next_vid=batch_size
, which doesn't make sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed the names in the comments to make this less confusing.
graph/src/components/store/mod.rs
Outdated
"the delete threshold must be between 0 and 1 but is {delete_threshold}" | ||
)); | ||
} | ||
if history_blocks < reorg_threshold { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this should be <=
, I tried it with equal values and nothing gets pruned, and then history_blocks
fails to set because it checks for <=
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch - fixed.
let mut req = PruneRequest::new( | ||
&deployment, | ||
history, | ||
ENV_VARS.reorg_threshold, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A minor inconsistency is that this env value might be different between the index node and graphman, but improving this will require a general rethink of finality configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the global reorg_threshold
is something that I'd love for us to improve on. I feel that that is something that should be settable for each chain individually (e.g., in graph-node.toml
)
)); | ||
} | ||
|
||
if history_blocks <= 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The preceding if
makes this dead code, assuming reorg_threshold >= 0
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
.unwrap() | ||
.remove(&site.id) | ||
.unwrap(); | ||
match graph::block_on(reap(handle)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we know the task is finished, we can use now_or_never https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.now_or_never here instead of block_on
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had no idea that existed. Changed.
|
||
const COPY: Scope = Scope { id: 1 }; | ||
const WRITE: Scope = Scope { id: 2 }; | ||
const PRUNE: Scope = Scope { id: 3 }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there no need for PRUNE and COPY to be mutually exclusive operations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question; I think it's ok if the source subgraph gets pruned simultaneously:
- we might end up with data that's been pruned from the source, but that's harmless since the
earliest_block
of the destination will be set to that from the source after data copying has finished and will make that data unreachable - we might also try to copy data that has been pruned after we determined which data to copy, which is fine for the same reason
If the destination subgraph gets pruned while the copy is still in progress, things are ok for the raw data, but I think we might get into trouble with these steps:
- copy process starts and copies data
- pruning does its thing and sets
earliest_block
- copying data finishes and sets metadata, including
earliest_block
from the source which might be before what was pruned
We could fix this by never allowing earliest_block
to go back by changing deployment::earliest_block
to
pub fn set_earliest_block(
conn: &PgConnection,
site: &Site,
earliest_block: BlockNumber,
) -> Result<(), StoreError> {
use subgraph_deployment as d;
update(d::table.filter(d::id.eq(site.id)))
.set(d::earliest_block_number.eq(earliest_block))
.filter(d::earliest_block.lt(earliest_block)) // this is the change
.execute(conn)?;
Ok(())
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's one more interaction between copying and pruning that I am not clear on: as things are right now, when you copy/graft a deployment that has history_blocks
set, that setting gets reset to the default, i.e., pruning is disabled for the copy. I think copying/grafting should retain that setting.
I just added a commit that does this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks for thinking through this, it's worth documenting some of that analysis somewhere on this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- we might end up with data that's been pruned from the source, but that's harmless since the
earliest_block
of the destination will be set to that from the source after data copying has finished and will make that data unreachable
I just looked through the code again, and that's actually not true. I filed #4496 to capture this. I'll merge this PR as-is and work on another PR to address this problem.
Addressed/replied to all review comments |
Rebased to latest master |
Pass in how many blocks of history to keep instead of the earliest block; use the subgraph's setting if the caller doesn't specify history
The store also needs access to it, and this avoids making the store dependent on graph::chain::ethereum
We used to perform pruning in two stages, by first copying final entities for all tables, then copying nonfinal entities and switching for all tables. It is better to do this loop the other way around: we now go table-by-table, and for each of them do the nonfinal copy, then the final copy. This makes an ongoing prune operation less visible, since the subgraph writer can write in between the final copying for each table.
Change the PruneReporter trait to produce reasonable output for the new pruning flow
Prune the subgraph periodically while transacting blocks. Select the right strategy (copying or deleting) depending on how much history we are removing.
We used to determine the total number of blocks in a subgraph based on its latest and earliest blocks. With ongoing pruning, the earliest block is updated every time we prune, even though the logic in PruneRequest.strategy might have us actually not do anything. That leads to a situation where we think the subgraph contains much fewer blocks than it really does, and we therefore underestimate how much of its data is historical. We now remember for each table the block at which we actually pruned, which might be long before the subgraph's earliest block, and use that to determine how many blocks are present. As an example, assume we want to keep 100 blocks of history, in a subgraph that is at block 1000 and earliest block 800 and a table that was last pruned at block 500. Previously, we would have estimated that 50% of the table is historical, when in reality 80% is historical.
Also, remove a check that could never fail
Rebased to latest master. I've also completed a run in the integration cluster with this branch; that did not cause any PoI differences. There were some differences reported, but they were all because |
This is still work-in-progress, and I am still testing correctness and performance, but for greater visibility, opening a PR already.
This PR adds the ability to perform pruning on an ongoing basis; each deployment now has an optional flag
history_blocks
that indicates how much history the deployment should retain. While the deployment is processing blocks, it checks whether the deployment currently has more history than configured and prunes itself if that is the case. Pruning happens in line with normal block processing, so processing can get blocked by pruning, though the details depend on how full the write queue gets.The frequency with which pruning is performed is set with
GRAPH_STORE_HISTORY_SLACK_FACTOR
: once a deployment has more thanhistory_blocks * GRAPH_STORE_HISTORY_SLACK_FACTOR
, pruning is kicked off.Pruning can now use two different strategies: copying and deleting, and selects the strategy based on how much of each table is historical and therefore has to be removed. For large removals, copying is used, and for smaller removals, we delete (configured with
GRAPH_STORE_HISTORY_COPY_THRESHOLD
andGRAPH_STORE_HISTORY_DELETE_THRESHOLD
.The amount of data we are likely to remove is estimated, since getting precise numbers would be too slow as it onvolves counting the entries in each table. The estimate is based on how much history the table retains and the ratio of entities to entity versions. When that ratio is high, it's likely that pruning will remove only few rows, and when it is low, it's likely that pruning will remove many rows. Similarly, if the ratio of
history_blocks
to the number of blocks for which a table has data is high, we expect to remove only few blocks since a lot of rows are still current. The number of blocks for which a table has data is stored intable_stats.last_pruned_block
- since a table will not necessarily have data removed on every pruning run, that block can be substantially before the deployment's earliest block.Pruning is controlled with
graphman prune
; besides performing an initial prune, it now also sets the deployment'shistory_blocks
by default, which can be turned off by passing--once
.