Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pausing and unpausing of requests #62

Merged
merged 10 commits into from
Apr 21, 2020
16 changes: 9 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,29 @@ module github.com/ipfs/go-graphsync
go 1.12

require (
github.com/filecoin-project/go-fil-markets v0.0.0-20200408062434-d92f329a6428
github.com/gogo/protobuf v1.3.1
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.5
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.4
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-ipld-format v0.0.2
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log v1.0.2
github.com/ipfs/go-merkledag v0.2.4
github.com/ipfs/go-merkledag v0.3.1
github.com/ipfs/go-peertaskqueue v0.2.0
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb
github.com/ipld/go-ipld-prime v0.0.2-0.20191108012745-28a82f04c785
github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5
github.com/ipfs/go-unixfs v0.2.4
github.com/ipld/go-ipld-prime v0.0.2-0.20200229094926-eb71617f4aeb
github.com/ipld/go-ipld-prime-proto v0.0.0-20200409003434-8cf97d9cb362
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/libp2p/go-libp2p v0.6.0
github.com/libp2p/go-libp2p-core v0.5.0
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multihash v0.0.13
github.com/stretchr/testify v1.4.0
Expand Down
118 changes: 118 additions & 0 deletions go.sum

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ type ResponseData interface {
Extension(name ExtensionName) ([]byte, bool)
}

// BlockData gives information about a block included in a graphsync response
type BlockData interface {
// Link is the link/cid for the block
Link() ipld.Link

// BlockSize specifies the size of the block
BlockSize() uint64

// BlockSize specifies the amount of data actually transmitted over the network
BlockSizeOnWire() uint64
}

// IncomingRequestHookActions are actions that a request hook can take to change
// behavior for the response
type IncomingRequestHookActions interface {
Expand All @@ -152,6 +164,14 @@ type OutgoingRequestHookActions interface {
UseNodeBuilderChooser(traversal.NodeBuilderChooser)
}

// OutgoingBlockHookActions are actions that an outgoing block hook can take to
// change the execution of this request
type OutgoingBlockHookActions interface {
SendExtensionData(ExtensionData)
TerminateWithError(error)
PauseResponse()
}

// OnIncomingRequestHook is a hook that runs each time a new request is received.
// It receives the peer that sent the request and all data about the request.
// It receives an interface for customizing the response to this request
Expand All @@ -167,6 +187,13 @@ type OnIncomingResponseHook func(p peer.ID, responseData ResponseData) error
// It receives an interface for customizing how we handle executing this request
type OnOutgoingRequestHook func(p peer.ID, request RequestData, hookActions OutgoingRequestHookActions)

// OnOutgoingBlockHook is a hook that runs immediately after a requestor sends a new block
// on a response
// It receives the peer we're sending a request to, all the data aobut the request, a link for the block sent,
// and the size of the block sent
// It receives an interface for taking further action on the response
type OnOutgoingBlockHook func(p peer.ID, request RequestData, block BlockData, hookActions OutgoingBlockHookActions)

// UnregisterHookFunc is a function call to unregister a hook that was previously registered
type UnregisterHookFunc func()

Expand All @@ -186,4 +213,10 @@ type GraphExchange interface {

// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request
RegisterOutgoingRequestHook(hook OnOutgoingRequestHook) UnregisterHookFunc

// RegisterOutgoingBlockHook adds a hook that runs every time a block is sent from a responder
RegisterOutgoingBlockHook(hook OnOutgoingBlockHook) UnregisterHookFunc

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
UnpauseResponse(peer.ID, RequestID) error
}
30 changes: 26 additions & 4 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (
"github.com/ipfs/go-graphsync/requestmanager"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
"github.com/ipfs/go-graphsync/responsemanager"
"github.com/ipfs/go-graphsync/responsemanager/blockhooks"
"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
"github.com/ipfs/go-graphsync/responsemanager/requesthooks"
"github.com/ipfs/go-graphsync/selectorvalidator"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-peertaskqueue"
Expand All @@ -35,6 +38,9 @@ type GraphSync struct {
peerResponseManager *peerresponsemanager.PeerResponseManager
peerTaskQueue *peertaskqueue.PeerTaskQueue
peerManager *peermanager.PeerMessageManager
incomingRequestHooks *requesthooks.IncomingRequestHooks
outgoingBlockHooks *blockhooks.OutgoingBlockHooks
persistenceOptions *persistenceoptions.PersistenceOptions
ctx context.Context
cancel context.CancelFunc
unregisterDefaultValidator graphsync.UnregisterHookFunc
Expand Down Expand Up @@ -69,15 +75,21 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
return peerresponsemanager.NewResponseSender(ctx, p, peerManager)
}
peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue)
unregisterDefaultValidator := responseManager.RegisterRequestHook(selectorvalidator.SelectorValidator(maxRecursionDepth))
persistenceOptions := persistenceoptions.New()
incomingRequestHooks := requesthooks.New(persistenceOptions)
outgoingBlockHooks := blockhooks.New()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks)
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
loader: loader,
storer: storer,
asyncLoader: asyncLoader,
requestManager: requestManager,
peerManager: peerManager,
persistenceOptions: persistenceOptions,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
peerTaskQueue: peerTaskQueue,
peerResponseManager: peerResponseManager,
responseManager: responseManager,
Expand Down Expand Up @@ -108,7 +120,7 @@ func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, sel
// it is considered to have "validated" the request -- and that validation supersedes
// the normal validation of requests Graphsync does (i.e. all selectors can be accepted)
func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingRequestHook) graphsync.UnregisterHookFunc {
return gs.responseManager.RegisterRequestHook(hook)
return gs.incomingRequestHooks.Register(hook)
}

// RegisterIncomingResponseHook adds a hook that runs when a response is received
Expand All @@ -127,7 +139,17 @@ func (gs *GraphSync) RegisterPersistenceOption(name string, loader ipld.Loader,
if err != nil {
return err
}
return gs.responseManager.RegisterPersistenceOption(name, loader)
return gs.persistenceOptions.Register(name, loader)
}

// RegisterOutgoingBlockHook registers a hook that runs after each block is sent in a response
func (gs *GraphSync) RegisterOutgoingBlockHook(hook graphsync.OnOutgoingBlockHook) graphsync.UnregisterHookFunc {
return gs.outgoingBlockHooks.Register(hook)
}

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.UnpauseResponse(p, requestID)
}

type graphSyncReceiver GraphSync
Expand Down
54 changes: 53 additions & 1 deletion impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,58 @@ func TestGraphsyncRoundTrip(t *testing.T) {
require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data")
}

func TestPauseResume(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()

stopPoint := 50
blocksSent := 0
requestIDChan := make(chan graphsync.RequestID, 1)
responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
_, has := requestData.Extension(td.extensionName)
if has {
select {
case requestIDChan <- requestData.ID():
default:
}
blocksSent++
if blocksSent == stopPoint {
hookActions.PauseResponse()
}
} else {
hookActions.TerminateWithError(errors.New("should have sent extension"))
}
})

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyResponseRange(ctx, progressChan, 0, stopPoint)
timer := time.NewTimer(100 * time.Millisecond)
testutil.AssertDoesReceiveFirst(t, timer.C, "should pause request", progressChan)

requestID := <-requestIDChan
err := responder.UnpauseResponse(td.host1.ID(), requestID)
require.NoError(t, err)

blockChain.VerifyRemainder(ctx, progressChan, stopPoint)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

}

func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
// create network
ctx := context.Background()
Expand Down Expand Up @@ -273,7 +325,7 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector())
testutil.VerifyEmptyResponse(ctx, t, progressChan)
testutil.VerifySingleTerminalError(ctx, t, errChan)
testutil.VerifyHasErrors(ctx, t, errChan)

progressChan, errChan = requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension)

Expand Down
9 changes: 6 additions & 3 deletions ipldutil/ipldutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,19 @@ func ErrDoNotFollow() error {
}

var (
defaultChooser traversal.NodeBuilderChooser = dagpb.AddDagPBSupportToChooser(func(ipld.Link, ipld.LinkContext) ipld.NodeBuilder {
return free.NodeBuilder()
defaultChooser traversal.NodeBuilderChooser = dagpb.AddDagPBSupportToChooser(func(ipld.Link, ipld.LinkContext) (ipld.NodeBuilder, error) {
return free.NodeBuilder(), nil
})
)

func Traverse(ctx context.Context, loader ipld.Loader, chooser traversal.NodeBuilderChooser, root ipld.Link, s selector.Selector, fn traversal.AdvVisitFn) error {
if chooser == nil {
chooser = defaultChooser
}
builder := chooser(root, ipld.LinkContext{})
builder, err := chooser(root, ipld.LinkContext{})
if err != nil {
return err
}
node, err := root.Load(ctx, ipld.LinkContext{}, builder, loader)
if err != nil {
return err
Expand Down
Loading