Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: storage: Parallel proving checks #8391

Merged
merged 3 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/lotus-miner/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
}
stor := stores.NewRemote(lstor, si, http.Header(sa), 10, &stores.DefaultPartialFileHandler{})

smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.SealerConfig{
smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.Config{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: true,
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ var runCmd = &cli.Command{
&cli.IntFlag{
Name: "post-parallel-reads",
Usage: "maximum number of parallel challenge reads (0 = no limit)",
Value: 0,
Value: 128,
},
&cli.DurationFlag{
Name: "post-read-timeout",
Expand Down
2 changes: 1 addition & 1 deletion documentation/en/cli-lotus-worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ OPTIONS:
--windowpost enable window post (default: false)
--winningpost enable winning post (default: false)
--parallel-fetch-limit value maximum fetch operations to run in parallel (default: 5)
--post-parallel-reads value maximum number of parallel challenge reads (0 = no limit) (default: 0)
--post-parallel-reads value maximum number of parallel challenge reads (0 = no limit) (default: 128)
--post-read-timeout value time limit for reading PoSt challenges (0 = no limit) (default: 0s)
--timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m")
--help, -h show help (default: false)
Expand Down
24 changes: 24 additions & 0 deletions documentation/en/default-lotus-miner-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,14 @@
#PurgeCacheOnStart = false


[Proving]
# Maximum number of sector checks to run in parallel. (0 = unlimited)
#
# type: int
# env var: LOTUS_PROVING_PARALLELCHECKLIMIT
#ParallelCheckLimit = 128


[Sealing]
# Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time.
# If the miner is accepting multiple deals in parallel, up to MaxWaitDealsSectors of new sectors will be created.
Expand Down Expand Up @@ -484,33 +492,49 @@


[Storage]
# type: int
# env var: LOTUS_STORAGE_PARALLELFETCHLIMIT
#ParallelFetchLimit = 10

# Local worker config
#
# type: bool
# env var: LOTUS_STORAGE_ALLOWADDPIECE
#AllowAddPiece = true

# type: bool
# env var: LOTUS_STORAGE_ALLOWPRECOMMIT1
#AllowPreCommit1 = true

# type: bool
# env var: LOTUS_STORAGE_ALLOWPRECOMMIT2
#AllowPreCommit2 = true

# type: bool
# env var: LOTUS_STORAGE_ALLOWCOMMIT
#AllowCommit = true

# type: bool
# env var: LOTUS_STORAGE_ALLOWUNSEAL
#AllowUnseal = true

# type: bool
# env var: LOTUS_STORAGE_ALLOWREPLICAUPDATE
#AllowReplicaUpdate = true

# type: bool
# env var: LOTUS_STORAGE_ALLOWPROVEREPLICAUPDATE2
#AllowProveReplicaUpdate2 = true

# type: bool
# env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY
#AllowRegenSectorKey = true

# ResourceFiltering instructs the system which resource filtering strategy
# to use when evaluating tasks against this worker. An empty value defaults
# to "hardware".
#
# type: sectorstorage.ResourceFilteringStrategy
# env var: LOTUS_STORAGE_RESOURCEFILTERING
#ResourceFiltering = "hardware"

Expand Down
73 changes: 51 additions & 22 deletions extern/sector-storage/faults.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"fmt"
"sync"
"time"

"golang.org/x/xerrors"
Expand All @@ -24,22 +25,55 @@ type FaultTracker interface {

// CheckProvable returns unprovable sectors
func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

if rg == nil {
return nil, xerrors.Errorf("rg is nil")
}

var bad = make(map[abi.SectorID]string)
var badLk sync.Mutex

var postRand abi.PoStRandomness = make([]byte, abi.RandomnessLength)
_, _ = rand.Read(postRand)
postRand[31] &= 0x3f

limit := m.parallelCheckLimit
if limit <= 0 {
limit = len(sectors)
}
throttle := make(chan struct{}, limit)

addBad := func(s abi.SectorID, reason string) {
badLk.Lock()
bad[s] = reason
badLk.Unlock()
}

var wg sync.WaitGroup
wg.Add(len(sectors))

for _, sector := range sectors {
err := func() error {
select {
case throttle <- struct{}{}:
case <-ctx.Done():
return nil, ctx.Err()
}

go func(sector storage.SectorRef) {
defer wg.Done()
defer func() {
<-throttle
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

commr, update, err := rg(ctx, sector.ID)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: getting commR", "sector", sector, "sealed", "err", err)
bad[sector.ID] = fmt.Sprintf("getting commR: %s", err)
return nil
addBad(sector.ID, fmt.Sprintf("getting commR: %s", err))
return
}

toLock := storiface.FTSealed | storiface.FTCache
Expand All @@ -49,31 +83,29 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,

locked, err := m.index.StorageTryLock(ctx, sector.ID, toLock, storiface.FTNone)
if err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
addBad(sector.ID, fmt.Sprintf("tryLock error: %s", err))
return
}

if !locked {
log.Warnw("CheckProvable Sector FAULT: can't acquire read lock", "sector", sector)
bad[sector.ID] = fmt.Sprint("can't acquire read lock")
return nil
addBad(sector.ID, fmt.Sprint("can't acquire read lock"))
return
}

wpp, err := sector.ProofType.RegisteredWindowPoStProof()
if err != nil {
return err
addBad(sector.ID, fmt.Sprint("can't get proof type"))
return
}

var pr abi.PoStRandomness = make([]byte, abi.RandomnessLength)
_, _ = rand.Read(pr)
pr[31] &= 0x3f

ch, err := ffi.GeneratePoStFallbackSectorChallenges(wpp, sector.ID.Miner, pr, []abi.SectorNumber{
ch, err := ffi.GeneratePoStFallbackSectorChallenges(wpp, sector.ID.Miner, postRand, []abi.SectorNumber{
sector.ID.Number,
})
if err != nil {
log.Warnw("CheckProvable Sector FAULT: generating challenges", "sector", sector, "err", err)
bad[sector.ID] = fmt.Sprintf("generating fallback challenges: %s", err)
return nil
addBad(sector.ID, fmt.Sprintf("generating fallback challenges: %s", err))
return
}

vctx, cancel2 := context.WithTimeout(ctx, PostCheckTimeout)
Expand All @@ -88,17 +120,14 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
}, wpp)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: generating vanilla proof", "sector", sector, "err", err)
bad[sector.ID] = fmt.Sprintf("generating vanilla proof: %s", err)
return nil
addBad(sector.ID, fmt.Sprintf("generating vanilla proof: %s", err))
return
}

return nil
}()
if err != nil {
return nil, err
}
}(sector)
}

wg.Wait()

return bad, nil
}

Expand Down
11 changes: 9 additions & 2 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type Manager struct {
workLk sync.Mutex
work *statestore.StateStore

parallelCheckLimit int

callToWork map[storiface.CallID]WorkID
// used when we get an early return and there's no callToWork mapping
callRes map[storiface.CallID]chan result
Expand Down Expand Up @@ -99,7 +101,7 @@ const (
ResourceFilteringDisabled = ResourceFilteringStrategy("disabled")
)

type SealerConfig struct {
type Config struct {
ParallelFetchLimit int

// Local worker config
Expand All @@ -116,14 +118,17 @@ type SealerConfig struct {
// to use when evaluating tasks against this worker. An empty value defaults
// to "hardware".
ResourceFiltering ResourceFilteringStrategy

// PoSt config
ParallelCheckLimit int
}

type StorageAuth http.Header

type WorkerStateStore *statestore.StateStore
type ManagerStateStore *statestore.StateStore

func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc Config, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
Expand All @@ -142,6 +147,8 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.

localProver: prover,

parallelCheckLimit: sc.ParallelCheckLimit,

work: mss,
callToWork: map[storiface.CallID]WorkID{},
callRes: map[storiface.CallID]chan result{},
Expand Down
6 changes: 3 additions & 3 deletions extern/sector-storage/piece_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
// only uses miner and does NOT use any remote worker.
func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
// Set up sector storage manager
sealerCfg := SealerConfig{
sealerCfg := Config{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: true,
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)

// miner's worker can only add pieces to an unsealed sector.
sealerCfg := SealerConfig{
sealerCfg := Config{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: false,
Expand Down Expand Up @@ -198,7 +198,7 @@ func generatePieceData(size uint64) []byte {
return bz
}

func newPieceProviderTestHarness(t *testing.T, mgrConfig SealerConfig, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness {
func newPieceProviderTestHarness(t *testing.T, mgrConfig Config, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness {
ctx := context.Background()
// listen on tcp socket to create an http server later
address := "0.0.0.0:0"
Expand Down
4 changes: 2 additions & 2 deletions itests/kit/ensemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func (n *Ensemble) Start() *Ensemble {

// disable resource filtering so that local worker gets assigned tasks
// regardless of system pressure.
node.Override(new(sectorstorage.SealerConfig), func() sectorstorage.SealerConfig {
node.Override(new(sectorstorage.Config), func() sectorstorage.Config {
scfg := config.DefaultStorageMiner()

if noLocal {
Expand All @@ -596,7 +596,7 @@ func (n *Ensemble) Start() *Ensemble {
}

scfg.Storage.ResourceFiltering = sectorstorage.ResourceFilteringDisabled
return scfg.Storage
return scfg.StorageManager()
}),

// upgrades
Expand Down
2 changes: 1 addition & 1 deletion node/builder_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees, &cfg.Dealmaking)),
),

Override(new(sectorstorage.SealerConfig), cfg.Storage),
Override(new(sectorstorage.Config), cfg.StorageManager()),
Override(new(*storage.AddressSelector), modules.AddressSelector(&cfg.Addresses)),
)
}
Expand Down
6 changes: 5 additions & 1 deletion node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ func DefaultStorageMiner() *StorageMiner {
TerminateBatchWait: Duration(5 * time.Minute),
},

Storage: sectorstorage.SealerConfig{
Proving: ProvingConfig{
ParallelCheckLimit: 128,
},

Storage: SealerConfig{
AllowAddPiece: true,
AllowPreCommit1: true,
AllowPreCommit2: true,
Expand Down
Loading