From c9124547f55f11452cbc96f9af4809e77ba0decf Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Wed, 28 Dec 2022 09:33:52 -0800 Subject: [PATCH] Fix errcheck in service/history/shard (#3755) --- service/history/shard/context_impl.go | 26 +++++++++++++++--------- service/history/shard/context_test.go | 9 +++++--- service/history/shard/controller_test.go | 4 +++- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 7e0be653a11..53f2fa7225f 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -35,6 +35,7 @@ import ( commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" + "go.uber.org/multierr" "golang.org/x/exp/maps" "go.temporal.io/server/api/adminservice/v1" @@ -1482,8 +1483,7 @@ func (s *ContextImpl) handleReadError(err error) error { case *persistence.ShardOwnershipLostError: // Shard is stolen, trigger shutdown of history engine. // Handling of max read level doesn't matter here. - s.transition(contextRequestStop{}) - return err + return multierr.Combine(err, s.transition(contextRequestStop{})) default: return err @@ -1517,8 +1517,7 @@ func (s *ContextImpl) handleWriteErrorAndUpdateMaxReadLevelLocked(err error, new case *persistence.ShardOwnershipLostError: // Shard is stolen, trigger shutdown of history engine. // Handling of max read level doesn't matter here. - s.transition(contextRequestStop{}) - return err + return multierr.Combine(err, s.transition(contextRequestStop{})) default: // We have no idea if the write failed or will eventually make it to persistence. Try to re-acquire @@ -1527,8 +1526,7 @@ func (s *ContextImpl) handleWriteErrorAndUpdateMaxReadLevelLocked(err error, new // reliably check the outcome by performing a read. If we fail, we'll shut down the shard. // Note that reacquiring the shard will cause the max read level to be updated // to the new range (i.e. past newMaxReadLevel). - s.transition(contextRequestLost{}) - return err + return multierr.Combine(err, s.transition(contextRequestLost{})) } } @@ -1551,18 +1549,24 @@ func (s *ContextImpl) createEngine() Engine { // start should only be called by the controller. func (s *ContextImpl) start() { - s.transition(contextRequestAcquire{}) + if err := s.transition(contextRequestAcquire{}); err != nil { + s.contextTaggedLogger.Error("Failed to start shard", tag.Error(err)) + } } func (s *ContextImpl) Unload() { - s.transition(contextRequestStop{}) + if err := s.transition(contextRequestStop{}); err != nil { + s.contextTaggedLogger.Error("Failed to unload shard", tag.Error(err)) + } } // finishStop should only be called by the controller. func (s *ContextImpl) finishStop() { // After this returns, engineFuture.Set may not be called anymore, so if we don't get see // an Engine here, we won't ever have one. - s.transition(contextRequestFinishStop{}) + if err := s.transition(contextRequestFinishStop{}); err != nil { + s.contextTaggedLogger.Error("Failed to stop shard", tag.Error(err)) + } // use a context that we know is cancelled so that this doesn't block engine, _ := s.engineFuture.Get(s.lifecycleCtx) @@ -2064,7 +2068,9 @@ func (s *ContextImpl) acquireShard() { // On any error, initiate shutting down the shard. If we already changed state // because we got a ShardOwnershipLostError, this won't do anything. - s.transition(contextRequestStop{}) + if err := s.transition(contextRequestStop{}); err != nil { + s.contextTaggedLogger.Error("Error stopping shard", tag.Error(err)) + } } } diff --git a/service/history/shard/context_test.go b/service/history/shard/context_test.go index dcf360e44d1..a4cf9c04d18 100644 --- a/service/history/shard/context_test.go +++ b/service/history/shard/context_test.go @@ -205,14 +205,17 @@ func (s *contextSuite) TestTimerMaxReadLevelUpdate_SingleProcessor() { s.timeSource.Update(now) // make sure the scheduledTaskMaxReadLevelMap has value for both current cluster and alternative cluster - s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, false) - s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestAlternativeClusterName, false) + _, err := s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, false) + s.NoError(err) + _, err = s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestAlternativeClusterName, false) + s.NoError(err) now = time.Now().Add(time.Minute) s.timeSource.Update(now) // update in single processor mode - s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, true) + _, err = s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, true) + s.NoError(err) scheduledTaskMaxReadLevelMap := s.mockShard.scheduledTaskMaxReadLevelMap s.Len(scheduledTaskMaxReadLevelMap, 2) s.True(scheduledTaskMaxReadLevelMap[cluster.TestCurrentClusterName].After(now)) diff --git a/service/history/shard/controller_test.go b/service/history/shard/controller_test.go index 1aa1e43fff0..555c64cbd5f 100644 --- a/service/history/shard/controller_test.go +++ b/service/history/shard/controller_test.go @@ -790,7 +790,9 @@ func (s *controllerSuite) TestShardControllerFuzz() { shardID := int32(rand.Intn(int(s.config.NumberOfShards))) + 1 switch rand.Intn(5) { case 0: - s.shardController.GetShardByID(shardID) + if _, err := s.shardController.GetShardByID(shardID); err != nil { + return err + } case 1: if shard, err := s.shardController.GetShardByID(shardID); err == nil { _, _ = shard.GetEngine(ctx)