From c785e59371f7e8f4194082e0725d33c1ebf70b16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 16 Apr 2024 23:34:48 +0200 Subject: [PATCH 1/6] feat: curio: storage index gc task (#11884) * curio storage path gc: lay out the structure * curio gc: Implement storage metadata gc * move bored singleton task impl to harmonytask * curio: run storage gc task on storage node * make gen --- cmd/curio/tasks/tasks.go | 7 + curiosrc/gc/storage_endpoint_gc.go | 276 ++++++++++++++++++ .../sql/20240416-harmony_singleton_task.sql | 8 + .../sql/20240417-sector_index_gc.sql | 13 + lib/harmony/harmonydb/userfuncs.go | 4 + lib/harmony/harmonytask/singleton_task.go | 52 ++++ lib/passcall/every.go | 28 ++ storage/paths/remote.go | 66 +++-- 8 files changed, 424 insertions(+), 30 deletions(-) create mode 100644 curiosrc/gc/storage_endpoint_gc.go create mode 100644 lib/harmony/harmonydb/sql/20240416-harmony_singleton_task.sql create mode 100644 lib/harmony/harmonydb/sql/20240417-sector_index_gc.sql create mode 100644 lib/harmony/harmonytask/singleton_task.go create mode 100644 lib/passcall/every.go diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index 3b262efa8d7..71923018d9e 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -18,6 +18,7 @@ import ( curio "github.com/filecoin-project/lotus/curiosrc" "github.com/filecoin-project/lotus/curiosrc/chainsched" "github.com/filecoin-project/lotus/curiosrc/ffi" + "github.com/filecoin-project/lotus/curiosrc/gc" "github.com/filecoin-project/lotus/curiosrc/message" "github.com/filecoin-project/lotus/curiosrc/piece" "github.com/filecoin-project/lotus/curiosrc/seal" @@ -136,6 +137,12 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task } } + if hasAnySealingTask { + // Sealing nodes maintain storage index when bored + storageEndpointGcTask := gc.NewStorageEndpointGC(si, stor, db) + activeTasks = append(activeTasks, storageEndpointGcTask) + } + if needProofParams { for spt := range dependencies.ProofTypes { if err := modules.GetParams(true)(spt); err != nil { diff --git a/curiosrc/gc/storage_endpoint_gc.go b/curiosrc/gc/storage_endpoint_gc.go new file mode 100644 index 00000000000..8c6e6a0bf30 --- /dev/null +++ b/curiosrc/gc/storage_endpoint_gc.go @@ -0,0 +1,276 @@ +package gc + +import ( + "context" + "os" + "strings" + "sync" + "time" + + logging "github.com/ipfs/go-log/v2" + "github.com/samber/lo" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/lib/result" + "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/sealer/fsutil" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +var log = logging.Logger("curiogc") + +const StorageEndpointGCInterval = 2 * time.Minute // todo bump post testing +const StorageEndpointDeadTime = 15 * time.Minute +const MaxParallelEndpointChecks = 32 + +type StorageEndpointGC struct { + si *paths.DBIndex + remote *paths.Remote + db *harmonydb.DB +} + +func NewStorageEndpointGC(si *paths.DBIndex, remote *paths.Remote, db *harmonydb.DB) *StorageEndpointGC { + return &StorageEndpointGC{ + si: si, + remote: remote, + db: db, + } +} + +func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + /* + 1. Get all storage paths + urls (endpoints) + 2. Ping each url, record results + 3. Update sector_path_url_liveness with success/failure + 4.1 If a URL was consistently down for StorageEndpointDeadTime, remove it from the storage_path table + 4.2 Remove storage paths with no URLs remaining + 4.2.1 in the same transaction remove sector refs to the dead path + */ + + ctx := context.Background() + + var pathRefs []struct { + StorageID storiface.ID `db:"storage_id"` + Urls string `db:"urls"` + LastHeartbeat *time.Time `db:"last_heartbeat"` + } + + err = s.db.Select(ctx, &pathRefs, `SELECT storage_id, urls, last_heartbeat FROM storage_path`) + if err != nil { + return false, xerrors.Errorf("getting path metadata: %w", err) + } + + type pingResult struct { + storageID storiface.ID + url string + + res result.Result[fsutil.FsStat] + } + + var pingResults []pingResult + var resultLk sync.Mutex + var resultThrottle = make(chan struct{}, MaxParallelEndpointChecks) + + for _, pathRef := range pathRefs { + pathRef := pathRef + urls := strings.Split(pathRef.Urls, paths.URLSeparator) + + for _, url := range urls { + url := url + + select { + case resultThrottle <- struct{}{}: + case <-ctx.Done(): + return false, ctx.Err() + } + + go func() { + defer func() { + <-resultThrottle + }() + + st, err := s.remote.StatUrl(ctx, url, pathRef.StorageID) + + res := pingResult{ + storageID: pathRef.StorageID, + url: url, + res: result.Wrap(st, err), + } + + resultLk.Lock() + pingResults = append(pingResults, res) + resultLk.Unlock() + }() + } + } + + // Wait for all pings to finish + for i := 0; i < MaxParallelEndpointChecks; i++ { + select { + case resultThrottle <- struct{}{}: + case <-ctx.Done(): + return false, ctx.Err() + } + } + + // Update the liveness table + + /* + create table sector_path_url_liveness ( + storage_id text, + url text, + + last_checked timestamp not null, + last_live timestamp, + last_dead timestamp, + last_dead_reason text, + + primary key (storage_id, url), + + foreign key (storage_id) references storage_path (storage_id) on delete cascade + ) + */ + + currentTime := time.Now().UTC() + + committed, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { + for _, pingResult := range pingResults { + var lastLive, lastDead, lastDeadReason interface{} + if pingResult.res.Error == nil { + lastLive = currentTime.UTC() + lastDead = nil + lastDeadReason = nil + } else { + lastLive = nil + lastDead = currentTime.UTC() + lastDeadReason = pingResult.res.Error.Error() + } + + _, err := tx.Exec(` + INSERT INTO sector_path_url_liveness (storage_id, url, last_checked, last_live, last_dead, last_dead_reason) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (storage_id, url) DO UPDATE + SET last_checked = EXCLUDED.last_checked, + last_live = COALESCE(EXCLUDED.last_live, sector_path_url_liveness.last_live), + last_dead = COALESCE(EXCLUDED.last_dead, sector_path_url_liveness.last_dead), + last_dead_reason = COALESCE(EXCLUDED.last_dead_reason, sector_path_url_liveness.last_dead_reason) + `, pingResult.storageID, pingResult.url, currentTime, lastLive, lastDead, lastDeadReason) + if err != nil { + return false, xerrors.Errorf("updating liveness data: %w", err) + } + } + + return true, nil + }, harmonydb.OptionRetry()) + if err != nil { + return false, xerrors.Errorf("sector_path_url_liveness update: %w", err) + } + if !committed { + return false, xerrors.Errorf("sector_path_url_liveness update: transaction didn't commit") + } + + /////// + // Now we do the actual database cleanup + if !stillOwned() { + return false, xerrors.Errorf("task no longer owned") + } + + committed, err = s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { + // Identify URLs that are consistently down + var deadURLs []struct { + StorageID storiface.ID + URL string + } + err = tx.Select(&deadURLs, ` + SELECT storage_id, url FROM sector_path_url_liveness + WHERE last_dead > last_live AND last_dead < $1 + `, currentTime.Add(-StorageEndpointDeadTime).UTC()) + if err != nil { + return false, xerrors.Errorf("selecting dead URLs: %w", err) + } + + log.Debugw("dead urls", "dead_urls", deadURLs) + + // Remove dead URLs from storage_path entries and handle path cleanup + for _, du := range deadURLs { + // Fetch the current URLs for the storage path + var currentPath struct { + URLs string + } + err = tx.Get(¤tPath, "SELECT urls FROM storage_path WHERE storage_id = $1", du.StorageID) + if err != nil { + return false, xerrors.Errorf("fetching storage paths: %w", err) + } + + // Filter out the dead URL using lo.Reject and prepare the updated list + urls := strings.Split(currentPath.URLs, paths.URLSeparator) + urls = lo.Reject(urls, func(u string, _ int) bool { + return u == du.URL + }) + + log.Debugw("filtered urls", "urls", urls, "dead_url", du.URL, "storage_id", du.StorageID) + + if os.Getenv("CURIO_STORAGE_META_GC_DRYRUN") != "no" { // todo drop this after testing + log.Debugw("dryrun: not updating storage path", "storage_id", du.StorageID, "urls", urls, "dead_url", du.URL, "current_urls", currentPath.URLs, "dead_urls", deadURLs) + continue + } + + if len(urls) == 0 { + // If no URLs left, remove the storage path entirely + _, err = tx.Exec("DELETE FROM storage_path WHERE storage_id = $1", du.StorageID) + if err != nil { + return false, xerrors.Errorf("deleting storage path: %w", err) + } + _, err = tx.Exec("DELETE FROM sector_location WHERE storage_id = $1", du.StorageID) + if err != nil { + return false, xerrors.Errorf("deleting sector locations: %w", err) + } + } else { + // Update the storage path with the filtered URLs + newURLs := strings.Join(urls, paths.URLSeparator) + _, err = tx.Exec("UPDATE storage_path SET urls = $1 WHERE storage_id = $2", newURLs, du.StorageID) + if err != nil { + return false, xerrors.Errorf("updating storage path urls: %w", err) + } + } + } + + return true, nil + }, harmonydb.OptionRetry()) + if err != nil { + return false, xerrors.Errorf("removing dead URLs and cleaning storage paths: %w", err) + } + if !committed { + return false, xerrors.Errorf("transaction for removing dead URLs and cleaning paths did not commit") + } + + return true, nil +} + +func (s *StorageEndpointGC) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + id := ids[0] + return &id, nil +} + +func (s *StorageEndpointGC) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Max: 1, + Name: "StorageMetaGC", + Cost: resources.Resources{ + Cpu: 1, + Ram: 64 << 20, + Gpu: 0, + }, + IAmBored: harmonytask.SingletonTaskAdder(StorageEndpointGCInterval, s), + } +} + +func (s *StorageEndpointGC) Adder(taskFunc harmonytask.AddTaskFunc) { + // lazy endpoint, added when bored + return +} + +var _ harmonytask.TaskInterface = &StorageEndpointGC{} diff --git a/lib/harmony/harmonydb/sql/20240416-harmony_singleton_task.sql b/lib/harmony/harmonydb/sql/20240416-harmony_singleton_task.sql new file mode 100644 index 00000000000..d565cfa4702 --- /dev/null +++ b/lib/harmony/harmonydb/sql/20240416-harmony_singleton_task.sql @@ -0,0 +1,8 @@ +create table harmony_task_singletons ( + task_name varchar(255) not null, + task_id bigint, + last_run_time timestamp, + + primary key (task_name), + foreign key (task_id) references harmony_task (id) on delete set null +); diff --git a/lib/harmony/harmonydb/sql/20240417-sector_index_gc.sql b/lib/harmony/harmonydb/sql/20240417-sector_index_gc.sql new file mode 100644 index 00000000000..e9771d9f31c --- /dev/null +++ b/lib/harmony/harmonydb/sql/20240417-sector_index_gc.sql @@ -0,0 +1,13 @@ +create table sector_path_url_liveness ( + storage_id text, + url text, + + last_checked timestamp not null, + last_live timestamp, + last_dead timestamp, + last_dead_reason text, + + primary key (storage_id, url), + + foreign key (storage_id) references storage_path (storage_id) on delete cascade +) diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go index 759cbd322b3..5cca8de57e7 100644 --- a/lib/harmony/harmonydb/userfuncs.go +++ b/lib/harmony/harmonydb/userfuncs.go @@ -236,6 +236,10 @@ func (t *Tx) Select(sliceOfStructPtr any, sql rawStringOnly, arguments ...any) e return pgxscan.Select(t.ctx, t.Tx, sliceOfStructPtr, string(sql), arguments...) } +func (t *Tx) Get(s any, sql rawStringOnly, arguments ...any) error { + return pgxscan.Get(t.ctx, t.Tx, s, string(sql), arguments...) +} + func IsErrUniqueContraint(err error) bool { var e2 *pgconn.PgError return errors.As(err, &e2) && e2.Code == pgerrcode.UniqueViolation diff --git a/lib/harmony/harmonytask/singleton_task.go b/lib/harmony/harmonytask/singleton_task.go new file mode 100644 index 00000000000..a7b2d45a26f --- /dev/null +++ b/lib/harmony/harmonytask/singleton_task.go @@ -0,0 +1,52 @@ +package harmonytask + +import ( + "errors" + "time" + + "github.com/jackc/pgx/v5" + + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/passcall" +) + +func SingletonTaskAdder(minInterval time.Duration, task TaskInterface) func(AddTaskFunc) error { + return passcall.Every(minInterval, func(add AddTaskFunc) error { + taskName := task.TypeDetails().Name + + add(func(taskID TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { + var existingTaskID *int64 + var lastRunTime time.Time + + // Query to check the existing task entry + err = tx.QueryRow(`SELECT task_id, last_run_time FROM harmony_task_singletons WHERE task_name = $1`, taskName).Scan(&existingTaskID, &lastRunTime) + if err != nil { + if !errors.Is(err, pgx.ErrNoRows) { + return false, err // return error if query failed and it's not because of missing row + } + } + + now := time.Now() + // Determine if the task should run based on the absence of a record or outdated last_run_time + shouldRun := err == pgx.ErrNoRows || (existingTaskID == nil && lastRunTime.Add(minInterval).Before(now)) + if !shouldRun { + return false, nil + } + + // Conditionally insert or update the task entry + n, err := tx.Exec(` + INSERT INTO harmony_task_singletons (task_name, task_id, last_run_time) + VALUES ($1, $2, $3) + ON CONFLICT (task_name) DO UPDATE + SET task_id = COALESCE(harmony_task_singletons.task_id, $2), + last_run_time = $3 + WHERE harmony_task_singletons.task_id IS NULL + `, taskName, taskID, now) + if err != nil { + return false, err + } + return n > 0, nil + }) + return nil + }) +} diff --git a/lib/passcall/every.go b/lib/passcall/every.go new file mode 100644 index 00000000000..f39543063dd --- /dev/null +++ b/lib/passcall/every.go @@ -0,0 +1,28 @@ +package passcall + +import ( + "sync" + "time" +) + +// Every is a helper function that will call the provided callback +// function at most once every `passEvery` duration. If the function is called +// more frequently than that, it will return nil and not call the callback. +func Every[P, R any](passInterval time.Duration, cb func(P) R) func(P) R { + var lastCall time.Time + var lk sync.Mutex + + return func(param P) R { + lk.Lock() + defer lk.Unlock() + + if time.Since(lastCall) < passInterval { + return *new(R) + } + + defer func() { + lastCall = time.Now() + }() + return cb(param) + } +} diff --git a/storage/paths/remote.go b/storage/paths/remote.go index abf8622e1c9..ab27548632c 100644 --- a/storage/paths/remote.go +++ b/storage/paths/remote.go @@ -416,47 +416,53 @@ func (r *Remote) FsStat(ctx context.Context, id storiface.ID) (fsutil.FsStat, er } for _, urlStr := range si.URLs { - rl, err := url.Parse(urlStr) + out, err := r.StatUrl(ctx, urlStr, id) if err != nil { - log.Warnw("failed to parse URL", "url", urlStr, "error", err) - continue // Try the next URL + log.Warnw("stat url failed", "url", urlStr, "error", err) + continue } - rl.Path = gopath.Join(rl.Path, "stat", string(id)) + return out, nil + } - req, err := http.NewRequest("GET", rl.String(), nil) - if err != nil { - log.Warnw("creating request failed", "url", rl.String(), "error", err) - continue // Try the next URL - } - req.Header = r.auth - req = req.WithContext(ctx) + return fsutil.FsStat{}, xerrors.Errorf("all endpoints failed for remote storage %s", id) +} - resp, err := http.DefaultClient.Do(req) - if err != nil { - log.Warnw("request failed", "url", rl.String(), "error", err) - continue // Try the next URL - } +func (r *Remote) StatUrl(ctx context.Context, urlStr string, id storiface.ID) (fsutil.FsStat, error) { + rl, err := url.Parse(urlStr) + if err != nil { + return fsutil.FsStat{}, xerrors.Errorf("parsing URL: %w", err) + } - if resp.StatusCode == 200 { - var out fsutil.FsStat - if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { - _ = resp.Body.Close() - log.Warnw("decoding response failed", "url", rl.String(), "error", err) - continue // Try the next URL - } + rl.Path = gopath.Join(rl.Path, "stat", string(id)) + + req, err := http.NewRequest("GET", rl.String(), nil) + if err != nil { + return fsutil.FsStat{}, xerrors.Errorf("creating request failed: %w", err) + } + req.Header = r.auth + req = req.WithContext(ctx) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fsutil.FsStat{}, xerrors.Errorf("do request: %w", err) + } + + if resp.StatusCode == 200 { + var out fsutil.FsStat + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { _ = resp.Body.Close() - return out, nil // Successfully decoded, return the result + return fsutil.FsStat{}, xerrors.Errorf("decoding response failed: %w", err) } - - // non-200 status code - b, _ := io.ReadAll(resp.Body) // Best-effort read the body for logging - log.Warnw("request to endpoint failed", "url", rl.String(), "statusCode", resp.StatusCode, "response", string(b)) _ = resp.Body.Close() - // Continue to try the next URL, don't return here as we want to try all URLs + return out, nil // Successfully decoded, return the result } - return fsutil.FsStat{}, xerrors.Errorf("all endpoints failed for remote storage %s", id) + // non-200 status code + b, _ := io.ReadAll(resp.Body) // Best-effort read the body for logging + _ = resp.Body.Close() + + return fsutil.FsStat{}, xerrors.Errorf("endpoint failed %s: %d %s", rl.String(), resp.StatusCode, string(b)) } func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) { From 1cdca19a851f47b2695da0c8ac886d8b5069b019 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Tue, 16 Apr 2024 20:20:58 -0500 Subject: [PATCH 2/6] oops --- documentation/en/cli-curio.md | 67 +++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/documentation/en/cli-curio.md b/documentation/en/cli-curio.md index 385056c33e8..6dced244188 100644 --- a/documentation/en/cli-curio.md +++ b/documentation/en/cli-curio.md @@ -159,10 +159,77 @@ OPTIONS: ### curio cli log ``` +NAME: + curio cli log - Manage logging + +USAGE: + curio cli log command [command options] [arguments...] + +COMMANDS: + list List log systems + set-level Set log level + help, h Shows a list of commands or help for one command + +OPTIONS: + --help, -h show help +``` + +#### curio cli log list +``` +NAME: + curio cli log list - List log systems + +USAGE: + curio cli log list [command options] [arguments...] + +OPTIONS: + --help, -h show help +``` + +#### curio cli log set-level +``` +NAME: + curio cli log set-level - Set log level + +USAGE: + curio cli log set-level [command options] [level] + +DESCRIPTION: + Set the log level for logging systems: + + The system flag can be specified multiple times. + + eg) log set-level --system chain --system chainxchg debug + + Available Levels: + debug + info + warn + error + + Environment Variables: + GOLOG_LOG_LEVEL - Default log level for all log systems + GOLOG_LOG_FMT - Change output log format (json, nocolor) + GOLOG_FILE - Write logs to file + GOLOG_OUTPUT - Specify whether to output to file, stderr, stdout or a combination, i.e. file+stderr + + +OPTIONS: + --system value [ --system value ] limit to log system + --help, -h show help ``` ### curio cli wait-api ``` +NAME: + curio cli wait-api - Wait for Curio api to come online + +USAGE: + curio cli wait-api [command options] [arguments...] + +OPTIONS: + --timeout value duration to wait till fail (default: 30s) + --help, -h show help ``` ## curio run From 52a190aa7bb059ef6d65b93bf8739a74e50cf55f Mon Sep 17 00:00:00 2001 From: writegr <167099595+writegr@users.noreply.github.com> Date: Thu, 18 Apr 2024 16:34:20 +0800 Subject: [PATCH 3/6] chore: fix some typos in comments (#11892) Signed-off-by: writegr --- chain/sub/ratelimit/window.go | 2 +- cmd/tvx/extract_tipset.go | 2 +- node/config/load_test.go | 2 +- storage/pipeline/fsm_events.go | 2 +- tools/stats/sync/sync.go | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/chain/sub/ratelimit/window.go b/chain/sub/ratelimit/window.go index 0756e8998bb..8350b109eb1 100644 --- a/chain/sub/ratelimit/window.go +++ b/chain/sub/ratelimit/window.go @@ -24,7 +24,7 @@ func NewWindow(capacity int, size time.Duration) *Window { } // Add attempts to append a new timestamp into the current window. Previously -// added values that are not not within `size` difference from the value being +// added values that are not within `size` difference from the value being // added are first removed. Add fails if adding the value would cause the // window to exceed capacity. func (w *Window) Add() error { diff --git a/cmd/tvx/extract_tipset.go b/cmd/tvx/extract_tipset.go index 553961f4491..0be4c04b08d 100644 --- a/cmd/tvx/extract_tipset.go +++ b/cmd/tvx/extract_tipset.go @@ -57,7 +57,7 @@ func doExtractTipset(opts extractOpts) error { return err } - // are are squashing all tipsets into a single multi-tipset vector? + // are squashing all tipsets into a single multi-tipset vector? if opts.squash { vector, err := extractTipsets(ctx, tss...) if err != nil { diff --git a/node/config/load_test.go b/node/config/load_test.go index 2edef259bc6..2eeacb7d5b3 100644 --- a/node/config/load_test.go +++ b/node/config/load_test.go @@ -53,7 +53,7 @@ func TestParitalConfig(t *testing.T) { f, err := os.CreateTemp("", "config-*.toml") fname := f.Name() - assert.NoError(err, "tmp file shold not error") + assert.NoError(err, "tmp file should not error") _, err = f.WriteString(cfgString) assert.NoError(err, "writing to tmp file should not error") err = f.Close() diff --git a/storage/pipeline/fsm_events.go b/storage/pipeline/fsm_events.go index 94cd53e829d..63ebdb9d5b6 100644 --- a/storage/pipeline/fsm_events.go +++ b/storage/pipeline/fsm_events.go @@ -21,7 +21,7 @@ type mutator interface { // globalMutator is an event which can apply in every state type globalMutator interface { - // applyGlobal applies the event to the state. If if returns true, + // applyGlobal applies the event to the state. If it returns true, // event processing should be interrupted applyGlobal(state *SectorInfo) bool } diff --git a/tools/stats/sync/sync.go b/tools/stats/sync/sync.go index c8db1c543be..5a925d4cbcc 100644 --- a/tools/stats/sync/sync.go +++ b/tools/stats/sync/sync.go @@ -91,7 +91,7 @@ type BufferedTipsetChannelApi interface { } // BufferedTipsetChannel returns an unbuffered channel of tipsets. Buffering occurs internally to handle revert -// ChainNotify changes. The returned channel can output tipsets at the same height twice if a reorg larger the the +// ChainNotify changes. The returned channel can output tipsets at the same height twice if a reorg larger the // provided `size` occurs. func BufferedTipsetChannel(ctx context.Context, api BufferedTipsetChannelApi, lastHeight abi.ChainEpoch, size int) (<-chan *types.TipSet, error) { chmain := make(chan *types.TipSet) From b9c06b64d739e1f9a501736c6d1f6c50fe1f6a7f Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Wed, 17 Apr 2024 14:18:10 -0500 Subject: [PATCH 4/6] 1 --- curiosrc/gc/storage_endpoint_gc.go | 10 +++--- go.mod | 7 +++-- go.sum | 8 +++++ lib/harmony/harmonydb/harmonydb.go | 6 ++-- lib/harmony/harmonydb/userfuncs.go | 49 ++++++++++++++++++++++-------- 5 files changed, 56 insertions(+), 24 deletions(-) diff --git a/curiosrc/gc/storage_endpoint_gc.go b/curiosrc/gc/storage_endpoint_gc.go index 8c6e6a0bf30..1dd26ee037c 100644 --- a/curiosrc/gc/storage_endpoint_gc.go +++ b/curiosrc/gc/storage_endpoint_gc.go @@ -197,16 +197,14 @@ func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool // Remove dead URLs from storage_path entries and handle path cleanup for _, du := range deadURLs { // Fetch the current URLs for the storage path - var currentPath struct { - URLs string - } - err = tx.Get(¤tPath, "SELECT urls FROM storage_path WHERE storage_id = $1", du.StorageID) + var URLs string + err = tx.QueryRow("SELECT urls FROM storage_path WHERE storage_id = $1", du.StorageID).Scan(&URLs) if err != nil { return false, xerrors.Errorf("fetching storage paths: %w", err) } // Filter out the dead URL using lo.Reject and prepare the updated list - urls := strings.Split(currentPath.URLs, paths.URLSeparator) + urls := strings.Split(URLs, paths.URLSeparator) urls = lo.Reject(urls, func(u string, _ int) bool { return u == du.URL }) @@ -214,7 +212,7 @@ func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool log.Debugw("filtered urls", "urls", urls, "dead_url", du.URL, "storage_id", du.StorageID) if os.Getenv("CURIO_STORAGE_META_GC_DRYRUN") != "no" { // todo drop this after testing - log.Debugw("dryrun: not updating storage path", "storage_id", du.StorageID, "urls", urls, "dead_url", du.URL, "current_urls", currentPath.URLs, "dead_urls", deadURLs) + log.Debugw("dryrun: not updating storage path", "storage_id", du.StorageID, "urls", urls, "dead_url", du.URL, "current_urls", URLs, "dead_urls", deadURLs) continue } diff --git a/go.mod b/go.mod index 054a20f9ed2..cd92fcac01a 100644 --- a/go.mod +++ b/go.mod @@ -159,7 +159,7 @@ require ( go.uber.org/fx v1.20.1 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - golang.org/x/crypto v0.19.0 + golang.org/x/crypto v0.20.0 golang.org/x/exp v0.0.0-20240213143201-ec583247a57a golang.org/x/net v0.21.0 golang.org/x/sync v0.6.0 @@ -249,8 +249,8 @@ require ( github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 // indirect github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect - github.com/jackc/puddle/v2 v2.2.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect @@ -318,6 +318,7 @@ require ( github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect + github.com/yugabyte/pgx/v5 v5.5.3-yb-2 // indirect github.com/zondax/hid v0.9.2 // indirect github.com/zondax/ledger-go v0.14.3 // indirect go.opentelemetry.io/otel/metric v1.21.0 // indirect diff --git a/go.sum b/go.sum index 4fe1f13df94..6c2bf17cfb5 100644 --- a/go.sum +++ b/go.sum @@ -884,10 +884,14 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA= +github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgx/v5 v5.4.1 h1:oKfB/FhuVtit1bBM3zNRRsZ925ZkMN3HXL+LgLUM9lE= github.com/jackc/pgx/v5 v5.4.1/go.mod h1:q6iHT8uDNXWiFNOlRqJzBTaSH3+2xCXkokxHZC5qWFY= github.com/jackc/puddle/v2 v2.2.0 h1:RdcDk92EJBuBS55nQMMYFXTxwstHug4jkhT5pq8VxPk= github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= @@ -1711,6 +1715,8 @@ github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542/go.mod h1:7T39/ZM github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= +github.com/yugabyte/pgx/v5 v5.5.3-yb-2 h1:SDk2waZb2o6dSLYqk+vq0Ur2jnIv+X2A+P+QPR1UThU= +github.com/yugabyte/pgx/v5 v5.5.3-yb-2/go.mod h1:2SxizGfDY7UDCRTtbI/xd98C/oGN7S/3YoGF8l9gx/c= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -1840,6 +1846,8 @@ golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.20.0 h1:jmAMJJZXr5KiCw05dfYK9QnqaqKLYXijU23lsEdcQqg= +golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= diff --git a/lib/harmony/harmonydb/harmonydb.go b/lib/harmony/harmonydb/harmonydb.go index 3d05f04fb4f..56b5acdfee2 100644 --- a/lib/harmony/harmonydb/harmonydb.go +++ b/lib/harmony/harmonydb/harmonydb.go @@ -15,9 +15,9 @@ import ( "time" logging "github.com/ipfs/go-log/v2" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" - "github.com/jackc/pgx/v5/pgxpool" + "github.com/yugabyte/pgx/v5" + "github.com/yugabyte/pgx/v5/pgconn" + "github.com/yugabyte/pgx/v5/pgxpool" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/node/config" diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go index 5cca8de57e7..1f39504b81e 100644 --- a/lib/harmony/harmonydb/userfuncs.go +++ b/lib/harmony/harmonydb/userfuncs.go @@ -3,17 +3,18 @@ package harmonydb import ( "context" "errors" + "fmt" "runtime" "time" - "github.com/georgysavva/scany/v2/pgxscan" + "github.com/georgysavva/scany/v2/dbscan" "github.com/jackc/pgerrcode" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" "github.com/samber/lo" + "github.com/yugabyte/pgx/v5" + "github.com/yugabyte/pgx/v5/pgconn" ) -var errTx = errors.New("Cannot use a non-transaction func in a transaction") +var errTx = errors.New("cannot use a non-transaction func in a transaction") // rawStringOnly is _intentionally_private_ to force only basic strings in SQL queries. // In any package, raw strings will satisfy compilation. Ex: @@ -42,7 +43,7 @@ type Qry interface { Values() ([]any, error) } -// Query offers Next/Err/Close/Scan/Values/StructScan +// Query offers Next/Err/Close/Scan/Values type Query struct { Qry } @@ -69,8 +70,12 @@ func (db *DB) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (* q, err := db.pgx.Query(ctx, string(sql), arguments...) return &Query{q}, err } + +// StructScan allows scanning a single row into a struct. +// This improves efficiency of processing large result sets +// by avoiding the need to allocate a slice of structs. func (q *Query) StructScan(s any) error { - return pgxscan.ScanRow(s, q.Qry.(pgx.Rows)) + return dbscan.ScanRow(s, dbscanRows{q.Qry.(pgx.Rows)}) } type Row interface { @@ -95,6 +100,20 @@ func (db *DB) QueryRow(ctx context.Context, sql rawStringOnly, arguments ...any) return db.pgx.QueryRow(ctx, string(sql), arguments...) } +type dbscanRows struct { + pgx.Rows +} + +func (d dbscanRows) Close() error { + d.Rows.Close() + return nil +} +func (d dbscanRows) Columns() ([]string, error) { + return lo.Map(d.Rows.FieldDescriptions(), func(fd pgconn.FieldDescription, _ int) string { + return fd.Name + }), nil +} + /* Select multiple rows into a slice using name matching Ex: @@ -113,7 +132,12 @@ func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnl if db.usedInTransaction() { return errTx } - return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...) + rows, err := db.pgx.Query(ctx, string(sql), arguments...) + if err != nil { + return err + } + defer rows.Close() + return dbscan.ScanAll(sliceOfStructPtr, dbscanRows{rows}) } type Tx struct { @@ -233,11 +257,12 @@ func (t *Tx) QueryRow(sql rawStringOnly, arguments ...any) Row { // Select in a transaction. func (t *Tx) Select(sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error { - return pgxscan.Select(t.ctx, t.Tx, sliceOfStructPtr, string(sql), arguments...) -} - -func (t *Tx) Get(s any, sql rawStringOnly, arguments ...any) error { - return pgxscan.Get(t.ctx, t.Tx, s, string(sql), arguments...) + rows, err := t.Query(sql, arguments...) + if err != nil { + return fmt.Errorf("scany: query multiple result rows: %w", err) + } + defer rows.Close() + return dbscan.ScanAll(sliceOfStructPtr, dbscanRows{rows.Qry.(pgx.Rows)}) } func IsErrUniqueContraint(err error) bool { From edd9c82bc1fc5fc19adefd671488236faa8e4b98 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Wed, 17 Apr 2024 14:53:46 -0500 Subject: [PATCH 5/6] mod tidy --- go.mod | 2 +- go.sum | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/go.mod b/go.mod index cd92fcac01a..674148ec3db 100644 --- a/go.mod +++ b/go.mod @@ -148,6 +148,7 @@ require ( github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 github.com/xeipuuv/gojsonschema v1.2.0 github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542 + github.com/yugabyte/pgx/v5 v5.5.3-yb-2 github.com/zondax/ledger-filecoin-go v0.11.1 github.com/zyedidia/generic v1.2.1 go.opencensus.io v0.24.0 @@ -318,7 +319,6 @@ require ( github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect - github.com/yugabyte/pgx/v5 v5.5.3-yb-2 // indirect github.com/zondax/hid v0.9.2 // indirect github.com/zondax/ledger-go v0.14.3 // indirect go.opentelemetry.io/otel/metric v1.21.0 // indirect diff --git a/go.sum b/go.sum index 6c2bf17cfb5..4e6ce0dec0f 100644 --- a/go.sum +++ b/go.sum @@ -882,14 +882,10 @@ github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa h1:s+4MhCQ6YrzisK6 github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA= github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgx/v5 v5.4.1 h1:oKfB/FhuVtit1bBM3zNRRsZ925ZkMN3HXL+LgLUM9lE= github.com/jackc/pgx/v5 v5.4.1/go.mod h1:q6iHT8uDNXWiFNOlRqJzBTaSH3+2xCXkokxHZC5qWFY= -github.com/jackc/puddle/v2 v2.2.0 h1:RdcDk92EJBuBS55nQMMYFXTxwstHug4jkhT5pq8VxPk= -github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= @@ -1844,8 +1840,6 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= -golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= -golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.20.0 h1:jmAMJJZXr5KiCw05dfYK9QnqaqKLYXijU23lsEdcQqg= golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= From 6b3e9b109fa5855f856d11cc45b39efcdb38cf08 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Thu, 18 Apr 2024 14:57:29 -0500 Subject: [PATCH 6/6] feat: curio: sectors UI (#11869) * cfg edit 1 * jsonschema deps * feat: lp mig - first few steps * lp mig: default tasks * code comments * docs * lp-mig-progress * shared * comments and todos * fix: curio: rename lotus-provider to curio (#11645) * rename provider to curio * install gotext * fix lint errors, mod tidy * fix typo * fix API_INFO and add gotext to circleCI * add back gotext * add gotext after remerge * lp: channels doc * finish easy-migration TODOs * out generate * merging and more renames * avoid make-all * minor doc stuff * cu: make gen * make gen fix * make gen * tryfix * go mod tidy * minor ez migration fixes * ez setup - ui cleanups * better error message * guided setup colors * better path to saveconfigtolayer * loadconfigwithupgrades fix * readMiner oops * guided - homedir * err if miner is running * prompt error should exit * process already running, miner_id sectors in migration * dont prompt for language a second time * check miner stopped * unlock repo * render and sql oops * curio easyMig - some fixes * easyMigration runs successfully * lint * part 2 of last * message * merge addtl * fixing guided setup for myself * warn-on-no-post * EditorLoads * cleanups and styles * create info * fix tests * make gen * sector early bird * sectors v2 * sector termination v1 * terminate2 * mjs * minor things * flag bad sectors * fix errors * add dealweight and deals * change column width * refactor sql, handle sealing sectors * fix estimates --------- Co-authored-by: LexLuthr <88259624+LexLuthr@users.noreply.github.com> Co-authored-by: LexLuthr Co-authored-by: LexLuthr --- cli/spcli/sectors.go | 122 +++++---- curiosrc/web/api/config/config.go | 1 - curiosrc/web/api/routes.go | 2 + curiosrc/web/api/sector/sector.go | 375 ++++++++++++++++++++++++++ curiosrc/web/hapi/watch_actor.go | 6 +- curiosrc/web/static/sector/index.html | 129 +++++++++ curiosrc/web/static/ux/curio-ux.mjs | 24 +- storage/paths/local.go | 1 + 8 files changed, 608 insertions(+), 52 deletions(-) create mode 100644 curiosrc/web/api/sector/sector.go create mode 100644 curiosrc/web/static/sector/index.html diff --git a/cli/spcli/sectors.go b/cli/spcli/sectors.go index 1b230ce04ee..95acbcd111e 100644 --- a/cli/spcli/sectors.go +++ b/cli/spcli/sectors.go @@ -2,6 +2,7 @@ package spcli import ( "bufio" + "context" "encoding/csv" "encoding/json" "errors" @@ -16,6 +17,7 @@ import ( "github.com/fatih/color" cbor "github.com/ipfs/go-ipld-cbor" + "github.com/samber/lo" "github.com/urfave/cli/v2" "golang.org/x/xerrors" @@ -1353,44 +1355,20 @@ func TerminateSectorCmd(getActorAddress ActorAddressGetter) *cli.Command { } } - mi, err := nodeApi.StateMinerInfo(ctx, maddr, types.EmptyTSK) - if err != nil { - return err - } - - terminationDeclarationParams := []miner2.TerminationDeclaration{} - - for _, sn := range cctx.Args().Slice() { - sectorNum, err := strconv.ParseUint(sn, 10, 64) + var outerErr error + sectorNumbers := lo.Map(cctx.Args().Slice(), func(sn string, _ int) int { + sectorNum, err := strconv.Atoi(sn) if err != nil { - return fmt.Errorf("could not parse sector number: %w", err) + outerErr = fmt.Errorf("could not parse sector number: %w", err) + return 0 } - - sectorbit := bitfield.New() - sectorbit.Set(sectorNum) - - loca, err := nodeApi.StateSectorPartition(ctx, maddr, abi.SectorNumber(sectorNum), types.EmptyTSK) - if err != nil { - return fmt.Errorf("get state sector partition %s", err) - } - - para := miner2.TerminationDeclaration{ - Deadline: loca.Deadline, - Partition: loca.Partition, - Sectors: sectorbit, - } - - terminationDeclarationParams = append(terminationDeclarationParams, para) - } - - terminateSectorParams := &miner2.TerminateSectorsParams{ - Terminations: terminationDeclarationParams, + return sectorNum + }) + if outerErr != nil { + return outerErr } - sp, err := actors.SerializeParams(terminateSectorParams) - if err != nil { - return xerrors.Errorf("serializing params: %w", err) - } + confidence := uint64(cctx.Int("confidence")) var fromAddr address.Address if from := cctx.String("from"); from != "" { @@ -1400,24 +1378,19 @@ func TerminateSectorCmd(getActorAddress ActorAddressGetter) *cli.Command { return fmt.Errorf("parsing address %s: %w", from, err) } } else { + mi, err := nodeApi.StateMinerInfo(ctx, maddr, types.EmptyTSK) + if err != nil { + return err + } + fromAddr = mi.Worker } - - smsg, err := nodeApi.MpoolPushMessage(ctx, &types.Message{ - From: fromAddr, - To: maddr, - Method: builtin.MethodsMiner.TerminateSectors, - - Value: big.Zero(), - Params: sp, - }, nil) + smsg, err := TerminateSectors(ctx, nodeApi, maddr, sectorNumbers, fromAddr) if err != nil { - return xerrors.Errorf("mpool push message: %w", err) + return err } - fmt.Println("sent termination message:", smsg.Cid()) - - wait, err := nodeApi.StateWaitMsg(ctx, smsg.Cid(), uint64(cctx.Int("confidence"))) + wait, err := nodeApi.StateWaitMsg(ctx, smsg.Cid(), confidence) if err != nil { return err } @@ -1425,8 +1398,61 @@ func TerminateSectorCmd(getActorAddress ActorAddressGetter) *cli.Command { if wait.Receipt.ExitCode.IsError() { return fmt.Errorf("terminate sectors message returned exit %d", wait.Receipt.ExitCode) } - return nil }, } } + +type TerminatorNode interface { + StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*miner.SectorLocation, error) + MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) +} + +func TerminateSectors(ctx context.Context, full TerminatorNode, maddr address.Address, sectorNumbers []int, fromAddr address.Address) (*types.SignedMessage, error) { + + terminationDeclarationParams := []miner2.TerminationDeclaration{} + + for _, sectorNum := range sectorNumbers { + + sectorbit := bitfield.New() + sectorbit.Set(uint64(sectorNum)) + + loca, err := full.StateSectorPartition(ctx, maddr, abi.SectorNumber(sectorNum), types.EmptyTSK) + if err != nil { + return nil, fmt.Errorf("get state sector partition %s", err) + } + + para := miner2.TerminationDeclaration{ + Deadline: loca.Deadline, + Partition: loca.Partition, + Sectors: sectorbit, + } + + terminationDeclarationParams = append(terminationDeclarationParams, para) + } + + terminateSectorParams := &miner2.TerminateSectorsParams{ + Terminations: terminationDeclarationParams, + } + + sp, errA := actors.SerializeParams(terminateSectorParams) + if errA != nil { + return nil, xerrors.Errorf("serializing params: %w", errA) + } + + smsg, err := full.MpoolPushMessage(ctx, &types.Message{ + From: fromAddr, + To: maddr, + Method: builtin.MethodsMiner.TerminateSectors, + + Value: big.Zero(), + Params: sp, + }, nil) + if err != nil { + return nil, xerrors.Errorf("mpool push message: %w", err) + } + + fmt.Println("sent termination message:", smsg.Cid()) + + return smsg, nil +} diff --git a/curiosrc/web/api/config/config.go b/curiosrc/web/api/config/config.go index 1e18e792fc5..6f9598f7fad 100644 --- a/curiosrc/web/api/config/config.go +++ b/curiosrc/web/api/config/config.go @@ -46,7 +46,6 @@ func getSch(w http.ResponseWriter, r *http.Request) { }, } sch := ref.Reflect(config.CurioConfig{}) - //sch := jsonschema.Reflect(config.CurioConfig{}) // add comments for k, doc := range config.Doc { item, ok := sch.Definitions[k] diff --git a/curiosrc/web/api/routes.go b/curiosrc/web/api/routes.go index 6b450055e79..cf56257ee92 100644 --- a/curiosrc/web/api/routes.go +++ b/curiosrc/web/api/routes.go @@ -7,9 +7,11 @@ import ( "github.com/filecoin-project/lotus/cmd/curio/deps" "github.com/filecoin-project/lotus/curiosrc/web/api/config" "github.com/filecoin-project/lotus/curiosrc/web/api/debug" + "github.com/filecoin-project/lotus/curiosrc/web/api/sector" ) func Routes(r *mux.Router, deps *deps.Deps) { debug.Routes(r.PathPrefix("/debug").Subrouter(), deps) config.Routes(r.PathPrefix("/config").Subrouter(), deps) + sector.Routes(r.PathPrefix("/sector").Subrouter(), deps) } diff --git a/curiosrc/web/api/sector/sector.go b/curiosrc/web/api/sector/sector.go new file mode 100644 index 00000000000..ba71f0cbe88 --- /dev/null +++ b/curiosrc/web/api/sector/sector.go @@ -0,0 +1,375 @@ +package sector + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" + + "github.com/docker/go-units" + "github.com/gorilla/mux" + "github.com/samber/lo" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-state-types/builtin/v9/market" + + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/cli/spcli" + "github.com/filecoin-project/lotus/cmd/curio/deps" + "github.com/filecoin-project/lotus/curiosrc/web/api/apihelper" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +const verifiedPowerGainMul = 9 + +type cfg struct { + *deps.Deps +} + +func Routes(r *mux.Router, deps *deps.Deps) { + c := &cfg{deps} + // At menu.html: + r.Methods("GET").Path("/all").HandlerFunc(c.getSectors) + r.Methods("POST").Path("/terminate").HandlerFunc(c.terminateSectors) +} + +func (c *cfg) terminateSectors(w http.ResponseWriter, r *http.Request) { + var in []struct { + MinerID int + Sector int + } + apihelper.OrHTTPFail(w, json.NewDecoder(r.Body).Decode(&in)) + toDel := map[int][]int{} + for _, s := range in { + toDel[s.MinerID] = append(toDel[s.MinerID], s.Sector) + } + + for minerInt, sectors := range toDel { + maddr, err := address.NewIDAddress(uint64(minerInt)) + apihelper.OrHTTPFail(w, err) + mi, err := c.Full.StateMinerInfo(r.Context(), maddr, types.EmptyTSK) + apihelper.OrHTTPFail(w, err) + _, err = spcli.TerminateSectors(r.Context(), c.Full, maddr, sectors, mi.Worker) + apihelper.OrHTTPFail(w, err) + for _, sectorNumber := range sectors { + id := abi.SectorID{Miner: abi.ActorID(minerInt), Number: abi.SectorNumber(sectorNumber)} + apihelper.OrHTTPFail(w, c.Stor.Remove(r.Context(), id, storiface.FTAll, true, nil)) + } + } +} + +func (c *cfg) getSectors(w http.ResponseWriter, r *http.Request) { + // TODO get sector info from chain and from database, then fold them together + // and return the result. + type sector struct { + MinerID int64 `db:"miner_id"` + SectorNum int64 `db:"sector_num"` + SectorFiletype int `db:"sector_filetype" json:"-"` // Useless? + HasSealed bool + HasUnsealed bool + HasSnap bool + ExpiresAt abi.ChainEpoch // map to Duration + IsOnChain bool + IsFilPlus bool + SealInfo string + Proving bool + Flag bool + DealWeight string + Deals string + //StorageID string `db:"storage_id"` // map to serverName + // Activation abi.ChainEpoch // map to time.Time. advanced view only + // DealIDs []abi.DealID // advanced view only + //ExpectedDayReward abi.TokenAmount + //SealProof abi.RegisteredSealProof + } + type piece struct { + Size int64 `db:"piece_size"` + DealID uint64 `db:"f05_deal_id"` + Proposal json.RawMessage `db:"f05_deal_proposal"` + Manifest json.RawMessage `db:"direct_piece_activation_manifest"` + Miner int64 `db:"sp_id"` + Sector int64 `db:"sector_number"` + } + var sectors []sector + var pieces []piece + apihelper.OrHTTPFail(w, c.DB.Select(r.Context(), §ors, `SELECT + miner_id, sector_num, SUM(sector_filetype) as sector_filetype + FROM sector_location WHERE sector_filetype != 32 + GROUP BY miner_id, sector_num + ORDER BY miner_id, sector_num`)) + minerToAddr := map[int64]address.Address{} + head, err := c.Full.ChainHead(r.Context()) + apihelper.OrHTTPFail(w, err) + + type sectorID struct { + mID int64 + sNum uint64 + } + sectorIdx := map[sectorID]int{} + for i, s := range sectors { + sectors[i].HasSealed = s.SectorFiletype&int(storiface.FTSealed) != 0 || s.SectorFiletype&int(storiface.FTUpdate) != 0 + sectors[i].HasUnsealed = s.SectorFiletype&int(storiface.FTUnsealed) != 0 + sectors[i].HasSnap = s.SectorFiletype&int(storiface.FTUpdate) != 0 + sectorIdx[sectorID{s.MinerID, uint64(s.SectorNum)}] = i + if _, ok := minerToAddr[s.MinerID]; !ok { + minerToAddr[s.MinerID], err = address.NewIDAddress(uint64(s.MinerID)) + apihelper.OrHTTPFail(w, err) + } + } + + // Get all pieces + apihelper.OrHTTPFail(w, c.DB.Select(r.Context(), &pieces, `SELECT + sp_id, sector_number, piece_size, f05_deal_id, f05_deal_proposal, direct_piece_activation_manifest + FROM sectors_sdr_initial_pieces + ORDER BY sp_id, sector_number`)) + pieceIndex := map[sectorID][]int{} + for i, piece := range pieces { + piece := piece + cur := pieceIndex[sectorID{mID: piece.Miner, sNum: uint64(piece.Sector)}] + pieceIndex[sectorID{mID: piece.Miner, sNum: uint64(piece.Sector)}] = append(cur, i) + } + + for minerID, maddr := range minerToAddr { + onChainInfo, err := c.getCachedSectorInfo(w, r, maddr, head.Key()) + apihelper.OrHTTPFail(w, err) + for _, chainy := range onChainInfo { + st := chainy.onChain + if i, ok := sectorIdx[sectorID{minerID, uint64(st.SectorNumber)}]; ok { + sectors[i].IsOnChain = true + sectors[i].ExpiresAt = st.Expiration + sectors[i].IsFilPlus = st.VerifiedDealWeight.GreaterThan(st.DealWeight) + if ss, err := st.SealProof.SectorSize(); err == nil { + sectors[i].SealInfo = ss.ShortString() + } + sectors[i].Proving = chainy.active + if st.Expiration < head.Height() { + sectors[i].Flag = true // Flag expired sectors + } + dw, vp := .0, .0 + f05, ddo := 0, 0 + var pi []piece + if j, ok := pieceIndex[sectorID{sectors[i].MinerID, uint64(sectors[i].SectorNum)}]; ok { + for _, k := range j { + pi = append(pi, pieces[k]) + } + } + estimate := st.Expiration-st.Activation <= 0 || sectors[i].HasSnap + if estimate { + for _, p := range pi { + if p.Proposal != nil { + var prop *market.DealProposal + apihelper.OrHTTPFail(w, json.Unmarshal(p.Proposal, &prop)) + dw += float64(prop.PieceSize) + if prop.VerifiedDeal { + vp += float64(prop.PieceSize) * verifiedPowerGainMul + } + f05++ + } + if p.Manifest != nil { + var pam *miner.PieceActivationManifest + apihelper.OrHTTPFail(w, json.Unmarshal(p.Manifest, &pam)) + dw += float64(pam.Size) + if pam.VerifiedAllocationKey != nil { + vp += float64(pam.Size) * verifiedPowerGainMul + } + ddo++ + } + } + } else { + rdw := big.Add(st.DealWeight, st.VerifiedDealWeight) + dw = float64(big.Div(rdw, big.NewInt(int64(st.Expiration-st.Activation))).Uint64()) + vp = float64(big.Div(big.Mul(st.VerifiedDealWeight, big.NewInt(verifiedPowerGainMul)), big.NewInt(int64(st.Expiration-st.Activation))).Uint64()) + for _, deal := range st.DealIDs { + if deal > 0 { + f05++ + } + } + // DDO info is not on chain + for _, piece := range pieces { + if piece.Manifest != nil { + //var pam *miner.PieceActivationManifest + //apihelper.OrHTTPFail(w, json.Unmarshal(piece.Manifest, pam)) + //dw += float64(pam.Size) + //if pam.VerifiedAllocationKey != nil { + // vp += float64(pam.Size) * verifiedPowerGainMul + //} + ddo++ + } + } + } + sectors[i].DealWeight = "CC" + if dw > 0 { + sectors[i].DealWeight = fmt.Sprintf("%s", units.BytesSize(dw)) + } + if vp > 0 { + sectors[i].DealWeight = fmt.Sprintf("%s", units.BytesSize(vp)) + } + sectors[i].Deals = fmt.Sprintf("Market: %d, DDO: %d", f05, ddo) + } else { + // sector is on chain but not in db + s := sector{ + MinerID: minerID, + SectorNum: int64(chainy.onChain.SectorNumber), + IsOnChain: true, + ExpiresAt: chainy.onChain.Expiration, + IsFilPlus: chainy.onChain.VerifiedDealWeight.GreaterThan(chainy.onChain.DealWeight), + Proving: chainy.active, + Flag: true, // All such sectors should be flagged to be terminated + } + if ss, err := chainy.onChain.SealProof.SectorSize(); err == nil { + s.SealInfo = ss.ShortString() + } + sectors = append(sectors, s) + } + /* + info, err := c.Full.StateSectorGetInfo(r.Context(), minerToAddr[s], abi.SectorNumber(uint64(sectors[i].SectorNum)), headKey) + if err != nil { + sectors[i].IsValid = false + continue + }*/ + } + } + + // Add deal details to sectors which are not on chain + for i := range sectors { + if !sectors[i].IsOnChain { + var pi []piece + dw, vp := .0, .0 + f05, ddo := 0, 0 + + // Find if there are any deals for this sector + if j, ok := pieceIndex[sectorID{sectors[i].MinerID, uint64(sectors[i].SectorNum)}]; ok { + for _, k := range j { + pi = append(pi, pieces[k]) + } + } + + if len(pi) > 0 { + for _, piece := range pi { + if piece.Proposal != nil { + var prop *market.DealProposal + apihelper.OrHTTPFail(w, json.Unmarshal(piece.Proposal, &prop)) + dw += float64(prop.PieceSize) + if prop.VerifiedDeal { + vp += float64(prop.PieceSize) * verifiedPowerGainMul + } + f05++ + } + if piece.Manifest != nil { + var pam *miner.PieceActivationManifest + apihelper.OrHTTPFail(w, json.Unmarshal(piece.Manifest, &pam)) + dw += float64(pam.Size) + if pam.VerifiedAllocationKey != nil { + vp += float64(pam.Size) * verifiedPowerGainMul + } + ddo++ + } + } + } + if dw > 0 { + sectors[i].DealWeight = fmt.Sprintf("%s", units.BytesSize(dw)) + } else if vp > 0 { + sectors[i].DealWeight = fmt.Sprintf("%s", units.BytesSize(vp)) + } else { + sectors[i].DealWeight = "CC" + } + sectors[i].Deals = fmt.Sprintf("Market: %d, DDO: %d", f05, ddo) + } + } + apihelper.OrHTTPFail(w, json.NewEncoder(w).Encode(map[string]any{"data": sectors})) +} + +type sectorInfo struct { + onChain *miner.SectorOnChainInfo + active bool +} + +type sectorCacheEntry struct { + sectors []sectorInfo + loading chan struct{} + time.Time +} + +const cacheTimeout = 30 * time.Minute + +var mx sync.Mutex +var sectorInfoCache = map[address.Address]sectorCacheEntry{} + +// getCachedSectorInfo returns the sector info for the given miner address, +// either from the cache or by querying the chain. +// Cache can be invalidated by setting the "sector_refresh" cookie to "true". +// This is thread-safe. +// Parallel requests share the chain's first response. +func (c *cfg) getCachedSectorInfo(w http.ResponseWriter, r *http.Request, maddr address.Address, headKey types.TipSetKey) ([]sectorInfo, error) { + mx.Lock() + v, ok := sectorInfoCache[maddr] + mx.Unlock() + + if ok && v.loading != nil { + <-v.loading + mx.Lock() + v, ok = sectorInfoCache[maddr] + mx.Unlock() + } + + shouldRefreshCookie, found := lo.Find(r.Cookies(), func(item *http.Cookie) bool { return item.Name == "sector_refresh" }) + shouldRefresh := found && shouldRefreshCookie.Value == "true" + w.Header().Set("Set-Cookie", "sector_refresh=; Max-Age=0; Path=/") + + if !ok || time.Since(v.Time) > cacheTimeout || shouldRefresh { + v = sectorCacheEntry{nil, make(chan struct{}), time.Now()} + mx.Lock() + sectorInfoCache[maddr] = v + mx.Unlock() + + // Intentionally not using the context from the request, as this is a cache + onChainInfo, err := c.Full.StateMinerSectors(context.Background(), maddr, nil, headKey) + if err != nil { + mx.Lock() + delete(sectorInfoCache, maddr) + close(v.loading) + mx.Unlock() + return nil, err + } + active, err := c.Full.StateMinerActiveSectors(context.Background(), maddr, headKey) + if err != nil { + mx.Lock() + delete(sectorInfoCache, maddr) + close(v.loading) + mx.Unlock() + return nil, err + } + activebf := bitfield.New() + for i := range active { + activebf.Set(uint64(active[i].SectorNumber)) + } + infos := make([]sectorInfo, len(onChainInfo)) + for i, info := range onChainInfo { + info := info + set, err := activebf.IsSet(uint64(info.SectorNumber)) + if err != nil { + mx.Lock() + delete(sectorInfoCache, maddr) + close(v.loading) + mx.Unlock() + return nil, err + } + infos[i] = sectorInfo{ + onChain: info, + active: set, + } + } + mx.Lock() + sectorInfoCache[maddr] = sectorCacheEntry{infos, nil, time.Now()} + close(v.loading) + mx.Unlock() + return infos, nil + } + return v.sectors, nil +} diff --git a/curiosrc/web/hapi/watch_actor.go b/curiosrc/web/hapi/watch_actor.go index c44dcd7796d..51e1f51e74d 100644 --- a/curiosrc/web/hapi/watch_actor.go +++ b/curiosrc/web/hapi/watch_actor.go @@ -37,6 +37,8 @@ type minimalActorInfo struct { } } +var startedAt = time.Now() + func (a *app) updateActor(ctx context.Context) error { a.rpcInfoLk.Lock() api := a.workingApi @@ -45,7 +47,9 @@ func (a *app) updateActor(ctx context.Context) error { stor := store.ActorStore(ctx, blockstore.NewReadCachedBlockstore(blockstore.NewAPIBlockstore(a.workingApi), ChainBlockCache)) if api == nil { - log.Warnw("no working api yet") + if time.Since(startedAt) > time.Second*10 { + log.Warnw("no working api yet") + } return nil } diff --git a/curiosrc/web/static/sector/index.html b/curiosrc/web/static/sector/index.html new file mode 100644 index 00000000000..e3abed9f486 --- /dev/null +++ b/curiosrc/web/static/sector/index.html @@ -0,0 +1,129 @@ + + + + + Sector List + + + + + + + + + + + + + + + + + + + + + + + + +
+
+
+ + + + +
Loading...
+
+
+
+
+ + + + \ No newline at end of file diff --git a/curiosrc/web/static/ux/curio-ux.mjs b/curiosrc/web/static/ux/curio-ux.mjs index 157cab5fc38..6f8048a4e84 100644 --- a/curiosrc/web/static/ux/curio-ux.mjs +++ b/curiosrc/web/static/ux/curio-ux.mjs @@ -33,18 +33,38 @@ class CurioUX extends LitElement { document.body.attributes.setNamedItem(cdsText); document.body.style.visibility = 'initial'; + + // how Bootstrap & DataTables expect dark mode declared. + document.documentElement.classList.add('dark'); + + this.messsage = this.getCookieMessage(); } render() { return html` -
+
+ ${this.message? html`
${this.message}
`: html``}
`; } -} + getCookieMessage() { + const name = 'message'; + const cookies = document.cookie.split(';'); + for (let i = 0; i < cookies.length; i++) { + const cookie = cookies[i].trim(); + if (cookie.startsWith(name + '=')) { + var val = cookie.substring(name.length + 1); + document.cookie = name + '=; expires=Thu, 01 Jan 1970 00:00:00 UTC; path=/;'; + return val; + } + } + return null; + } + +}; customElements.define('curio-ux', CurioUX); \ No newline at end of file diff --git a/storage/paths/local.go b/storage/paths/local.go index 006854bbfe0..7e955ccfe51 100644 --- a/storage/paths/local.go +++ b/storage/paths/local.go @@ -500,6 +500,7 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif }() for _, fileType := range ft.AllSet() { + fileType := fileType id := storiface.ID(storiface.PathByType(storageIDs, fileType)) p, ok := st.paths[id]