-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests #284
Conversation
taskqueue/taskqueue.go
Outdated
@@ -104,7 +117,15 @@ func (tq *WorkerTaskQueue) worker(executor Executor) { | |||
} | |||
} | |||
for _, task := range tasks { | |||
tq.activeTasks++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this atomic(-enough) to safely do this for a shared int
field in a struct between go routines, or do I need to put a mutex around these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, apparently not, "DATA RACE", am trying out sync/atomic
for this
Ironic that this is failing on one of the flaky tests I was hoping to be able to deal with with this change. Sadly, as per #273 (comment), I tried wiring this up for that and it didn't work. But it does work for #283. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests * fixup! feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests
* feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests * fixup! feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests
* feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests * fixup! feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Late to the party, but hopefully this is still useful.
select { | ||
case <-tq.ctx.Done(): | ||
return | ||
case <-tq.noTaskSignal: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this case return
too? if you got the signal that there were zero active tasks a moment ago, you should be able to return.
Perhaps what you're trying to do is double-check that with the activeTasks
counter, but then WaitForNoActiveTasks
is blocking for the counter to reach zero on two instances, not just one. I imagine just one is still fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you apply this change, then you can also simplify the code by using an if
rather than a for
on the atomic load.
@@ -43,6 +46,7 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue { | |||
cancelFn: cancelFn, | |||
peerTaskQueue: peertaskqueue.New(), | |||
workSignal: make(chan struct{}, 1), | |||
noTaskSignal: make(chan struct{}, 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My two cents, and this might be why the method isn't working as expected for you:
- the channel only buffers one "no tasks" signal
worker
only tries to signal via the channel when finishing a task results inactiveTasks
being 0WaitForNoActiveTasks
consumes that channel signal
So, for instance, if two WaitForNoActiveTasks
calls come in and the worker performs one task, it will only send one signal to the channel, and only one of the two WaitForNoActiveTasks
calls will grab that signal - the other call will potentially block forever.
You want a "broadcast signal", which unfortunately doesn't have a primitive in Go right now - channels are one-to-one. You can use close
as a way to broadcast to infinite receivers, but that closes the channel forever, not allowing its reuse.
If you really only call WaitForNoActiveTasks
once at a time, then your code isn't buggy, but I'd still argue it's prone to misuse :) So I would adapt its code to make it panic if called while it's already waiting.
If you do want to support concurrent Wait calls, then we'd need a different approach. Happy to bounce some ideas if that's the case.
feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests (#284) * feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests * fixup! feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests fix(responsemanager): fix flaky tests fix(responsemanager): make fix more global feat: add basic OT tracing for incoming requests Closes: #271 docs(tests): document tracing test helper utilities fix(test): increase 1s timeouts to 2s for slow CI (#289) * fix(test): increase 1s timeouts to 2s for slow CI * fixup! fix(test): increase 1s timeouts to 2s for slow CI testutil/chaintypes: simplify maintenance of codegen (#294) "go generate" now updates the generated code for us. The separate directory for a main package was unnecessary; a build-tag-ignored file is enough. Using gofmt on the resulting source is now unnecessary too, as upstream has been using go/format on its output for some time. Finally, re-generate the output source code, as the last time that was done we were on an older ipld-prime. ipldutil: use chooser APIs from dagpb and basicnode (#292) Saves us a bit of extra code, since they were added in summer. Also avoid making defaultVisitor a variable, which makes it clearer that it's never a nil func. While here, replace node/basic with node/basicnode, as the former has been deprecated in favor of the latter. Co-authored-by: Hannah Howard <[email protected]> fix: use sync.Cond to handle no-task blocking wait (#299) Ref: #284 Peer Stats function (#298) * feat(graphsync): add impl method for peer stats add method that gets current request states by request ID for a given peer * fix(requestmanager): fix tested method Add a bit of logging (#301) * chore(responsemanager): add a bit of logging * fix(responsemanager): remove code change chore: short-circuit unnecessary message processing Expose task queue diagnostics (#302) * feat(impl): expose task queue diagnostics * refactor(peerstate): put peerstate in its own module * refactor(peerstate): make diagnostics return array
feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests (#284) * feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests * fixup! feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests fix(responsemanager): fix flaky tests fix(responsemanager): make fix more global feat: add basic OT tracing for incoming requests Closes: #271 docs(tests): document tracing test helper utilities fix(test): increase 1s timeouts to 2s for slow CI (#289) * fix(test): increase 1s timeouts to 2s for slow CI * fixup! fix(test): increase 1s timeouts to 2s for slow CI testutil/chaintypes: simplify maintenance of codegen (#294) "go generate" now updates the generated code for us. The separate directory for a main package was unnecessary; a build-tag-ignored file is enough. Using gofmt on the resulting source is now unnecessary too, as upstream has been using go/format on its output for some time. Finally, re-generate the output source code, as the last time that was done we were on an older ipld-prime. ipldutil: use chooser APIs from dagpb and basicnode (#292) Saves us a bit of extra code, since they were added in summer. Also avoid making defaultVisitor a variable, which makes it clearer that it's never a nil func. While here, replace node/basic with node/basicnode, as the former has been deprecated in favor of the latter. Co-authored-by: Hannah Howard <[email protected]> fix: use sync.Cond to handle no-task blocking wait (#299) Ref: ipfs/go-graphsync#284 Peer Stats function (#298) * feat(graphsync): add impl method for peer stats add method that gets current request states by request ID for a given peer * fix(requestmanager): fix tested method Add a bit of logging (#301) * chore(responsemanager): add a bit of logging * fix(responsemanager): remove code change chore: short-circuit unnecessary message processing Expose task queue diagnostics (#302) * feat(impl): expose task queue diagnostics * refactor(peerstate): put peerstate in its own module * refactor(peerstate): make diagnostics return array
feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests (#284) * feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests * fixup! feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests fix(responsemanager): fix flaky tests fix(responsemanager): make fix more global feat: add basic OT tracing for incoming requests Closes: #271 docs(tests): document tracing test helper utilities fix(test): increase 1s timeouts to 2s for slow CI (#289) * fix(test): increase 1s timeouts to 2s for slow CI * fixup! fix(test): increase 1s timeouts to 2s for slow CI testutil/chaintypes: simplify maintenance of codegen (#294) "go generate" now updates the generated code for us. The separate directory for a main package was unnecessary; a build-tag-ignored file is enough. Using gofmt on the resulting source is now unnecessary too, as upstream has been using go/format on its output for some time. Finally, re-generate the output source code, as the last time that was done we were on an older ipld-prime. ipldutil: use chooser APIs from dagpb and basicnode (#292) Saves us a bit of extra code, since they were added in summer. Also avoid making defaultVisitor a variable, which makes it clearer that it's never a nil func. While here, replace node/basic with node/basicnode, as the former has been deprecated in favor of the latter. Co-authored-by: Hannah Howard <[email protected]> fix: use sync.Cond to handle no-task blocking wait (#299) Ref: ipfs/go-graphsync#284 Peer Stats function (#298) * feat(graphsync): add impl method for peer stats add method that gets current request states by request ID for a given peer * fix(requestmanager): fix tested method Add a bit of logging (#301) * chore(responsemanager): add a bit of logging * fix(responsemanager): remove code change chore: short-circuit unnecessary message processing Expose task queue diagnostics (#302) * feat(impl): expose task queue diagnostics * refactor(peerstate): put peerstate in its own module * refactor(peerstate): make diagnostics return array
feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests (#284) * feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests * fixup! feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests fix(responsemanager): fix flaky tests fix(responsemanager): make fix more global feat: add basic OT tracing for incoming requests Closes: #271 docs(tests): document tracing test helper utilities fix(test): increase 1s timeouts to 2s for slow CI (#289) * fix(test): increase 1s timeouts to 2s for slow CI * fixup! fix(test): increase 1s timeouts to 2s for slow CI testutil/chaintypes: simplify maintenance of codegen (#294) "go generate" now updates the generated code for us. The separate directory for a main package was unnecessary; a build-tag-ignored file is enough. Using gofmt on the resulting source is now unnecessary too, as upstream has been using go/format on its output for some time. Finally, re-generate the output source code, as the last time that was done we were on an older ipld-prime. ipldutil: use chooser APIs from dagpb and basicnode (#292) Saves us a bit of extra code, since they were added in summer. Also avoid making defaultVisitor a variable, which makes it clearer that it's never a nil func. While here, replace node/basic with node/basicnode, as the former has been deprecated in favor of the latter. Co-authored-by: Hannah Howard <[email protected]> fix: use sync.Cond to handle no-task blocking wait (#299) Ref: ipfs/go-graphsync#284 Peer Stats function (#298) * feat(graphsync): add impl method for peer stats add method that gets current request states by request ID for a given peer * fix(requestmanager): fix tested method Add a bit of logging (#301) * chore(responsemanager): add a bit of logging * fix(responsemanager): remove code change chore: short-circuit unnecessary message processing Expose task queue diagnostics (#302) * feat(impl): expose task queue diagnostics * refactor(peerstate): put peerstate in its own module * refactor(peerstate): make diagnostics return array
No description provided.