-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sql: add SplitAndScatter processor #51057
Conversation
3dddddd
to
9a0b24c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I know this was an early idea that you moved away from after talking to DistSQL folks, but I was wondering if you could reiterate the reason we don't like it for me: The RangeRouter has a mapping of spans to streams, and can be configured to look in a specific column for the key that it will then use to lookup in those spans the correct stream, right? If we have N nodes, couldn't we just construct N roachpb.Keys e.g. as /n1, /n2, /n3 in a nodeKeys map from ID to key, install routings for each to its stream, and then just stick nodeKeys[entry.node]
in the designated column of the row that the router will look at, instead of installing a new mapping for every individual row?
Reviewed 2 of 2 files at r1, 6 of 9 files at r2.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @dt, and @pbardea)
pkg/sql/rowflow/routers.go, line 459 at r2 (raw file):
// ControlledRouter allows the processor to edit the router for dynamic routing // of rows. type ControlledRouter interface {
see below, but I might want another method in this API to clear the route mappings and/or to replace them with a passed set (which could be empty), instead of just being able to append.
pkg/sql/rowflow/routers.go, line 469 at r2 (raw file):
// Direct implements the ControlledRouter interface. func (rr *rangeRouter) Direct(span execinfrapb.OutputRouterSpec_RangeRouterSpec_Span) { rr.spans = append(rr.spans, span)
just
pkg/sql/rowflow/routers.go, line 470 at r2 (raw file):
func (rr *rangeRouter) Direct(span execinfrapb.OutputRouterSpec_RangeRouterSpec_Span) { rr.spans = append(rr.spans, span) // TODO: Insert in the correct place rather than sort every time.
even with O(logn) to find insertion position, we'd need o(n) to shift the remainder over, no?
sounds like a good place to use a tree (my kingdom for generic containers).
That said, I think in our usage, we're literally writing before every single push, so I think we could actually just replace the entire routes slice with our single entry each time and not worry about maintaining the sort of a multi-item slice?
b37ab46
to
ed95714
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial idea was to have the route determined by a field in the metadata field - but then we when evaluating it we wanted use the range router + route each span to the destination processor. Putting in a marker key rather than the span key and using that as the routing metric for the range router didn't occur to me, but it is a simpler solution - it removes the need to update the mapping entirely. Updated.
It looks like the router expects the bytes from the encoded key (rather than the underlying byte stream itself), so I added some helpers to help with that, rather than a hash-map.
I also added a test case such that the router doesn't have a mapping to one of the nodes to test the router's default stream.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @dt)
pkg/sql/rowflow/routers.go, line 459 at r2 (raw file):
Previously, dt (David Taylor) wrote…
see below, but I might want another method in this API to clear the route mappings and/or to replace them with a passed set (which could be empty), instead of just being able to append.
Removed this API to opt for just dynamically choosing the key.
pkg/sql/rowflow/routers.go, line 470 at r2 (raw file):
Previously, dt (David Taylor) wrote…
even with O(logn) to find insertion position, we'd need o(n) to shift the remainder over, no?
sounds like a good place to use a tree (my kingdom for generic containers).That said, I think in our usage, we're literally writing before every single push, so I think we could actually just replace the entire routes slice with our single entry each time and not worry about maintaining the sort of a multi-item slice?
Removed.
fe44dfe
to
2cb5bf3
Compare
2cb5bf3
to
1d9ca26
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @dt, and @pbardea)
pkg/ccl/backupccl/split_and_scatter_processor.go, line 45 at r3 (raw file):
// splitAndScatterKey implements the splitAndScatterer interface. func (s *dbSplitAndScatterer) splitAndScatterKey(
nit: doesn't seem like this method is stateful, consider using a value receiver instead
pkg/ccl/backupccl/split_and_scatter_processor.go, line 169 at r3 (raw file):
// point sending this error through the stream. Something bad has // happened. // TODO: Consider if we just want to panic here.
Better not to panic, we've made an effort to move away from anything that would crash a server during execution.
pkg/ccl/backupccl/split_and_scatter_processor.go, line 209 at r3 (raw file):
defer close(importSpanChunksCh) for _, importSpanChunk := range spec.Chunks { _, err := (*scatterer).splitAndScatterKey(ctx, db, kr, importSpanChunk.Entries[0].Span.Key)
Why pass the scatterer by pointer?
1d9ca26
to
5da469a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @dt)
pkg/ccl/backupccl/split_and_scatter_processor.go, line 45 at r3 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
nit: doesn't seem like this method is stateful, consider using a value receiver instead
Done.
pkg/ccl/backupccl/split_and_scatter_processor.go, line 169 at r3 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
Better not to panic, we've made an effort to move away from anything that would crash a server during execution.
Done.
pkg/ccl/backupccl/split_and_scatter_processor.go, line 209 at r3 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
Why pass the scatterer by pointer?
The mock implementation maintains state. I went back and forth between this method and persisting state outside the struct in the test, which makes the testing code a little messier but cleans up the implementation. I ended up opting preferring this method over storing the state externally in the mock - but am not too attached with this approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @dt, and @pbardea)
pkg/ccl/backupccl/split_and_scatter_processor.go, line 209 at r3 (raw file):
Previously, pbardea (Paul Bardea) wrote…
The mock implementation maintains state. I went back and forth between this method and persisting state outside the struct in the test, which makes the testing code a little messier but cleans up the implementation. I ended up opting preferring this method over storing the state externally in the mock - but am not too attached with this approach.
splitAndScatterer
is an interface though, right? Unless I'm missing something, you can implement the interface using a value receiver for the production code and a pointer receiver and pass in either (https://play.golang.org/p/bdnR7bAYVBO)
b51d29f
to
4dfec75
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @dt)
pkg/ccl/backupccl/split_and_scatter_processor.go, line 209 at r3 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
splitAndScatterer
is an interface though, right? Unless I'm missing something, you can implement the interface using a value receiver for the production code and a pointer receiver and pass in either (https://play.golang.org/p/bdnR7bAYVBO)
Ah, right. Done.
c1a0563
to
dd6ded1
Compare
@asubiotto Just wanted to check if you had any comments to add? |
Just realized Alfonso was OOO - @yuzefovich could you take a quick look to make sure that this is sane from the distsql perspective? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Reviewed 2 of 5 files at r3, 2 of 3 files at r4, 2 of 2 files at r5.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @dt)
TFTRs |
Build failed (retrying...) |
Build failed (retrying...) |
Build failed (retrying...) |
Build failed |
Looks like you got merge-skew'ed with @andreimatei's #51168 well done bors, you actually caught some actual merge skew -- have gold star. I hope the hundreds of hours we've wasted on bors flakes since the last time it caught something was worth the minor irritation it would have been to catch this one after the fact (lol, jk, I know it will never come close). |
Release note: None
dd6ded1
to
175781e
Compare
bors r+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @dt, @pbardea, and @yuzefovich)
pkg/ccl/backupccl/split_and_scatter_processor.go, line 66 at r6 (raw file):
log.VEventf(ctx, 1, "scattering new key %+v", newSpanKey) var ba roachpb.BatchRequest ba.Header.ReturnRangeInfo = true
Please don't do this. As I was saying in #51172 (review), I'm in the process of getting rid of this ReturnRangeInfo
field. And also Im making br.RangeInfos
not available above the DistSender
. It's imminent.
Look at, for example, #51378 where I move another guy that was doing what you're doing here.
bors r- |
Canceled |
The SplitAndScatter processor will be used by restore to distribute the spans we're restoring across the cluster. This enables all nodes to be used when restoring the data, as each node will import the data it will become the leaseholder of. Release note: None
175781e
to
920e363
Compare
Sorry I missed your other comment. Since this code path isn't actually used yet, I removed it from this PR and will add it separately. |
Merging the processor for now as I will address getting the lease node in a separate PR. |
Build succeeded |
The SplitAndScatter processor will be used by restore to distribute the
spans we're restoring across the cluster. This enables all nodes to be
used when restoring the data, as each node will import the data it will
become the leaseholder of.
Release note: None