-
Notifications
You must be signed in to change notification settings - Fork 812
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Store explicit TaskList partition data #6591
Conversation
3b2163a
to
0c97f02
Compare
return result, changed | ||
} | ||
|
||
func (a *adaptiveScalerImpl) collectPartitionMetrics(config *types.TaskListPartitionConfig) (*aggregatePartitionMetrics, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might increase the load significantly. Originally, describeTaskList is only called when the number of write partitions is less than the number of read partitions. But after this change, it will be called periodically. Maybe, we can add a metric to track how much extra load is caused by this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Failure case behavior is also not clear to me. If one partition fails to response (e.g. timeout) what do we do?
Maybe we just activate this new logic if the tasklist has isolation groups enabled. This way we can iterate and optimize as needed without impacting normal behavior of adaptive scaler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failure case is a no-op, which seems reasonable as a starting point. In the future we can explore more sophisticated approaches such as maintaining more state. I've updated this logic to only perform the RPC to all child partitions when isolation is enabled.
if values == nil { | ||
return nil | ||
} | ||
partitions := values.(map[int]map[string]any) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: to avoid future misuses causing panics let's check the result of casting
partitions, ok := values.(map[int]map[string]any)
if !ok { return nil }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
// If they're out of sync, go with the value of num_*_partitions. This is necessary only while support for | ||
// read_partitions and write_partitions rolls out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
until read_partitions
/write_partitions
are written back to DB we will be resetting the partition mappings. I guess it doesn't matter for now as isolation group feature is disabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that should be fine for now. Once we start populating the mapping then they'll be persisted and we can eventually even remove this logic.
@@ -654,3 +630,69 @@ func lockTaskList(ctx context.Context, tx sqlplugin.Tx, shardID int, domainID se | |||
func stickyTaskListExpiry() time.Time { | |||
return time.Now().Add(stickyTasksListsTTL) | |||
} | |||
|
|||
func toSerializationTaskListPartitionConfig(c *persistence.TaskListPartitionConfig) *serialization.TaskListPartitionConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for moving these to helper funcs
if e != nil { | ||
a.logger.Warn("failed to get partition metrics", tag.WorkflowTaskListName(a.taskListID.GetPartition(partitionID)), tag.Error(e)) | ||
} | ||
if result != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if e
is not nil we shouldn't care about result
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
return result, changed | ||
} | ||
|
||
func (a *adaptiveScalerImpl) collectPartitionMetrics(config *types.TaskListPartitionConfig) (*aggregatePartitionMetrics, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Failure case behavior is also not clear to me. If one partition fails to response (e.g. timeout) what do we do?
Maybe we just activate this new logic if the tasklist has isolation groups enabled. This way we can iterate and optimize as needed without impacting normal behavior of adaptive scaler
@@ -240,7 +240,7 @@ func NewManager( | |||
partitionConfig := tlMgr.TaskListPartitionConfig() | |||
r := 1 | |||
if partitionConfig != nil { | |||
r = int(partitionConfig.NumReadPartitions) | |||
r = len(partitionConfig.ReadPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This len()
logic is reused at multiple places, let's consider creating a helper method.
taskListType := types.TaskListTypeDecision.Ptr() | ||
if c.taskListID.GetType() == persistence.TaskListTypeActivity { | ||
taskListType = types.TaskListTypeActivity.Ptr() | ||
} | ||
// TODO: Do we want to notify partitions that were removed? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to notify partitions that were removed from write
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Replace num_read_partitions and num_write_partitions with an explicit map of partition ids to partition configuration. This enables assigning isolation groups to partitions in the future. This change is backwards compatible as it populates both values when writing data to Cassandra and when returning it via the API. When draining partitions we continue to maintain a contiguous block of partition ids.
This reverts commit eebf656.
This reverts commit eebf656.
…" (cadence-workflow#6625) Address issues in GRPC -> types mapper and add additional tets. This reverts commit bf9f526.
…" (cadence-workflow#6625) Address issues in GRPC -> types mapper and add additional tets. Additionally address issues in serialization <-> sqlblobs mapper and add tests. This reverts commit bf9f526.
What changed?
Replace num_read_partitions and num_write_partitions with an explicit map of partition ids to partition configuration. This enables assigning isolation groups to partitions in the future.
This change is backwards compatible as it populates both values when writing data to/from persistence and when mapping to/from thrift/proto. When draining partitions we continue to maintain a contiguous block of partition ids.
Once this change has been deployed broadly we can remove the fields from IDL. We could also consider removing the fields from persistence in the future as well.
Why?
How did you test it?
Potential risks
Detailed Description
map[int]TaskListPartition
.Impact Analysis
Testing Plan
Rollout Plan
Release notes
Documentation Changes