From 7379cc1adec72a99140be06207ab1f80189089f3 Mon Sep 17 00:00:00 2001 From: Iaroslav Gridin Date: Sat, 14 Jan 2017 19:54:52 +0200 Subject: [PATCH 1/2] Pass cids instead of nodes around in EnumerateChildrenAsync License: MIT Signed-off-by: Iaroslav Gridin --- merkledag.go | 75 ++++++++++++++++++++++++---------------------------- 1 file changed, 35 insertions(+), 40 deletions(-) diff --git a/merkledag.go b/merkledag.go index ae8d71cfe28c..f6bc8f9f50ee 100644 --- a/merkledag.go +++ b/merkledag.go @@ -394,45 +394,40 @@ func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit var FetchGraphConcurrency = 8 func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error { - if !visit(c) { - return nil - } - - root, err := ds.Get(ctx, c) - if err != nil { - return err - } - - feed := make(chan node.Node) - out := make(chan *NodeOption) + feed := make(chan *cid.Cid) + out := make(chan node.Node) done := make(chan struct{}) var setlk sync.Mutex - + + errChan := make(chan error) + fetchersCtx, cancel := context.WithCancel(ctx) + + defer cancel() + for i := 0; i < FetchGraphConcurrency; i++ { go func() { - for n := range feed { - links := n.Links() - cids := make([]*cid.Cid, 0, len(links)) - for _, l := range links { - setlk.Lock() - unseen := visit(l.Cid) - setlk.Unlock() - if unseen { - cids = append(cids, l.Cid) - } + for ic := range feed { + n, err := ds.Get(ctx, ic) + if err != nil { + errChan <- err + return } - - for nopt := range ds.GetMany(ctx, cids) { + + setlk.Lock() + unseen := visit(ic) + setlk.Unlock() + + if unseen { select { - case out <- nopt: - case <-ctx.Done(): + case out <- n: + case <-fetchersCtx.Done(): return } } select { case done <- struct{}{}: - case <-ctx.Done(): + case <-fetchersCtx.Done(): } } }() @@ -440,10 +435,10 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi defer close(feed) send := feed - var todobuffer []node.Node + var todobuffer []*cid.Cid var inProgress int - next := root + next := c for { select { case send <- next: @@ -460,18 +455,18 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi if inProgress == 0 && next == nil { return nil } - case nc := <-out: - if nc.Err != nil { - return nc.Err - } - - if next == nil { - next = nc.Node - send = feed - } else { - todobuffer = append(todobuffer, nc.Node) + case nd := <-out: + for _, lnk := range nd.Links() { + if next == nil { + next = lnk.Cid + send = feed + } else { + todobuffer = append(todobuffer, lnk.Cid) + } } - + case err := <-errChan: + return err + case <-ctx.Done(): return ctx.Err() } From 03782baff57dcc6716275ac1b0eb7e69cc5f0826 Mon Sep 17 00:00:00 2001 From: Iaroslav Gridin Date: Thu, 19 Jan 2017 13:51:55 +0200 Subject: [PATCH 2/2] Re-enable async children enumerating in FetchGraph License: MIT Signed-off-by: Iaroslav Gridin --- merkledag.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merkledag.go b/merkledag.go index f6bc8f9f50ee..faff47796c6d 100644 --- a/merkledag.go +++ b/merkledag.go @@ -140,7 +140,7 @@ func (n *dagService) Remove(nd node.Node) error { // FetchGraph fetches all nodes that are children of the given node func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error { - return EnumerateChildren(ctx, serv, c, cid.NewSet().Visit, false) + return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit) } // FindLinks searches this nodes links for the given key,