Skip to content

Commit 9858e2d

Browse files
committed
store: Improve predictability of the time for each batch during pruning
1 parent a3cc5b9 commit 9858e2d

File tree

2 files changed

+92
-65
lines changed

2 files changed

+92
-65
lines changed

store/postgres/src/copy.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ impl CopyState {
288288
/// batch, but don't step up the size by more than 2x at once
289289
#[derive(Debug, Queryable)]
290290
pub(crate) struct AdaptiveBatchSize {
291-
size: i64,
291+
pub size: i64,
292292
}
293293

294294
impl AdaptiveBatchSize {

store/postgres/src/relational/prune.rs

+91-64
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ impl TablePair {
6464
}
6565

6666
/// Copy all entity versions visible between `earliest_block` and
67-
/// `final_block` in batches, where each batch is a separate transaction
67+
/// `final_block` in batches, where each batch is a separate
68+
/// transaction. Write activity for nonfinal blocks can happen
69+
/// concurrently to this copy
6870
fn copy_final_entities(
6971
&self,
7072
conn: &PgConnection,
@@ -73,62 +75,74 @@ impl TablePair {
7375
final_block: BlockNumber,
7476
cancel: &CancelHandle,
7577
) -> Result<usize, CancelableError<StoreError>> {
76-
#[derive(QueryableByName)]
77-
struct LastVid {
78-
#[sql_type = "BigInt"]
79-
rows: i64,
80-
#[sql_type = "BigInt"]
81-
last_vid: i64,
82-
}
83-
8478
let column_list = self.column_list();
8579

80+
// Determine the last vid that we need to copy
81+
let VidRange { min_vid, max_vid } = sql_query(&format!(
82+
"select coalesce(min(vid), 0) as min_vid, \
83+
coalesce(max(vid), -1) as max_vid from {src} \
84+
where lower(block_range) <= $2 \
85+
and coalesce(upper(block_range), 2147483647) > $1 \
86+
and coalesce(upper(block_range), 2147483647) <= $2 \
87+
and block_range && int4range($1, $2, '[]')",
88+
src = self.src.qualified_name,
89+
))
90+
.bind::<Integer, _>(earliest_block)
91+
.bind::<Integer, _>(final_block)
92+
.get_result::<VidRange>(conn)?;
93+
8694
let mut batch_size = AdaptiveBatchSize::new(&self.src);
87-
// The first vid we still need to copy. When we start, we start with
88-
// 0 so that we don't constrain the set of rows based on their vid
89-
let mut next_vid = 0;
95+
// The first vid we still need to copy
96+
let mut next_vid = min_vid;
9097
let mut total_rows: usize = 0;
91-
loop {
98+
while next_vid <= max_vid {
9299
let start = Instant::now();
93-
let LastVid { last_vid, rows } = conn.transaction(|| {
100+
let rows = conn.transaction(|| {
101+
// Page through all rows in `src` in batches of `batch_size`
102+
// and copy the ones that are visible to queries at block
103+
// heights between `earliest_block` and `final_block`, but
104+
// whose block_range does not extend past `final_block`
105+
// since they could still be reverted while we copy.
106+
// The conditions on `block_range` are expressed redundantly
107+
// to make more indexes useable
94108
sql_query(&format!(
95-
"with cp as (insert into {dst}({column_list}) \
96-
select {column_list} from {src} \
97-
where lower(block_range) <= $2 \
98-
and coalesce(upper(block_range), 2147483647) > $1 \
99-
and coalesce(upper(block_range), 2147483647) <= $2 \
100-
and block_range && int4range($1, $2, '[]') \
101-
and vid >= $3 \
102-
order by vid \
103-
limit $4 \
104-
returning vid) \
105-
select coalesce(max(cp.vid), 0) as last_vid, count(*) as rows from cp",
109+
"insert into {dst}({column_list}) \
110+
select {column_list} from {src} \
111+
where lower(block_range) <= $2 \
112+
and coalesce(upper(block_range), 2147483647) > $1 \
113+
and coalesce(upper(block_range), 2147483647) <= $2 \
114+
and block_range && int4range($1, $2, '[]') \
115+
and vid >= $3 and vid < $3 + $4 \
116+
order by vid",
106117
src = self.src.qualified_name,
107118
dst = self.dst.qualified_name
108119
))
109120
.bind::<Integer, _>(earliest_block)
110121
.bind::<Integer, _>(final_block)
111122
.bind::<BigInt, _>(next_vid)
112123
.bind::<BigInt, _>(&batch_size)
113-
.get_result::<LastVid>(conn)
124+
.execute(conn)
114125
})?;
115126
cancel.check_cancel()?;
116127

117-
total_rows += rows as usize;
118-
let done = rows == 0;
119-
reporter.copy_final_batch(self.src.name.as_str(), rows as usize, total_rows, done);
120-
121-
if done {
122-
return Ok(total_rows);
123-
}
128+
total_rows += rows;
129+
next_vid += batch_size.size;
124130

125131
batch_size.adapt(start.elapsed());
126-
next_vid = last_vid + 1;
132+
133+
reporter.copy_final_batch(
134+
self.src.name.as_str(),
135+
rows as usize,
136+
total_rows,
137+
next_vid > max_vid,
138+
);
127139
}
140+
Ok(total_rows)
128141
}
129142

130143
/// Copy all entity versions visible after `final_block` in batches,
131-
/// where each batch is a separate transaction
144+
/// where each batch is a separate transaction. This assumes that all
145+
/// other write activity to the source table is blocked while we copy
132146
fn copy_nonfinal_entities(
133147
&self,
134148
conn: &PgConnection,
@@ -137,54 +151,59 @@ impl TablePair {
137151
) -> Result<usize, StoreError> {
138152
let column_list = self.column_list();
139153

140-
#[derive(QueryableByName)]
141-
struct LastVid {
142-
#[sql_type = "BigInt"]
143-
rows: i64,
144-
#[sql_type = "BigInt"]
145-
last_vid: i64,
146-
}
154+
// Determine the last vid that we need to copy
155+
let VidRange { min_vid, max_vid } = sql_query(&format!(
156+
"select coalesce(min(vid), 0) as min_vid, \
157+
coalesce(max(vid), -1) as max_vid from {src} \
158+
where coalesce(upper(block_range), 2147483647) > $1 \
159+
and block_range && int4range($1, null)",
160+
src = self.src.qualified_name,
161+
))
162+
.bind::<Integer, _>(final_block)
163+
.get_result::<VidRange>(conn)?;
147164

148165
let mut batch_size = AdaptiveBatchSize::new(&self.src);
149-
// The first vid we still need to copy. When we start, we start with
150-
// 0 so that we don't constrain the set of rows based on their vid
151-
let mut next_vid = 0;
166+
// The first vid we still need to copy
167+
let mut next_vid = min_vid;
152168
let mut total_rows = 0;
153-
loop {
169+
while next_vid <= max_vid {
154170
let start = Instant::now();
155-
let LastVid { rows, last_vid } = conn.transaction(|| {
171+
let rows = conn.transaction(|| {
172+
// Page through all the rows in `src` in batches of
173+
// `batch_size` that are visible to queries at block heights
174+
// starting right after `final_block`.
175+
// The conditions on `block_range` are expressed redundantly
176+
// to make more indexes useable
156177
sql_query(&format!(
157-
"with cp as (insert into {dst}({column_list}) \
158-
select {column_list} from {src} \
159-
where coalesce(upper(block_range), 2147483647) > $1 \
160-
and block_range && int4range($1, null) \
161-
and vid >= $2 \
162-
order by vid \
163-
limit $3
164-
returning vid) \
165-
select coalesce(max(cp.vid), 0) as last_vid, count(*) as rows from cp",
178+
"insert into {dst}({column_list}) \
179+
select {column_list} from {src} \
180+
where coalesce(upper(block_range), 2147483647) > $1 \
181+
and block_range && int4range($1, null) \
182+
and vid >= $2 and vid < $2 + $3 \
183+
order by vid",
166184
dst = self.dst.qualified_name,
167185
src = self.src.qualified_name,
168186
))
169187
.bind::<Integer, _>(final_block)
170188
.bind::<BigInt, _>(next_vid)
171189
.bind::<BigInt, _>(&batch_size)
172-
.get_result::<LastVid>(conn)
190+
.execute(conn)
173191
.map_err(StoreError::from)
174192
})?;
175-
total_rows += rows as usize;
193+
194+
total_rows += rows;
195+
next_vid += batch_size.size;
196+
197+
batch_size.adapt(start.elapsed());
198+
176199
reporter.copy_nonfinal_batch(
177200
self.src.name.as_str(),
178201
rows as usize,
179202
total_rows,
180-
rows == 0,
203+
next_vid > max_vid,
181204
);
182-
if rows == 0 {
183-
return Ok(total_rows);
184-
}
185-
batch_size.adapt(start.elapsed());
186-
next_vid = last_vid + 1;
187205
}
206+
Ok(total_rows)
188207
}
189208

190209
/// Replace the `src` table with the `dst` table
@@ -368,3 +387,11 @@ impl Layout {
368387
Ok(())
369388
}
370389
}
390+
391+
#[derive(QueryableByName)]
392+
struct VidRange {
393+
#[sql_type = "BigInt"]
394+
min_vid: i64,
395+
#[sql_type = "BigInt"]
396+
max_vid: i64,
397+
}

0 commit comments

Comments
 (0)