diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index 723df8ea27d..a27926d997d 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -555,14 +555,9 @@ func QueueReaderID(readerID int32) ZapTag { return NewInt32("queue-reader-id", readerID) } -// QueueAlertType returns tag for queue alert type -func QueueAlertType(alertType string) ZapTag { - return NewStringTag("queue-alert-type", alertType) -} - -// QueueAlertAttributes returns tag for queue alert attributes -func QueueAlertAttributes(attributes interface{}) ZapTag { - return NewAnyTag("queue-alert-attributes", attributes) +// QueueAlert returns tag for queue alert +func QueueAlert(alert interface{}) ZapTag { + return NewAnyTag("queue-alert", alert) } // Task returns tag for Task diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 3001f8bda6e..f068f8fe686 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -46,7 +46,7 @@ const ( TaskTypeTagName = "task_type" TaskPriorityTagName = "task_priority" QueueReaderIDTagName = "queue_reader_id" - QueueAlertTypeTagName = "queue_alert_type" + QueueActionTagName = "queue_action" QueueTypeTagName = "queue_type" visibilityTypeTagName = "visibility_type" ErrorTypeTagName = "error_type" @@ -1333,6 +1333,7 @@ var ( QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count") QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count") QueueActionCounter = NewCounterDef("queue_actions") + QueueActionFailures = NewCounterDef("queue_action_errors") ActivityE2ELatency = NewTimerDef("activity_end_to_end_latency") AckLevelUpdateCounter = NewCounterDef("ack_level_update") AckLevelUpdateFailedCounter = NewCounterDef("ack_level_update_failed") diff --git a/common/metrics/tags.go b/common/metrics/tags.go index e812333c8f9..51f5e497ddb 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -214,11 +214,11 @@ func QueueReaderIDTag(readerID int32) Tag { return &tagImpl{key: QueueReaderIDTagName, value: strconv.Itoa(int(readerID))} } -func QueueAlertTypeTag(value string) Tag { +func QueueActionTag(value string) Tag { if len(value) == 0 { value = unknownValue } - return &tagImpl{key: QueueAlertTypeTagName, value: value} + return &tagImpl{key: QueueActionTagName, value: value} } func QueueTypeTag(value string) Tag { diff --git a/service/history/queues/action.go b/service/history/queues/action.go index 009a75be9f8..363827df42f 100644 --- a/service/history/queues/action.go +++ b/service/history/queues/action.go @@ -24,17 +24,11 @@ package queues -var ( - _ Action = (*actionReaderStuck)(nil) -) - type ( - // Action is operations that can be run on a ReaderGroup. - // It is created by Mitigator upon receiving an Alert and - // run by a Queue to resolve the alert. + // Action is a set of operations that can be run on a ReaderGroup. + // It is created and run by Mitigator upon receiving an Alert. Action interface { - Run(*ReaderGroup) + Name() string + Run(*ReaderGroup) error } - - actionCompletionFn func() ) diff --git a/service/history/queues/action_pending_task_count.go b/service/history/queues/action_pending_task_count.go index a4c6af22e42..734a50a9a85 100644 --- a/service/history/queues/action_pending_task_count.go +++ b/service/history/queues/action_pending_task_count.go @@ -39,12 +39,13 @@ const ( clearSliceThrottleDuration = 10 * time.Second ) +var _ Action = (*actionQueuePendingTask)(nil) + type ( actionQueuePendingTask struct { attributes *AlertAttributesQueuePendingTaskCount monitor Monitor maxReaderCount int - completionFn actionCompletionFn // state of the action, used when running the action tasksPerNamespace map[namespace.ID]int @@ -58,28 +59,28 @@ func newQueuePendingTaskAction( attributes *AlertAttributesQueuePendingTaskCount, monitor Monitor, maxReaderCount int, - completionFn actionCompletionFn, -) Action { +) *actionQueuePendingTask { return &actionQueuePendingTask{ attributes: attributes, monitor: monitor, maxReaderCount: maxReaderCount, - completionFn: completionFn, } } -func (a *actionQueuePendingTask) Run(readerGroup *ReaderGroup) { - defer a.completionFn() +func (a *actionQueuePendingTask) Name() string { + return "queue-pending-task" +} +func (a *actionQueuePendingTask) Run(readerGroup *ReaderGroup) error { // first check if the alert is still valid if a.monitor.GetTotalPendingTaskCount() <= a.attributes.CiriticalPendingTaskCount { - return + return nil } // then try to shrink existing slices, which may reduce pending task count readers := readerGroup.Readers() if a.tryShrinkSlice(readers) { - return + return nil } // have to unload pending tasks to reduce pending task count @@ -88,7 +89,7 @@ func (a *actionQueuePendingTask) Run(readerGroup *ReaderGroup) { a.findSliceToClear( int(float64(a.attributes.CiriticalPendingTaskCount) * targetLoadFactor), ) - a.splitAndClearSlice(readers, readerGroup) + return a.splitAndClearSlice(readers, readerGroup) } func (a *actionQueuePendingTask) tryShrinkSlice( @@ -177,7 +178,11 @@ func (a *actionQueuePendingTask) findSliceToClear( func (a *actionQueuePendingTask) splitAndClearSlice( readers map[int32]Reader, readerGroup *ReaderGroup, -) { +) error { + if err := a.ensureNewReaders(readers, readerGroup); err != nil { + return err + } + for readerID, reader := range readers { if readerID == int32(a.maxReaderCount)-1 { // we can't do further split, have to clear entire slice @@ -216,17 +221,49 @@ func (a *actionQueuePendingTask) splitAndClearSlice( } nextReader, ok := readerGroup.ReaderByID(readerID + 1) - if ok { - nextReader.MergeSlices(splitSlices...) - } else { - nextReader = readerGroup.NewReader(readerID+1, splitSlices...) + if !ok { + // this should never happen, we already ensured all readers are created. + // we have no choice but to put those slices back + reader.MergeSlices(splitSlices...) + continue } + + nextReader.MergeSlices(splitSlices...) nextReader.Pause(clearSliceThrottleDuration) } - // it's likely that after a split, slice range can be shrinked - // as tasks blocking the min key from moving have been moved to another slice/reader - for _, reader := range readers { - reader.ShrinkSlices() + // ShrinkSlices will be triggered as part of checkpointing process + // see queueBase.handleAlert() and queueBase.checkpoint() + return nil +} + +func (a *actionQueuePendingTask) ensureNewReaders( + readers map[int32]Reader, + readerGroup *ReaderGroup, +) error { + for readerID, reader := range readers { + if readerID == int32(a.maxReaderCount)-1 { + // we won't perform split + continue + } + + needNewReader := false + reader.WalkSlices(func(s Slice) { + // namespaceToClearPerSlice contains all the slices + // that needs to be split & cleared + _, ok := a.namespaceToClearPerSlice[s] + needNewReader = needNewReader || ok + }) + + if !needNewReader { + continue + } + + _, err := readerGroup.GetOrCreateReader(readerID + 1) + if err != nil { + return err + } } + + return nil } diff --git a/service/history/queues/action_reader_stuck.go b/service/history/queues/action_reader_stuck.go index 2035e1d40eb..7cf510317bd 100644 --- a/service/history/queues/action_reader_stuck.go +++ b/service/history/queues/action_reader_stuck.go @@ -30,33 +30,34 @@ import ( "go.temporal.io/server/service/history/tasks" ) +var _ Action = (*actionReaderStuck)(nil) + type ( actionReaderStuck struct { - attributes *AlertAttributesReaderStuck - completionFn actionCompletionFn - logger log.Logger + attributes *AlertAttributesReaderStuck + logger log.Logger } ) func newReaderStuckAction( attributes *AlertAttributesReaderStuck, - completionFn actionCompletionFn, logger log.Logger, ) *actionReaderStuck { return &actionReaderStuck{ - attributes: attributes, - completionFn: completionFn, - logger: logger, + attributes: attributes, + logger: logger, } } -func (a *actionReaderStuck) Run(readerGroup *ReaderGroup) { - defer a.completionFn() +func (a *actionReaderStuck) Name() string { + return "reader-stuck" +} +func (a *actionReaderStuck) Run(readerGroup *ReaderGroup) error { reader, ok := readerGroup.ReaderByID(a.attributes.ReaderID) if !ok { - a.logger.Error("Failed to get queue with readerID for reader stuck action", tag.QueueReaderID(a.attributes.ReaderID)) - return + a.logger.Info("Failed to get queue with readerID for reader stuck action", tag.QueueReaderID(a.attributes.ReaderID)) + return nil } stuckRange := NewRange( @@ -100,14 +101,16 @@ func (a *actionReaderStuck) Run(readerGroup *ReaderGroup) { }) if len(splitSlices) == 0 { - return + return nil } - nextReader, ok := readerGroup.ReaderByID(a.attributes.ReaderID + 1) - if ok { - nextReader.MergeSlices(splitSlices...) - return + nextReader, err := readerGroup.GetOrCreateReader(a.attributes.ReaderID + 1) + if err != nil { + // unable to create new reader, merge split slices back to the original reader + reader.MergeSlices(splitSlices...) + return err } - readerGroup.NewReader(a.attributes.ReaderID+1, splitSlices...) + nextReader.MergeSlices(splitSlices...) + return nil } diff --git a/service/history/queues/action_slice_count.go b/service/history/queues/action_slice_count.go index feec65bf177..627a1b0f044 100644 --- a/service/history/queues/action_slice_count.go +++ b/service/history/queues/action_slice_count.go @@ -31,11 +31,12 @@ import ( "go.temporal.io/server/service/history/tasks" ) +var _ Action = (*actionSliceCount)(nil) + type ( actionSliceCount struct { - attributes *AlertAttributesSlicesCount - monitor Monitor - completionFn actionCompletionFn + attributes *AlertAttributesSlicesCount + monitor Monitor } compactCandidate struct { @@ -47,21 +48,21 @@ type ( func newSliceCountAction( attributes *AlertAttributesSlicesCount, monitor Monitor, - completionFn actionCompletionFn, -) Action { +) *actionSliceCount { return &actionSliceCount{ - attributes: attributes, - monitor: monitor, - completionFn: completionFn, + attributes: attributes, + monitor: monitor, } } -func (a *actionSliceCount) Run(readerGroup *ReaderGroup) { - defer a.completionFn() +func (a *actionSliceCount) Name() string { + return "slice-count" +} +func (a *actionSliceCount) Run(readerGroup *ReaderGroup) error { // first check if the alert is still valid if a.monitor.GetTotalSliceCount() <= a.attributes.CriticalSliceCount { - return + return nil } // then try to shrink existing slices, which may reduce slice count @@ -71,7 +72,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) { } currentSliceCount := a.monitor.GetTotalSliceCount() if currentSliceCount <= a.attributes.CriticalSliceCount { - return + return nil } // have to compact (force merge) slices to reduce slice count @@ -102,7 +103,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) { isNotUniversalPredicate, preferredSliceCount, ) { - return + return nil } if a.findAndCompactCandidates( @@ -111,7 +112,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) { isNotUniversalPredicate, preferredSliceCount, ) { - return + return nil } if a.findAndCompactCandidates( @@ -120,7 +121,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) { isUniversalPredicate, a.attributes.CriticalSliceCount, ) { - return + return nil } a.findAndCompactCandidates( @@ -129,6 +130,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) { isUniversalPredicate, a.attributes.CriticalSliceCount, ) + return nil } func (a *actionSliceCount) findAndCompactCandidates( diff --git a/service/history/queues/action_slice_predicate.go b/service/history/queues/action_slice_predicate.go index 733efefd6fb..4ae112c43f0 100644 --- a/service/history/queues/action_slice_predicate.go +++ b/service/history/queues/action_slice_predicate.go @@ -33,6 +33,8 @@ const ( moveSliceDefaultReaderMinSliceCount = 3 ) +var _ Action = (*slicePredicateAction)(nil) + type ( // slicePredicateAction will move all slices in default reader // with non-universal predicate to the next reader so that upon restart @@ -57,21 +59,25 @@ type ( func newSlicePredicateAction( monitor Monitor, maxReaderCount int, -) Action { +) *slicePredicateAction { return &slicePredicateAction{ monitor: monitor, maxReaderCount: maxReaderCount, } } -func (a *slicePredicateAction) Run(readerGroup *ReaderGroup) { +func (a *slicePredicateAction) Name() string { + return "slice-predicate" +} + +func (a *slicePredicateAction) Run(readerGroup *ReaderGroup) error { reader, ok := readerGroup.ReaderByID(DefaultReaderId) if !ok { - return + return nil } if a.maxReaderCount <= DefaultReaderId+1 { - return + return nil } sliceCount := a.monitor.GetSliceCount(DefaultReaderId) @@ -90,7 +96,7 @@ func (a *slicePredicateAction) Run(readerGroup *ReaderGroup) { if !hasNonUniversalPredicate || (pendingTasks < moveSliceDefaultReaderMinPendingTaskCount && sliceCount < moveSliceDefaultReaderMinSliceCount) { - return + return nil } var moveSlices []Slice @@ -104,13 +110,16 @@ func (a *slicePredicateAction) Run(readerGroup *ReaderGroup) { }) if len(moveSlices) == 0 { - return + return nil } - nextReader, ok := readerGroup.ReaderByID(DefaultReaderId + 1) - if !ok { - readerGroup.NewReader(DefaultReaderId+1, moveSlices...) - } else { - nextReader.MergeSlices(moveSlices...) + nextReader, err := readerGroup.GetOrCreateReader(DefaultReaderId + 1) + if err != nil { + // unable to create new reader, merge split slices back + reader.MergeSlices(moveSlices...) + return err } + + nextReader.MergeSlices(moveSlices...) + return nil } diff --git a/service/history/queues/alerts.go b/service/history/queues/alerts.go index c28a683932f..cb18171c89f 100644 --- a/service/history/queues/alerts.go +++ b/service/history/queues/alerts.go @@ -25,8 +25,6 @@ package queues import ( - "strconv" - "go.temporal.io/server/service/history/tasks" ) @@ -63,20 +61,3 @@ const ( AlertTypeReaderStuck AlertTypeSliceCount ) - -var ( - alertTypeNames = map[AlertType]string{ - 0: "Unspecified", - 1: "QueuePendingTaskCount", - 2: "ReaderStuck", - 3: "SliceCount", - } -) - -func (a AlertType) String() string { - s, ok := alertTypeNames[a] - if ok { - return s - } - return strconv.Itoa(int(a)) -} diff --git a/service/history/queues/mitigator.go b/service/history/queues/mitigator.go index effdfd72a14..2b83d763125 100644 --- a/service/history/queues/mitigator.go +++ b/service/history/queues/mitigator.go @@ -36,36 +36,47 @@ import ( var _ Mitigator = (*mitigatorImpl)(nil) type ( - // Mitigator generates an Action for resolving the given Alert + // Mitigator generates and runs an Action for resolving the given Alert Mitigator interface { - Mitigate(Alert) Action + Mitigate(Alert) } + actionRunner func(Action, *ReaderGroup, metrics.Handler, log.Logger) error + mitigatorImpl struct { sync.Mutex + readerGroup *ReaderGroup monitor Monitor logger log.Logger metricsHandler metrics.Handler maxReaderCount dynamicconfig.IntPropertyFn + + // this is for overriding the behavior in unit tests + // since we don't really want to run the action in Mitigator unit tests + actionRunner actionRunner } ) func newMitigator( + readerGroup *ReaderGroup, monitor Monitor, logger log.Logger, metricsHandler metrics.Handler, maxReaderCount dynamicconfig.IntPropertyFn, ) *mitigatorImpl { return &mitigatorImpl{ + readerGroup: readerGroup, monitor: monitor, logger: logger, metricsHandler: metricsHandler, maxReaderCount: maxReaderCount, + + actionRunner: runAction, } } -func (m *mitigatorImpl) Mitigate(alert Alert) Action { +func (m *mitigatorImpl) Mitigate(alert Alert) { m.Lock() defer m.Unlock() @@ -76,38 +87,50 @@ func (m *mitigatorImpl) Mitigate(alert Alert) Action { alert.AlertAttributesQueuePendingTaskCount, m.monitor, m.maxReaderCount(), - m.newActionCompletionFn(alert.AlertType, alert.AlertAttributesQueuePendingTaskCount), ) case AlertTypeReaderStuck: action = newReaderStuckAction( alert.AlertAttributesReaderStuck, - m.newActionCompletionFn(alert.AlertType, alert.AlertAttributesReaderStuck), m.logger, ) case AlertTypeSliceCount: action = newSliceCountAction( alert.AlertAttributesSliceCount, m.monitor, - m.newActionCompletionFn(alert.AlertType, alert.AlertAttributesSliceCount), ) default: - m.logger.Error("Unknown queue alert type", tag.QueueAlertType(alert.AlertType.String())) - return nil + m.logger.Error("Unknown queue alert type", tag.QueueAlert(alert)) + return + } + + if err := m.actionRunner( + action, + m.readerGroup, + m.metricsHandler, + log.With(m.logger, tag.QueueAlert(alert)), + ); err != nil { + m.monitor.SilenceAlert(alert.AlertType) + return } - return action + m.monitor.ResolveAlert(alert.AlertType) } -func (m *mitigatorImpl) newActionCompletionFn( - alertType AlertType, - alertAttributes interface{}, -) func() { - return func() { - m.monitor.ResolveAlert(alertType) - m.logger.Info("Action completed for queue alert", - tag.QueueAlertType(alertType.String()), - tag.QueueAlertAttributes(alertAttributes), - ) - m.metricsHandler.Counter(metrics.QueueActionCounter.GetMetricName()).Record(1, metrics.QueueAlertTypeTag(alertType.String())) +func runAction( + action Action, + readerGroup *ReaderGroup, + metricsHandler metrics.Handler, + logger log.Logger, +) error { + metricsHandler = metricsHandler.WithTags(metrics.QueueActionTag(action.Name())) + metricsHandler.Counter(metrics.QueueActionCounter.GetMetricName()).Record(1) + + if err := action.Run(readerGroup); err != nil { + logger.Error("Queue action failed", tag.Error(err)) + metricsHandler.Counter(metrics.QueueActionFailures.GetMetricName()).Record(1) + return err } + + logger.Info("Queue action completed") + return nil } diff --git a/service/history/queues/mitigator_test.go b/service/history/queues/mitigator_test.go index 2a067b21a6b..ad4ef451ddb 100644 --- a/service/history/queues/mitigator_test.go +++ b/service/history/queues/mitigator_test.go @@ -25,15 +25,16 @@ package queues import ( + "errors" "testing" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.temporal.io/server/common/clock" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" - "go.temporal.io/server/service/history/tasks" ) type ( @@ -41,9 +42,18 @@ type ( suite.Suite *require.Assertions - monitor Monitor + mockTimeSource *clock.EventTimeSource + + monitor *testMonitor mitigator *mitigatorImpl } + + testMonitor struct { + Monitor + + resolvedAlertType AlertType + silencedAlertType AlertType + } ) func TestMitigatorSuite(t *testing.T) { @@ -54,8 +64,13 @@ func TestMitigatorSuite(t *testing.T) { func (s *mitigatorSuite) SetupTest() { s.Assertions = require.New(s.T()) - s.monitor = newMonitor(tasks.CategoryTypeImmediate, nil) + s.mockTimeSource = clock.NewEventTimeSource() + s.monitor = &testMonitor{} + + // we use a different actionRunner implementation, + // which doesn't require readerGroup s.mitigator = newMitigator( + nil, s.monitor, log.NewTestLogger(), metrics.NoopMetricsHandler, @@ -63,7 +78,70 @@ func (s *mitigatorSuite) SetupTest() { ) } -func (s *mitigatorSuite) TestQueuePendingTaskAlert() { +func (s *mitigatorSuite) TestMitigate_ActionMatchAlert() { + testCases := []struct { + alert Alert + expectedAction Action + }{ + { + alert: Alert{ + AlertType: AlertTypeQueuePendingTaskCount, + AlertAttributesQueuePendingTaskCount: &AlertAttributesQueuePendingTaskCount{ + CurrentPendingTaskCount: 1000, + CiriticalPendingTaskCount: 500, + }, + }, + expectedAction: &actionQueuePendingTask{}, + }, + { + alert: Alert{ + AlertType: AlertTypeReaderStuck, + AlertAttributesReaderStuck: &AlertAttributesReaderStuck{ + ReaderID: 1, + CurrentWatermark: NewRandomKey(), + }, + }, + expectedAction: &actionReaderStuck{}, + }, + { + alert: Alert{ + AlertType: AlertTypeSliceCount, + AlertAttributesSliceCount: &AlertAttributesSlicesCount{ + CurrentSliceCount: 1000, + CriticalSliceCount: 500, + }, + }, + expectedAction: &actionSliceCount{}, + }, + } + + var actualAction Action + s.mitigator.actionRunner = func( + action Action, + _ *ReaderGroup, + _ metrics.Handler, + _ log.Logger, + ) error { + actualAction = action + return nil + } + + for _, tc := range testCases { + s.mitigator.Mitigate(tc.alert) + s.IsType(tc.expectedAction, actualAction) + } +} + +func (s *mitigatorSuite) TestMitigate_ResolveAlert() { + s.mitigator.actionRunner = func( + action Action, + _ *ReaderGroup, + _ metrics.Handler, + _ log.Logger, + ) error { + return nil + } + alert := Alert{ AlertType: AlertTypeQueuePendingTaskCount, AlertAttributesQueuePendingTaskCount: &AlertAttributesQueuePendingTaskCount{ @@ -71,48 +149,39 @@ func (s *mitigatorSuite) TestQueuePendingTaskAlert() { CiriticalPendingTaskCount: 500, }, } + s.mitigator.Mitigate(alert) - action := s.mitigator.Mitigate(alert) - s.IsType(&actionQueuePendingTask{}, action) + s.Equal(alert.AlertType, s.monitor.resolvedAlertType) + s.Equal(AlertTypeUnspecified, s.monitor.silencedAlertType) } -func (s *mitigatorSuite) TestReaderWatermarkAlert() { - alert := Alert{ - AlertType: AlertTypeReaderStuck, - AlertAttributesReaderStuck: &AlertAttributesReaderStuck{ - ReaderID: 1, - CurrentWatermark: NewRandomKey(), - }, +func (s *mitigatorSuite) TestMitigate_SilenceAlert() { + s.mitigator.actionRunner = func( + action Action, + _ *ReaderGroup, + _ metrics.Handler, + _ log.Logger, + ) error { + return errors.New("some random error") } - action := s.mitigator.Mitigate(alert) - s.IsType(&actionReaderStuck{}, action) -} - -func (s *mitigatorSuite) TestSliceCountAlert() { alert := Alert{ - AlertType: AlertTypeSliceCount, - AlertAttributesSliceCount: &AlertAttributesSlicesCount{ - CurrentSliceCount: 1000, - CriticalSliceCount: 500, + AlertType: AlertTypeQueuePendingTaskCount, + AlertAttributesQueuePendingTaskCount: &AlertAttributesQueuePendingTaskCount{ + CurrentPendingTaskCount: 1000, + CiriticalPendingTaskCount: 500, }, } + s.mitigator.Mitigate(alert) - action := s.mitigator.Mitigate(alert) - s.IsType(&actionSliceCount{}, action) + s.Equal(alert.AlertType, s.monitor.silencedAlertType) + s.Equal(AlertTypeUnspecified, s.monitor.resolvedAlertType) } -func (s *mitigatorSuite) TestActionComplectionFn() { - // manually set pending alerts in monitor - monitor := s.monitor.(*monitorImpl) - monitor.pendingAlerts = map[AlertType]struct{}{ - AlertTypeQueuePendingTaskCount: {}, - } - - s.mitigator.newActionCompletionFn( - AlertTypeQueuePendingTaskCount, - &AlertAttributesQueuePendingTaskCount{}, - )() +func (m *testMonitor) ResolveAlert(alertType AlertType) { + m.resolvedAlertType = alertType +} - s.Empty(monitor.pendingAlerts) +func (m *testMonitor) SilenceAlert(alertType AlertType) { + m.silencedAlertType = alertType } diff --git a/service/history/queues/monitor.go b/service/history/queues/monitor.go index f343a3b80a6..c0d2a7138c5 100644 --- a/service/history/queues/monitor.go +++ b/service/history/queues/monitor.go @@ -28,6 +28,7 @@ import ( "sync" "time" + "go.temporal.io/server/common/clock" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/service/history/tasks" ) @@ -35,7 +36,8 @@ import ( var _ Monitor = (*monitorImpl)(nil) const ( - monitorWatermarkPrecision = time.Second + monitorWatermarkPrecision = time.Second + defaultAlertSilenceDuration = 10 * time.Second alertChSize = 10 ) @@ -59,6 +61,7 @@ type ( RemoveReader(readerID int32) ResolveAlert(AlertType) + SilenceAlert(AlertType) AlertCh() <-chan *Alert Close() } @@ -79,11 +82,13 @@ type ( sliceStats map[Slice]sliceStats categoryType tasks.CategoryType + timeSource clock.TimeSource options *MonitorOptions - pendingAlerts map[AlertType]struct{} - alertCh chan *Alert - shutdownCh chan struct{} + pendingAlerts map[AlertType]struct{} + silencedAlerts map[AlertType]time.Time // silenced alertType => expiration + alertCh chan *Alert + shutdownCh chan struct{} } readerStats struct { @@ -103,16 +108,19 @@ type ( func newMonitor( categoryType tasks.CategoryType, + timeSource clock.TimeSource, options *MonitorOptions, ) *monitorImpl { return &monitorImpl{ - readerStats: make(map[int32]readerStats), - sliceStats: make(map[Slice]sliceStats), - categoryType: categoryType, - options: options, - pendingAlerts: make(map[AlertType]struct{}), - alertCh: make(chan *Alert, alertChSize), - shutdownCh: make(chan struct{}), + readerStats: make(map[int32]readerStats), + sliceStats: make(map[Slice]sliceStats), + categoryType: categoryType, + timeSource: timeSource, + options: options, + pendingAlerts: make(map[AlertType]struct{}), + silencedAlerts: make(map[AlertType]time.Time), + alertCh: make(chan *Alert, alertChSize), + shutdownCh: make(chan struct{}), } } @@ -281,6 +289,14 @@ func (m *monitorImpl) ResolveAlert(alertType AlertType) { delete(m.pendingAlerts, alertType) } +func (m *monitorImpl) SilenceAlert(alertType AlertType) { + m.Lock() + defer m.Unlock() + + delete(m.pendingAlerts, alertType) + m.silencedAlerts[alertType] = m.timeSource.Now().Add(defaultAlertSilenceDuration) +} + func (m *monitorImpl) AlertCh() <-chan *Alert { return m.alertCh } @@ -308,6 +324,10 @@ func (m *monitorImpl) sendAlertLocked(alert *Alert) { return } + if m.timeSource.Now().Before(m.silencedAlerts[alert.AlertType]) { + return + } + // dedup alerts, we only need one outstanding alert per alert type if _, ok := m.pendingAlerts[alert.AlertType]; ok { return diff --git a/service/history/queues/monitor_test.go b/service/history/queues/monitor_test.go index 5e53392379a..28a2e4ef81a 100644 --- a/service/history/queues/monitor_test.go +++ b/service/history/queues/monitor_test.go @@ -32,6 +32,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.temporal.io/server/common/clock" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/service/history/tasks" ) @@ -41,6 +42,8 @@ type ( suite.Suite *require.Assertions + mockTimeSource *clock.EventTimeSource + monitor *monitorImpl alertCh <-chan *Alert } @@ -54,7 +57,10 @@ func TestMonitorSuite(t *testing.T) { func (s *monitorSuite) SetupTest() { s.Assertions = require.New(s.T()) - s.monitor = newMonitor(tasks.CategoryTypeScheduled, + s.mockTimeSource = clock.NewEventTimeSource() + s.monitor = newMonitor( + tasks.CategoryTypeScheduled, + s.mockTimeSource, &MonitorOptions{ PendingTasksCriticalCount: dynamicconfig.GetIntPropertyFn(1000), ReaderStuckCriticalAttempts: dynamicconfig.GetIntPropertyFn(5), @@ -205,3 +211,66 @@ func (s *monitorSuite) TestSliceCount() { }, }, *alert) } + +func (s *monitorSuite) TestResolveAlert() { + sliceCount := s.monitor.options.SliceCountCriticalThreshold() * 2 + + s.monitor.SetSliceCount(DefaultReaderId, sliceCount) // trigger an alert + + alert := <-s.alertCh + s.NotNil(alert) + s.monitor.ResolveAlert(alert.AlertType) + + // alert should be resolved, + // which means we can trigger the same alert type again + s.monitor.SetSliceCount(DefaultReaderId, sliceCount) + select { + case alert := <-s.alertCh: + s.NotNil(alert) + default: + s.FailNow("Can't trigger new alert, previous alert likely not resolved") + } +} + +func (s *monitorSuite) TestSilenceAlert() { + now := time.Now() + s.mockTimeSource.Update(now) + + sliceCount := s.monitor.options.SliceCountCriticalThreshold() * 2 + s.monitor.SetSliceCount(DefaultReaderId, sliceCount) // trigger an alert + + alert := <-s.alertCh + s.NotNil(alert) + s.monitor.SilenceAlert(alert.AlertType) + + // alert should be silenced, + // which means we can't trigger the same alert type again + s.monitor.SetSliceCount(DefaultReaderId, sliceCount) + select { + case <-s.alertCh: + s.FailNow("Alert not silenced") + default: + } + + // other alert types should still be able to fire + pendingTaskCount := s.monitor.options.PendingTasksCriticalCount() * 2 + s.monitor.SetSlicePendingTaskCount(&SliceImpl{}, pendingTaskCount) + select { + case alert := <-s.alertCh: + s.NotNil(alert) + default: + s.FailNow("Alerts with a different type should still be able to fire") + } + + now = now.Add(defaultAlertSilenceDuration * 2) + s.mockTimeSource.Update(now) + + // same alert should be able to fire after the silence duration + s.monitor.SetSliceCount(DefaultReaderId, sliceCount) + select { + case alert := <-s.alertCh: + s.NotNil(alert) + default: + s.FailNow("Same alert type should fire after silence duration") + } +} diff --git a/service/history/queues/queue_base.go b/service/history/queues/queue_base.go index ae9b82c7562..efa8fef8fae 100644 --- a/service/history/queues/queue_base.go +++ b/service/history/queues/queue_base.go @@ -154,10 +154,6 @@ func newQueueBase( } timeSource := shard.GetTimeSource() - - monitor := newMonitor(category.Type(), &options.MonitorOptions) - mitigator := newMitigator(monitor, logger, metricsHandler, options.MaxReaderCount) - executableInitializer := func(readerID int32, t tasks.Task) Executable { return NewExecutable( readerID, @@ -174,6 +170,7 @@ func newQueueBase( ) } + monitor := newMonitor(category.Type(), timeSource, &options.MonitorOptions) readerRateLimiter := newShardReaderRateLimiter( options.MaxPollRPS, hostReaderRateLimiter, @@ -205,6 +202,7 @@ func newQueueBase( ) } + resetReaderScope := false exclusiveDeletionHighWatermark := exclusiveReaderHighWatermark readerGroup := NewReaderGroup(shard.GetShardID(), shard.GetOwner(), category, readerInitializer, shard.GetExecutionManager()) for readerID, scopes := range readerScopes { @@ -216,10 +214,30 @@ func newQueueBase( for _, scope := range scopes { slices = append(slices, NewSlice(paginationFnProvider, executableInitializer, monitor, scope)) } - readerGroup.NewReader(readerID, slices...) + if _, err := readerGroup.NewReader(readerID, slices...); err != nil { + // we are not able to re-create the scopes & readers we previously have + // but we may still be able to run with only one reader. + // Pick the lowest key among all scopes and start from there + logger.Error("Failed to create history queue reader on initialization", tag.QueueReaderID(readerID), tag.Error(err)) + + resetReaderScope = true + + // don't break here, still need to update exclusiveDeletionHighWatermark + } exclusiveDeletionHighWatermark = tasks.MinKey(exclusiveDeletionHighWatermark, scopes[0].Range.InclusiveMin) } + if resetReaderScope { + // start from the lowest key of all reader scopes + exclusiveReaderHighWatermark = exclusiveDeletionHighWatermark + + // some readers may already be created + // stop them and create a new empty reader group + readerGroup.Stop() + readerGroup = NewReaderGroup(shard.GetShardID(), shard.GetOwner(), category, readerInitializer, shard.GetExecutionManager()) + } + + mitigator := newMitigator(readerGroup, monitor, logger, metricsHandler, options.MaxReaderCount) return &queueBase{ shard: shard, @@ -300,6 +318,12 @@ func (p *queueBase) processNewRange() { panic(fmt.Sprintf("Unknown task category type: %v", categoryType.String())) } + reader, err := p.readerGroup.GetOrCreateReader(DefaultReaderId) + if err != nil { + p.logger.Error("Unable to create default reader", tag.Error(err), tag.QueueReaderID(DefaultReaderId)) + return + } + slices := make([]Slice, 0, 1) if p.nonReadableScope.CanSplitByRange(newMaxKey) { var newReadScope Scope @@ -312,12 +336,6 @@ func (p *queueBase) processNewRange() { )) } - reader, ok := p.readerGroup.ReaderByID(DefaultReaderId) - if !ok { - p.readerGroup.NewReader(DefaultReaderId, slices...) - return - } - if now := p.timeSource.Now(); now.After(p.nextForceNewSliceTime) { reader.AppendSlices(slices...) p.nextForceNewSliceTime = now.Add(forceNewSliceDuration) @@ -330,10 +348,15 @@ func (p *queueBase) checkpoint() { p.readerGroup.ForEach(func(_ int32, r Reader) { r.ShrinkSlices() }) + // Run slicePredicateAction to move slices with non-universal predicate to non-default reader - // so that upon shard reload, task loading for those slices won't block other slices in the default - // reader. - newSlicePredicateAction(p.monitor, p.mitigator.maxReaderCount()).Run(p.readerGroup) + // so that upon shard reload, task loading for those slices won't block other slices in the default reader. + _ = runAction( + newSlicePredicateAction(p.monitor, p.mitigator.maxReaderCount()), + p.readerGroup, + p.metricsHandler, + p.logger, + ) readerScopes := make(map[int32][]Scope) newExclusiveDeletionHighWatermark := p.nonReadableScope.Range.InclusiveMin @@ -486,20 +509,11 @@ func (p *queueBase) resetCheckpointTimer(checkPointErr error) { } func (p *queueBase) handleAlert(alert *Alert) { - // Upon getting an Alert from monitor, - // send it to the mitigator for deduping and generating the corresponding Action. - // Then run the returned Action to resolve the Alert. - if alert == nil { return } - action := p.mitigator.Mitigate(*alert) - if action == nil { - return - } - - action.Run(p.readerGroup) + p.mitigator.Mitigate(*alert) // checkpoint the action taken & update reader progress p.checkpoint() diff --git a/service/history/queues/queue_base_test.go b/service/history/queues/queue_base_test.go index 2615c332d5d..17231835b8f 100644 --- a/service/history/queues/queue_base_test.go +++ b/service/history/queues/queue_base_test.go @@ -26,6 +26,7 @@ package queues import ( "context" + "errors" "testing" "time" @@ -151,7 +152,7 @@ func (s *queueBaseSuite) TestNewProcessBase_NoPreviousState() { s.Equal(ackLevel+1, base.nonReadableScope.Range.InclusiveMin.TaskID) } -func (s *queueBaseSuite) TestNewProcessBase_WithPreviousState() { +func (s *queueBaseSuite) TestNewProcessBase_WithPreviousState_RestoreSucceed() { persistenceState := &persistencespb.QueueState{ ReaderStates: map[int32]*persistencespb.QueueReaderState{ DefaultReaderId: { @@ -215,7 +216,7 @@ func (s *queueBaseSuite) TestNewProcessBase_WithPreviousState() { }, s.config, ) - mockShard.Resource.ExecutionMgr.EXPECT().RegisterHistoryTaskReader(gomock.Any(), gomock.Any()).Times(2) + mockShard.Resource.ExecutionMgr.EXPECT().RegisterHistoryTaskReader(gomock.Any(), gomock.Any()).Return(nil).Times(2) base := newQueueBase( mockShard, @@ -244,6 +245,80 @@ func (s *queueBaseSuite) TestNewProcessBase_WithPreviousState() { s.Equal(persistenceState, ToPersistenceQueueState(queueState)) } +func (s *queueBaseSuite) TestNewProcessBase_WithPreviousState_RestoreFailed() { + persistenceState := &persistencespb.QueueState{ + ReaderStates: map[int32]*persistencespb.QueueReaderState{ + DefaultReaderId: { + Scopes: []*persistencespb.QueueSliceScope{ + { + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: &persistencespb.TaskKey{FireTime: timestamp.TimePtr(tasks.DefaultFireTime), TaskId: 1000}, + ExclusiveMax: &persistencespb.TaskKey{FireTime: timestamp.TimePtr(tasks.DefaultFireTime), TaskId: 2000}, + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }, + }, + }, + DefaultReaderId + 1: { + Scopes: []*persistencespb.QueueSliceScope{ + { + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: &persistencespb.TaskKey{FireTime: timestamp.TimePtr(tasks.DefaultFireTime), TaskId: 500}, + ExclusiveMax: &persistencespb.TaskKey{FireTime: timestamp.TimePtr(tasks.DefaultFireTime), TaskId: 1000}, + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }, + }, + }, + }, + ExclusiveReaderHighWatermark: &persistencespb.TaskKey{FireTime: timestamp.TimePtr(tasks.DefaultFireTime), TaskId: 4000}, + } + + mockShard := shard.NewTestContext( + s.controller, + &persistencespb.ShardInfo{ + ShardId: 0, + RangeId: 10, + QueueStates: map[int32]*persistencespb.QueueState{ + tasks.CategoryIDTransfer: persistenceState, + }, + }, + s.config, + ) + mockShard.Resource.ExecutionMgr.EXPECT().RegisterHistoryTaskReader(gomock.Any(), gomock.Any()).Return(errors.New("some random error")).Times(2) + + base := newQueueBase( + mockShard, + tasks.CategoryTransfer, + nil, + s.mockScheduler, + s.mockRescheduler, + NewNoopPriorityAssigner(), + nil, + s.options, + s.rateLimiter, + NoopReaderCompletionFn, + s.logger, + s.metricsHandler, + ) + + s.Empty(base.readerGroup.Readers()) + s.Equal( + NewScope( + // Range should start from the smallest key among all scopes + NewRange(tasks.NewImmediateKey(500), tasks.MaximumKey), + predicates.Universal[tasks.Task](), + ), + base.nonReadableScope, + ) +} + func (s *queueBaseSuite) TestStartStop() { mockShard := shard.NewTestContext( s.controller, @@ -259,7 +334,7 @@ func (s *queueBaseSuite) TestStartStop() { s.config, ) mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() - mockShard.Resource.ExecutionMgr.EXPECT().RegisterHistoryTaskReader(gomock.Any(), gomock.Any()).Times(1) + mockShard.Resource.ExecutionMgr.EXPECT().RegisterHistoryTaskReader(gomock.Any(), gomock.Any()).Return(nil).Times(1) mockShard.Resource.ExecutionMgr.EXPECT().UnregisterHistoryTaskReader(gomock.Any(), gomock.Any()).Times(1) paginationFnProvider := func(_ int32, paginationRange Range) collection.PaginationFn[tasks.Task] { @@ -328,7 +403,7 @@ func (s *queueBaseSuite) TestProcessNewRange() { s.config, ) mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() - mockShard.Resource.ExecutionMgr.EXPECT().RegisterHistoryTaskReader(gomock.Any(), gomock.Any()).Times(1) + mockShard.Resource.ExecutionMgr.EXPECT().RegisterHistoryTaskReader(gomock.Any(), gomock.Any()).Return(nil).Times(1) base := newQueueBase( mockShard, @@ -387,7 +462,7 @@ func (s *queueBaseSuite) TestCheckPoint_WithPendingTasks() { ) mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() mockShard.Resource.ClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes() - mockShard.Resource.ExecutionMgr.EXPECT().RegisterHistoryTaskReader(gomock.Any(), gomock.Any()).Times(len(readerIDs)) + mockShard.Resource.ExecutionMgr.EXPECT().RegisterHistoryTaskReader(gomock.Any(), gomock.Any()).Return(nil).Times(len(readerIDs)) base := newQueueBase( mockShard, @@ -550,7 +625,7 @@ func (s *queueBaseSuite) TestCheckPoint_MoveSlices() { ) mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() mockShard.Resource.ClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes() - mockShard.Resource.ExecutionMgr.EXPECT().RegisterHistoryTaskReader(gomock.Any(), gomock.Any()).Times(2) + mockShard.Resource.ExecutionMgr.EXPECT().RegisterHistoryTaskReader(gomock.Any(), gomock.Any()).Return(nil).Times(2) base := newQueueBase( mockShard, @@ -621,7 +696,7 @@ func (s *queueBaseSuite) TestUpdateReaderProgress() { ) mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() mockShard.Resource.ClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes() - mockShard.Resource.ExecutionMgr.EXPECT().RegisterHistoryTaskReader(gomock.Any(), gomock.Any()).Times(2) + mockShard.Resource.ExecutionMgr.EXPECT().RegisterHistoryTaskReader(gomock.Any(), gomock.Any()).Return(nil).Times(2) base := newQueueBase( mockShard, diff --git a/service/history/queues/queue_scheduled.go b/service/history/queues/queue_scheduled.go index 803643486af..10d7e2def46 100644 --- a/service/history/queues/queue_scheduled.go +++ b/service/history/queues/queue_scheduled.go @@ -267,8 +267,8 @@ func (p *scheduledQueue) lookAheadTask() { ctx, cancel := newQueueIOContext() defer cancel() - if err := p.registerLookAheadReader(); err != nil { - p.logger.Error("Failed to load look ahead task", tag.Error(err)) + if err := p.ensureLookAheadReader(); err != nil { + p.logger.Error("Failed to create look ahead reader", tag.Error(err)) p.timerGate.Update(lookAheadMinTime) return } @@ -308,19 +308,9 @@ func (p *scheduledQueue) lookAheadTask() { p.timerGate.Update(lookAheadMaxTime) } -func (p *scheduledQueue) registerLookAheadReader() error { - _, ok := p.readerGroup.ReaderByID(lookAheadReaderID) - if ok { - return nil - } - - // This should not happen actually - // since lookAheadReadID == DefaultReaderID and defaultReaderID should - // always be available (unless during shutdown) - - // TODO: return error from NewReader - p.readerGroup.NewReader(lookAheadReaderID) - return nil +func (p *scheduledQueue) ensureLookAheadReader() error { + _, err := p.readerGroup.GetOrCreateReader(lookAheadReaderID) + return err } // IsTimeExpired checks if the testing time is equal or before diff --git a/service/history/queues/queue_scheduled_test.go b/service/history/queues/queue_scheduled_test.go index 5bef2f5fd2a..a1db24a83f4 100644 --- a/service/history/queues/queue_scheduled_test.go +++ b/service/history/queues/queue_scheduled_test.go @@ -240,7 +240,7 @@ func (s *scheduledQueueSuite) TestLookAheadTask_ErrorLookAhead() { ShardOwner: s.mockShard.GetOwner(), TaskCategory: tasks.CategoryTimer, ReaderID: lookAheadReaderID, - }).Times(1) + }).Return(nil).Times(1) s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), gomock.Any()). Return(nil, errors.New("some random error")).Times(1) s.scheduledQueue.lookAheadTask() @@ -275,7 +275,7 @@ func (s *scheduledQueueSuite) setupLookAheadMock( ShardOwner: s.mockShard.GetOwner(), TaskCategory: tasks.CategoryTimer, ReaderID: lookAheadReaderID, - }).Times(1) + }).Return(nil).Times(1) s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, request *persistence.GetHistoryTasksRequest) (*persistence.GetHistoryTasksResponse, error) { s.Equal(s.mockShard.GetShardID(), request.ShardID) s.Equal(tasks.CategoryTimer, request.TaskCategory) diff --git a/service/history/queues/reader_group.go b/service/history/queues/reader_group.go index 0597af458af..2da79379c3c 100644 --- a/service/history/queues/reader_group.go +++ b/service/history/queues/reader_group.go @@ -26,6 +26,7 @@ package queues import ( "context" + "errors" "fmt" "sync" "sync/atomic" @@ -37,6 +38,10 @@ import ( "go.temporal.io/server/service/history/tasks" ) +var ( + errReaderGroupStopped = errors.New("queue reader group already stopped") +) + type ( ReaderInitializer func(readerID int32, slices []Slice) Reader @@ -94,15 +99,24 @@ func (g *ReaderGroup) Stop() { g.Lock() defer g.Unlock() + // Stop() method is protected by g.status, so g.readerMap will never be nil here for readerID := range g.readerMap { g.removeReaderLocked(readerID) } + + // This guarantee no new reader can be created/registered after Stop() returns + // also all registered readers will be unregistered. + g.readerMap = nil } func (g *ReaderGroup) ForEach(f func(int32, Reader)) { g.Lock() defer g.Unlock() + if g.readerMap == nil { + return + } + for readerID, reader := range g.readerMap { f(readerID, reader) } @@ -118,28 +132,52 @@ func (g *ReaderGroup) Readers() map[int32]Reader { return readerMapCopy } +func (g *ReaderGroup) GetOrCreateReader(readerID int32) (Reader, error) { + g.Lock() + defer g.Unlock() + + reader, ok := g.getReaderByIDLocked(readerID) + if ok { + return reader, nil + } + + return g.newReaderLocked(readerID) +} + func (g *ReaderGroup) ReaderByID(readerID int32) (Reader, bool) { g.Lock() defer g.Unlock() + return g.getReaderByIDLocked(readerID) +} + +func (g *ReaderGroup) getReaderByIDLocked(readerID int32) (Reader, bool) { + if g.readerMap == nil { + return nil, false + } + reader, ok := g.readerMap[readerID] return reader, ok } -// TODO: this method should return error -// if reader registration fails or reader group already closed -func (g *ReaderGroup) NewReader(readerID int32, slices ...Slice) Reader { - reader := g.initializer(readerID, slices) - +func (g *ReaderGroup) NewReader(readerID int32, slices ...Slice) (Reader, error) { g.Lock() defer g.Unlock() + return g.newReaderLocked(readerID, slices...) +} + +func (g *ReaderGroup) newReaderLocked(readerID int32, slices ...Slice) (Reader, error) { + reader := g.initializer(readerID, slices) + + if g.readerMap == nil { + return nil, errReaderGroupStopped + } + if _, ok := g.readerMap[readerID]; ok { panic(fmt.Sprintf("reader with ID %v already exists", readerID)) } - // TODO: return error if reader group already stopped - g.readerMap[readerID] = reader err := g.executionManager.RegisterHistoryTaskReader(context.Background(), &persistence.RegisterHistoryTaskReaderRequest{ ShardID: g.shardID, @@ -148,13 +186,12 @@ func (g *ReaderGroup) NewReader(readerID int32, slices ...Slice) Reader { ReaderID: readerID, }) if err != nil { - // TODO: bubble up the error when registring task reader - panic(fmt.Sprintf("Unable to register history task reader: %v", err)) + return nil, fmt.Errorf("unable to register history task reader: %w", err) } if g.isStarted() { reader.Start() } - return reader + return reader, nil } func (g *ReaderGroup) RemoveReader(readerID int32) { @@ -165,12 +202,16 @@ func (g *ReaderGroup) RemoveReader(readerID int32) { } func (g *ReaderGroup) removeReaderLocked(readerID int32) { + if g.readerMap == nil { + return + } + reader, ok := g.readerMap[readerID] if !ok { return } - // TODO: reader.Stop() does not guarantee reader won't issue new read requests + // NOTE: reader.Stop() does not guarantee reader won't issue new read requests // But it's very unlikely as it waits for 1min. // If UnregisterHistoryTaskReader requires no more read requests after the call // we need to wait for the separate reader goroutine to complete. diff --git a/service/history/queues/reader_group_test.go b/service/history/queues/reader_group_test.go index b7a27bc0583..1a1191cb4a6 100644 --- a/service/history/queues/reader_group_test.go +++ b/service/history/queues/reader_group_test.go @@ -91,7 +91,8 @@ func (s *readerGroupSuite) TearDownTest() { func (s *readerGroupSuite) TestStartStop() { readerID := int32(DefaultReaderId) s.setupRegisterReaderMock(readerID) - r := s.readerGroup.NewReader(readerID) + r, err := s.readerGroup.NewReader(readerID) + s.NoError(err) s.Equal(common.DaemonStatusInitialized, r.(*testReader).status) s.readerGroup.Start() @@ -99,7 +100,8 @@ func (s *readerGroupSuite) TestStartStop() { readerID = int32(DefaultReaderId + 1) s.setupRegisterReaderMock(readerID) - r = s.readerGroup.NewReader(readerID) + r, err = s.readerGroup.NewReader(readerID) + s.NoError(err) s.Equal(common.DaemonStatusStarted, r.(*testReader).status) var readers []*testReader @@ -114,9 +116,9 @@ func (s *readerGroupSuite) TestStartStop() { } readerID = int32(DefaultReaderId + 2) - s.setupRegisterReaderMock(readerID) - r = s.readerGroup.NewReader(readerID) - s.Equal(common.DaemonStatusInitialized, r.(*testReader).status) + r, err = s.readerGroup.NewReader(readerID) + s.Nil(r) + s.Equal(errReaderGroupStopped, err) } func (s *readerGroupSuite) TestAddGetReader() { @@ -128,7 +130,8 @@ func (s *readerGroupSuite) TestAddGetReader() { for i := int32(0); i < 3; i++ { s.setupRegisterReaderMock(i) - r = s.readerGroup.NewReader(i) + r, err := s.readerGroup.NewReader(i) + s.NoError(err) readers := s.readerGroup.Readers() s.Len(readers, int(i)+1) @@ -140,7 +143,7 @@ func (s *readerGroupSuite) TestAddGetReader() { } s.Panics(func() { - s.readerGroup.NewReader(DefaultReaderId) + _, _ = s.readerGroup.NewReader(DefaultReaderId) }) } @@ -151,7 +154,8 @@ func (s *readerGroupSuite) TestRemoveReader() { readerID := int32(DefaultReaderId) s.setupRegisterReaderMock(readerID) - r := s.readerGroup.NewReader(readerID) + r, err := s.readerGroup.NewReader(readerID) + s.NoError(err) s.setupUnRegisterReaderMock(readerID) s.readerGroup.RemoveReader(readerID) @@ -164,7 +168,8 @@ func (s *readerGroupSuite) TestForEach() { readerIDs := []int32{1, 2, 3} for _, readerID := range readerIDs { s.setupRegisterReaderMock(readerID) - s.readerGroup.NewReader(readerID) + _, err := s.readerGroup.NewReader(readerID) + s.NoError(err) } forEachResult := make(map[int32]Reader) diff --git a/service/history/queues/reader_test.go b/service/history/queues/reader_test.go index 6e31ccb2750..df828d39f25 100644 --- a/service/history/queues/reader_test.go +++ b/service/history/queues/reader_test.go @@ -79,7 +79,7 @@ func (s *readerSuite) SetupTest() { s.executableInitializer = func(readerID int32, t tasks.Task) Executable { return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler) } - s.monitor = newMonitor(tasks.CategoryTypeScheduled, &MonitorOptions{ + s.monitor = newMonitor(tasks.CategoryTypeScheduled, clock.NewRealTimeSource(), &MonitorOptions{ PendingTasksCriticalCount: dynamicconfig.GetIntPropertyFn(1000), ReaderStuckCriticalAttempts: dynamicconfig.GetIntPropertyFn(5), SliceCountCriticalThreshold: dynamicconfig.GetIntPropertyFn(50), diff --git a/service/history/queues/slice_test.go b/service/history/queues/slice_test.go index d7dc34fa035..1ac3a1a3cdb 100644 --- a/service/history/queues/slice_test.go +++ b/service/history/queues/slice_test.go @@ -71,7 +71,7 @@ func (s *sliceSuite) SetupTest() { s.executableInitializer = func(readerID int32, t tasks.Task) Executable { return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler) } - s.monitor = newMonitor(tasks.CategoryTypeScheduled, &MonitorOptions{ + s.monitor = newMonitor(tasks.CategoryTypeScheduled, clock.NewRealTimeSource(), &MonitorOptions{ PendingTasksCriticalCount: dynamicconfig.GetIntPropertyFn(1000), ReaderStuckCriticalAttempts: dynamicconfig.GetIntPropertyFn(5), SliceCountCriticalThreshold: dynamicconfig.GetIntPropertyFn(50), diff --git a/service/history/replication/ack_manager_test.go b/service/history/replication/ack_manager_test.go index fa97608aee1..9dd192245f1 100644 --- a/service/history/replication/ack_manager_test.go +++ b/service/history/replication/ack_manager_test.go @@ -111,7 +111,7 @@ func (s *ackManagerSuite) SetupTest() { ShardOwner: s.mockShard.GetOwner(), TaskCategory: tasks.CategoryReplication, ReaderID: common.DefaultQueueReaderID, - }).MaxTimes(1) + }).Return(nil).MaxTimes(1) s.mockClusterMetadata = s.mockShard.Resource.ClusterMetadata s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()