Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FetchGraphWithDepthLimit to specify depth-limited graph fetching. #2

Merged
merged 1 commit into from
Aug 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 101 additions & 19 deletions merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,25 +158,57 @@ func (n *dagService) Session(ctx context.Context) ipld.NodeGetter {

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *cid.Cid, serv ipld.DAGService) error {
return FetchGraphWithDepthLimit(ctx, root, -1, serv)
}

// FetchGraphWithDepthLimit fetches all nodes that are children to the given
// node down to the given depth. maxDetph=0 means "only fetch root",
// maxDepth=1 means "fetch root and its direct children" and so on...
// maxDepth=-1 means unlimited.
func FetchGraphWithDepthLimit(ctx context.Context, root *cid.Cid, depthLim int, serv ipld.DAGService) error {
var ng ipld.NodeGetter = serv
ds, ok := serv.(*dagService)
if ok {
ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
}

set := make(map[string]int)

// Visit function returns true when:
// * The element is not in the set and we're not over depthLim
// * The element is in the set but recorded depth is deeper
// than currently seen (if we find it higher in the tree we'll need
// to explore deeper than before).
// depthLim = -1 means we only return true if the element is not in the
// set.
visit := func(c *cid.Cid, depth int) bool {
key := string(c.Bytes())
oldDepth, ok := set[key]

if (ok && depthLim < 0) || (depthLim >= 0 && depth > depthLim) {
return false
}

if !ok || oldDepth > depth {
set[key] = depth
return true
}
return false
}

v, _ := ctx.Value(progressContextKey).(*ProgressTracker)
if v == nil {
return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit)
return EnumerateChildrenAsyncDepth(ctx, GetLinksDirect(ng), root, 0, visit)
}
set := cid.NewSet()
visit := func(c *cid.Cid) bool {
if set.Visit(c) {

visitProgress := func(c *cid.Cid, depth int) bool {
if visit(c, depth) {
v.Increment()
return true
}
return false
}
return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit)
return EnumerateChildrenAsyncDepth(ctx, GetLinksDirect(ng), root, 0, visitProgress)
}

// GetMany gets many nodes from the DAG at once.
Expand Down Expand Up @@ -254,14 +286,26 @@ func GetLinksWithDAG(ng ipld.NodeGetter) GetLinks {
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
visitDepth := func(c *cid.Cid, depth int) bool {
return visit(c)
}

return EnumerateChildrenDepth(ctx, getLinks, root, 0, visitDepth)
}

// EnumerateChildrenDepth walks the dag below the given root and passes the
// current depth to a given visit function. The visit function can be used to
// limit DAG exploration.
func EnumerateChildrenDepth(ctx context.Context, getLinks GetLinks, root *cid.Cid, depth int, visit func(*cid.Cid, int) bool) error {
links, err := getLinks(ctx, root)
if err != nil {
return err
}

for _, lnk := range links {
c := lnk.Cid
if visit(c) {
err = EnumerateChildren(ctx, getLinks, c, visit)
if visit(c, depth+1) {
err = EnumerateChildrenDepth(ctx, getLinks, c, depth+1, visit)
if err != nil {
return err
}
Expand Down Expand Up @@ -305,8 +349,30 @@ var FetchGraphConcurrency = 8
//
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
feed := make(chan *cid.Cid)
out := make(chan []*ipld.Link)
visitDepth := func(c *cid.Cid, depth int) bool {
return visit(c)
}

return EnumerateChildrenAsyncDepth(ctx, getLinks, c, 0, visitDepth)
}

// EnumerateChildrenAsyncDepth is equivalent to EnumerateChildrenDepth *except*
// that it fetches children in parallel (down to a maximum depth in the graph).
//
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func EnumerateChildrenAsyncDepth(ctx context.Context, getLinks GetLinks, c *cid.Cid, startDepth int, visit func(*cid.Cid, int) bool) error {
type cidDepth struct {
cid *cid.Cid
depth int
}

type linksDepth struct {
links []*ipld.Link
depth int
}

feed := make(chan *cidDepth)
out := make(chan *linksDepth)
done := make(chan struct{})

var setlk sync.Mutex
Expand All @@ -318,20 +384,28 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid,

for i := 0; i < FetchGraphConcurrency; i++ {
go func() {
for ic := range feed {
for cdepth := range feed {
ci := cdepth.cid
depth := cdepth.depth

setlk.Lock()
shouldVisit := visit(ic)
shouldVisit := visit(ci, depth)
setlk.Unlock()

if shouldVisit {
links, err := getLinks(ctx, ic)
links, err := getLinks(ctx, ci)
if err != nil {
errChan <- err
return
}

outLinks := &linksDepth{
links: links,
depth: depth + 1,
}

select {
case out <- links:
case out <- outLinks:
case <-fetchersCtx.Done():
return
}
Expand All @@ -346,10 +420,13 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid,
defer close(feed)

send := feed
var todobuffer []*cid.Cid
var todobuffer []*cidDepth
var inProgress int

next := c
next := &cidDepth{
cid: c,
depth: startDepth,
}
for {
select {
case send <- next:
Expand All @@ -366,13 +443,18 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid,
if inProgress == 0 && next == nil {
return nil
}
case links := <-out:
for _, lnk := range links {
case linksDepth := <-out:
for _, lnk := range linksDepth.links {
cd := &cidDepth{
cid: lnk.Cid,
depth: linksDepth.depth,
}

if next == nil {
next = lnk.Cid
next = cd
send = feed
} else {
todobuffer = append(todobuffer, lnk.Cid)
todobuffer = append(todobuffer, cd)
}
}
case err := <-errChan:
Expand Down
124 changes: 107 additions & 17 deletions merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,52 @@ import (
ipld "github.com/ipfs/go-ipld-format"
)

// makeDepthTestingGraph makes a small DAG with two levels. The level-two
// nodes are both children of the root and of one of the level 1 nodes.
// This is meant to test the EnumerateChildren*Depth functions.
func makeDepthTestingGraph(t *testing.T, ds ipld.DAGService) ipld.Node {
root := NodeWithData(nil)
l11 := NodeWithData([]byte("leve1_node1"))
l12 := NodeWithData([]byte("leve1_node2"))
l21 := NodeWithData([]byte("leve2_node1"))
l22 := NodeWithData([]byte("leve2_node2"))
l23 := NodeWithData([]byte("leve2_node3"))

l11.AddNodeLink(l21.Cid().String(), l21)
l11.AddNodeLink(l22.Cid().String(), l22)
l11.AddNodeLink(l23.Cid().String(), l23)

root.AddNodeLink(l11.Cid().String(), l11)
root.AddNodeLink(l12.Cid().String(), l12)
root.AddNodeLink(l23.Cid().String(), l23)

ctx := context.Background()
for _, n := range []ipld.Node{l23, l22, l21, l12, l11, root} {
err := ds.Add(ctx, n)
if err != nil {
t.Fatal(err)
}
}

return root
}

// Check that all children of root are in the given set and in the datastore
func traverseAndCheck(t *testing.T, root ipld.Node, ds ipld.DAGService, hasF func(c *cid.Cid) bool) {
// traverse dag and check
for _, lnk := range root.Links() {
c := lnk.Cid
if !hasF(c) {
t.Fatal("missing key in set! ", lnk.Cid.String())
}
child, err := ds.Get(context.Background(), c)
if err != nil {
t.Fatal(err)
}
traverseAndCheck(t, child, ds, hasF)
}
}

func TestNode(t *testing.T) {

n1 := NodeWithData([]byte("beep"))
Expand Down Expand Up @@ -293,6 +339,66 @@ func TestFetchGraph(t *testing.T) {
}
}

func TestFetchGraphWithDepthLimit(t *testing.T) {
type testcase struct {
depthLim int
setLen int
}

tests := []testcase{
testcase{1, 3},
testcase{0, 0},
testcase{-1, 5},
testcase{2, 5},
testcase{3, 5},
}

testF := func(t *testing.T, tc testcase) {
var dservs []ipld.DAGService
bsis := bstest.Mocks(2)
for _, bsi := range bsis {
dservs = append(dservs, NewDAGService(bsi))
}

root := makeDepthTestingGraph(t, dservs[0])

err := FetchGraphWithDepthLimit(context.TODO(), root.Cid(), tc.depthLim, dservs[1])
if err != nil {
t.Fatal(err)
}

// create an offline dagstore and ensure all blocks were fetched
bs := bserv.New(bsis[1].Blockstore(), offline.Exchange(bsis[1].Blockstore()))

offlineDS := NewDAGService(bs)

set := make(map[string]int)
visitF := func(c *cid.Cid, depth int) bool {
if tc.depthLim < 0 || depth <= tc.depthLim {
set[string(c.Bytes())] = depth
return true
}
return false

}

err = EnumerateChildrenDepth(context.Background(), offlineDS.GetLinks, root.Cid(), 0, visitF)
if err != nil {
t.Fatal(err)
}

if len(set) != tc.setLen {
t.Fatalf("expected %d nodes but visited %d", tc.setLen, len(set))
}
}

for _, tc := range tests {
t.Run(fmt.Sprintf("depth limit %d", tc.depthLim), func(t *testing.T) {
testF(t, tc)
})
}
}

func TestEnumerateChildren(t *testing.T) {
bsi := bstest.Mocks(1)
ds := NewDAGService(bsi[0])
Expand All @@ -307,23 +413,7 @@ func TestEnumerateChildren(t *testing.T) {
t.Fatal(err)
}

var traverse func(n ipld.Node)
traverse = func(n ipld.Node) {
// traverse dag and check
for _, lnk := range n.Links() {
c := lnk.Cid
if !set.Has(c) {
t.Fatal("missing key in set! ", lnk.Cid.String())
}
child, err := ds.Get(context.Background(), c)
if err != nil {
t.Fatal(err)
}
traverse(child)
}
}

traverse(root)
traverseAndCheck(t, root, ds, set.Has)
}

func TestFetchFailure(t *testing.T) {
Expand Down