Skip to content

Commit

Permalink
Fix broken AnswerQueue behavior with chained pipeline calls
Browse files Browse the repository at this point in the history
AnswerQueue does not handle chained pipeline calls correctly.
I'm assuming this is because the code for it might be incomplete. It doesn't assign basis recv functions for any bases other than 0 and uses the wrong value for the basis itself on subsequent chained calls.

This fixes those bugs and adds more documentation about how AnswerQueue is meant to function because I had to stare at it for 2 hours to figure it out.
  • Loading branch information
ObsidianMinor committed Feb 19, 2025
1 parent 3adab7c commit ddd72bb
Showing 1 changed file with 31 additions and 5 deletions.
36 changes: 31 additions & 5 deletions answerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,30 @@ type AnswerQueue struct {
method Method
draining chan struct{} // closed while exiting queueing state

mu sync.Mutex
q []qent // non-nil while queueing
bases []base // set when drain starts. len(bases) >= 1
mu sync.Mutex
// The entries of the queue. The queue refers to method calls
// after an earlier call finishes. But not all entries need to refer
// to the same call. Entries can refer to *earlier* entries in
// the queue as their "basis". The basis for the initial fulfilled
// value is always 0. Take for instance the call chain:
//
// A() -> a
// a.B() -> b
// b.C() -> c
// a.D() -> d
//
// Call A is pipelined to B which is pipelined to C. This would
// produce a chain of 2 qents with the bases 0 and 1, where 0 is
// the fulfilled result of A, and 1 is the result of B. If we add
// in a qent to the end for call D pipelined from A, it would have
// a basis 0 like with call B.
//
// This field is set to nil when draining starts.
q []qent
// Message targets derived from applying a qent. Its length always
// at least 1, since the 0 index is used for the initial fulfilled result.
// This is set when draining starts.
bases []base
}

// qent is a single entry in an AnswerQueue.
Expand Down Expand Up @@ -77,7 +98,10 @@ func (aq *AnswerQueue) Fulfill(ptr Ptr) {
for i := range q {
ent := &q[i]
recv := aq.bases[ent.basis].recv
recv(ent.ctx, ent.path, ent.Recv)
pcall := recv(ent.ctx, ent.path, ent.Recv)
// The basis for our result will always be our index in the queue + 1
// since 0 is used for the initial fulfilled result.
aq.bases[i+1].recv = pcall.PipelineRecv
}
}

Expand Down Expand Up @@ -146,7 +170,9 @@ func (qc queueCaller) PipelineRecv(ctx context.Context, transform []PipelineOp,
path: transform,
Recv: r,
})
basis := len(qc.aq.q) - 1
// The basis for our result will always be our index in the queue + 1
// since 0 is used for the initial fulfilled result.
basis := len(qc.aq.q)
qc.aq.mu.Unlock()
return queueCaller{aq: qc.aq, basis: basis}
}
Expand Down

0 comments on commit ddd72bb

Please sign in to comment.