-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
jobs: add mechanism to communicate job completion in-memory locally #72297
Conversation
@miretskiy I think this is what you were looking for in #71909. |
Best reviewed commit-by-commit. More fallout from #71800. |
ae23b5e
to
e715df7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still need to think this through a bit more; but just initial set of mostly nits.
pkg/jobs/wait.go
Outdated
// populate the crdb_internal.jobs vtable. | ||
query := fmt.Sprintf( | ||
`SELECT count(*) FROM system.jobs WHERE id IN (%s) | ||
AND (status != $1 AND status != $2 AND status != $3 AND status != $4)`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
paused need to be considered too?
len(jobs), jobs, timeutil.Since(start)) | ||
}() | ||
for i, id := range jobs { | ||
j, err := r.LoadJob(ctx, id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wonder: at what point would just starting a rangefeed and filtering for things we care about be faster?
Certainly would bypass any locking issues, and presumably if the job set cardinality is sufficiently large, running the above count(*) query plus a load for each job might be pretty expensive.
pkg/jobs/wait.go
Outdated
} | ||
|
||
func (r *Registry) waitForJobs( | ||
ctx context.Context, ex sqlutil.InternalExecutor, jobs []jobspb.JobID, done <-chan struct{}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: (maybe silly one) ... I always associate done with something that this func closes when it's done. But it appears that in this case done is a signal to wait to stop waiting. perhaps rename to abortWait or some such?
pkg/jobs/registry.go
Outdated
// That may not have lasted to completion. Separately a goroutine will be | ||
// passively polling for these jobs to complete. If they complete locally, | ||
// the waitingSet will be updated appropriately. | ||
waiting map[jobspb.JobID]map[*waitingSet]struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nothing wrong w/ this.. but I wonder if we need this map of map of pointer complexity?
Wouldn't just a slice do? I understand we have to iterate or whatnot, but we're not expecting thousands of entries in there, do we?
e715df7
to
7f497a7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @dt, @miretskiy, and @stevendanna)
pkg/jobs/registry.go, line 147 at r2 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
nothing wrong w/ this.. but I wonder if we need this map of map of pointer complexity?
Wouldn't just a slice do? I understand we have to iterate or whatnot, but we're not expecting thousands of entries in there, do we?
why do something worse when there's already something better? I'll make it a type. I disagree that it's that complicated.
pkg/jobs/wait.go, line 66 at r2 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
nit: (maybe silly one) ... I always associate done with something that this func closes when it's done. But it appears that in this case done is a signal to wait to stop waiting. perhaps rename to abortWait or some such?
consider ctx.Done()
... but okay
pkg/jobs/wait.go, line 84 at r2 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
paused need to be considered too?
nice, thanks, done
pkg/jobs/wait.go, line 128 at r2 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
i wonder: at what point would just starting a rangefeed and filtering for things we care about be faster?
Certainly would bypass any locking issues, and presumably if the job set cardinality is sufficiently large, running the above count(*) query plus a load for each job might be pretty expensive.
I thought about it and I agree with you but it'd be a lot of complexity for this change. This code is just moved, it's not new. I don't worry about these locking issues on the point lookups really at all. Don't look at anything in this file as new code in the first commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 5 files at r1, 1 of 7 files at r3, 5 of 6 files at r4.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @dt, and @stevendanna)
pkg/jobs/registry.go, line 147 at r2 (raw file):
Previously, ajwerner wrote…
why do something worse when there's already something better? I'll make it a type. I disagree that it's that complicated.
I guess in the eye of a beholder. A small map is less efficient than the small vector or slice. At least that was true in c++; pretty sure it's true here as well. I also find vectors easier to reason about than a map of int to map of pointer (containing channel and a map of int to struct) to struct.
pkg/jobs/registry.go, line 1168 at r4 (raw file):
case StatusPaused: return errors.NewAssertionErrorWithWrappedErrf(jobErr, "job %d: unexpected status %s provided to state machine", job.ID(), status)
Does StatusPaused
branch need r.removeFromWaitingSet
?
In general, I'm worried about the brittleness of this solution. Forgetting to remove from sets
is probably okay -- though I wonder if we'd be leaking resources and how much.
Do we need to remove when say filterAlreadyRunningAndCancelFromPreviousSessions
runs?
Do you think it would make sense to take the existing adoptedJobs map, along w/ waiting set and make a small struct
with methods on it? I can't think of a reason we wouldn't want to remove from waiting set when delete(r.mu.adoptedJobs, id)
. I would replace all uses of the delete with method calls. And similarly put methods on management of waiting set...
pkg/jobs/wait.go, line 86 at r4 (raw file):
// populate the crdb_internal.jobs vtable. query := fmt.Sprintf( `SELECT count(*) FROM system.jobs WHERE id IN (%s)
probably could do w/ out fmt.Sprintf since you built buf anyway?
pkg/jobs/wait.go, line 226 at r4 (raw file):
r.ac.AnnotateCtx(context.Background()), "corruption detected in waiting set for id %d", id, )
log.Fatal makes me sad here. I sort of understand it.. but also, this feels so harsh... And also removeFromWaitingSet below
doesn't do that when deleting from ws.set .
pkg/jobs/wait.go, line 244 at r4 (raw file):
delete(ws.set, id) if len(ws.set) == 0 { close(ws.jobDoneCh)
I have to say: processing this data structure is giving me hard time.
The are multiple reasons for this: for one, it's a very complex data structure.
So complex in fact that it probably needs its own tests imo.
Secondly, this data structure gets mutated all over the place, so it's hard to isolate cause/effect.
Seeing code like this (close(ws.jobDoneCh)
) makes me worried.
removeFromWaitingSets
is called from many places. Could remove
be called for the same id? Would we panic when we close done channel twice?
I can understand the argument that it doesn't happen now, but I hope you can see my concern about brittleness here.
Could an accidental call to removeFromWaitingSets be added where it shouldn't be added, causing a hard to trigger race
condition that our tests will almost certainly not pick up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @dt, and @stevendanna)
pkg/jobs/wait.go, line 244 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I have to say: processing this data structure is giving me hard time.
The are multiple reasons for this: for one, it's a very complex data structure.
So complex in fact that it probably needs its own tests imo.Secondly, this data structure gets mutated all over the place, so it's hard to isolate cause/effect.
Seeing code like this (
close(ws.jobDoneCh)
) makes me worried.
removeFromWaitingSets
is called from many places. Could remove
be called for the same id? Would we panic when we close done channel twice?
I can understand the argument that it doesn't happen now, but I hope you can see my concern about brittleness here.
Could an accidental call to removeFromWaitingSets be added where it shouldn't be added, causing a hard to trigger race
condition that our tests will almost certainly not pick up?
NM: re panicing; ws.set delete above ensures that doesn't happen... Still, literally every line of code requires thinking what's happening here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @dt, and @stevendanna)
pkg/jobs/wait.go, line 240 at r4 (raw file):
r.mu.Lock() defer r.mu.Unlock() sets := r.mu.waiting[id]
can it be nil? I guess that's fine -- range works okay; but is that a bug ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @dt, @miretskiy, and @stevendanna)
pkg/jobs/registry.go, line 147 at r2 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I guess in the eye of a beholder. A small map is less efficient than the small vector or slice. At least that was true in c++; pretty sure it's true here as well. I also find vectors easier to reason about than a map of int to map of pointer (containing channel and a map of int to struct) to struct.
If we had the new slices
package coming in go1.18, I might agree with you. Right now they're such a pain to deal with.
pkg/jobs/registry.go, line 1168 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Does
StatusPaused
branch needr.removeFromWaitingSet
?In general, I'm worried about the brittleness of this solution. Forgetting to remove from sets
is probably okay -- though I wonder if we'd be leaking resources and how much.Do we need to remove when say
filterAlreadyRunningAndCancelFromPreviousSessions
runs?Do you think it would make sense to take the existing adoptedJobs map, along w/ waiting set and make a small struct
with methods on it? I can't think of a reason we wouldn't want to remove from waiting set whendelete(r.mu.adoptedJobs, id)
. I would replace all uses of the delete with method calls. And similarly put methods on management of waiting set...
The thing I feel like you're missing here and below when you worry about leaks, is that we have the fallback loop and we remove the state when the client who is waiting goes away. If we never notified, the code would still be correct, and, in fact, not worse than what was there before this commit. Notifying is an optimization.
pkg/jobs/wait.go, line 86 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
probably could do w/ out fmt.Sprintf since you built buf anyway?
🤷
pkg/jobs/wait.go, line 226 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
log.Fatal makes me sad here. I sort of understand it.. but also, this feels so harsh... And also removeFromWaitingSet below
doesn't do that when deleting from ws.set .
Sure, I'll make it less harsh.
pkg/jobs/wait.go, line 244 at r4 (raw file):
Still, literally every line of code requires thinking what's happening here.
🤔 Is it really that subtle? On some level, doesn't every line of code require thinking? Would more commentary put you at ease? There's one place where the channel is closed and before it's closed, it's removed from the data structure, all of that happens under a mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 6 files at r4.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @dt, @miretskiy, and @stevendanna)
pkg/jobs/registry.go, line 1168 at r4 (raw file):
Previously, ajwerner wrote…
The thing I feel like you're missing here and below when you worry about leaks, is that we have the fallback loop and we remove the state when the client who is waiting goes away. If we never notified, the code would still be correct, and, in fact, not worse than what was there before this commit. Notifying is an optimization.
I see... Which loop also removes from this map as a cleanup?
pkg/jobs/wait.go, line 130 at r4 (raw file):
defer func() { log.Infof(ctx, "waited for %d %v queued jobs to complete %v", len(jobs), jobs, timeutil.Since(start))
presumably this doesn't run frequently?
pkg/jobs/wait.go, line 244 at r4 (raw file):
Previously, ajwerner wrote…
Still, literally every line of code requires thinking what's happening here.
🤔 Is it really that subtle? On some level, doesn't every line of code require thinking? Would more commentary put you at ease? There's one place where the channel is closed and before it's closed, it's removed from the data structure, all of that happens under a mutex.
Very true -- code requires thinking. But simpler code requires a lot less of that. More commentary is "more better" :)
Perhaps not on deletes or whatnot; but on this function and the lifetime of the waiting sets as a whole. That comment you left on top (re this being
an optimization is important one); who does the cleanup, when things get removed, etc. This type of commentary would put me more at ease.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @dt, @miretskiy, and @stevendanna)
pkg/jobs/registry.go, line 1168 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I see... Which loop also removes from this map as a cleanup?
The r.Run
method where we call installWaitingSet
after that we defer the removal. Then, underneath that, we call r.wait
which polls the status. If the goroutine watching either sees the status change or exits for whatever reason, the state will be cleaned up.
c774720
to
86d02a1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I added ample commentary, did a bit of cleanup, and removed the fatal.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @dt, @miretskiy, and @stevendanna)
pkg/jobs/wait.go, line 86 at r4 (raw file):
Previously, ajwerner wrote…
🤷
Refactored this to be better.
pkg/jobs/wait.go, line 130 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
presumably this doesn't run frequently?
It runs every time a job gets run, but we log so much for that that it didn't seem like a big deal.
pkg/jobs/wait.go, line 226 at r4 (raw file):
Previously, ajwerner wrote…
Sure, I'll make it less harsh.
Done.
pkg/jobs/wait.go, line 240 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
can it be nil? I guess that's fine -- range works okay; but is that a bug ?
It's not at all a bug if it's nil. It'll commonly be nil. I am relying on nil semantics working here. I typed a comment but it seemed better to just check the bool and avoid ambiguity.
pkg/jobs/wait.go, line 244 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Very true -- code requires thinking. But simpler code requires a lot less of that. More commentary is "more better" :)
Perhaps not on deletes or whatnot; but on this function and the lifetime of the waiting sets as a whole. That comment you left on top (re this being
an optimization is important one); who does the cleanup, when things get removed, etc. This type of commentary would put me more at ease.
More commentary here and elsewhere.
86d02a1
to
5301a3e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 6 files at r6.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @dt, @miretskiy, and @stevendanna)
pkg/jobs/wait.go, line 195 at r6 (raw file):
// set is an optimization, the caller still polls the job state to wait for it // to transition to a terminal status (or paused). This is unavoidable: the job // may end up running elsewhere.
Love it. Thanks for writing this up.
Release note: None
This change does two things. Firstly, when local transactions create jobs which are pre-claimed by the current registry, there's no need for the registry to go search to find these job IDs. Instead, it can just directly attempt to resume them. This nicely avoids a bunch of contention. Then, when we wait for the jobs to complete, we can avoid polling the status of the jobs table in the common case and instead wait for an in-memory notification. This is beneficial because it reduces contention on the job record of the running job. Release note (performance improvement): Improved job performance in the face of concurrent schema changes by reducing contention.
5301a3e
to
eb31fd4
Compare
bors r+ |
Build succeeded: |
The first commit just moves some code out into a new file to make the second
commit more obvious.
This change does two things. Firstly, when local transactions create jobs
which are pre-claimed by the current registry, there's no need for the
registry to go search to find these job IDs. Instead, it can just directly
attempt to resume them. This nicely avoids a bunch of contention. Then,
when we wait for the jobs to complete, we can avoid polling the status
of the jobs table in the common case and instead wait for an in-memory
notification. This is beneficial because it reduces contention on the
job record of the running job.
Epic: CRDB-10719
Release note (performance improvement): Improved job performance in the
face of concurrent schema changes by reducing contention.