Skip to content

Commit

Permalink
Fix errcheck in service/history/shard (#3755)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden authored Dec 28, 2022
1 parent 5640a98 commit c912454
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 14 deletions.
26 changes: 16 additions & 10 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.uber.org/multierr"
"golang.org/x/exp/maps"

"go.temporal.io/server/api/adminservice/v1"
Expand Down Expand Up @@ -1482,8 +1483,7 @@ func (s *ContextImpl) handleReadError(err error) error {
case *persistence.ShardOwnershipLostError:
// Shard is stolen, trigger shutdown of history engine.
// Handling of max read level doesn't matter here.
s.transition(contextRequestStop{})
return err
return multierr.Combine(err, s.transition(contextRequestStop{}))

default:
return err
Expand Down Expand Up @@ -1517,8 +1517,7 @@ func (s *ContextImpl) handleWriteErrorAndUpdateMaxReadLevelLocked(err error, new
case *persistence.ShardOwnershipLostError:
// Shard is stolen, trigger shutdown of history engine.
// Handling of max read level doesn't matter here.
s.transition(contextRequestStop{})
return err
return multierr.Combine(err, s.transition(contextRequestStop{}))

default:
// We have no idea if the write failed or will eventually make it to persistence. Try to re-acquire
Expand All @@ -1527,8 +1526,7 @@ func (s *ContextImpl) handleWriteErrorAndUpdateMaxReadLevelLocked(err error, new
// reliably check the outcome by performing a read. If we fail, we'll shut down the shard.
// Note that reacquiring the shard will cause the max read level to be updated
// to the new range (i.e. past newMaxReadLevel).
s.transition(contextRequestLost{})
return err
return multierr.Combine(err, s.transition(contextRequestLost{}))
}
}

Expand All @@ -1551,18 +1549,24 @@ func (s *ContextImpl) createEngine() Engine {

// start should only be called by the controller.
func (s *ContextImpl) start() {
s.transition(contextRequestAcquire{})
if err := s.transition(contextRequestAcquire{}); err != nil {
s.contextTaggedLogger.Error("Failed to start shard", tag.Error(err))
}
}

func (s *ContextImpl) Unload() {
s.transition(contextRequestStop{})
if err := s.transition(contextRequestStop{}); err != nil {
s.contextTaggedLogger.Error("Failed to unload shard", tag.Error(err))
}
}

// finishStop should only be called by the controller.
func (s *ContextImpl) finishStop() {
// After this returns, engineFuture.Set may not be called anymore, so if we don't get see
// an Engine here, we won't ever have one.
s.transition(contextRequestFinishStop{})
if err := s.transition(contextRequestFinishStop{}); err != nil {
s.contextTaggedLogger.Error("Failed to stop shard", tag.Error(err))
}

// use a context that we know is cancelled so that this doesn't block
engine, _ := s.engineFuture.Get(s.lifecycleCtx)
Expand Down Expand Up @@ -2064,7 +2068,9 @@ func (s *ContextImpl) acquireShard() {

// On any error, initiate shutting down the shard. If we already changed state
// because we got a ShardOwnershipLostError, this won't do anything.
s.transition(contextRequestStop{})
if err := s.transition(contextRequestStop{}); err != nil {
s.contextTaggedLogger.Error("Error stopping shard", tag.Error(err))
}
}
}

Expand Down
9 changes: 6 additions & 3 deletions service/history/shard/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,17 @@ func (s *contextSuite) TestTimerMaxReadLevelUpdate_SingleProcessor() {
s.timeSource.Update(now)

// make sure the scheduledTaskMaxReadLevelMap has value for both current cluster and alternative cluster
s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, false)
s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestAlternativeClusterName, false)
_, err := s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, false)
s.NoError(err)
_, err = s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestAlternativeClusterName, false)
s.NoError(err)

now = time.Now().Add(time.Minute)
s.timeSource.Update(now)

// update in single processor mode
s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, true)
_, err = s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, true)
s.NoError(err)
scheduledTaskMaxReadLevelMap := s.mockShard.scheduledTaskMaxReadLevelMap
s.Len(scheduledTaskMaxReadLevelMap, 2)
s.True(scheduledTaskMaxReadLevelMap[cluster.TestCurrentClusterName].After(now))
Expand Down
4 changes: 3 additions & 1 deletion service/history/shard/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,9 @@ func (s *controllerSuite) TestShardControllerFuzz() {
shardID := int32(rand.Intn(int(s.config.NumberOfShards))) + 1
switch rand.Intn(5) {
case 0:
s.shardController.GetShardByID(shardID)
if _, err := s.shardController.GetShardByID(shardID); err != nil {
return err
}
case 1:
if shard, err := s.shardController.GetShardByID(shardID); err == nil {
_, _ = shard.GetEngine(ctx)
Expand Down

0 comments on commit c912454

Please sign in to comment.