Skip to content

Commit

Permalink
fixup! feat(graphsync): unify req & resp Pause, Unpause & Cancel by R…
Browse files Browse the repository at this point in the history
…equestID
  • Loading branch information
rvagg committed Feb 14, 2022
1 parent f5e9fa1 commit cd80a9b
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 36 deletions.
8 changes: 4 additions & 4 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ func (rm *ResponseManager) synchronize() {
}

// StartTask starts the given task from the peer task queue
func (rm *ResponseManager) StartTask(task *peertask.Task, responseTaskChan chan<- queryexecutor.ResponseTask) {
rm.send(&startTaskRequest{task, responseTaskChan}, nil)
func (rm *ResponseManager) StartTask(task *peertask.Task, p peer.ID, responseTaskChan chan<- queryexecutor.ResponseTask) {
rm.send(&startTaskRequest{task, p, responseTaskChan}, nil)
}

// GetUpdates is called to read pending updates for a task and clear them
Expand All @@ -210,9 +210,9 @@ func (rm *ResponseManager) GetUpdates(requestID graphsync.RequestID, updatesChan
}

// FinishTask marks a task from the task queue as done
func (rm *ResponseManager) FinishTask(task *peertask.Task, err error) {
func (rm *ResponseManager) FinishTask(task *peertask.Task, p peer.ID, err error) {
done := make(chan struct{}, 1)
rm.send(&finishTaskRequest{task, err, done}, nil)
rm.send(&finishTaskRequest{task, p, err, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
Expand Down
6 changes: 4 additions & 2 deletions responsemanager/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,13 @@ func (rur *responseUpdateRequest) handle(rm *ResponseManager) {

type finishTaskRequest struct {
task *peertask.Task
p peer.ID
err error
done chan struct{}
}

func (ftr *finishTaskRequest) handle(rm *ResponseManager) {
rm.finishTask(ftr.task, ftr.err)
rm.finishTask(ftr.task, ftr.p, ftr.err)
select {
case <-rm.ctx.Done():
case ftr.done <- struct{}{}:
Expand All @@ -100,11 +101,12 @@ func (ftr *finishTaskRequest) handle(rm *ResponseManager) {

type startTaskRequest struct {
task *peertask.Task
p peer.ID
taskDataChan chan<- queryexecutor.ResponseTask
}

func (str *startTaskRequest) handle(rm *ResponseManager) {
taskData := rm.startTask(str.task)
taskData := rm.startTask(str.task, str.p)

select {
case <-rm.ctx.Done():
Expand Down
8 changes: 4 additions & 4 deletions responsemanager/queryexecutor/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (qe *QueryExecutor) ExecuteTask(_ context.Context, pid peer.ID, task *peert
// StartTask lets us block until this task is at the top of the execution stack
responseTaskChan := make(chan ResponseTask)
var rt ResponseTask
qe.manager.StartTask(task, responseTaskChan)
qe.manager.StartTask(task, pid, responseTaskChan)
select {
case rt = <-responseTaskChan:
case <-qe.ctx.Done():
Expand All @@ -109,7 +109,7 @@ func (qe *QueryExecutor) ExecuteTask(_ context.Context, pid peer.ID, task *peert
span.SetStatus(codes.Error, err.Error())
}
}
qe.manager.FinishTask(task, err)
qe.manager.FinishTask(task, pid, err)
log.Debugw("finishing response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
return false
}
Expand Down Expand Up @@ -279,9 +279,9 @@ func (qe *QueryExecutor) sendResponse(ctx context.Context, p peer.ID, taskData R

// Manager providers an interface to the response manager
type Manager interface {
StartTask(task *peertask.Task, responseTaskChan chan<- ResponseTask)
StartTask(task *peertask.Task, p peer.ID, responseTaskChan chan<- ResponseTask)
GetUpdates(requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest)
FinishTask(task *peertask.Task, err error)
FinishTask(task *peertask.Task, p peer.ID, err error)
}

// BlockHooks is an interface for processing block hooks
Expand Down
11 changes: 7 additions & 4 deletions responsemanager/queryexecutor/queryexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,11 @@ func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData,
td := &testData{}
td.t = t
td.ctx, td.cancel = context.WithTimeout(ctx, 10*time.Second)
td.peer = testutil.GeneratePeers(1)[0]
td.blockStore = make(map[ipld.Link][]byte)
td.persistence = testutil.NewTestStore(td.blockStore)
td.task = &peertask.Task{}
td.manager = &fauxManager{ctx: ctx, t: t, expectedStartTask: td.task}
td.manager = &fauxManager{ctx: ctx, t: t, expectedStartTask: td.task, expectedPeer: td.peer}
td.blockHooks = hooks.NewBlockHooks()
td.updateHooks = hooks.NewUpdateHooks()
td.requestID = graphsync.NewRequestID()
Expand All @@ -280,7 +281,6 @@ func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData,
td.extensionData = basicnode.NewBytes(testutil.RandomBytes(100))
td.extensionName = graphsync.ExtensionName("AppleSauce/McGee")
td.responseCode = graphsync.ResponseStatusCode(101)
td.peer = testutil.GeneratePeers(1)[0]

td.extension = graphsync.ExtensionData{
Name: td.extensionName,
Expand Down Expand Up @@ -367,10 +367,12 @@ type fauxManager struct {
t *testing.T
responseTask ResponseTask
expectedStartTask *peertask.Task
expectedPeer peer.ID
}

func (fm *fauxManager) StartTask(task *peertask.Task, responseTaskChan chan<- ResponseTask) {
func (fm *fauxManager) StartTask(task *peertask.Task, p peer.ID, responseTaskChan chan<- ResponseTask) {
require.Same(fm.t, fm.expectedStartTask, task)
require.Equal(fm.t, fm.expectedPeer, p)
go func() {
select {
case <-fm.ctx.Done():
Expand All @@ -382,7 +384,8 @@ func (fm *fauxManager) StartTask(task *peertask.Task, responseTaskChan chan<- Re
func (fm *fauxManager) GetUpdates(requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest) {
}

func (fm *fauxManager) FinishTask(task *peertask.Task, err error) {
func (fm *fauxManager) FinishTask(task *peertask.Task, p peer.ID, err error) {
require.Equal(fm.t, fm.expectedPeer, p)
}

type fauxResponseStream struct {
Expand Down
39 changes: 17 additions & 22 deletions responsemanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ import (
// The code in this file implements the internal thread for the response manager.
// These functions can modify the internal state of the ResponseManager

type queueTopic struct {
p peer.ID
requestID graphsync.RequestID
}

func (rm *ResponseManager) cleanupInProcessResponses() {
for _, response := range rm.inProgressResponses {
response.cancelFn()
Expand Down Expand Up @@ -137,14 +132,14 @@ func (rm *ResponseManager) unpauseRequest(requestID graphsync.RequestID, extensi
return nil
})
}
rm.responseQueue.PushTask(inProgressResponse.peer, peertask.Task{Topic: queueTopic{inProgressResponse.peer, requestID}, Priority: math.MaxInt32, Work: 1})
rm.responseQueue.PushTask(inProgressResponse.peer, peertask.Task{Topic: requestID, Priority: math.MaxInt32, Work: 1})
return nil
}

func (rm *ResponseManager) abortRequest(ctx context.Context, requestID graphsync.RequestID, err error) error {
response, ok := rm.inProgressResponses[requestID]
if ok {
rm.responseQueue.Remove(queueTopic{response.peer, requestID}, response.peer)
rm.responseQueue.Remove(requestID, response.peer)
}
if !ok || response.state == graphsync.CompletingSend {
return graphsync.RequestNotFoundErr{}
Expand Down Expand Up @@ -258,7 +253,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync
}
// TODO: Use a better work estimation metric.

rm.responseQueue.PushTask(p, peertask.Task{Topic: queueTopic{p, request.ID()}, Priority: int(request.Priority()), Work: 1})
rm.responseQueue.PushTask(p, peertask.Task{Topic: request.ID(), Priority: int(request.Priority()), Work: 1})
}
}

Expand Down Expand Up @@ -297,28 +292,28 @@ func (rm *ResponseManager) taskDataForKey(requestID graphsync.RequestID) queryex
}
}

func (rm *ResponseManager) startTask(task *peertask.Task) queryexecutor.ResponseTask {
key := task.Topic.(queueTopic)
taskData := rm.taskDataForKey(key.requestID)
func (rm *ResponseManager) startTask(task *peertask.Task, p peer.ID) queryexecutor.ResponseTask {
requestID := task.Topic.(graphsync.RequestID)
taskData := rm.taskDataForKey(requestID)
if taskData.Empty {
rm.responseQueue.TaskDone(key.p, task)
rm.responseQueue.TaskDone(p, task)
}

return taskData
}

func (rm *ResponseManager) finishTask(task *peertask.Task, err error) {
key := task.Topic.(queueTopic)
rm.responseQueue.TaskDone(key.p, task)
response, ok := rm.inProgressResponses[key.requestID]
func (rm *ResponseManager) finishTask(task *peertask.Task, p peer.ID, err error) {
requestID := task.Topic.(graphsync.RequestID)
rm.responseQueue.TaskDone(p, task)
response, ok := rm.inProgressResponses[requestID]
if !ok {
return
}
if _, ok := err.(hooks.ErrPaused); ok {
response.state = graphsync.Paused
return
}
log.Infow("graphsync response processing complete (messages stil sending)", "request id", key.requestID.String(), "peer", key.p, "total time", time.Since(response.startTime))
log.Infow("graphsync response processing complete (messages stil sending)", "request id", requestID.String(), "peer", p, "total time", time.Since(response.startTime))

if err != nil {
response.span.RecordError(err)
Expand All @@ -327,13 +322,13 @@ func (rm *ResponseManager) finishTask(task *peertask.Task, err error) {
}

if ipldutil.IsContextCancelErr(err) {
rm.cancelledListeners.NotifyCancelledListeners(key.p, response.request)
rm.terminateRequest(key.requestID)
rm.cancelledListeners.NotifyCancelledListeners(p, response.request)
rm.terminateRequest(requestID)
return
}

if err == queryexecutor.ErrNetworkError {
rm.terminateRequest(key.requestID)
rm.terminateRequest(requestID)
return
}

Expand Down Expand Up @@ -385,11 +380,11 @@ func fromPeerTopics(pt *peertracker.PeerTrackerTopics) peerstate.TaskQueueState
}
active := make([]graphsync.RequestID, 0, len(pt.Active))
for _, topic := range pt.Active {
active = append(active, topic.(queueTopic).requestID)
active = append(active, topic.(graphsync.RequestID))
}
pending := make([]graphsync.RequestID, 0, len(pt.Pending))
for _, topic := range pt.Pending {
pending = append(pending, topic.(queueTopic).requestID)
pending = append(pending, topic.(graphsync.RequestID))
}
return peerstate.TaskQueueState{
Active: active,
Expand Down

0 comments on commit cd80a9b

Please sign in to comment.