Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle errors when registering new queue readers #4055

Merged
merged 2 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,14 +555,9 @@ func QueueReaderID(readerID int32) ZapTag {
return NewInt32("queue-reader-id", readerID)
}

// QueueAlertType returns tag for queue alert type
func QueueAlertType(alertType string) ZapTag {
return NewStringTag("queue-alert-type", alertType)
}

// QueueAlertAttributes returns tag for queue alert attributes
func QueueAlertAttributes(attributes interface{}) ZapTag {
return NewAnyTag("queue-alert-attributes", attributes)
// QueueAlert returns tag for queue alert
func QueueAlert(alert interface{}) ZapTag {
return NewAnyTag("queue-alert", alert)
}

// Task returns tag for Task
Expand Down
3 changes: 2 additions & 1 deletion common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
TaskTypeTagName = "task_type"
TaskPriorityTagName = "task_priority"
QueueReaderIDTagName = "queue_reader_id"
QueueAlertTypeTagName = "queue_alert_type"
QueueActionTagName = "queue_action"
QueueTypeTagName = "queue_type"
visibilityTypeTagName = "visibility_type"
ErrorTypeTagName = "error_type"
Expand Down Expand Up @@ -1333,6 +1333,7 @@ var (
QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count")
QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count")
QueueActionCounter = NewCounterDef("queue_actions")
QueueActionFailures = NewCounterDef("queue_action_errors")
ActivityE2ELatency = NewTimerDef("activity_end_to_end_latency")
AckLevelUpdateCounter = NewCounterDef("ack_level_update")
AckLevelUpdateFailedCounter = NewCounterDef("ack_level_update_failed")
Expand Down
4 changes: 2 additions & 2 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,11 @@ func QueueReaderIDTag(readerID int32) Tag {
return &tagImpl{key: QueueReaderIDTagName, value: strconv.Itoa(int(readerID))}
}

func QueueAlertTypeTag(value string) Tag {
func QueueActionTag(value string) Tag {
if len(value) == 0 {
value = unknownValue
}
return &tagImpl{key: QueueAlertTypeTagName, value: value}
return &tagImpl{key: QueueActionTagName, value: value}
}

func QueueTypeTag(value string) Tag {
Expand Down
14 changes: 4 additions & 10 deletions service/history/queues/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,11 @@

package queues

var (
_ Action = (*actionReaderStuck)(nil)
)

type (
// Action is operations that can be run on a ReaderGroup.
// It is created by Mitigator upon receiving an Alert and
// run by a Queue to resolve the alert.
// Action is a set of operations that can be run on a ReaderGroup.
// It is created and run by Mitigator upon receiving an Alert.
Action interface {
Run(*ReaderGroup)
Name() string
Run(*ReaderGroup) error
}

actionCompletionFn func()
)
73 changes: 55 additions & 18 deletions service/history/queues/action_pending_task_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ const (
clearSliceThrottleDuration = 10 * time.Second
)

var _ Action = (*actionQueuePendingTask)(nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comment to explain what this action is about, what alert is it react to, and how does it mitigate the alert.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I will send follow up PRs to comment and add tests for those actions.


type (
actionQueuePendingTask struct {
attributes *AlertAttributesQueuePendingTaskCount
monitor Monitor
maxReaderCount int
completionFn actionCompletionFn

// state of the action, used when running the action
tasksPerNamespace map[namespace.ID]int
Expand All @@ -58,28 +59,28 @@ func newQueuePendingTaskAction(
attributes *AlertAttributesQueuePendingTaskCount,
monitor Monitor,
maxReaderCount int,
completionFn actionCompletionFn,
) Action {
) *actionQueuePendingTask {
return &actionQueuePendingTask{
attributes: attributes,
monitor: monitor,
maxReaderCount: maxReaderCount,
completionFn: completionFn,
}
}

func (a *actionQueuePendingTask) Run(readerGroup *ReaderGroup) {
defer a.completionFn()
func (a *actionQueuePendingTask) Name() string {
return "queue-pending-task"
}

func (a *actionQueuePendingTask) Run(readerGroup *ReaderGroup) error {
// first check if the alert is still valid
if a.monitor.GetTotalPendingTaskCount() <= a.attributes.CiriticalPendingTaskCount {
return
return nil
}

// then try to shrink existing slices, which may reduce pending task count
readers := readerGroup.Readers()
if a.tryShrinkSlice(readers) {
return
return nil
}

// have to unload pending tasks to reduce pending task count
Expand All @@ -88,7 +89,7 @@ func (a *actionQueuePendingTask) Run(readerGroup *ReaderGroup) {
a.findSliceToClear(
int(float64(a.attributes.CiriticalPendingTaskCount) * targetLoadFactor),
)
a.splitAndClearSlice(readers, readerGroup)
return a.splitAndClearSlice(readers, readerGroup)
}

func (a *actionQueuePendingTask) tryShrinkSlice(
Expand Down Expand Up @@ -177,7 +178,11 @@ func (a *actionQueuePendingTask) findSliceToClear(
func (a *actionQueuePendingTask) splitAndClearSlice(
readers map[int32]Reader,
readerGroup *ReaderGroup,
) {
) error {
if err := a.ensureNewReaders(readers, readerGroup); err != nil {
return err
}

for readerID, reader := range readers {
if readerID == int32(a.maxReaderCount)-1 {
// we can't do further split, have to clear entire slice
Expand Down Expand Up @@ -216,17 +221,49 @@ func (a *actionQueuePendingTask) splitAndClearSlice(
}

nextReader, ok := readerGroup.ReaderByID(readerID + 1)
if ok {
nextReader.MergeSlices(splitSlices...)
} else {
nextReader = readerGroup.NewReader(readerID+1, splitSlices...)
if !ok {
// this should never happen, we already ensured all readers are created.
// we have no choice but to put those slices back
reader.MergeSlices(splitSlices...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write some noticeable logs, or even emit metric here?

continue
}

nextReader.MergeSlices(splitSlices...)
nextReader.Pause(clearSliceThrottleDuration)
}

// it's likely that after a split, slice range can be shrinked
// as tasks blocking the min key from moving have been moved to another slice/reader
for _, reader := range readers {
reader.ShrinkSlices()
// ShrinkSlices will be triggered as part of checkpointing process
// see queueBase.handleAlert() and queueBase.checkpoint()
return nil
}

func (a *actionQueuePendingTask) ensureNewReaders(
readers map[int32]Reader,
readerGroup *ReaderGroup,
) error {
for readerID, reader := range readers {
if readerID == int32(a.maxReaderCount)-1 {
// we won't perform split
continue
}

needNewReader := false
reader.WalkSlices(func(s Slice) {
// namespaceToClearPerSlice contains all the slices
// that needs to be split & cleared
_, ok := a.namespaceToClearPerSlice[s]
needNewReader = needNewReader || ok
})

if !needNewReader {
continue
}

_, err := readerGroup.GetOrCreateReader(readerID + 1)
if err != nil {
return err
}
}

return nil
}
37 changes: 20 additions & 17 deletions service/history/queues/action_reader_stuck.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,34 @@ import (
"go.temporal.io/server/service/history/tasks"
)

var _ Action = (*actionReaderStuck)(nil)

type (
actionReaderStuck struct {
attributes *AlertAttributesReaderStuck
completionFn actionCompletionFn
logger log.Logger
attributes *AlertAttributesReaderStuck
logger log.Logger
}
)

func newReaderStuckAction(
attributes *AlertAttributesReaderStuck,
completionFn actionCompletionFn,
logger log.Logger,
) *actionReaderStuck {
return &actionReaderStuck{
attributes: attributes,
completionFn: completionFn,
logger: logger,
attributes: attributes,
logger: logger,
}
}

func (a *actionReaderStuck) Run(readerGroup *ReaderGroup) {
defer a.completionFn()
func (a *actionReaderStuck) Name() string {
return "reader-stuck"
}

func (a *actionReaderStuck) Run(readerGroup *ReaderGroup) error {
reader, ok := readerGroup.ReaderByID(a.attributes.ReaderID)
if !ok {
a.logger.Error("Failed to get queue with readerID for reader stuck action", tag.QueueReaderID(a.attributes.ReaderID))
return
a.logger.Info("Failed to get queue with readerID for reader stuck action", tag.QueueReaderID(a.attributes.ReaderID))
return nil
}

stuckRange := NewRange(
Expand Down Expand Up @@ -100,14 +101,16 @@ func (a *actionReaderStuck) Run(readerGroup *ReaderGroup) {
})

if len(splitSlices) == 0 {
return
return nil
}

nextReader, ok := readerGroup.ReaderByID(a.attributes.ReaderID + 1)
if ok {
nextReader.MergeSlices(splitSlices...)
return
nextReader, err := readerGroup.GetOrCreateReader(a.attributes.ReaderID + 1)
if err != nil {
// unable to create new reader, merge split slices back to the original reader
reader.MergeSlices(splitSlices...)
return err
}

readerGroup.NewReader(a.attributes.ReaderID+1, splitSlices...)
nextReader.MergeSlices(splitSlices...)
return nil
}
32 changes: 17 additions & 15 deletions service/history/queues/action_slice_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import (
"go.temporal.io/server/service/history/tasks"
)

var _ Action = (*actionSliceCount)(nil)

type (
actionSliceCount struct {
attributes *AlertAttributesSlicesCount
monitor Monitor
completionFn actionCompletionFn
attributes *AlertAttributesSlicesCount
monitor Monitor
}

compactCandidate struct {
Expand All @@ -47,21 +48,21 @@ type (
func newSliceCountAction(
attributes *AlertAttributesSlicesCount,
monitor Monitor,
completionFn actionCompletionFn,
) Action {
) *actionSliceCount {
return &actionSliceCount{
attributes: attributes,
monitor: monitor,
completionFn: completionFn,
attributes: attributes,
monitor: monitor,
}
}

func (a *actionSliceCount) Run(readerGroup *ReaderGroup) {
defer a.completionFn()
func (a *actionSliceCount) Name() string {
return "slice-count"
}

func (a *actionSliceCount) Run(readerGroup *ReaderGroup) error {
// first check if the alert is still valid
if a.monitor.GetTotalSliceCount() <= a.attributes.CriticalSliceCount {
return
return nil
}

// then try to shrink existing slices, which may reduce slice count
Expand All @@ -71,7 +72,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) {
}
currentSliceCount := a.monitor.GetTotalSliceCount()
if currentSliceCount <= a.attributes.CriticalSliceCount {
return
return nil
}

// have to compact (force merge) slices to reduce slice count
Expand Down Expand Up @@ -102,7 +103,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) {
isNotUniversalPredicate,
preferredSliceCount,
) {
return
return nil
}

if a.findAndCompactCandidates(
Expand All @@ -111,7 +112,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) {
isNotUniversalPredicate,
preferredSliceCount,
) {
return
return nil
}

if a.findAndCompactCandidates(
Expand All @@ -120,7 +121,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) {
isUniversalPredicate,
a.attributes.CriticalSliceCount,
) {
return
return nil
}

a.findAndCompactCandidates(
Expand All @@ -129,6 +130,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) {
isUniversalPredicate,
a.attributes.CriticalSliceCount,
)
return nil
}

func (a *actionSliceCount) findAndCompactCandidates(
Expand Down
Loading