From 019abf2dbac84c5f07c9bedfa83c066307fd2bf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Fri, 29 Nov 2019 23:17:01 +0100 Subject: [PATCH] pin: implement pin/ls with only CoreApi --- core/commands/pin.go | 126 +++++++++++++------------------------ core/coreapi/pin.go | 144 ++++++++++++++++++++++++++++++++----------- go.mod | 2 + pin/pin.go | 2 +- 4 files changed, 152 insertions(+), 122 deletions(-) diff --git a/core/commands/pin.go b/core/commands/pin.go index c6e5ca6b547a..40f5a0f5b1ba 100644 --- a/core/commands/pin.go +++ b/core/commands/pin.go @@ -8,11 +8,6 @@ import ( "os" "time" - core "github.com/ipfs/go-ipfs/core" - cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" - e "github.com/ipfs/go-ipfs/core/commands/e" - pin "github.com/ipfs/go-ipfs/pin" - bserv "github.com/ipfs/go-blockservice" cid "github.com/ipfs/go-cid" cidenc "github.com/ipfs/go-cidutil/cidenc" @@ -23,6 +18,10 @@ import ( coreiface "github.com/ipfs/interface-go-ipfs-core" options "github.com/ipfs/interface-go-ipfs-core/options" "github.com/ipfs/interface-go-ipfs-core/path" + + core "github.com/ipfs/go-ipfs/core" + cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" + e "github.com/ipfs/go-ipfs/core/commands/e" ) var PinCmd = &cmds.Command{ @@ -318,11 +317,6 @@ Example: cmds.BoolOption(pinStreamOptionName, "s", "Enable streaming of pins as they are discovered."), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - n, err := cmdenv.GetNode(env) - if err != nil { - return err - } - api, err := cmdenv.GetApi(env, req) if err != nil { return err @@ -350,9 +344,9 @@ Example: } if len(req.Arguments) > 0 { - err = pinLsKeys(req, typeStr, n, api, emit) + err = pinLsKeys(req, typeStr, api, emit) } else { - err = pinLsAll(req, typeStr, n, emit) + err = pinLsAll(req, typeStr, api, emit) } if err != nil { return err @@ -429,24 +423,30 @@ type PinLsObject struct { Type string `json:",omitempty"` } -func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreiface.CoreAPI, emit func(value interface{}) error) error { - mode, ok := pin.StringToMode(typeStr) - if !ok { - return fmt.Errorf("invalid pin mode '%s'", typeStr) - } - +func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error { enc, err := cmdenv.GetCidEncoder(req) if err != nil { return err } + switch typeStr { + case "all", "direct", "indirect", "recursive": + default: + return fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", typeStr) + } + + opt, err := options.Pin.IsPinned.Type(typeStr) + if err != nil { + panic("unhandled pin type") + } + for _, p := range req.Arguments { - c, err := api.ResolvePath(req.Context, path.New(p)) + rp, err := api.ResolvePath(req.Context, path.New(p)) if err != nil { return err } - pinType, pinned, err := n.Pinning.IsPinnedWithType(req.Context, c.Cid(), mode) + pinType, pinned, err := api.Pin().IsPinned(req.Context, rp, opt) if err != nil { return err } @@ -464,7 +464,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreifac err = emit(&PinLsOutputWrapper{ PinLsObject: PinLsObject{ Type: pinType, - Cid: enc.Encode(c.Cid()), + Cid: enc.Encode(rp.Cid()), }, }) if err != nil { @@ -475,81 +475,39 @@ func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreifac return nil } -func pinLsAll(req *cmds.Request, typeStr string, n *core.IpfsNode, emit func(value interface{}) error) error { +func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error { enc, err := cmdenv.GetCidEncoder(req) if err != nil { return err } - keys := cid.NewSet() - - AddToResultKeys := func(keyList []cid.Cid, typeStr string) error { - for _, c := range keyList { - if keys.Visit(c) { - err := emit(&PinLsOutputWrapper{ - PinLsObject: PinLsObject{ - Type: typeStr, - Cid: enc.Encode(c), - }, - }) - if err != nil { - return err - } - } - } - return nil + switch typeStr { + case "all", "direct", "indirect", "recursive": + default: + err = fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", typeStr) + return err } - if typeStr == "direct" || typeStr == "all" { - dkeys, err := n.Pinning.DirectKeys(req.Context) - if err != nil { - return err - } - err = AddToResultKeys(dkeys, "direct") - if err != nil { - return err - } + opt, err := options.Pin.Ls.Type(typeStr) + if err != nil { + panic("unhandled pin type") } - if typeStr == "recursive" || typeStr == "all" { - rkeys, err := n.Pinning.RecursiveKeys(req.Context) - if err != nil { - return err - } - err = AddToResultKeys(rkeys, "recursive") - if err != nil { - return err - } + + pins, err := api.Pin().Ls(req.Context, opt) + if err != nil { + return err } - if typeStr == "indirect" || typeStr == "all" { - rkeys, err := n.Pinning.RecursiveKeys(req.Context) + + for p := range pins { + err = emit(&PinLsOutputWrapper{ + PinLsObject: PinLsObject{ + Type: p.Type(), + Cid: enc.Encode(p.Path().Cid()), + }, + }) if err != nil { return err } - for _, k := range rkeys { - var visitErr error - err := dag.Walk(req.Context, dag.GetLinksWithDAG(n.DAG), k, func(c cid.Cid) bool { - r := keys.Visit(c) - if r { - err := emit(&PinLsOutputWrapper{ - PinLsObject: PinLsObject{ - Type: "indirect", - Cid: enc.Encode(c), - }, - }) - if err != nil { - visitErr = err - } - } - return r - }, dag.SkipRoot(), dag.Concurrent()) - - if visitErr != nil { - return visitErr - } - if err != nil { - return err - } - } } return nil diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 2447ec8a0ed2..aa37966a6672 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -11,6 +11,8 @@ import ( coreiface "github.com/ipfs/interface-go-ipfs-core" caopts "github.com/ipfs/interface-go-ipfs-core/options" path "github.com/ipfs/interface-go-ipfs-core/path" + + "github.com/ipfs/go-ipfs/pin" ) type PinAPI CoreAPI @@ -40,7 +42,7 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp return api.pinning.Flush(ctx) } -func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreiface.Pin, error) { +func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan coreiface.Pin, error) { settings, err := caopts.PinLsOptions(opts...) if err != nil { return nil, err @@ -52,7 +54,26 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreif return nil, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type) } - return api.pinLsAll(settings.Type, ctx) + return api.pinLsAll(settings.Type, ctx), nil +} + +func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.PinIsPinnedOption) (string, bool, error) { + dagNode, err := api.core().ResolveNode(ctx, p) + if err != nil { + return "", false, fmt.Errorf("pin: %s", err) + } + + settings, err := caopts.PinIsPinnedOptions(opts...) + if err != nil { + return "", false, err + } + + mode, ok := pin.StringToMode(settings.WithType) + if !ok { + return "", false, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.WithType) + } + + return api.pinning.IsPinnedWithType(ctx, dagNode.Cid(), mode) } // Rm pin rm api @@ -183,6 +204,7 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro type pinInfo struct { pinType string path path.Resolved + err error } func (p *pinInfo) Path() path.Resolved { @@ -193,58 +215,106 @@ func (p *pinInfo) Type() string { return p.pinType } -func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pin, error) { +func (p *pinInfo) Err() error { + return p.err +} - keys := make(map[cid.Cid]*pinInfo) +func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) <-chan coreiface.Pin { + out := make(chan coreiface.Pin) + + keys := cid.NewSet() AddToResultKeys := func(keyList []cid.Cid, typeStr string) { for _, c := range keyList { - keys[c] = &pinInfo{ - pinType: typeStr, - path: path.IpldPath(c), + if keys.Visit(c) { + out <- &pinInfo{ + pinType: typeStr, + path: path.IpldPath(c), + } } } } - if typeStr == "direct" || typeStr == "all" { - dkeys, err := api.pinning.DirectKeys(ctx) - if err != nil { - return nil, err + VisitKeys := func(keyList []cid.Cid) { + for _, c := range keyList { + keys.Visit(c) } - AddToResultKeys(dkeys, "direct") } - if typeStr == "indirect" || typeStr == "all" { - set := cid.NewSet() - rkeys, err := api.pinning.RecursiveKeys(ctx) - if err != nil { - return nil, err + + go func() { + defer close(out) + + if typeStr == "recursive" || typeStr == "all" { + rkeys, err := api.pinning.RecursiveKeys(ctx) + if err != nil { + out <- &pinInfo{err: err} + return + } + AddToResultKeys(rkeys, "recursive") } - for _, k := range rkeys { - err := merkledag.Walk( - ctx, merkledag.GetLinksWithDAG(api.dag), k, - set.Visit, - merkledag.SkipRoot(), merkledag.Concurrent(), - ) + if typeStr == "direct" || typeStr == "all" { + dkeys, err := api.pinning.DirectKeys(ctx) if err != nil { - return nil, err + out <- &pinInfo{err: err} + return } + AddToResultKeys(dkeys, "direct") } - AddToResultKeys(set.Keys(), "indirect") - } - if typeStr == "recursive" || typeStr == "all" { - rkeys, err := api.pinning.RecursiveKeys(ctx) - if err != nil { - return nil, err + if typeStr == "all" { + set := cid.NewSet() + rkeys, err := api.pinning.RecursiveKeys(ctx) + if err != nil { + out <- &pinInfo{err: err} + return + } + for _, k := range rkeys { + err := merkledag.Walk( + ctx, merkledag.GetLinksWithDAG(api.dag), k, + set.Visit, + merkledag.SkipRoot(), merkledag.Concurrent(), + ) + if err != nil { + out <- &pinInfo{err: err} + return + } + } + AddToResultKeys(set.Keys(), "indirect") } - AddToResultKeys(rkeys, "recursive") - } + if typeStr == "indirect" { + // We need to first visit the direct pins that have priority + // without emitting them - out := make([]coreiface.Pin, 0, len(keys)) - for _, v := range keys { - out = append(out, v) - } + dkeys, err := api.pinning.DirectKeys(ctx) + if err != nil { + out <- &pinInfo{err: err} + return + } + VisitKeys(dkeys) - return out, nil + rkeys, err := api.pinning.RecursiveKeys(ctx) + if err != nil { + out <- &pinInfo{err: err} + return + } + VisitKeys(rkeys) + + set := cid.NewSet() + for _, k := range rkeys { + err := merkledag.Walk( + ctx, merkledag.GetLinksWithDAG(api.dag), k, + set.Visit, + merkledag.SkipRoot(), merkledag.Concurrent(), + ) + if err != nil { + out <- &pinInfo{err: err} + return + } + } + AddToResultKeys(set.Keys(), "indirect") + } + }() + + return out } func (api *PinAPI) core() coreiface.CoreAPI { diff --git a/go.mod b/go.mod index c7ba2ed6d760..abc0117510ea 100644 --- a/go.mod +++ b/go.mod @@ -107,4 +107,6 @@ require ( gopkg.in/cheggaaa/pb.v1 v1.0.28 ) +replace github.com/ipfs/interface-go-ipfs-core => github.com/MichaelMure/interface-go-ipfs-core v0.2.6-0.20191129215829-6e1e501b463b + go 1.13 diff --git a/pin/pin.go b/pin/pin.go index 15b2396b5529..bd27bc174bca 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -287,7 +287,7 @@ func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) } // IsPinnedWithType returns whether or not the given cid is pinned with the -// given pin type, as well as returning the type of pin its pinned with. +// given pin type, and an explanation of why its pinned func (p *pinner) IsPinnedWithType(ctx context.Context, c cid.Cid, mode Mode) (string, bool, error) { p.lock.RLock() defer p.lock.RUnlock()