Skip to content

Commit

Permalink
Handle errors when registering new queue readers (temporalio#4055)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored and samanbarghi committed May 2, 2023
1 parent c9c2d29 commit a42b9ee
Show file tree
Hide file tree
Showing 22 changed files with 568 additions and 240 deletions.
11 changes: 3 additions & 8 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,14 +555,9 @@ func QueueReaderID(readerID int32) ZapTag {
return NewInt32("queue-reader-id", readerID)
}

// QueueAlertType returns tag for queue alert type
func QueueAlertType(alertType string) ZapTag {
return NewStringTag("queue-alert-type", alertType)
}

// QueueAlertAttributes returns tag for queue alert attributes
func QueueAlertAttributes(attributes interface{}) ZapTag {
return NewAnyTag("queue-alert-attributes", attributes)
// QueueAlert returns tag for queue alert
func QueueAlert(alert interface{}) ZapTag {
return NewAnyTag("queue-alert", alert)
}

// Task returns tag for Task
Expand Down
3 changes: 2 additions & 1 deletion common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
TaskTypeTagName = "task_type"
TaskPriorityTagName = "task_priority"
QueueReaderIDTagName = "queue_reader_id"
QueueAlertTypeTagName = "queue_alert_type"
QueueActionTagName = "queue_action"
QueueTypeTagName = "queue_type"
visibilityTypeTagName = "visibility_type"
ErrorTypeTagName = "error_type"
Expand Down Expand Up @@ -1333,6 +1333,7 @@ var (
QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count")
QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count")
QueueActionCounter = NewCounterDef("queue_actions")
QueueActionFailures = NewCounterDef("queue_action_errors")
ActivityE2ELatency = NewTimerDef("activity_end_to_end_latency")
AckLevelUpdateCounter = NewCounterDef("ack_level_update")
AckLevelUpdateFailedCounter = NewCounterDef("ack_level_update_failed")
Expand Down
4 changes: 2 additions & 2 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,11 @@ func QueueReaderIDTag(readerID int32) Tag {
return &tagImpl{key: QueueReaderIDTagName, value: strconv.Itoa(int(readerID))}
}

func QueueAlertTypeTag(value string) Tag {
func QueueActionTag(value string) Tag {
if len(value) == 0 {
value = unknownValue
}
return &tagImpl{key: QueueAlertTypeTagName, value: value}
return &tagImpl{key: QueueActionTagName, value: value}
}

func QueueTypeTag(value string) Tag {
Expand Down
14 changes: 4 additions & 10 deletions service/history/queues/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,11 @@

package queues

var (
_ Action = (*actionReaderStuck)(nil)
)

type (
// Action is operations that can be run on a ReaderGroup.
// It is created by Mitigator upon receiving an Alert and
// run by a Queue to resolve the alert.
// Action is a set of operations that can be run on a ReaderGroup.
// It is created and run by Mitigator upon receiving an Alert.
Action interface {
Run(*ReaderGroup)
Name() string
Run(*ReaderGroup) error
}

actionCompletionFn func()
)
73 changes: 55 additions & 18 deletions service/history/queues/action_pending_task_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ const (
clearSliceThrottleDuration = 10 * time.Second
)

var _ Action = (*actionQueuePendingTask)(nil)

type (
actionQueuePendingTask struct {
attributes *AlertAttributesQueuePendingTaskCount
monitor Monitor
maxReaderCount int
completionFn actionCompletionFn

// state of the action, used when running the action
tasksPerNamespace map[namespace.ID]int
Expand All @@ -58,28 +59,28 @@ func newQueuePendingTaskAction(
attributes *AlertAttributesQueuePendingTaskCount,
monitor Monitor,
maxReaderCount int,
completionFn actionCompletionFn,
) Action {
) *actionQueuePendingTask {
return &actionQueuePendingTask{
attributes: attributes,
monitor: monitor,
maxReaderCount: maxReaderCount,
completionFn: completionFn,
}
}

func (a *actionQueuePendingTask) Run(readerGroup *ReaderGroup) {
defer a.completionFn()
func (a *actionQueuePendingTask) Name() string {
return "queue-pending-task"
}

func (a *actionQueuePendingTask) Run(readerGroup *ReaderGroup) error {
// first check if the alert is still valid
if a.monitor.GetTotalPendingTaskCount() <= a.attributes.CiriticalPendingTaskCount {
return
return nil
}

// then try to shrink existing slices, which may reduce pending task count
readers := readerGroup.Readers()
if a.tryShrinkSlice(readers) {
return
return nil
}

// have to unload pending tasks to reduce pending task count
Expand All @@ -88,7 +89,7 @@ func (a *actionQueuePendingTask) Run(readerGroup *ReaderGroup) {
a.findSliceToClear(
int(float64(a.attributes.CiriticalPendingTaskCount) * targetLoadFactor),
)
a.splitAndClearSlice(readers, readerGroup)
return a.splitAndClearSlice(readers, readerGroup)
}

func (a *actionQueuePendingTask) tryShrinkSlice(
Expand Down Expand Up @@ -177,7 +178,11 @@ func (a *actionQueuePendingTask) findSliceToClear(
func (a *actionQueuePendingTask) splitAndClearSlice(
readers map[int32]Reader,
readerGroup *ReaderGroup,
) {
) error {
if err := a.ensureNewReaders(readers, readerGroup); err != nil {
return err
}

for readerID, reader := range readers {
if readerID == int32(a.maxReaderCount)-1 {
// we can't do further split, have to clear entire slice
Expand Down Expand Up @@ -216,17 +221,49 @@ func (a *actionQueuePendingTask) splitAndClearSlice(
}

nextReader, ok := readerGroup.ReaderByID(readerID + 1)
if ok {
nextReader.MergeSlices(splitSlices...)
} else {
nextReader = readerGroup.NewReader(readerID+1, splitSlices...)
if !ok {
// this should never happen, we already ensured all readers are created.
// we have no choice but to put those slices back
reader.MergeSlices(splitSlices...)
continue
}

nextReader.MergeSlices(splitSlices...)
nextReader.Pause(clearSliceThrottleDuration)
}

// it's likely that after a split, slice range can be shrinked
// as tasks blocking the min key from moving have been moved to another slice/reader
for _, reader := range readers {
reader.ShrinkSlices()
// ShrinkSlices will be triggered as part of checkpointing process
// see queueBase.handleAlert() and queueBase.checkpoint()
return nil
}

func (a *actionQueuePendingTask) ensureNewReaders(
readers map[int32]Reader,
readerGroup *ReaderGroup,
) error {
for readerID, reader := range readers {
if readerID == int32(a.maxReaderCount)-1 {
// we won't perform split
continue
}

needNewReader := false
reader.WalkSlices(func(s Slice) {
// namespaceToClearPerSlice contains all the slices
// that needs to be split & cleared
_, ok := a.namespaceToClearPerSlice[s]
needNewReader = needNewReader || ok
})

if !needNewReader {
continue
}

_, err := readerGroup.GetOrCreateReader(readerID + 1)
if err != nil {
return err
}
}

return nil
}
37 changes: 20 additions & 17 deletions service/history/queues/action_reader_stuck.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,34 @@ import (
"go.temporal.io/server/service/history/tasks"
)

var _ Action = (*actionReaderStuck)(nil)

type (
actionReaderStuck struct {
attributes *AlertAttributesReaderStuck
completionFn actionCompletionFn
logger log.Logger
attributes *AlertAttributesReaderStuck
logger log.Logger
}
)

func newReaderStuckAction(
attributes *AlertAttributesReaderStuck,
completionFn actionCompletionFn,
logger log.Logger,
) *actionReaderStuck {
return &actionReaderStuck{
attributes: attributes,
completionFn: completionFn,
logger: logger,
attributes: attributes,
logger: logger,
}
}

func (a *actionReaderStuck) Run(readerGroup *ReaderGroup) {
defer a.completionFn()
func (a *actionReaderStuck) Name() string {
return "reader-stuck"
}

func (a *actionReaderStuck) Run(readerGroup *ReaderGroup) error {
reader, ok := readerGroup.ReaderByID(a.attributes.ReaderID)
if !ok {
a.logger.Error("Failed to get queue with readerID for reader stuck action", tag.QueueReaderID(a.attributes.ReaderID))
return
a.logger.Info("Failed to get queue with readerID for reader stuck action", tag.QueueReaderID(a.attributes.ReaderID))
return nil
}

stuckRange := NewRange(
Expand Down Expand Up @@ -100,14 +101,16 @@ func (a *actionReaderStuck) Run(readerGroup *ReaderGroup) {
})

if len(splitSlices) == 0 {
return
return nil
}

nextReader, ok := readerGroup.ReaderByID(a.attributes.ReaderID + 1)
if ok {
nextReader.MergeSlices(splitSlices...)
return
nextReader, err := readerGroup.GetOrCreateReader(a.attributes.ReaderID + 1)
if err != nil {
// unable to create new reader, merge split slices back to the original reader
reader.MergeSlices(splitSlices...)
return err
}

readerGroup.NewReader(a.attributes.ReaderID+1, splitSlices...)
nextReader.MergeSlices(splitSlices...)
return nil
}
32 changes: 17 additions & 15 deletions service/history/queues/action_slice_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import (
"go.temporal.io/server/service/history/tasks"
)

var _ Action = (*actionSliceCount)(nil)

type (
actionSliceCount struct {
attributes *AlertAttributesSlicesCount
monitor Monitor
completionFn actionCompletionFn
attributes *AlertAttributesSlicesCount
monitor Monitor
}

compactCandidate struct {
Expand All @@ -47,21 +48,21 @@ type (
func newSliceCountAction(
attributes *AlertAttributesSlicesCount,
monitor Monitor,
completionFn actionCompletionFn,
) Action {
) *actionSliceCount {
return &actionSliceCount{
attributes: attributes,
monitor: monitor,
completionFn: completionFn,
attributes: attributes,
monitor: monitor,
}
}

func (a *actionSliceCount) Run(readerGroup *ReaderGroup) {
defer a.completionFn()
func (a *actionSliceCount) Name() string {
return "slice-count"
}

func (a *actionSliceCount) Run(readerGroup *ReaderGroup) error {
// first check if the alert is still valid
if a.monitor.GetTotalSliceCount() <= a.attributes.CriticalSliceCount {
return
return nil
}

// then try to shrink existing slices, which may reduce slice count
Expand All @@ -71,7 +72,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) {
}
currentSliceCount := a.monitor.GetTotalSliceCount()
if currentSliceCount <= a.attributes.CriticalSliceCount {
return
return nil
}

// have to compact (force merge) slices to reduce slice count
Expand Down Expand Up @@ -102,7 +103,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) {
isNotUniversalPredicate,
preferredSliceCount,
) {
return
return nil
}

if a.findAndCompactCandidates(
Expand All @@ -111,7 +112,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) {
isNotUniversalPredicate,
preferredSliceCount,
) {
return
return nil
}

if a.findAndCompactCandidates(
Expand All @@ -120,7 +121,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) {
isUniversalPredicate,
a.attributes.CriticalSliceCount,
) {
return
return nil
}

a.findAndCompactCandidates(
Expand All @@ -129,6 +130,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) {
isUniversalPredicate,
a.attributes.CriticalSliceCount,
)
return nil
}

func (a *actionSliceCount) findAndCompactCandidates(
Expand Down
Loading

0 comments on commit a42b9ee

Please sign in to comment.