diff --git a/requestmanager/client.go b/requestmanager/client.go index 25655c2f..b7c953ea 100644 --- a/requestmanager/client.go +++ b/requestmanager/client.go @@ -66,6 +66,7 @@ type inProgressRequestStatus struct { inProgressChan chan graphsync.ResponseProgress inProgressErr chan error traverser ipldutil.Traverser + traverserCancel context.CancelFunc } // PeerHandler is an interface that can send requests to peers diff --git a/requestmanager/server.go b/requestmanager/server.go index d9f7ba7c..a7647b4e 100644 --- a/requestmanager/server.go +++ b/requestmanager/server.go @@ -109,12 +109,17 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re LinkBudget: int64(rm.maxLinksPerRequest), } } + // the traverser has its own context because we want to fail on block boundaries, in the executor, + // and make sure all blocks included up to the termination message + // are processed and passed in the response channel + ctx, cancel := context.WithCancel(rm.ctx) + ipr.traverserCancel = cancel ipr.traverser = ipldutil.TraversalBuilder{ Root: cidlink.Link{Cid: ipr.request.Root()}, Selector: ipr.request.Selector(), Visitor: func(tp traversal.Progress, node ipld.Node, tr traversal.VisitReason) error { select { - case <-ipr.ctx.Done(): + case <-ctx.Done(): case ipr.inProgressChan <- graphsync.ResponseProgress{ Node: node, Path: tp.Path, @@ -126,7 +131,7 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re Chooser: ipr.nodeStyleChooser, LinkSystem: rm.linkSystem, Budget: budget, - }.Start(ipr.ctx) + }.Start(ctx) } ipr.state = running @@ -165,6 +170,7 @@ func (rm *RequestManager) terminateRequest(requestID graphsync.RequestID, ipr *i ipr.cancelFn() rm.asyncLoader.CleanupRequest(requestID) if ipr.traverser != nil { + ipr.traverserCancel() ipr.traverser.Shutdown(rm.ctx) } // make sure context is not closed before closing channels (could cause send @@ -236,6 +242,10 @@ func (rm *RequestManager) processResponseMessage(p peer.ID, responses []gsmsg.Gr filteredResponses = rm.filterResponsesForPeer(filteredResponses, p) rm.updateLastResponses(filteredResponses) responseMetadata := metadataForResponses(filteredResponses) + fmt.Println(responseMetadata) + if len(filteredResponses) > 0 { + fmt.Println(filteredResponses[0].Status()) + } rm.asyncLoader.ProcessResponse(responseMetadata, blks) rm.processTerminations(filteredResponses) log.Debugf("end processing message for peer %s", p) diff --git a/testutil/testutil.go b/testutil/testutil.go index fd8b3149..6fdb7005 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -152,10 +152,13 @@ func ReadNResponses(ctx context.Context, t TestingT, responseChan <-chan graphsy var returnedBlocks []graphsync.ResponseProgress for i := 0; i < count; i++ { select { - case blk := <-responseChan: + case blk, ok := <-responseChan: + if !ok { + require.FailNowf(t, "Channel closed early", "expected %d messages, got %d", count, len(returnedBlocks)) + } returnedBlocks = append(returnedBlocks, blk) case <-ctx.Done(): - t.Fatal("Unable to read enough responses") + require.FailNow(t, "Unable to read enough responses") } } return returnedBlocks