Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
Converting to datastore-based pins requires loading all dag-storage p…
Browse files Browse the repository at this point in the history
…ins into memory, in addition to indirect pins. This is unnecessary for conversion, and much memory can be saved by avoiding loading all pins.
  • Loading branch information
gammazero committed Jan 14, 2021
1 parent 4c92071 commit 5d98075
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 27 deletions.
35 changes: 35 additions & 0 deletions ipldpinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,41 @@ func New(dstore ds.Datastore, dserv, internal ipld.DAGService) (*pinner, error)
}, nil
}

// LoadKeys reads the pinned CIDs and sends them on the given channel. This is
// used to read pins without loading them all into memory.
func LoadKeys(ctx context.Context, dstore ds.Datastore, dserv, internal ipld.DAGService, recursive bool, keyChan chan<- cid.Cid) error {
rootKey, err := dstore.Get(pinDatastoreKey)
if err != nil {
if err == ds.ErrNotFound {
return nil
}
return err
}
rootCid, err := cid.Cast(rootKey)
if err != nil {
return err
}

root, err := internal.Get(ctx, rootCid)
if err != nil {
return fmt.Errorf("cannot find pinning root object: %v", err)
}

rootpb, ok := root.(*mdag.ProtoNode)
if !ok {
return mdag.ErrNotProtobuf
}

var linkName string
if recursive {
linkName = linkRecursive
} else {
linkName = linkDirect
}

return loadSetChan(ctx, internal, rootpb, linkName, keyChan)
}

// Pin the given node, optionally recursive
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
err := p.dserv.Add(ctx, node)
Expand Down
23 changes: 23 additions & 0 deletions ipldpinner/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,29 @@ func TestPinnerBasic(t *testing.T) {

// Test recursively pinned
assertPinned(t, np, bk, "could not find recursively pinned node")

// Test that LoadKeys returns the expected CIDs.
keyChan := make(chan cid.Cid)
go func() {
err = LoadKeys(ctx, dstore, dserv, dserv, true, keyChan)
close(keyChan)
}()
keys := map[cid.Cid]struct{}{}
for c := range keyChan {
keys[c] = struct{}{}
}
if err != nil {
t.Fatal(err)
}
recKeys, _ := np.RecursiveKeys(ctx)
if len(keys) != len(recKeys) {
t.Fatal("wrong number of keys from LoadKeys")
}
for _, k := range recKeys {
if _, ok := keys[k]; !ok {
t.Fatal("LoadKeys did not return correct keys")
}
}
}

func TestIsPinnedLookup(t *testing.T) {
Expand Down
35 changes: 32 additions & 3 deletions ipldpinner/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,15 @@ func walkItems(ctx context.Context, dag ipld.DAGService, n *merkledag.ProtoNode,
// readHdr guarantees fanout is a safe value
fanout := hdr.GetFanout()
for i, l := range n.Links()[fanout:] {
if err := fn(i, l); err != nil {
if err = fn(i, l); err != nil {
return err
}
}
for _, l := range n.Links()[:fanout] {
c := l.Cid
children(c)
if children != nil {
children(c)
}
if c.Equals(emptyKey) {
continue
}
Expand All @@ -239,7 +241,7 @@ func walkItems(ctx context.Context, dag ipld.DAGService, n *merkledag.ProtoNode,
return merkledag.ErrNotProtobuf
}

if err := walkItems(ctx, dag, stpb, fn, children); err != nil {
if err = walkItems(ctx, dag, stpb, fn, children); err != nil {
return err
}
}
Expand Down Expand Up @@ -277,6 +279,33 @@ func loadSet(ctx context.Context, dag ipld.DAGService, root *merkledag.ProtoNode
return res, nil
}

func loadSetChan(ctx context.Context, dag ipld.DAGService, root *merkledag.ProtoNode, name string, keyChan chan<- cid.Cid) error {
l, err := root.GetNodeLink(name)
if err != nil {
return err
}

n, err := l.GetNode(ctx, dag)
if err != nil {
return err
}

pbn, ok := n.(*merkledag.ProtoNode)
if !ok {
return merkledag.ErrNotProtobuf
}

walk := func(idx int, link *ipld.Link) error {
keyChan <- link.Cid
return nil
}

if err = walkItems(ctx, dag, pbn, walk, nil); err != nil {
return err
}
return nil
}

func getCidListIterator(cids []cid.Cid) itemIterator {
return func() (c cid.Cid, ok bool) {
if len(cids) == 0 {
Expand Down
47 changes: 23 additions & 24 deletions pinconv/pinconv.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"context"
"fmt"

"github.com/ipfs/go-cid"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ipfspinner "github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-pinner/dspinner"
Expand All @@ -24,39 +24,38 @@ import (
func ConvertPinsFromIPLDToDS(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, internal ipld.DAGService) (ipfspinner.Pinner, int, error) {
const ipldPinPath = "/local/pins"

ipldPinner, err := ipldpinner.New(dstore, dserv, internal)
if err != nil {
return nil, 0, err
}

dsPinner, err := dspinner.New(ctx, dstore, dserv)
if err != nil {
return nil, 0, err
}

seen := cid.NewSet()
cids, err := ipldPinner.RecursiveKeys(ctx)
if err != nil {
return nil, 0, err
var convCount int
keyChan := make(chan cid.Cid)

go func() {
err = ipldpinner.LoadKeys(ctx, dstore, dserv, internal, true, keyChan)
close(keyChan)
}()
for key := range keyChan {
dsPinner.PinWithMode(key, ipfspinner.Recursive)
convCount++
}
for i := range cids {
seen.Add(cids[i])
dsPinner.PinWithMode(cids[i], ipfspinner.Recursive)
if err != nil {
return nil, 0, fmt.Errorf("cannot load recursive keys: %s", err)
}
convCount := len(cids)

cids, err = ipldPinner.DirectKeys(ctx)
if err != nil {
return nil, 0, err
keyChan = make(chan cid.Cid)
go func() {
err = ipldpinner.LoadKeys(ctx, dstore, dserv, internal, false, keyChan)
close(keyChan)
}()
for key := range keyChan {
dsPinner.PinWithMode(key, ipfspinner.Direct)
convCount++
}
for i := range cids {
if seen.Has(cids[i]) {
// Pin was already pinned recursively
continue
}
dsPinner.PinWithMode(cids[i], ipfspinner.Direct)
if err != nil {
return nil, 0, fmt.Errorf("cannot load direct keys: %s", err)
}
convCount += len(cids)

err = dsPinner.Flush(ctx)
if err != nil {
Expand Down

0 comments on commit 5d98075

Please sign in to comment.