Skip to content

Commit

Permalink
Add Archiver interface (#2106)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jun 25, 2019
1 parent d2b0df7 commit 992c745
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 13 deletions.
83 changes: 83 additions & 0 deletions common/archiver/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package archiver

import (
"context"

"github.com/uber/cadence/.gen/go/shared"
)

type (
// ArchiveHistoryRequest is request to Archive
ArchiveHistoryRequest struct {
ShardID int
DomainID string
DomainName string
WorkflowID string
RunID string
EventStoreVersion int32
BranchToken []byte
NextEventID int64
CloseFailoverVersion int64
}

// GetHistoryRequest is the request to Get archived history
GetHistoryRequest struct {
DomainID string
WorkflowID string
RunID string
CloseFailoverVersion *int64
NextPageToken []byte
PageSize int
}

// GetHistoryResponse is the response of Get archived history
GetHistoryResponse struct {
HistoryBatches []*shared.History
NextPageToken []byte
}

// HistoryArchiver is used to archive history and read archived history
HistoryArchiver interface {
Archive(ctx context.Context, URI string, request ArchiveHistoryRequest, opts ...ArchiveOption) error
Get(ctx context.Context, URI string, request GetHistoryRequest) (GetHistoryResponse, error)
ValidateURI(URI string) bool
}

// ycyang TODO: implement visibility archiver

// ArchiveVisibilityRequest is request to Archive
ArchiveVisibilityRequest struct{}

// GetVisibilityRequest is the request to Get archived visibility records
GetVisibilityRequest struct{}

// GetVisibilityResponse is the response of Get archived visibility records
GetVisibilityResponse struct{}

// VisibilityArchiver is used to archive visibility and read archived visibility
VisibilityArchiver interface {
Archive(ctx context.Context, URI string, request ArchiveVisibilityRequest, opts ...ArchiveOption) error
Get(ctx context.Context, URI string, request GetVisibilityRequest) (GetVisibilityResponse, error)
ValidateURI(URI string) bool
}
)
78 changes: 78 additions & 0 deletions common/archiver/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package archiver

import (
"context"
"errors"

"go.uber.org/cadence/activity"
)

type (
// ArchiveOption is used to provide options for addding features to
// the Archive method of History/Visibility Archiver
ArchiveOption func(featureCatalog *ArchiveFeatureCatalog)

// ArchiveFeatureCatalog is a collection features for the Archive method of
// History/Visibility Archiver
ArchiveFeatureCatalog struct {
ProgressManager ProgressManager
}

// ProgressManager is used to record and load archive progress
ProgressManager interface {
RecordProgress(ctx context.Context, progress interface{}) error
LoadProgress(ctx context.Context, valuePtr interface{}) error
}
)

// GetFeatureCatalog applies all the ArchiveOptions to the catalog and returns the catalog.
// It should be called inside the Archive method.
func GetFeatureCatalog(opts ...ArchiveOption) *ArchiveFeatureCatalog {
catalog := &ArchiveFeatureCatalog{}
for _, opt := range opts {
opt(catalog)
}
return catalog
}

// GetHeartbeatArchiveOption returns an ArchiveOption for enabling heartbeating.
// It should be used when the Archive method is invoked inside an activity.
func GetHeartbeatArchiveOption() ArchiveOption {
return func(catalog *ArchiveFeatureCatalog) {
catalog.ProgressManager = &heartbeatProgessManager{}
}
}

type heartbeatProgessManager struct{}

func (h *heartbeatProgessManager) RecordProgress(ctx context.Context, progress interface{}) error {
activity.RecordHeartbeat(ctx, progress)
return nil
}

func (h *heartbeatProgessManager) LoadProgress(ctx context.Context, valuePtr interface{}) error {
if !activity.HasHeartbeatDetails(ctx) {
return errors.New("no progess information in the context")
}
return activity.GetHeartbeatDetails(ctx, valuePtr)
}
4 changes: 3 additions & 1 deletion common/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
"github.com/uber/cadence/common/clock"

"github.com/uber-go/tally"

"github.com/uber/cadence/client"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/blobstore"
"github.com/uber/cadence/common/cluster"
es "github.com/uber/cadence/common/elasticsearch"
Expand Down Expand Up @@ -78,6 +78,8 @@ type (
BlobstoreClient blobstore.Client
DCRedirectionPolicy config.DCRedirectionPolicy
PublicClient workflowserviceclient.Interface
HistoryArchivers map[string]archiver.HistoryArchiver
VisibilityArchivers map[string]archiver.VisibilityArchiver
}

// MembershipMonitorFactory provides a bootstrapped membership monitor
Expand Down
4 changes: 2 additions & 2 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (c *cadenceImpl) startFrontend(hosts map[string][]string, startWG *sync.Wai
frontendConfig := frontend.NewConfig(dc, c.historyConfig.NumHistoryShards, c.workerConfig.EnableIndexer)
c.frontendHandler = frontend.NewWorkflowHandler(
c.frontEndService, frontendConfig, c.metadataMgr, c.historyMgr, c.historyV2Mgr,
c.visibilityMgr, kafkaProducer, params.BlobstoreClient)
c.visibilityMgr, kafkaProducer, params.BlobstoreClient, nil, nil)
dcRedirectionHandler := frontend.NewDCRedirectionHandler(c.frontendHandler, params.DCRedirectionPolicy)
dcRedirectionHandler.RegisterHandler()

Expand Down Expand Up @@ -481,7 +481,7 @@ func (c *cadenceImpl) startHistory(hosts map[string][]string, startWG *sync.Wait
historyConfig.HistoryCountLimitError = dynamicconfig.GetIntPropertyFilteredByDomain(hConfig.HistoryCountLimitError)
}
handler := history.NewHandler(service, historyConfig, c.shardMgr, c.metadataMgr,
c.visibilityMgr, c.historyMgr, c.historyV2Mgr, c.executionMgrFactory, params.PublicClient)
c.visibilityMgr, c.historyMgr, c.historyV2Mgr, c.executionMgrFactory, params.PublicClient, nil, nil)
handler.RegisterHandler()

service.Start()
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/dcRedirectionHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s *dcRedirectionHandlerSuite) SetupTest() {
s.mockClientBean.On("GetRemoteFrontendClient", s.alternativeClusterName).Return(s.mockRemoteFrontendClient)
s.service = service.NewTestService(s.mockClusterMetadata, nil, metricsClient, s.mockClientBean)

frontendHandler := NewWorkflowHandler(s.service, s.config, s.mockMetadataMgr, nil, nil, nil, nil, nil)
frontendHandler := NewWorkflowHandler(s.service, s.config, s.mockMetadataMgr, nil, nil, nil, nil, nil, nil, nil)
frontendHandler.metricsClient = metricsClient
frontendHandler.startWG.Done()

Expand Down
2 changes: 1 addition & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (s *Service) Start() {
}

metricsBlobstore := blobstore.NewMetricClient(params.BlobstoreClient, base.GetMetricsClient())
wfHandler := NewWorkflowHandler(base, s.config, metadata, history, historyV2, visibility, kafkaProducer, metricsBlobstore)
wfHandler := NewWorkflowHandler(base, s.config, metadata, history, historyV2, visibility, kafkaProducer, metricsBlobstore, params.HistoryArchivers, params.VisibilityArchivers)
dcRedirectionHandler := NewDCRedirectionHandler(wfHandler, params.DCRedirectionPolicy)
dcRedirectionHandler.RegisterHandler()

Expand Down
8 changes: 7 additions & 1 deletion service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common"
carchiver "github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/blobstore"
"github.com/uber/cadence/common/cache"
Expand Down Expand Up @@ -78,6 +79,8 @@ type (
domainHandler *domainHandlerImpl
visibilityQueryValidator *common.VisibilityQueryValidator
historyBlobDownloader archiver.HistoryBlobDownloader
historyArchivers map[string]carchiver.HistoryArchiver
visibilityArchivers map[string]carchiver.VisibilityArchiver
service.Service
}

Expand Down Expand Up @@ -149,7 +152,8 @@ var (
func NewWorkflowHandler(sVice service.Service, config *Config, metadataMgr persistence.MetadataManager,
historyMgr persistence.HistoryManager, historyV2Mgr persistence.HistoryV2Manager,
visibilityMgr persistence.VisibilityManager, kafkaProducer messaging.Producer,
blobstoreClient blobstore.Client) *WorkflowHandler {
blobstoreClient blobstore.Client, historyArchivers map[string]carchiver.HistoryArchiver,
visibilityArchivers map[string]carchiver.VisibilityArchiver) *WorkflowHandler {
handler := &WorkflowHandler{
Service: sVice,
config: config,
Expand All @@ -173,6 +177,8 @@ func NewWorkflowHandler(sVice service.Service, config *Config, metadataMgr persi
),
visibilityQueryValidator: common.NewQueryValidator(config.ValidSearchAttributes),
historyBlobDownloader: archiver.NewHistoryBlobDownloader(blobstoreClient),
historyArchivers: historyArchivers,
visibilityArchivers: visibilityArchivers,
}
// prevent us from trying to serve requests before handler's Start() is complete
handler.startWG.Add(1)
Expand Down
4 changes: 2 additions & 2 deletions service/frontend/workflowHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (s *workflowHandlerSuite) TearDownTest() {

func (s *workflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandler {
return NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr,
s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient)
s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient, nil, nil)
}

func (s *workflowHandlerSuite) getWorkflowHandlerHelper() *WorkflowHandler {
Expand Down Expand Up @@ -472,7 +472,7 @@ func (s *workflowHandlerSuite) getWorkflowHandlerWithParams(mService cs.Service,
mMetadataManager persistence.MetadataManager, blobStore *mocks.BlobstoreClient) *WorkflowHandler {
s.mockBlobstoreClient = blobStore
return NewWorkflowHandler(mService, config, mMetadataManager, s.mockHistoryMgr, s.mockHistoryV2Mgr,
s.mockVisibilityMgr, s.mockProducer, blobStore)
s.mockVisibilityMgr, s.mockProducer, blobStore, nil, nil)
}

func (s *workflowHandlerSuite) TestRegisterDomain_Failure_BucketNotExists() {
Expand Down
10 changes: 8 additions & 2 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
hc "github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
Expand Down Expand Up @@ -70,6 +71,8 @@ type (
historyEventNotifier historyEventNotifier
publisher messaging.Producer
rateLimiter tokenbucket.TokenBucket
historyArchivers map[string]archiver.HistoryArchiver
visibilityArchivers map[string]archiver.VisibilityArchiver
service.Service
}
)
Expand All @@ -93,7 +96,8 @@ var (
func NewHandler(sVice service.Service, config *Config, shardManager persistence.ShardManager,
metadataMgr persistence.MetadataManager, visibilityMgr persistence.VisibilityManager,
historyMgr persistence.HistoryManager, historyV2Mgr persistence.HistoryV2Manager,
executionMgrFactory persistence.ExecutionManagerFactory, publicClient workflowserviceclient.Interface) *Handler {
executionMgrFactory persistence.ExecutionManagerFactory, publicClient workflowserviceclient.Interface,
historyArchivers map[string]archiver.HistoryArchiver, visibilityArchivers map[string]archiver.VisibilityArchiver) *Handler {
handler := &Handler{
Service: sVice,
config: config,
Expand All @@ -106,6 +110,8 @@ func NewHandler(sVice service.Service, config *Config, shardManager persistence.
tokenSerializer: common.NewJSONTaskTokenSerializer(),
rateLimiter: tokenbucket.NewDynamicTokenBucket(config.RPS, clock.NewRealTimeSource()),
publicClient: publicClient,
historyArchivers: historyArchivers,
visibilityArchivers: visibilityArchivers,
}

// prevent us from trying to serve requests before shard controller is started and ready
Expand Down Expand Up @@ -182,7 +188,7 @@ func (h *Handler) Stop() {
// CreateEngine is implementation for HistoryEngineFactory used for creating the engine instance for shard
func (h *Handler) CreateEngine(context ShardContext) Engine {
return NewEngineWithShardContext(context, h.visibilityMgr, h.matchingServiceClient, h.historyServiceClient,
h.publicClient, h.historyEventNotifier, h.publisher, h.config)
h.publicClient, h.historyEventNotifier, h.publisher, h.config, h.historyArchivers, h.visibilityArchivers)
}

// Health is for health check
Expand Down
7 changes: 7 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
hc "github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common"
carchiver "github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
Expand Down Expand Up @@ -80,6 +81,8 @@ type (
config *Config
archivalClient archiver.Client
resetor workflowResetor
historyArchivers map[string]carchiver.HistoryArchiver
visibilityArchivers map[string]carchiver.VisibilityArchiver
}

// shardContextWrapper wraps ShardContext to notify transferQueueProcessor on new tasks.
Expand Down Expand Up @@ -147,6 +150,8 @@ func NewEngineWithShardContext(
historyEventNotifier historyEventNotifier,
publisher messaging.Producer,
config *Config,
historyArchivers map[string]carchiver.HistoryArchiver,
visibilityArchivers map[string]carchiver.VisibilityArchiver,
) Engine {
currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()
shardWrapper := &shardContextWrapper{
Expand Down Expand Up @@ -176,6 +181,8 @@ func NewEngineWithShardContext(
historyEventNotifier: historyEventNotifier,
config: config,
archivalClient: archiver.NewClient(shard.GetMetricsClient(), shard.GetLogger(), publicClient, shard.GetConfig().NumArchiveSystemWorkflows, shard.GetConfig().ArchiveRequestRPS),
historyArchivers: historyArchivers,
visibilityArchivers: visibilityArchivers,
}

txProcessor := newTransferQueueProcessor(shard, historyEngImpl, visibilityMgr, matching, historyClient, logger)
Expand Down
2 changes: 1 addition & 1 deletion service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (s *Service) Start() {
log.Fatal("Creating historyV2 manager persistence failed", tag.Error(err))
}

handler := NewHandler(base, s.config, shardMgr, metadata, visibility, history, historyV2, pFactory, params.PublicClient)
handler := NewHandler(base, s.config, shardMgr, metadata, visibility, history, historyV2, pFactory, params.PublicClient, params.HistoryArchivers, params.VisibilityArchivers)
handler.RegisterHandler()

// must start base service first
Expand Down
2 changes: 2 additions & 0 deletions service/worker/archiver/client_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/uber/cadence/common"
carchiver "github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/blobstore"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/cluster"
Expand Down Expand Up @@ -62,6 +63,7 @@ type (
Blobstore blobstore.Client
DomainCache cache.DomainCache
Config *Config
HistoryArchivers map[string]carchiver.HistoryArchiver

// the following are only set in testing code
HistoryBlobReader HistoryBlobReader
Expand Down
Loading

0 comments on commit 992c745

Please sign in to comment.