Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Reduce the amount of data getting mirrored from the primary #4015

Merged
merged 1 commit into from
Oct 13, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
store: Reduce the amount of data getting mirrored from the primary
  • Loading branch information
lutter committed Oct 13, 2022
commit 000a33cee37ddca7e36f4ea9593fa96a59572069
54 changes: 38 additions & 16 deletions store/postgres/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1583,19 +1583,25 @@ impl Mirror {
"subgraph_version",
];

fn run_query(conn: &PgConnection, query: String) -> Result<(), StoreError> {
conn.batch_execute(&query).map_err(StoreError::from)
}

fn copy_table(
conn: &PgConnection,
src_nsp: &str,
dst_nsp: &str,
table_name: &str,
) -> Result<(), StoreError> {
let query = format!(
"insert into {dst_nsp}.{table_name} select * from {src_nsp}.{table_name};",
src_nsp = src_nsp,
dst_nsp = dst_nsp,
table_name = table_name
);
conn.batch_execute(&query).map_err(StoreError::from)
run_query(
conn,
format!(
"insert into {dst_nsp}.{table_name} select * from {src_nsp}.{table_name};",
src_nsp = src_nsp,
dst_nsp = dst_nsp,
table_name = table_name
),
)
}

let check_cancel = || {
Expand All @@ -1622,6 +1628,7 @@ impl Mirror {
conn.batch_execute(&query)?;
check_cancel()?;

// Repopulate `PUBLIC_TABLES` by copying their data wholesale
for table_name in PUBLIC_TABLES {
copy_table(
conn,
Expand All @@ -1631,15 +1638,30 @@ impl Mirror {
)?;
check_cancel()?;
}
for table_name in SUBGRAPHS_TABLES {
copy_table(
conn,
&ForeignServer::metadata_schema(&*PRIMARY_SHARD),
NAMESPACE_SUBGRAPHS,
table_name,
)?;
check_cancel()?;
}

// Repopulate `SUBGRAPHS_TABLES` but only copy the data we actually
// need to respond to queries when the primary is down
let src_nsp = ForeignServer::metadata_schema(&*PRIMARY_SHARD);
let dst_nsp = NAMESPACE_SUBGRAPHS;

run_query(
conn,
format!(
"insert into {dst_nsp}.subgraph \
select * from {src_nsp}.subgraph
where current_version is not null;"
),
)?;
run_query(
conn,
format!(
"insert into {dst_nsp}.subgraph_version \
select v.* from {src_nsp}.subgraph_version v, {src_nsp}.subgraph s
where v.id = s.current_version;"
),
)?;
copy_table(conn, &src_nsp, dst_nsp, "subgraph_deployment_assignment")?;

Ok(())
}

Expand Down