Skip to content

Commit

Permalink
feat: remove peer.ID from asyncloader & loadattempter
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Feb 10, 2022
1 parent fde604d commit 8c1008c
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 119 deletions.
9 changes: 4 additions & 5 deletions requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

blocks "github.com/ipfs/go-block-format"
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -140,9 +139,9 @@ func (al *AsyncLoader) ProcessResponse(

// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
func (al *AsyncLoader) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult {
func (al *AsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult {
resultChan := make(chan types.AsyncLoadResult, 1)
lr := loadattemptqueue.NewLoadRequest(p, requestID, link, linkContext, resultChan)
lr := loadattemptqueue.NewLoadRequest(requestID, link, linkContext, resultChan)
al.stateLk.Lock()
defer al.stateLk.Unlock()
_, retry := al.activeRequests[requestID]
Expand All @@ -165,7 +164,7 @@ func (al *AsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {
// CleanupRequest indicates the given request is complete on the client side,
// and no further attempts will be made to load links for this request,
// so any cached response data is invalid can be cleaned
func (al *AsyncLoader) CleanupRequest(p peer.ID, requestID graphsync.RequestID) {
func (al *AsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
al.stateLk.Lock()
defer al.stateLk.Unlock()
responseCache := al.responseCache
Expand Down Expand Up @@ -194,7 +193,7 @@ func (al *AsyncLoader) getResponseCache(queue string) *responsecache.ResponseCac
func setupAttemptQueue(lsys ipld.LinkSystem) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {
unverifiedBlockStore := unverifiedblockstore.New(lsys.StorageWriteOpener)
responseCache := responsecache.New(unverifiedBlockStore)
loadAttemptQueue := loadattemptqueue.New(func(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) types.AsyncLoadResult {
loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) types.AsyncLoadResult {
// load from response cache
data, err := responseCache.AttemptLoad(requestID, link, linkContext)
if err != nil {
Expand Down
46 changes: 17 additions & 29 deletions requestmanager/asyncloader/asyncloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ func TestAsyncLoadInitialLoadSucceedsLocallyPresent(t *testing.T) {
link := st.Store(t, block)
withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) {
requestID := graphsync.NewRequestID()
p := testutil.GeneratePeers(1)[0]
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
resultChan := asyncLoader.AsyncLoad(requestID, link, ipld.LinkContext{})
assertSuccessResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 1)
})
Expand All @@ -45,9 +44,8 @@ func TestAsyncLoadInitialLoadSucceedsResponsePresent(t *testing.T) {
Action: graphsync.LinkActionPresent,
}}),
}
p := testutil.GeneratePeers(1)[0]
asyncLoader.ProcessResponse(context.Background(), responses, blocks)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
resultChan := asyncLoader.AsyncLoad(requestID, link, ipld.LinkContext{})

assertSuccessResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 0)
Expand All @@ -68,10 +66,9 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) {
Action: graphsync.LinkActionMissing,
}}),
}
p := testutil.GeneratePeers(1)[0]
asyncLoader.ProcessResponse(context.Background(), responses, nil)

resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
resultChan := asyncLoader.AsyncLoad(requestID, link, ipld.LinkContext{})
assertFailResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 0)
})
Expand All @@ -82,8 +79,7 @@ func TestAsyncLoadInitialLoadIndeterminateWhenRequestNotInProgress(t *testing.T)
withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) {
link := testutil.NewTestLink()
requestID := graphsync.NewRequestID()
p := testutil.GeneratePeers(1)[0]
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
resultChan := asyncLoader.AsyncLoad(requestID, link, ipld.LinkContext{})
assertFailResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 1)
})
Expand All @@ -100,8 +96,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) {
requestID := graphsync.NewRequestID()
err := asyncLoader.StartRequest(requestID, "")
require.NoError(t, err)
p := testutil.GeneratePeers(1)[0]
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
resultChan := asyncLoader.AsyncLoad(requestID, link, ipld.LinkContext{})

st.AssertAttemptLoadWithoutResult(ctx, t, resultChan)

Expand All @@ -127,8 +122,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenFails(t *testing.T) {
requestID := graphsync.NewRequestID()
err := asyncLoader.StartRequest(requestID, "")
require.NoError(t, err)
p := testutil.GeneratePeers(1)[0]
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
resultChan := asyncLoader.AsyncLoad(requestID, link, ipld.LinkContext{})

st.AssertAttemptLoadWithoutResult(ctx, t, resultChan)

Expand All @@ -152,8 +146,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) {
requestID := graphsync.NewRequestID()
err := asyncLoader.StartRequest(requestID, "")
require.NoError(t, err)
p := testutil.GeneratePeers(1)[0]
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
resultChan := asyncLoader.AsyncLoad(requestID, link, ipld.LinkContext{})
st.AssertAttemptLoadWithoutResult(ctx, t, resultChan)
asyncLoader.CompleteResponsesFor(requestID)
assertFailResponse(ctx, t, resultChan)
Expand All @@ -175,14 +168,13 @@ func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) {
Action: graphsync.LinkActionPresent,
}}),
}
p := testutil.GeneratePeers(1)[0]
asyncLoader.ProcessResponse(context.Background(), responses, blocks)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
resultChan := asyncLoader.AsyncLoad(requestID, link, ipld.LinkContext{})

assertSuccessResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 0)

resultChan = asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
resultChan = asyncLoader.AsyncLoad(requestID, link, ipld.LinkContext{})
assertSuccessResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 1)

Expand All @@ -206,13 +198,12 @@ func TestRegisterUnregister(t *testing.T) {
requestID2 := graphsync.NewRequestID()
err = asyncLoader.StartRequest(requestID2, "other")
require.NoError(t, err)
p := testutil.GeneratePeers(1)[0]
resultChan1 := asyncLoader.AsyncLoad(p, requestID2, link1, ipld.LinkContext{})
resultChan1 := asyncLoader.AsyncLoad(requestID2, link1, ipld.LinkContext{})
assertSuccessResponse(ctx, t, resultChan1)
err = asyncLoader.UnregisterPersistenceOption("other")
require.EqualError(t, err, "cannot unregister while requests are in progress")
asyncLoader.CompleteResponsesFor(requestID2)
asyncLoader.CleanupRequest(p, requestID2)
asyncLoader.CleanupRequest(requestID2)
err = asyncLoader.UnregisterPersistenceOption("other")
require.NoError(t, err)

Expand All @@ -230,13 +221,12 @@ func TestRequestSplittingLoadLocallyFromBlockstore(t *testing.T) {
err := asyncLoader.RegisterPersistenceOption("other", otherSt.lsys)
require.NoError(t, err)
requestID1 := graphsync.NewRequestID()
p := testutil.GeneratePeers(1)[0]

resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link, ipld.LinkContext{})
resultChan1 := asyncLoader.AsyncLoad(requestID1, link, ipld.LinkContext{})
requestID2 := graphsync.NewRequestID()
err = asyncLoader.StartRequest(requestID2, "other")
require.NoError(t, err)
resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link, ipld.LinkContext{})
resultChan2 := asyncLoader.AsyncLoad(requestID2, link, ipld.LinkContext{})

assertFailResponse(ctx, t, resultChan1)
assertSuccessResponse(ctx, t, resultChan2)
Expand All @@ -259,9 +249,8 @@ func TestRequestSplittingSameBlockTwoStores(t *testing.T) {
require.NoError(t, err)
err = asyncLoader.StartRequest(requestID2, "other")
require.NoError(t, err)
p := testutil.GeneratePeers(1)[0]
resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link, ipld.LinkContext{})
resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link, ipld.LinkContext{})
resultChan1 := asyncLoader.AsyncLoad(requestID1, link, ipld.LinkContext{})
resultChan2 := asyncLoader.AsyncLoad(requestID2, link, ipld.LinkContext{})
responses := map[graphsync.RequestID]graphsync.LinkMetadata{
requestID1: message.NewLinkMetadata(
[]message.GraphSyncLinkMetadatum{{
Expand Down Expand Up @@ -298,9 +287,8 @@ func TestRequestSplittingSameBlockOnlyOneResponse(t *testing.T) {
require.NoError(t, err)
err = asyncLoader.StartRequest(requestID2, "other")
require.NoError(t, err)
p := testutil.GeneratePeers(1)[0]
resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link, ipld.LinkContext{})
resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link, ipld.LinkContext{})
resultChan1 := asyncLoader.AsyncLoad(requestID1, link, ipld.LinkContext{})
resultChan2 := asyncLoader.AsyncLoad(requestID2, link, ipld.LinkContext{})
responses := map[graphsync.RequestID]graphsync.LinkMetadata{
requestID2: message.NewLinkMetadata(
[]message.GraphSyncLinkMetadatum{{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"

"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/requestmanager/types"
Expand All @@ -13,7 +12,6 @@ import (
// LoadRequest is a request to load the given link for the given request id,
// with results returned to the given channel
type LoadRequest struct {
p peer.ID
requestID graphsync.RequestID
link ipld.Link
linkContext ipld.LinkContext
Expand All @@ -23,17 +21,16 @@ type LoadRequest struct {
// NewLoadRequest returns a new LoadRequest for the given request id, link,
// and results channel
func NewLoadRequest(
p peer.ID,
requestID graphsync.RequestID,
link ipld.Link,
linkContext ipld.LinkContext,
resultChan chan types.AsyncLoadResult) LoadRequest {
return LoadRequest{p, requestID, link, linkContext, resultChan}
return LoadRequest{requestID, link, linkContext, resultChan}
}

// LoadAttempter attempts to load a link to an array of bytes
// and returns an async load result
type LoadAttempter func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult
type LoadAttempter func(graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult

// LoadAttemptQueue attempts to load using the load attempter, and then can
// place requests on a retry queue
Expand All @@ -52,7 +49,7 @@ func New(loadAttempter LoadAttempter) *LoadAttemptQueue {
// AttemptLoad attempts to loads the given load request, and if retry is true
// it saves the loadrequest for retrying later
func (laq *LoadAttemptQueue) AttemptLoad(lr LoadRequest, retry bool) {
response := laq.loadAttempter(lr.p, lr.requestID, lr.link, lr.linkContext)
response := laq.loadAttempter(lr.requestID, lr.link, lr.linkContext)
if response.Err != nil || response.Data != nil {
lr.resultChan <- response
close(lr.resultChan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

ipld "github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"

"github.com/ipfs/go-graphsync"
Expand All @@ -20,7 +19,7 @@ func TestAsyncLoadInitialLoadSucceeds(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
callCount := 0
loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult {
loadAttempter := func(graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult {
callCount++
return types.AsyncLoadResult{
Data: testutil.RandomBytes(100),
Expand All @@ -31,10 +30,9 @@ func TestAsyncLoadInitialLoadSucceeds(t *testing.T) {
link := testutil.NewTestLink()
linkContext := ipld.LinkContext{}
requestID := graphsync.NewRequestID()
p := testutil.GeneratePeers(1)[0]

resultChan := make(chan types.AsyncLoadResult, 1)
lr := NewLoadRequest(p, requestID, link, linkContext, resultChan)
lr := NewLoadRequest(requestID, link, linkContext, resultChan)
loadAttemptQueue.AttemptLoad(lr, false)

var result types.AsyncLoadResult
Expand All @@ -50,7 +48,7 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
callCount := 0
loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult {
loadAttempter := func(graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult {
callCount++
return types.AsyncLoadResult{
Err: fmt.Errorf("something went wrong"),
Expand All @@ -62,9 +60,8 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) {
linkContext := ipld.LinkContext{}
requestID := graphsync.NewRequestID()
resultChan := make(chan types.AsyncLoadResult, 1)
p := testutil.GeneratePeers(1)[0]

lr := NewLoadRequest(p, requestID, link, linkContext, resultChan)
lr := NewLoadRequest(requestID, link, linkContext, resultChan)
loadAttemptQueue.AttemptLoad(lr, false)

var result types.AsyncLoadResult
Expand All @@ -79,7 +76,7 @@ func TestAsyncLoadInitialLoadIndeterminateRetryFalse(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
callCount := 0
loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult {
loadAttempter := func(graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult {
var result []byte
if callCount > 0 {
result = testutil.RandomBytes(100)
Expand All @@ -95,10 +92,9 @@ func TestAsyncLoadInitialLoadIndeterminateRetryFalse(t *testing.T) {
link := testutil.NewTestLink()
linkContext := ipld.LinkContext{}
requestID := graphsync.NewRequestID()
p := testutil.GeneratePeers(1)[0]

resultChan := make(chan types.AsyncLoadResult, 1)
lr := NewLoadRequest(p, requestID, link, linkContext, resultChan)
lr := NewLoadRequest(requestID, link, linkContext, resultChan)
loadAttemptQueue.AttemptLoad(lr, false)

var result types.AsyncLoadResult
Expand All @@ -114,7 +110,7 @@ func TestAsyncLoadInitialLoadIndeterminateRetryTrueThenRetriedSuccess(t *testing
defer cancel()
callCount := 0
called := make(chan struct{}, 2)
loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult {
loadAttempter := func(graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult {
var result []byte
called <- struct{}{}
if callCount > 0 {
Expand All @@ -131,8 +127,7 @@ func TestAsyncLoadInitialLoadIndeterminateRetryTrueThenRetriedSuccess(t *testing
linkContext := ipld.LinkContext{}
requestID := graphsync.NewRequestID()
resultChan := make(chan types.AsyncLoadResult, 1)
p := testutil.GeneratePeers(1)[0]
lr := NewLoadRequest(p, requestID, link, linkContext, resultChan)
lr := NewLoadRequest(requestID, link, linkContext, resultChan)
loadAttemptQueue.AttemptLoad(lr, true)

testutil.AssertDoesReceiveFirst(t, called, "should attempt load with no result", resultChan, ctx.Done())
Expand All @@ -151,7 +146,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) {
defer cancel()
callCount := 0
called := make(chan struct{}, 2)
loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult {
loadAttempter := func(graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult {
var result []byte
called <- struct{}{}
if callCount > 0 {
Expand All @@ -168,8 +163,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) {
linkContext := ipld.LinkContext{}
requestID := graphsync.NewRequestID()
resultChan := make(chan types.AsyncLoadResult, 1)
p := testutil.GeneratePeers(1)[0]
lr := NewLoadRequest(p, requestID, link, linkContext, resultChan)
lr := NewLoadRequest(requestID, link, linkContext, resultChan)
loadAttemptQueue.AttemptLoad(lr, true)

testutil.AssertDoesReceiveFirst(t, called, "should attempt load with no result", resultChan, ctx.Done())
Expand Down
4 changes: 2 additions & 2 deletions requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ type PeerHandler interface {
type AsyncLoader interface {
StartRequest(graphsync.RequestID, string) error
ProcessResponse(ctx context.Context, responses map[graphsync.RequestID]graphsync.LinkMetadata, blks []blocks.Block)
AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult
AsyncLoad(requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult
CompleteResponsesFor(requestID graphsync.RequestID)
CleanupRequest(p peer.ID, requestID graphsync.RequestID)
CleanupRequest(requestID graphsync.RequestID)
}

// RequestManager tracks outgoing requests and processes incoming reponses
Expand Down
4 changes: 2 additions & 2 deletions requestmanager/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type BlockHooks interface {

// AsyncLoadFn is a function which given a request id and an ipld.Link, returns
// a channel which will eventually return data for the link or an err
type AsyncLoadFn func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) <-chan types.AsyncLoadResult
type AsyncLoadFn func(graphsync.RequestID, ipld.Link, ipld.LinkContext) <-chan types.AsyncLoadResult

// Executor handles actually executing graphsync requests and verifying them.
// It has control of requests when they are in the "running" state, while
Expand Down Expand Up @@ -131,7 +131,7 @@ func (e *Executor) traverse(rt RequestTask) error {
lnk, linkContext := rt.Traverser.CurrentRequest()
// attempt to load
log.Debugf("will load link=%s", lnk)
resultChan := e.loader(rt.P, rt.Request.ID(), lnk, linkContext)
resultChan := e.loader(rt.Request.ID(), lnk, linkContext)
var result types.AsyncLoadResult
// check for immediate result
select {
Expand Down
Loading

0 comments on commit 8c1008c

Please sign in to comment.