-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' into feat/ui-menu
- Loading branch information
Showing
18 changed files
with
591 additions
and
98 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,274 @@ | ||
package gc | ||
|
||
import ( | ||
"context" | ||
"os" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
logging "github.com/ipfs/go-log/v2" | ||
"github.com/samber/lo" | ||
"golang.org/x/xerrors" | ||
|
||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb" | ||
"github.com/filecoin-project/lotus/lib/harmony/harmonytask" | ||
"github.com/filecoin-project/lotus/lib/harmony/resources" | ||
"github.com/filecoin-project/lotus/lib/result" | ||
"github.com/filecoin-project/lotus/storage/paths" | ||
"github.com/filecoin-project/lotus/storage/sealer/fsutil" | ||
"github.com/filecoin-project/lotus/storage/sealer/storiface" | ||
) | ||
|
||
var log = logging.Logger("curiogc") | ||
|
||
const StorageEndpointGCInterval = 2 * time.Minute // todo bump post testing | ||
const StorageEndpointDeadTime = 15 * time.Minute | ||
const MaxParallelEndpointChecks = 32 | ||
|
||
type StorageEndpointGC struct { | ||
si *paths.DBIndex | ||
remote *paths.Remote | ||
db *harmonydb.DB | ||
} | ||
|
||
func NewStorageEndpointGC(si *paths.DBIndex, remote *paths.Remote, db *harmonydb.DB) *StorageEndpointGC { | ||
return &StorageEndpointGC{ | ||
si: si, | ||
remote: remote, | ||
db: db, | ||
} | ||
} | ||
|
||
func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { | ||
/* | ||
1. Get all storage paths + urls (endpoints) | ||
2. Ping each url, record results | ||
3. Update sector_path_url_liveness with success/failure | ||
4.1 If a URL was consistently down for StorageEndpointDeadTime, remove it from the storage_path table | ||
4.2 Remove storage paths with no URLs remaining | ||
4.2.1 in the same transaction remove sector refs to the dead path | ||
*/ | ||
|
||
ctx := context.Background() | ||
|
||
var pathRefs []struct { | ||
StorageID storiface.ID `db:"storage_id"` | ||
Urls string `db:"urls"` | ||
LastHeartbeat *time.Time `db:"last_heartbeat"` | ||
} | ||
|
||
err = s.db.Select(ctx, &pathRefs, `SELECT storage_id, urls, last_heartbeat FROM storage_path`) | ||
if err != nil { | ||
return false, xerrors.Errorf("getting path metadata: %w", err) | ||
} | ||
|
||
type pingResult struct { | ||
storageID storiface.ID | ||
url string | ||
|
||
res result.Result[fsutil.FsStat] | ||
} | ||
|
||
var pingResults []pingResult | ||
var resultLk sync.Mutex | ||
var resultThrottle = make(chan struct{}, MaxParallelEndpointChecks) | ||
|
||
for _, pathRef := range pathRefs { | ||
pathRef := pathRef | ||
urls := strings.Split(pathRef.Urls, paths.URLSeparator) | ||
|
||
for _, url := range urls { | ||
url := url | ||
|
||
select { | ||
case resultThrottle <- struct{}{}: | ||
case <-ctx.Done(): | ||
return false, ctx.Err() | ||
} | ||
|
||
go func() { | ||
defer func() { | ||
<-resultThrottle | ||
}() | ||
|
||
st, err := s.remote.StatUrl(ctx, url, pathRef.StorageID) | ||
|
||
res := pingResult{ | ||
storageID: pathRef.StorageID, | ||
url: url, | ||
res: result.Wrap(st, err), | ||
} | ||
|
||
resultLk.Lock() | ||
pingResults = append(pingResults, res) | ||
resultLk.Unlock() | ||
}() | ||
} | ||
} | ||
|
||
// Wait for all pings to finish | ||
for i := 0; i < MaxParallelEndpointChecks; i++ { | ||
select { | ||
case resultThrottle <- struct{}{}: | ||
case <-ctx.Done(): | ||
return false, ctx.Err() | ||
} | ||
} | ||
|
||
// Update the liveness table | ||
|
||
/* | ||
create table sector_path_url_liveness ( | ||
storage_id text, | ||
url text, | ||
last_checked timestamp not null, | ||
last_live timestamp, | ||
last_dead timestamp, | ||
last_dead_reason text, | ||
primary key (storage_id, url), | ||
foreign key (storage_id) references storage_path (storage_id) on delete cascade | ||
) | ||
*/ | ||
|
||
currentTime := time.Now().UTC() | ||
|
||
committed, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { | ||
for _, pingResult := range pingResults { | ||
var lastLive, lastDead, lastDeadReason interface{} | ||
if pingResult.res.Error == nil { | ||
lastLive = currentTime.UTC() | ||
lastDead = nil | ||
lastDeadReason = nil | ||
} else { | ||
lastLive = nil | ||
lastDead = currentTime.UTC() | ||
lastDeadReason = pingResult.res.Error.Error() | ||
} | ||
|
||
_, err := tx.Exec(` | ||
INSERT INTO sector_path_url_liveness (storage_id, url, last_checked, last_live, last_dead, last_dead_reason) | ||
VALUES ($1, $2, $3, $4, $5, $6) | ||
ON CONFLICT (storage_id, url) DO UPDATE | ||
SET last_checked = EXCLUDED.last_checked, | ||
last_live = COALESCE(EXCLUDED.last_live, sector_path_url_liveness.last_live), | ||
last_dead = COALESCE(EXCLUDED.last_dead, sector_path_url_liveness.last_dead), | ||
last_dead_reason = COALESCE(EXCLUDED.last_dead_reason, sector_path_url_liveness.last_dead_reason) | ||
`, pingResult.storageID, pingResult.url, currentTime, lastLive, lastDead, lastDeadReason) | ||
if err != nil { | ||
return false, xerrors.Errorf("updating liveness data: %w", err) | ||
} | ||
} | ||
|
||
return true, nil | ||
}, harmonydb.OptionRetry()) | ||
if err != nil { | ||
return false, xerrors.Errorf("sector_path_url_liveness update: %w", err) | ||
} | ||
if !committed { | ||
return false, xerrors.Errorf("sector_path_url_liveness update: transaction didn't commit") | ||
} | ||
|
||
/////// | ||
// Now we do the actual database cleanup | ||
if !stillOwned() { | ||
return false, xerrors.Errorf("task no longer owned") | ||
} | ||
|
||
committed, err = s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { | ||
// Identify URLs that are consistently down | ||
var deadURLs []struct { | ||
StorageID storiface.ID | ||
URL string | ||
} | ||
err = tx.Select(&deadURLs, ` | ||
SELECT storage_id, url FROM sector_path_url_liveness | ||
WHERE last_dead > last_live AND last_dead < $1 | ||
`, currentTime.Add(-StorageEndpointDeadTime).UTC()) | ||
if err != nil { | ||
return false, xerrors.Errorf("selecting dead URLs: %w", err) | ||
} | ||
|
||
log.Debugw("dead urls", "dead_urls", deadURLs) | ||
|
||
// Remove dead URLs from storage_path entries and handle path cleanup | ||
for _, du := range deadURLs { | ||
// Fetch the current URLs for the storage path | ||
var URLs string | ||
err = tx.QueryRow("SELECT urls FROM storage_path WHERE storage_id = $1", du.StorageID).Scan(&URLs) | ||
if err != nil { | ||
return false, xerrors.Errorf("fetching storage paths: %w", err) | ||
} | ||
|
||
// Filter out the dead URL using lo.Reject and prepare the updated list | ||
urls := strings.Split(URLs, paths.URLSeparator) | ||
urls = lo.Reject(urls, func(u string, _ int) bool { | ||
return u == du.URL | ||
}) | ||
|
||
log.Debugw("filtered urls", "urls", urls, "dead_url", du.URL, "storage_id", du.StorageID) | ||
|
||
if os.Getenv("CURIO_STORAGE_META_GC_DRYRUN") != "no" { // todo drop this after testing | ||
log.Debugw("dryrun: not updating storage path", "storage_id", du.StorageID, "urls", urls, "dead_url", du.URL, "current_urls", URLs, "dead_urls", deadURLs) | ||
continue | ||
} | ||
|
||
if len(urls) == 0 { | ||
// If no URLs left, remove the storage path entirely | ||
_, err = tx.Exec("DELETE FROM storage_path WHERE storage_id = $1", du.StorageID) | ||
if err != nil { | ||
return false, xerrors.Errorf("deleting storage path: %w", err) | ||
} | ||
_, err = tx.Exec("DELETE FROM sector_location WHERE storage_id = $1", du.StorageID) | ||
if err != nil { | ||
return false, xerrors.Errorf("deleting sector locations: %w", err) | ||
} | ||
} else { | ||
// Update the storage path with the filtered URLs | ||
newURLs := strings.Join(urls, paths.URLSeparator) | ||
_, err = tx.Exec("UPDATE storage_path SET urls = $1 WHERE storage_id = $2", newURLs, du.StorageID) | ||
if err != nil { | ||
return false, xerrors.Errorf("updating storage path urls: %w", err) | ||
} | ||
} | ||
} | ||
|
||
return true, nil | ||
}, harmonydb.OptionRetry()) | ||
if err != nil { | ||
return false, xerrors.Errorf("removing dead URLs and cleaning storage paths: %w", err) | ||
} | ||
if !committed { | ||
return false, xerrors.Errorf("transaction for removing dead URLs and cleaning paths did not commit") | ||
} | ||
|
||
return true, nil | ||
} | ||
|
||
func (s *StorageEndpointGC) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { | ||
id := ids[0] | ||
return &id, nil | ||
} | ||
|
||
func (s *StorageEndpointGC) TypeDetails() harmonytask.TaskTypeDetails { | ||
return harmonytask.TaskTypeDetails{ | ||
Max: 1, | ||
Name: "StorageMetaGC", | ||
Cost: resources.Resources{ | ||
Cpu: 1, | ||
Ram: 64 << 20, | ||
Gpu: 0, | ||
}, | ||
IAmBored: harmonytask.SingletonTaskAdder(StorageEndpointGCInterval, s), | ||
} | ||
} | ||
|
||
func (s *StorageEndpointGC) Adder(taskFunc harmonytask.AddTaskFunc) { | ||
// lazy endpoint, added when bored | ||
return | ||
} | ||
|
||
var _ harmonytask.TaskInterface = &StorageEndpointGC{} |
Oops, something went wrong.