Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabling service-discovery driven shutdown of matching engine #6198

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
58eb90c
WIP commit
davidporter-id-au Jul 30, 2024
a8240d0
fixing some logs
davidporter-id-au Jul 30, 2024
6e9c59b
Add recover
davidporter-id-au Jul 30, 2024
c974255
Merge branch 'master' into bugfix/enabling-shutdown-through-service-d…
davidporter-id-au Aug 5, 2024
6acc273
WIP, snapshotting
davidporter-id-au Aug 6, 2024
4ff1e16
Moving this to engine
davidporter-id-au Aug 6, 2024
9c048eb
debugging
davidporter-id-au Aug 6, 2024
165b111
wip
davidporter-id-au Aug 7, 2024
f3efac6
Merge branch 'master' into bugfix/enabling-shutdown-through-service-d…
davidporter-id-au Aug 7, 2024
b1b77c3
Fix and add a small amount of coverage
davidporter-id-au Aug 7, 2024
bc10553
removing
davidporter-id-au Aug 7, 2024
7d4e592
Update service/matching/handler/engine.go
davidporter-id-au Aug 8, 2024
3236cc8
Update service/matching/handler/engine.go
davidporter-id-au Aug 8, 2024
84098f3
fixing some tests
davidporter-id-au Aug 8, 2024
c21d56a
Fix config
davidporter-id-au Aug 8, 2024
f695c38
Update service/matching/handler/membership.go
davidporter-id-au Aug 8, 2024
4d870b1
fixing test
davidporter-id-au Aug 13, 2024
9adba98
lint
davidporter-id-au Aug 13, 2024
8e22562
Merge branch 'master' into bugfix/enabling-shutdown-through-service-d…
davidporter-id-au Aug 13, 2024
2eeaca5
feedback
davidporter-id-au Aug 13, 2024
40f716d
fixing up
davidporter-id-au Aug 14, 2024
2f74aec
fix copy of sync wg
davidporter-id-au Aug 14, 2024
c36df6a
coverage
davidporter-id-au Aug 14, 2024
15ec5ca
maybe fix data race
davidporter-id-au Aug 14, 2024
25e649b
race debugging
davidporter-id-au Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,13 @@ const (
// Default value: false
// Allowed filters: DomainID
MatchingEnableTaskInfoLogByDomainID
// MatchingEnableTasklistGuardAgainstOwnershipShardLoss
// enables guards to prevent tasklists from processing if there is any detection that the host
// no longer is active or owns the shard
// KeyName: matching.enableTasklistGuardAgainstOwnershipLoss
// Value type: Bool
// Default value: false
MatchingEnableTasklistGuardAgainstOwnershipShardLoss

// key for history

Expand Down Expand Up @@ -4109,6 +4116,11 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "MatchingEnableTaskInfoLogByDomainID is enables info level logs for decision/activity task based on the request domainID",
DefaultValue: false,
},
MatchingEnableTasklistGuardAgainstOwnershipShardLoss: {
KeyName: "matching.enableTasklistGuardAgainstOwnershipLoss",
Description: "allows guards to ensure that tasklists don't continue processing if there's signal that they've lost ownership",
DefaultValue: false,
},
EventsCacheGlobalEnable: {
KeyName: "history.eventsCacheGlobalEnable",
Description: "EventsCacheGlobalEnable is enables global cache over all history shards",
Expand Down
10 changes: 5 additions & 5 deletions common/errors/taskListNotOwnedByHostError.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@ package errors

import "fmt"

var _ error = &TaskListNotOwnnedByHostError{}
var _ error = &TaskListNotOwnedByHostError{}
davidporter-id-au marked this conversation as resolved.
Show resolved Hide resolved

type TaskListNotOwnnedByHostError struct {
type TaskListNotOwnedByHostError struct {
OwnedByIdentity string
MyIdentity string
TasklistName string
}

func (m *TaskListNotOwnnedByHostError) Error() string {
func (m *TaskListNotOwnedByHostError) Error() string {
return fmt.Sprintf("task list is not owned by this host: OwnedBy: %s, Me: %s, Tasklist: %s",
m.OwnedByIdentity, m.MyIdentity, m.TasklistName)
}

func NewTaskListNotOwnnedByHostError(ownedByIdentity string, myIdentity string, tasklistName string) *TaskListNotOwnnedByHostError {
return &TaskListNotOwnnedByHostError{
func NewTaskListNotOwnedByHostError(ownedByIdentity string, myIdentity string, tasklistName string) *TaskListNotOwnedByHostError {
return &TaskListNotOwnedByHostError{
OwnedByIdentity: ownedByIdentity,
MyIdentity: myIdentity,
TasklistName: tasklistName,
Expand Down
6 changes: 6 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,12 @@ func VisibilityQuery(query string) Tag {
return newStringTag("visibility-query", query)
}

// MembershipChangeEvent is a predefined tag for when logging hashring change events,
// expected to be of type membership.ChangeEvent
func MembershipChangeEvent(event interface{}) Tag {
return newPredefinedDynamicTag("membership-change-event", event)
}

// Dynamic Uses reflection based logging for arbitrary values
// for not very performant logging
func Dynamic(key string, v interface{}) Tag {
Expand Down
9 changes: 9 additions & 0 deletions common/membership/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package membership

import (
"fmt"
"sync"
"sync/atomic"

"github.com/uber/cadence/common"
Expand Down Expand Up @@ -84,6 +85,7 @@ type MultiringResolver struct {
status int32

provider PeerProvider
mu sync.Mutex
rings map[string]*ring
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is the first instance of multiple accessors to the rings map, it needs guards

}

Expand All @@ -110,6 +112,7 @@ func NewMultiringResolver(
provider: provider,
rings: make(map[string]*ring),
metrics: metricsClient,
mu: sync.Mutex{},
}

for _, s := range services {
Expand All @@ -130,6 +133,8 @@ func (rpo *MultiringResolver) Start() {

rpo.provider.Start()

rpo.mu.Lock()
defer rpo.mu.Unlock()
for _, ring := range rpo.rings {
ring.Start()
}
Expand All @@ -145,6 +150,8 @@ func (rpo *MultiringResolver) Stop() {
return
}

rpo.mu.Lock()
defer rpo.mu.Unlock()
for _, ring := range rpo.rings {
ring.Stop()
}
Expand All @@ -163,6 +170,8 @@ func (rpo *MultiringResolver) EvictSelf() error {
}

func (rpo *MultiringResolver) getRing(service string) (*ring, error) {
rpo.mu.Lock()
defer rpo.mu.Unlock()
ring, found := rpo.rings[service]
if !found {
return nil, fmt.Errorf("service %q is not tracked by Resolver", service)
Expand Down
3 changes: 3 additions & 0 deletions service/matching/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type (
TaskDispatchRPSTTL time.Duration
// task gc configuration
MaxTimeBetweenTaskDeletes time.Duration

EnableTasklistOwnershipGuard dynamicconfig.BoolPropertyFn
}

ForwarderConfig struct {
Expand Down Expand Up @@ -156,6 +158,7 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config {
EnableTasklistIsolation: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableTasklistIsolation),
AllIsolationGroups: mapIGs(dc.GetListProperty(dynamicconfig.AllIsolationGroups)()),
AsyncTaskDispatchTimeout: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.AsyncTaskDispatchTimeout),
EnableTasklistOwnershipGuard: dc.GetBoolProperty(dynamicconfig.MatchingEnableTasklistGuardAgainstOwnershipShardLoss),
LocalPollWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalPollWaitTime),
HostName: hostName,
TaskDispatchRPS: 100000.0,
Expand Down
1 change: 1 addition & 0 deletions service/matching/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestNewConfig(t *testing.T) {
"TaskDispatchRPS": {nil, 100000.0},
"TaskDispatchRPSTTL": {nil, time.Minute},
"MaxTimeBetweenTaskDeletes": {nil, time.Second},
"EnableTasklistOwnershipGuard": {dynamicconfig.MatchingEnableTasklistGuardAgainstOwnershipShardLoss, false},
}
client := dynamicconfig.NewInMemoryClient()
for fieldName, expected := range fields {
Expand Down
94 changes: 73 additions & 21 deletions service/matching/handler/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type (
}

matchingEngineImpl struct {
shutdownCompletion *sync.WaitGroup
shutdown chan struct{}
taskManager persistence.TaskManager
clusterMetadata cluster.Metadata
historyService history.Client
Expand Down Expand Up @@ -120,7 +122,8 @@ var (
var _ Engine = (*matchingEngineImpl)(nil) // Asserts that interface is indeed implemented

// NewEngine creates an instance of matching engine
func NewEngine(taskManager persistence.TaskManager,
func NewEngine(
taskManager persistence.TaskManager,
clusterMetadata cluster.Metadata,
historyService history.Client,
matchingClient matching.Client,
Expand All @@ -132,7 +135,10 @@ func NewEngine(taskManager persistence.TaskManager,
partitioner partition.Partitioner,
timeSource clock.TimeSource,
) Engine {

e := &matchingEngineImpl{
shutdown: make(chan struct{}),
shutdownCompletion: &sync.WaitGroup{},
taskManager: taskManager,
clusterMetadata: clusterMetadata,
historyService: historyService,
Expand All @@ -149,19 +155,24 @@ func NewEngine(taskManager persistence.TaskManager,
partitioner: partitioner,
timeSource: timeSource,
}

e.shutdownCompletion.Add(1)
go e.subscribeToMembershipChanges()

e.waitForQueryResultFn = e.waitForQueryResult
return e
}

func (e *matchingEngineImpl) Start() {
// As task lists are initialized lazily nothing is done on startup at this point.
}

func (e *matchingEngineImpl) Stop() {
close(e.shutdown)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're not waiting for subscribeToMembershipChanges to complete, possibly goroutine leak?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true: I was too lazy and didn't think to add a full WG setup to engine. What do you think? It does mean that I didn't toggle it on for the leak-detector

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for not leaving goroutines behind. let's wait here until subscribeToMembershipChanges returns (via waitgroup)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I don't love adding a bunch of complexity back to the shutdown process, but fair. Added a waitgroup.

// Executes Stop() on each task list outside of lock
for _, l := range e.getTaskLists(math.MaxInt32) {
l.Stop()
}
e.shutdownCompletion.Wait()
}

func (e *matchingEngineImpl) getTaskLists(maxCount int) []tasklist.Manager {
Expand Down Expand Up @@ -200,26 +211,9 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, t
}
e.taskListsLock.RUnlock()

// Defensive check to make sure we actually own the task list
// If we try to create a task list manager for a task list that is not owned by us, return an error
// The new task list manager will steal the task list from the current owner, which should only happen if
// the task list is owned by the current host.
taskListOwner, err := e.membershipResolver.Lookup(service.Matching, taskList.GetName())
err := e.errIfShardLoss(taskList)
if err != nil {
return nil, fmt.Errorf("failed to lookup task list owner: %w", err)
}

self, err := e.membershipResolver.WhoAmI()
if err != nil {
return nil, fmt.Errorf("failed to lookup self im membership: %w", err)
}

if taskListOwner.Identity() != self.Identity() {
return nil, cadence_errors.NewTaskListNotOwnnedByHostError(
taskListOwner.Identity(),
self.Identity(),
taskList.GetName(),
)
return nil, err
}

// If it gets here, write lock and check again in case a task list is created between the two locks
Expand Down Expand Up @@ -1202,6 +1196,64 @@ func (e *matchingEngineImpl) emitInfoOrDebugLog(
}
}

func (e *matchingEngineImpl) errIfShardLoss(taskList *tasklist.Identifier) error {
if !e.config.EnableTasklistOwnershipGuard() {
return nil
}

self, err := e.membershipResolver.WhoAmI()
if err != nil {
return fmt.Errorf("failed to lookup self im membership: %w", err)
}

if e.isShuttingDown() {
e.logger.Warn("request to get tasklist is being rejected because engine is shutting down",
tag.WorkflowDomainID(taskList.GetDomainID()),
tag.WorkflowTaskListType(taskList.GetType()),
tag.WorkflowTaskListName(taskList.GetName()),
)

return cadence_errors.NewTaskListNotOwnedByHostError(
"not known",
self.Identity(),
taskList.GetName(),
)
}

// Defensive check to make sure we actually own the task list
// If we try to create a task list manager for a task list that is not owned by us, return an error
// The new task list manager will steal the task list from the current owner, which should only happen if
// the task list is owned by the current host.
taskListOwner, err := e.membershipResolver.Lookup(service.Matching, taskList.GetName())
if err != nil {
return fmt.Errorf("failed to lookup task list owner: %w", err)
}

if taskListOwner.Identity() != self.Identity() {
e.logger.Warn("Request to get tasklist is being rejected because engine does not own this shard",
tag.WorkflowDomainID(taskList.GetDomainID()),
tag.WorkflowTaskListType(taskList.GetType()),
tag.WorkflowTaskListName(taskList.GetName()),
)
return cadence_errors.NewTaskListNotOwnedByHostError(
taskListOwner.Identity(),
self.Identity(),
taskList.GetName(),
)
}

return nil
}

func (e *matchingEngineImpl) isShuttingDown() bool {
select {
case <-e.shutdown:
return true
default:
return false
}
}
jakobht marked this conversation as resolved.
Show resolved Hide resolved

func (m *lockableQueryTaskMap) put(key string, value chan *queryResult) {
m.Lock()
defer m.Unlock()
Expand Down
57 changes: 3 additions & 54 deletions service/matching/handler/engine_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (

"github.com/golang/mock/gomock"
"github.com/pborman/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"
"go.uber.org/yarpc"
Expand All @@ -45,7 +44,6 @@ import (
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/dynamicconfig"
cadence_errors "github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/isolationgroup/defaultisolationgroupstate"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand All @@ -55,6 +53,7 @@ import (
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/partition"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/matching/config"
"github.com/uber/cadence/service/matching/tasklist"
Expand Down Expand Up @@ -131,6 +130,7 @@ func (s *matchingEngineSuite) SetupTest() {
s.mockMembershipResolver = membership.NewMockResolver(s.controller)
s.mockMembershipResolver.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return(membership.HostInfo{}, nil).AnyTimes()
s.mockMembershipResolver.EXPECT().WhoAmI().Return(membership.HostInfo{}, nil).AnyTimes()
s.mockMembershipResolver.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).AnyTimes()
s.mockIsolationStore = dynamicconfig.NewMockClient(s.controller)
dcClient := dynamicconfig.NewInMemoryClient()
dcClient.UpdateValue(dynamicconfig.EnableTasklistIsolation, true)
Expand Down Expand Up @@ -1303,58 +1303,6 @@ func (s *matchingEngineSuite) TestConfigDefaultHostName() {
s.EqualValues(configEmpty.HostName, "")
}

func (s *matchingEngineSuite) TestGetTaskListManager_OwnerShip() {
davidporter-id-au marked this conversation as resolved.
Show resolved Hide resolved
testCases := []struct {
name string
lookUpResult string
lookUpErr error
whoAmIResult string
whoAmIErr error

expectedError error
}{
{
name: "Not owned by current host",
lookUpResult: "A",
whoAmIResult: "B",
expectedError: new(cadence_errors.TaskListNotOwnnedByHostError),
},
{
name: "LookupError",
lookUpErr: assert.AnError,
expectedError: assert.AnError,
},
{
name: "WhoAmIError",
whoAmIErr: assert.AnError,
expectedError: assert.AnError,
},
}

for _, tc := range testCases {
s.T().Run(tc.name, func(t *testing.T) {
resolverMock := membership.NewMockResolver(s.controller)
s.matchingEngine.membershipResolver = resolverMock

resolverMock.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return(
membership.NewDetailedHostInfo("", tc.lookUpResult, make(membership.PortMap)), tc.lookUpErr,
).AnyTimes()
resolverMock.EXPECT().WhoAmI().Return(
membership.NewDetailedHostInfo("", tc.whoAmIResult, make(membership.PortMap)), tc.whoAmIErr,
).AnyTimes()

taskListKind := types.TaskListKindNormal

_, err := s.matchingEngine.getTaskListManager(
tasklist.NewTestTaskListID(s.T(), "domain", "tasklist", persistence.TaskListTypeActivity),
&taskListKind,
)

assert.ErrorAs(s.T(), err, &tc.expectedError)
})
}
}

func newActivityTaskScheduledEvent(eventID int64, decisionTaskCompletedEventID int64,
scheduleAttributes *types.ScheduleActivityTaskDecisionAttributes) *types.HistoryEvent {
historyEvent := newHistoryEvent(eventID, types.EventTypeActivityTaskScheduled)
Expand Down Expand Up @@ -1402,6 +1350,7 @@ func defaultTestConfig() *config.Config {
config.GetTasksBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(10)
config.AsyncTaskDispatchTimeout = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)
config.MaxTimeBetweenTaskDeletes = time.Duration(0)
config.EnableTasklistOwnershipGuard = func(opts ...dynamicconfig.FilterOption) bool { return true }
return config
}

Expand Down
Loading
Loading