Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: curio: Drop FKs from pipeline to fix retry loops #11973

Merged
merged 1 commit into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions curiosrc/piece/task_cleanup_piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func (c *CleanupPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
}

if n == 0 {
_, err = c.db.Exec(ctx, `UPDATE parked_pieces SET cleanup_task_id = NULL WHERE id = $1`, pieceID)
if err != nil {
return false, xerrors.Errorf("marking piece as complete: %w", err)
}

return true, nil
}

Expand Down
2 changes: 1 addition & 1 deletion curiosrc/piece/task_park_piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (p *ParkPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (d
}

// Update the piece as complete after a successful write.
_, err = p.db.Exec(ctx, `UPDATE parked_pieces SET complete = TRUE WHERE id = $1`, pieceData.PieceID)
_, err = p.db.Exec(ctx, `UPDATE parked_pieces SET complete = TRUE, task_id = NULL WHERE id = $1`, pieceData.PieceID)
if err != nil {
return false, xerrors.Errorf("marking piece as complete: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion curiosrc/seal/task_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
}

// set after_finalize
_, err = f.db.Exec(ctx, `update sectors_sdr_pipeline set after_finalize=true where task_id_finalize=$1`, taskID)
_, err = f.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET after_finalize = TRUE, task_id_finalize = NULL WHERE task_id_finalize = $1`, taskID)
if err != nil {
return false, xerrors.Errorf("updating task: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion curiosrc/seal/task_movestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (m *MoveStorageTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
return false, xerrors.Errorf("moving storage: %w", err)
}

_, err = m.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET after_move_storage = true WHERE task_id_move_storage = $1`, taskID)
_, err = m.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET after_move_storage = TRUE, task_id_move_storage = NULL WHERE task_id_move_storage = $1`, taskID)
if err != nil {
return false, xerrors.Errorf("updating task: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion curiosrc/seal/task_porep.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (p *PoRepTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done

// store success!
n, err := p.db.Exec(ctx, `UPDATE sectors_sdr_pipeline
SET after_porep = TRUE, seed_value = $3, porep_proof = $4
SET after_porep = TRUE, seed_value = $3, porep_proof = $4, task_id_porep = NULL
WHERE sp_id = $1 AND sector_number = $2`,
sectorParams.SpID, sectorParams.SectorNumber, []byte(rand), proof)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion curiosrc/seal/task_sdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo

// store success!
n, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline
SET after_sdr = true, ticket_epoch = $3, ticket_value = $4
SET after_sdr = true, ticket_epoch = $3, ticket_value = $4, task_id_sdr = NULL
WHERE sp_id = $1 AND sector_number = $2`,
sectorParams.SpID, sectorParams.SectorNumber, ticketEpoch, []byte(ticket))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion curiosrc/seal/task_submit_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
return false, xerrors.Errorf("pushing message to mpool: %w", err)
}

_, err = s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET commit_msg_cid = $1, after_commit_msg = TRUE WHERE sp_id = $2 AND sector_number = $3`, mcid, sectorParams.SpID, sectorParams.SectorNumber)
_, err = s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET commit_msg_cid = $1, after_commit_msg = TRUE, task_id_commit_msg = NULL WHERE sp_id = $2 AND sector_number = $3`, mcid, sectorParams.SpID, sectorParams.SectorNumber)
if err != nil {
return false, xerrors.Errorf("updating commit_msg_cid: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions curiosrc/seal/task_submit_precommit.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo
if abi.ChainEpoch(pieces[0].F05DealStartEpoch) < head.Height() {
// deal start epoch is in the past, can't precommit this sector anymore
_, perr := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline
SET failed = TRUE, failed_at = NOW(), failed_reason = 'past-start-epoch', failed_reason_msg = 'precommit: start epoch is in the past'
SET failed = TRUE, failed_at = NOW(), failed_reason = 'past-start-epoch', failed_reason_msg = 'precommit: start epoch is in the past', task_id_precommit_msg = NULL
WHERE task_id_precommit_msg = $1`, taskID)
if perr != nil {
return false, xerrors.Errorf("persisting precommit start epoch expiry: %w", perr)
Expand Down Expand Up @@ -186,7 +186,7 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo
if err != nil {
if record {
_, perr := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline
SET failed = TRUE, failed_at = NOW(), failed_reason = 'precommit-check', failed_reason_msg = $1
SET failed = TRUE, failed_at = NOW(), failed_reason = 'precommit-check', failed_reason_msg = $1, task_id_precommit_msg = NULL
WHERE task_id_precommit_msg = $2`, err.Error(), taskID)
if perr != nil {
return false, xerrors.Errorf("persisting precommit check error: %w", perr)
Expand Down Expand Up @@ -238,7 +238,7 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo

// set precommit_msg_cid
_, err = s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline
SET precommit_msg_cid = $1, after_precommit_msg = TRUE
SET precommit_msg_cid = $1, after_precommit_msg = TRUE, task_id_precommit_msg = NULL
WHERE task_id_precommit_msg = $2`, mcid, taskID)
if err != nil {
return false, xerrors.Errorf("updating precommit_msg_cid: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion curiosrc/seal/task_treed.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (t *TreeDTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
}

n, err := t.db.Exec(ctx, `UPDATE sectors_sdr_pipeline
SET after_tree_d = true, tree_d_cid = $3 WHERE sp_id = $1 AND sector_number = $2`,
SET after_tree_d = true, tree_d_cid = $3, task_id_tree_d = NULL WHERE sp_id = $1 AND sector_number = $2`,
sectorParams.SpID, sectorParams.SectorNumber, commd)
if err != nil {
return false, xerrors.Errorf("store TreeD success: updating pipeline: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion curiosrc/seal/task_treerc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (t *TreeRCTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
// todo porep challenge check

n, err := t.db.Exec(ctx, `UPDATE sectors_sdr_pipeline
SET after_tree_r = true, after_tree_c = true, tree_r_cid = $3
SET after_tree_r = true, after_tree_c = true, tree_r_cid = $3, task_id_tree_r = NULL, task_id_tree_c = NULL
WHERE sp_id = $1 AND sector_number = $2`,
sectorParams.SpID, sectorParams.SectorNumber, sealed)
if err != nil {
Expand Down
23 changes: 10 additions & 13 deletions lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,16 @@ create table sectors_sdr_pipeline (
failed_reason_msg text not null default '',

-- foreign key
-- note: those foreign keys are a part of the retry mechanism. If a task
-- fails due to retry limit, it will drop the assigned task_id, and the
-- poller will reassign the task to a new node if it deems the task is
-- still valid to be retried.
foreign key (task_id_sdr) references harmony_task (id) on delete set null,
foreign key (task_id_tree_d) references harmony_task (id) on delete set null,
foreign key (task_id_tree_c) references harmony_task (id) on delete set null,
foreign key (task_id_tree_r) references harmony_task (id) on delete set null,
foreign key (task_id_precommit_msg) references harmony_task (id) on delete set null,
foreign key (task_id_porep) references harmony_task (id) on delete set null,
foreign key (task_id_finalize) references harmony_task (id) on delete set null,
foreign key (task_id_move_storage) references harmony_task (id) on delete set null,
foreign key (task_id_commit_msg) references harmony_task (id) on delete set null,
-- NOTE: Following keys were dropped in 20240507-sdr-pipeline-fk-drop.sql
foreign key (task_id_sdr) references harmony_task (id) on delete set null, -- dropped
foreign key (task_id_tree_d) references harmony_task (id) on delete set null, -- dropped
foreign key (task_id_tree_c) references harmony_task (id) on delete set null, -- dropped
foreign key (task_id_tree_r) references harmony_task (id) on delete set null, -- dropped
foreign key (task_id_precommit_msg) references harmony_task (id) on delete set null, -- dropped
foreign key (task_id_porep) references harmony_task (id) on delete set null, -- dropped
foreign key (task_id_finalize) references harmony_task (id) on delete set null, -- dropped
foreign key (task_id_move_storage) references harmony_task (id) on delete set null, -- dropped
foreign key (task_id_commit_msg) references harmony_task (id) on delete set null, -- dropped

-- constraints
primary key (sp_id, sector_number)
Expand Down
6 changes: 4 additions & 2 deletions lib/harmony/harmonydb/sql/20240228-piece-park.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ create table parked_pieces (

cleanup_task_id bigint default null,

foreign key (task_id) references harmony_task (id) on delete set null,
foreign key (cleanup_task_id) references harmony_task (id) on delete set null,
-- NOTE: Following keys were dropped in 20240507-sdr-pipeline-fk-drop.sql
foreign key (task_id) references harmony_task (id) on delete set null, -- dropped
foreign key (cleanup_task_id) references harmony_task (id) on delete set null, -- dropped

unique (piece_cid)
);

Expand Down
12 changes: 12 additions & 0 deletions lib/harmony/harmonydb/sql/20240507-sdr-pipeline-fk-drop.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_commit_msg_fkey;
ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_finalize_fkey;
ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_move_storage_fkey;
ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_porep_fkey;
ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_precommit_msg_fkey;
ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_sdr_fkey;
ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_tree_c_fkey;
ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_tree_d_fkey;
ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_tree_r_fkey;

ALTER TABLE parked_pieces DROP CONSTRAINT parked_pieces_cleanup_task_id_fkey;
ALTER TABLE parked_pieces DROP CONSTRAINT parked_pieces_task_id_fkey;
Loading