-
Notifications
You must be signed in to change notification settings - Fork 911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a processor for the archival queue #3560
Conversation
859f3d9
to
86440bb
Compare
86440bb
to
f826cf1
Compare
@@ -506,6 +508,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis | |||
|
|||
// Archival related | |||
ArchivalTaskBatchSize: dc.GetIntProperty(dynamicconfig.ArchivalTaskBatchSize, 100), | |||
ArchivalTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.ArchivalTaskMaxRetryCount, math.MaxInt32), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this use? and the maxInt32 doesn't make sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use it in a downstream PR, but I'll remove it from this one.
shardContext shard.Context | ||
workflowCache workflow.Cache | ||
logger log.Logger | ||
metricsClient metrics.MetricsHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
metricsHandler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
metricsClient metrics.MetricsHandler | ||
} | ||
|
||
func newArchivalQueueTaskExecutor(archiver archival.Archiver, shardContext shard.Context, workflowCache workflow.Cache, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: one argument per line could be cleaner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
func (e *archivalQueueTaskExecutor) Execute(ctx context.Context, executable queues.Executable) (tags []metrics.Tag, | ||
isActive bool, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
func (e *archivalQueueTaskExecutor) Execute(
ctx context.Context,
executable queues.Executable,
) (tags []metrics.Tag,isActive bool, err error) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
if err != nil { | ||
return err | ||
} | ||
if mutableState == nil || mutableState.IsWorkflowExecutionRunning() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the mutable state is nil, I think we should return nil to ack this task. If the workflow is deleted manually prior to archival.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if retention == 0 { | ||
retention = 7 * 24 * time.Hour | ||
} | ||
deleteTime := closeTime.Add(retention) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a jitter for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in #3565
} | ||
retention := namespaceEntry.Retention() | ||
if retention == 0 { | ||
retention = 7 * 24 * time.Hour |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it a const?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
StartTime: *startTime, | ||
ExecutionTime: *executionTime, | ||
CloseTime: *closeTime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the archival takes in an optional of those fields so you don't need those sanity check? And if those sanity check need to be there, I think it belongs to archiver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, fixed
Moved to #3663 |
What changed?
Added a processor which executes tasks on the archival queue. A followup PR will actually enable the queue factory.
Why?
I made these changes to enable the archival queue.
How did you test it?
I tested this using unit tests because it's not actually hooked into prod yet.
Potential risks
No--it's not wired into main yet.
Is hotfix candidate?
No.