-
Notifications
You must be signed in to change notification settings - Fork 911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor to use new protos and persistence #4194
Refactor to use new protos and persistence #4194
Conversation
func generateHLCTimestamp(clock clockpb.HybridLogicalClock) clockpb.HybridLogicalClock { | ||
wallclock := time.Now().UnixMilli() | ||
// Ensure time does not move backwards | ||
if wallclock < clock.GetWallClock() { | ||
wallclock = clock.GetWallClock() | ||
} | ||
// Ensure timestamp is monotonically increasing | ||
if wallclock == clock.GetWallClock() { | ||
clock.Version = clock.GetVersion() + 1 | ||
} else { | ||
clock.Version = 0 | ||
clock.WallClock = wallclock | ||
} | ||
|
||
return clockpb.HybridLogicalClock{WallClock: wallclock, Version: clock.Version, ClusterId: clock.ClusterId} | ||
} | ||
|
||
func zeroHLC(clusterID int64) clockpb.HybridLogicalClock { | ||
return clockpb.HybridLogicalClock{WallClock: 0, Version: 0, ClusterId: clusterID} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to move these into common
and document them in the next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
except make a new package, common
is way too crowded. or maybe common/clock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next PR.
if errors.Is(err, errUserDataNotPresentOnPartition) { | ||
return nil | ||
} | ||
return err | ||
|
||
case *serviceerror.NotFound: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we getUserDataLocked in this branch? suppose the tq metadata was collected by the scavenger but we have user data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. I didn't think of that. Do you know why we didn't load the versioning data in the original code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. might just be oversight.
Yeah, I'll add it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, disregard, I get why now. Adding it. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure about the behavior, but it does seem like it's needed.
you could also do it first, before the GetTaskQueue
to avoid duplication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decided against it because it complicates the error handling code
service/matching/db.go
Outdated
return nil, err | ||
response, err := db.store.GetTaskQueueUserData(ctx, &persistence.GetTaskQueueUserDataRequest{ | ||
NamespaceID: db.namespaceID.String(), | ||
TaskQueue: db.taskQueue.FullName(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TaskQueue: db.taskQueue.FullName(), | |
TaskQueue: db.taskQueue.BaseNameString(), |
use BaseNameString
here (and I'll clarify the comment). FullName
is the key for the metadata table and routing. they will be equal here, but semantically it shouldn't be FullName
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, thanks, I wasn't aware of the difference.
service/matching/matchingEngine.go
Outdated
} | ||
if userData.Version < version { | ||
e.logger.Error("Non-root partition requested task queue user data for version greater than known version") | ||
// TODO: When would this happen? Likely due to ownership transfer of the root partition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm. this shouldn't really be possible except with a very unlikely series of events. e.g. A had the root and loaded user data with version 3. ownership changed, B had the root, some client changed the data to version 4. C (a leaf of that root) refreshed from B. A happened to not notice it lost ownership in all this time. ownership changed again, back to A. now C refreshes from A.
if all the refresh intervals are the same, A should have refreshed in between the two refreshes of C, so that couldn't happen as written. but perhaps it got a timeout that time.
I suppose A could use it as a signal that it has out of date info. but it may be annoying to trigger a reload from this point. I think it would be ok to just return an error and let the periodic refresh fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
service/matching/version_sets.go
Outdated
numSets = maxSets | ||
} | ||
versionSets := make([]*taskqueue.CompatibleVersionSet, numSets) | ||
for i := 0; i < numSets; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for i := 0; i < numSets; i++ { | |
for i := range versionSets { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... yeah, don't know what I was thinking, thanks.
} | ||
} | ||
|
||
func generateHLCTimestamp(clock clockpb.HybridLogicalClock) clockpb.HybridLogicalClock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"generate" doesn't really get across the meaning to me, maybe "hlcNow" or something like that? and call the argument "previousClock"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm refactoring this in my next PR, please review there.
func generateHLCTimestamp(clock clockpb.HybridLogicalClock) clockpb.HybridLogicalClock { | ||
wallclock := time.Now().UnixMilli() | ||
// Ensure time does not move backwards | ||
if wallclock < clock.GetWallClock() { | ||
wallclock = clock.GetWallClock() | ||
} | ||
// Ensure timestamp is monotonically increasing | ||
if wallclock == clock.GetWallClock() { | ||
clock.Version = clock.GetVersion() + 1 | ||
} else { | ||
clock.Version = 0 | ||
clock.WallClock = wallclock | ||
} | ||
|
||
return clockpb.HybridLogicalClock{WallClock: wallclock, Version: clock.Version, ClusterId: clock.ClusterId} | ||
} | ||
|
||
func zeroHLC(clusterID int64) clockpb.HybridLogicalClock { | ||
return clockpb.HybridLogicalClock{WallClock: 0, Version: 0, ClusterId: clusterID} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
except make a new package, common
is way too crowded. or maybe common/clock
existingData.VersionSets = append(existingData.GetVersionSets(), &taskqueuepb.CompatibleVersionSet{ | ||
BuildIds: []string{targetedVersion}, | ||
modifiedData.VersionSets = append(modifiedData.VersionSets, &persistence.CompatibleVersionSet{ | ||
SetIds: []string{targetedVersion}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use a base64 or hex encoded hash of the version here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's just for the character limitation when creating the internal queue name, let's handle that there and keep this structure more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is for that, and I agree it makes sense to do that closer to the use, but I would still rather do it here for a few reasons:
- if we do it here, that allows changing the hash function later. otherwise we'd have to ensure we keep the same function forever.
- for debugging purposes, I want build ids and set ids to be clearly visually distinct, since they're separate types and should not be mixed. if I see something that looks like a build id where I expect a set id that's confusing. (i.e. I believe it's more readable to use a hash here.)
- if someone uses very long build ids, we do have to store them, but I'd rather store them once and store a fixed-length thing as the set id. if we use it literally we have to store it twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is for that, and I agree it makes sense to do that closer to the use, but I would still rather do it here for a few reasons:
- if we do it here, that allows changing the hash function later. otherwise we'd have to ensure we keep the same function forever.
Fair point, it would be not impossible but highly annoying to change later with this format.
If you think that's an important capability, let's do that.
- for debugging purposes, I want build ids and set ids to be clearly visually distinct, since they're separate types and should not be mixed. if I see something that looks like a build id where I expect a set id that's confusing. (i.e. I believe it's more readable to use a hash here.)
I'd argue otherwise, that it's easier to debug which build ID the set ID correlates to and might help debug replication and task queue user data scavenger issues.
- if someone uses very long build ids, we do have to store them, but I'd rather store them once and store a fixed-length thing as the set id. if we use it literally we have to store it twice.
Since we limit to 10 sets per queue, I'd say this is a non issue and if we really cared about this, we could optimize to omit the set ID if it's the same as the first build ID but I'd rather avoid that.
Overall I'd be okay hashing here, if you don't mind, I'll merge this and track so we don't forget.
@@ -85,215 +97,422 @@ func mkPromoteInSet(id string) *workflowservice.UpdateWorkerBuildIdCompatibility | |||
} | |||
} | |||
|
|||
func assertClockGreater(t *testing.T, clock1 clockpb.HybridLogicalClock, clock2 clockpb.HybridLogicalClock) { | |||
if clock1.Version == clock2.Version { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't work? it fails on [101, 2] > [100, 3] (using [WallClock, Version])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored in the next PR.
} | ||
|
||
func generateHLCTimestamp(clock clockpb.HybridLogicalClock) clockpb.HybridLogicalClock { | ||
wallclock := time.Now().UnixMilli() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we usually use a TimeSource
(from common/clock) instead of raw time.Now()
. unfortunately TimeSource
isn't powerful enough to be a true fake time, but maybe we should still use it for consistency, and making it easier to improve it in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm refactoring this in my next PR, please review there.
57b6718
to
6752299
Compare
In the next PR I'll implement the actual replication mechanism and merging of the version sets. - Renamed / added dynamic config variables for limiting sets and build IDs - Added schema migration scripts - All persistence types implemented Tested locally with all persistence types.
a0367ac
to
98dbfea
Compare
- Renamed / added dynamic config variables for limiting sets and build IDs - Added schema migration scripts - All persistence types implemented
- Renamed / added dynamic config variables for limiting sets and build IDs - Added schema migration scripts - All persistence types implemented
- Renamed / added dynamic config variables for limiting sets and build IDs - Added schema migration scripts - All persistence types implemented
Note: This commit came from a feature branch and is not expected to build.
Note: This commit came from a feature branch and is not expected to build.
Note: This commit came from a feature branch and is not expected to build.
Note: This commit came from a feature branch and is not expected to build.
In the next PR I'll implement the actual replication mechanism and merging of the version sets.
Tested locally with all persistence types.