Skip to content

Commit

Permalink
pin: implement pin/ls with only CoreApi
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelMure committed Nov 29, 2019
1 parent 14605f9 commit 019abf2
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 122 deletions.
126 changes: 42 additions & 84 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ import (
"os"
"time"

core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
pin "github.com/ipfs/go-ipfs/pin"

bserv "github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
cidenc "github.com/ipfs/go-cidutil/cidenc"
Expand All @@ -23,6 +18,10 @@ import (
coreiface "github.com/ipfs/interface-go-ipfs-core"
options "github.com/ipfs/interface-go-ipfs-core/options"
"github.com/ipfs/interface-go-ipfs-core/path"

core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
)

var PinCmd = &cmds.Command{
Expand Down Expand Up @@ -318,11 +317,6 @@ Example:
cmds.BoolOption(pinStreamOptionName, "s", "Enable streaming of pins as they are discovered."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
if err != nil {
return err
}

api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
Expand Down Expand Up @@ -350,9 +344,9 @@ Example:
}

if len(req.Arguments) > 0 {
err = pinLsKeys(req, typeStr, n, api, emit)
err = pinLsKeys(req, typeStr, api, emit)
} else {
err = pinLsAll(req, typeStr, n, emit)
err = pinLsAll(req, typeStr, api, emit)
}
if err != nil {
return err
Expand Down Expand Up @@ -429,24 +423,30 @@ type PinLsObject struct {
Type string `json:",omitempty"`
}

func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreiface.CoreAPI, emit func(value interface{}) error) error {
mode, ok := pin.StringToMode(typeStr)
if !ok {
return fmt.Errorf("invalid pin mode '%s'", typeStr)
}

func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error {
enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
}

switch typeStr {
case "all", "direct", "indirect", "recursive":
default:
return fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", typeStr)
}

opt, err := options.Pin.IsPinned.Type(typeStr)
if err != nil {
panic("unhandled pin type")
}

for _, p := range req.Arguments {
c, err := api.ResolvePath(req.Context, path.New(p))
rp, err := api.ResolvePath(req.Context, path.New(p))
if err != nil {
return err
}

pinType, pinned, err := n.Pinning.IsPinnedWithType(req.Context, c.Cid(), mode)
pinType, pinned, err := api.Pin().IsPinned(req.Context, rp, opt)
if err != nil {
return err
}
Expand All @@ -464,7 +464,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreifac
err = emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: pinType,
Cid: enc.Encode(c.Cid()),
Cid: enc.Encode(rp.Cid()),
},
})
if err != nil {
Expand All @@ -475,81 +475,39 @@ func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreifac
return nil
}

func pinLsAll(req *cmds.Request, typeStr string, n *core.IpfsNode, emit func(value interface{}) error) error {
func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error {
enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
}

keys := cid.NewSet()

AddToResultKeys := func(keyList []cid.Cid, typeStr string) error {
for _, c := range keyList {
if keys.Visit(c) {
err := emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: typeStr,
Cid: enc.Encode(c),
},
})
if err != nil {
return err
}
}
}
return nil
switch typeStr {
case "all", "direct", "indirect", "recursive":
default:
err = fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", typeStr)
return err
}

if typeStr == "direct" || typeStr == "all" {
dkeys, err := n.Pinning.DirectKeys(req.Context)
if err != nil {
return err
}
err = AddToResultKeys(dkeys, "direct")
if err != nil {
return err
}
opt, err := options.Pin.Ls.Type(typeStr)
if err != nil {
panic("unhandled pin type")
}
if typeStr == "recursive" || typeStr == "all" {
rkeys, err := n.Pinning.RecursiveKeys(req.Context)
if err != nil {
return err
}
err = AddToResultKeys(rkeys, "recursive")
if err != nil {
return err
}

pins, err := api.Pin().Ls(req.Context, opt)
if err != nil {
return err
}
if typeStr == "indirect" || typeStr == "all" {
rkeys, err := n.Pinning.RecursiveKeys(req.Context)

for p := range pins {
err = emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: p.Type(),
Cid: enc.Encode(p.Path().Cid()),
},
})
if err != nil {
return err
}
for _, k := range rkeys {
var visitErr error
err := dag.Walk(req.Context, dag.GetLinksWithDAG(n.DAG), k, func(c cid.Cid) bool {
r := keys.Visit(c)
if r {
err := emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: "indirect",
Cid: enc.Encode(c),
},
})
if err != nil {
visitErr = err
}
}
return r
}, dag.SkipRoot(), dag.Concurrent())

if visitErr != nil {
return visitErr
}
if err != nil {
return err
}
}
}

return nil
Expand Down
144 changes: 107 additions & 37 deletions core/coreapi/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
coreiface "github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
path "github.com/ipfs/interface-go-ipfs-core/path"

"github.com/ipfs/go-ipfs/pin"
)

type PinAPI CoreAPI
Expand Down Expand Up @@ -40,7 +42,7 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp
return api.pinning.Flush(ctx)
}

func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreiface.Pin, error) {
func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan coreiface.Pin, error) {
settings, err := caopts.PinLsOptions(opts...)
if err != nil {
return nil, err
Expand All @@ -52,7 +54,26 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreif
return nil, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type)
}

return api.pinLsAll(settings.Type, ctx)
return api.pinLsAll(settings.Type, ctx), nil
}

func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.PinIsPinnedOption) (string, bool, error) {
dagNode, err := api.core().ResolveNode(ctx, p)
if err != nil {
return "", false, fmt.Errorf("pin: %s", err)
}

settings, err := caopts.PinIsPinnedOptions(opts...)
if err != nil {
return "", false, err
}

mode, ok := pin.StringToMode(settings.WithType)
if !ok {
return "", false, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.WithType)
}

return api.pinning.IsPinnedWithType(ctx, dagNode.Cid(), mode)
}

// Rm pin rm api
Expand Down Expand Up @@ -183,6 +204,7 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro
type pinInfo struct {
pinType string
path path.Resolved
err error
}

func (p *pinInfo) Path() path.Resolved {
Expand All @@ -193,58 +215,106 @@ func (p *pinInfo) Type() string {
return p.pinType
}

func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pin, error) {
func (p *pinInfo) Err() error {
return p.err
}

keys := make(map[cid.Cid]*pinInfo)
func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) <-chan coreiface.Pin {
out := make(chan coreiface.Pin)

keys := cid.NewSet()

AddToResultKeys := func(keyList []cid.Cid, typeStr string) {
for _, c := range keyList {
keys[c] = &pinInfo{
pinType: typeStr,
path: path.IpldPath(c),
if keys.Visit(c) {
out <- &pinInfo{
pinType: typeStr,
path: path.IpldPath(c),
}
}
}
}

if typeStr == "direct" || typeStr == "all" {
dkeys, err := api.pinning.DirectKeys(ctx)
if err != nil {
return nil, err
VisitKeys := func(keyList []cid.Cid) {
for _, c := range keyList {
keys.Visit(c)
}
AddToResultKeys(dkeys, "direct")
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
rkeys, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err

go func() {
defer close(out)

if typeStr == "recursive" || typeStr == "all" {
rkeys, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
out <- &pinInfo{err: err}
return
}
AddToResultKeys(rkeys, "recursive")
}
for _, k := range rkeys {
err := merkledag.Walk(
ctx, merkledag.GetLinksWithDAG(api.dag), k,
set.Visit,
merkledag.SkipRoot(), merkledag.Concurrent(),
)
if typeStr == "direct" || typeStr == "all" {
dkeys, err := api.pinning.DirectKeys(ctx)
if err != nil {
return nil, err
out <- &pinInfo{err: err}
return
}
AddToResultKeys(dkeys, "direct")
}
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
rkeys, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
if typeStr == "all" {
set := cid.NewSet()
rkeys, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
out <- &pinInfo{err: err}
return
}
for _, k := range rkeys {
err := merkledag.Walk(
ctx, merkledag.GetLinksWithDAG(api.dag), k,
set.Visit,
merkledag.SkipRoot(), merkledag.Concurrent(),
)
if err != nil {
out <- &pinInfo{err: err}
return
}
}
AddToResultKeys(set.Keys(), "indirect")
}
AddToResultKeys(rkeys, "recursive")
}
if typeStr == "indirect" {
// We need to first visit the direct pins that have priority
// without emitting them

out := make([]coreiface.Pin, 0, len(keys))
for _, v := range keys {
out = append(out, v)
}
dkeys, err := api.pinning.DirectKeys(ctx)
if err != nil {
out <- &pinInfo{err: err}
return
}
VisitKeys(dkeys)

return out, nil
rkeys, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
out <- &pinInfo{err: err}
return
}
VisitKeys(rkeys)

set := cid.NewSet()
for _, k := range rkeys {
err := merkledag.Walk(
ctx, merkledag.GetLinksWithDAG(api.dag), k,
set.Visit,
merkledag.SkipRoot(), merkledag.Concurrent(),
)
if err != nil {
out <- &pinInfo{err: err}
return
}
}
AddToResultKeys(set.Keys(), "indirect")
}
}()

return out
}

func (api *PinAPI) core() coreiface.CoreAPI {
Expand Down
Loading

0 comments on commit 019abf2

Please sign in to comment.