Skip to content

Commit

Permalink
Merge branch 'master' into domain-mirror
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenquan Xing committed Jun 5, 2018
2 parents 7c6ab95 + 42f44b6 commit 66c401a
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
17 changes: 14 additions & 3 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (
gen "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/logging"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
)

Expand All @@ -42,16 +44,18 @@ type (
AdminHandler struct {
numberOfHistoryShards int
service.Service
history history.Client
history history.Client
domainCache cache.DomainCache
}
)

// NewAdminHandler creates a thrift handler for the cadence admin service
func NewAdminHandler(
sVice service.Service, numberOfHistoryShards int) *AdminHandler {
sVice service.Service, numberOfHistoryShards int, metadataMgr persistence.MetadataManager) *AdminHandler {
handler := &AdminHandler{
numberOfHistoryShards: numberOfHistoryShards,
Service: sVice,
domainCache: cache.NewDomainCache(metadataMgr, sVice.GetClusterMetadata(), sVice.GetLogger()),
}
return handler
}
Expand All @@ -60,6 +64,7 @@ func NewAdminHandler(
func (adh *AdminHandler) Start() error {
adh.Service.GetDispatcher().Register(adminserviceserver.New(adh))
adh.Service.Start()
adh.domainCache.Start()
var err error
adh.history, err = adh.Service.GetClientFactory().NewHistoryClient()
if err != nil {
Expand All @@ -70,6 +75,7 @@ func (adh *AdminHandler) Start() error {

// Stop stops the handler
func (adh *AdminHandler) Stop() {
adh.domainCache.Stop()
adh.Service.Stop()
}

Expand All @@ -92,11 +98,16 @@ func (adh *AdminHandler) DescribeWorkflowExecution(ctx context.Context, request
return nil, adh.error(err)
}

domainID, err := adh.domainCache.GetDomainID(request.GetDomain())

historyAddr := historyHost.GetAddress()
resp, err := adh.history.DescribeMutableState(ctx, &hist.DescribeMutableStateRequest{
DomainUUID: request.Domain,
DomainUUID: &domainID,
Execution: request.Execution,
})
if err != nil {
return &admin.DescribeWorkflowExecutionResponse{}, err
}
return &admin.DescribeWorkflowExecutionResponse{
ShardId: common.StringPtr(shardIDForOutput),
HistoryAddr: common.StringPtr(historyAddr),
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (s *Service) Start() {
wfHandler := NewWorkflowHandler(base, s.config, metadata, history, visibility, kafkaProducer)
wfHandler.Start()

adminHandler := NewAdminHandler(base, p.CassandraConfig.NumHistoryShards)
adminHandler := NewAdminHandler(base, p.CassandraConfig.NumHistoryShards, metadata)
adminHandler.Start()

log.Infof("%v started", common.FrontendServiceName)
Expand Down

0 comments on commit 66c401a

Please sign in to comment.