Skip to content

Commit

Permalink
refactor existing domain API for cross DC, refactor existing domain p… (
Browse files Browse the repository at this point in the history
#527)

* refactor existing domain API for cross DC, refactor existing domain persistence layer, refactor existing domain cache
  • Loading branch information
wxing1292 authored Feb 1, 2018
1 parent 051f9de commit 1a9baeb
Show file tree
Hide file tree
Showing 27 changed files with 1,675 additions and 283 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

538 changes: 518 additions & 20 deletions .gen/go/shared/types.go

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/service/frontend"
Expand Down Expand Up @@ -100,10 +101,15 @@ func (s *server) startService() common.Daemon {
}

svcCfg := s.cfg.Services[s.name]

params.MetricScope = svcCfg.Metrics.NewScope()
params.RPCFactory = svcCfg.RPC.NewFactory(params.Name, params.Logger)
params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger)
params.ClusterMetadata = cluster.NewMetadata(
s.cfg.ClustersInfo.InitialFailoverVersion,
s.cfg.ClustersInfo.FailoverVersionIncrement,
s.cfg.ClustersInfo.CurrentClusterName,
s.cfg.ClustersInfo.ClusterNames,
)

var daemon common.Daemon

Expand Down
74 changes: 41 additions & 33 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
domainCacheInitialSize = 1024
domainCacheMaxSize = 16 * 1024
domainCacheTTL = time.Hour
domainEntryRefreshInterval = int64(10 * time.Second)
domainEntryRefreshInterval = 10 * time.Second
)

type (
Expand All @@ -44,8 +44,8 @@ type (
// in updating the domain entry every 10 seconds but in the case of a cassandra failure we can still keep on serving
// requests using the stale entry from cache upto an hour
DomainCache interface {
GetDomain(name string) (*persistence.DomainInfo, *persistence.DomainConfig, error)
GetDomainByID(id string) (*persistence.DomainInfo, *persistence.DomainConfig, error)
GetDomain(name string) (*domainCacheEntry, error)
GetDomainByID(id string) (*domainCacheEntry, error)
GetDomainID(name string) (string, error)
}

Expand All @@ -58,9 +58,10 @@ type (
}

domainCacheEntry struct {
info *persistence.DomainInfo
config *persistence.DomainConfig
expiry int64
Info *persistence.DomainInfo
Config *persistence.DomainConfig
ReplicationConfig *persistence.DomainReplicationConfig
expiry time.Time
sync.RWMutex
}
)
Expand All @@ -86,50 +87,44 @@ func newDomainCacheEntry() *domainCacheEntry {

// GetDomain retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
// store and writes it to the cache with an expiry before returning back
func (c *domainCache) GetDomain(name string) (*persistence.DomainInfo, *persistence.DomainConfig, error) {
func (c *domainCache) GetDomain(name string) (*domainCacheEntry, error) {
return c.getDomain(name, "", name, c.cacheByName)
}

// GetDomainByID retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
// store and writes it to the cache with an expiry before returning back
func (c *domainCache) GetDomainByID(id string) (*persistence.DomainInfo, *persistence.DomainConfig, error) {
func (c *domainCache) GetDomainByID(id string) (*domainCacheEntry, error) {
return c.getDomain(id, id, "", c.cacheByID)
}

// GetDomainID retrieves domainID by using GetDomain
func (c *domainCache) GetDomainID(name string) (string, error) {
info, _, err := c.GetDomain(name)
entry, err := c.GetDomain(name)
if err != nil {
return "", err
}
return info.ID, nil
return entry.Info.ID, nil
}

// GetDomain retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
// store and writes it to the cache with an expiry before returning back
func (c *domainCache) getDomain(key, id, name string, cache Cache) (*persistence.DomainInfo, *persistence.DomainConfig, error) {
now := c.timeSource.Now().UnixNano()
refreshCache := false
var info *persistence.DomainInfo
var config *persistence.DomainConfig
func (c *domainCache) getDomain(key, id, name string, cache Cache) (*domainCacheEntry, error) {
now := c.timeSource.Now()
var result *domainCacheEntry

entry, cacheHit := cache.Get(key).(*domainCacheEntry)
if cacheHit {
// Found the information in the cache, lets check if it needs to be refreshed before returning back
entry.RLock()
info = entry.info
config = entry.config

if entry.expiry == 0 || now >= entry.expiry {
refreshCache = true
if !entry.isExpired(now) {
result = entry.duplicate()
entry.RUnlock()
return result, nil
}
// cache expired, need to refresh
entry.RUnlock()
}

// Found a cache entry and no need to refresh. Return immediately
if cacheHit && !refreshCache {
return info, config, nil
}

// Cache entry not found, Let's create an entry and add it to cache
if !cacheHit {
elem, _ := cache.PutIfNotExist(key, newDomainCacheEntry())
Expand All @@ -141,25 +136,38 @@ func (c *domainCache) getDomain(key, id, name string, cache Cache) (*persistence
defer entry.Unlock()

// Check again under the lock to make sure someone else did not update the entry
if entry.expiry == 0 || now >= entry.expiry {
if entry.isExpired(now) {
response, err := c.metadataMgr.GetDomain(&persistence.GetDomainRequest{
Name: name,
ID: id,
})

// Failed to get domain. Return stale entry if we have one, otherwise just return error
if err != nil {
if entry.expiry > 0 {
return entry.info, entry.config, nil
if !entry.expiry.IsZero() {
return entry, nil
}

return nil, nil, err
return nil, err
}

entry.info = response.Info
entry.config = response.Config
entry.expiry = now + domainEntryRefreshInterval
entry.Info = response.Info
entry.Config = response.Config
entry.ReplicationConfig = response.ReplicationConfig
entry.expiry = now.Add(domainEntryRefreshInterval)
}

return entry.info, entry.config, nil
return entry.duplicate(), nil
}

func (entry *domainCacheEntry) duplicate() *domainCacheEntry {
result := newDomainCacheEntry()
result.Info = entry.Info
result.Config = entry.Config
result.ReplicationConfig = entry.ReplicationConfig
return result
}

func (entry *domainCacheEntry) isExpired(now time.Time) bool {
return entry.expiry.IsZero() || now.After(entry.expiry)
}
25 changes: 15 additions & 10 deletions common/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,19 +243,24 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
elt := c.byKey[key]
if elt != nil {
entry := elt.Value.(*entryImpl)
existing := entry.value
if allowUpdate {
entry.value = value
if c.ttl != 0 {
entry.createTime = time.Now()
if c.isEntryExpired(entry, time.Now()) {
// Entry has expired
c.deleteInternal(elt)
} else {
existing := entry.value
if allowUpdate {
entry.value = value
if c.ttl != 0 {
entry.createTime = time.Now()
}
}
}

c.byAccess.MoveToFront(elt)
if c.pin {
entry.refCount++
c.byAccess.MoveToFront(elt)
if c.pin {
entry.refCount++
}
return existing, nil
}
return existing, nil
}

entry := &entryImpl{
Expand Down
95 changes: 95 additions & 0 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package cluster

type (
// Metadata provides information about clusters
Metadata interface {
GetNextFailoverVersion(int64) int64
// GetCurrentClusterName return the current cluster name
GetCurrentClusterName() string
// GetAllClusterNames return the all cluster names, as a set
GetAllClusterNames() map[string]bool
}

MetadataImpl struct {
// initialFailoverVersion is the initial failover version
initialFailoverVersion int64
// failoverVersionIncrement is the increment of each cluster failover version
failoverVersionIncrement int64
// currentClusterName is the name of the current cluster
currentClusterName string
// clusterNames contains all cluster names, as a set
clusterNames map[string]bool
}
)

// NewMetadata create a new instance of Metadata
func NewMetadata(initialFailoverVersion int64, failoverVersionIncrement int64,
currentClusterName string, clusterNames []string) Metadata {

if initialFailoverVersion < 0 {
panic("Bad initial failover version")
} else if failoverVersionIncrement <= initialFailoverVersion {
panic("Bad failover version increment")
} else if len(currentClusterName) == 0 {
panic("Current cluster name is empty")
} else if len(clusterNames) == 0 {
panic("Total number of all cluster names is 0")
}

clusters := make(map[string]bool)
for _, clusterName := range clusterNames {
if len(clusterName) == 0 {
panic("Cluster name in all cluster names is empty")
}
clusters[clusterName] = true
}
if _, ok := clusters[currentClusterName]; !ok {
panic("Current cluster is not specified in all cluster names")
}

return &MetadataImpl{
initialFailoverVersion: initialFailoverVersion,
failoverVersionIncrement: failoverVersionIncrement,
currentClusterName: currentClusterName,
clusterNames: clusters,
}
}

// GetNextFailoverVersion return the next failover version based on input
func (metadata *MetadataImpl) GetNextFailoverVersion(currentFailoverVersion int64) int64 {
failoverVersion := currentFailoverVersion/metadata.failoverVersionIncrement + metadata.initialFailoverVersion
if failoverVersion < currentFailoverVersion {
return failoverVersion + metadata.failoverVersionIncrement
}
return failoverVersion
}

// GetCurrentClusterName return the current cluster name
func (metadata *MetadataImpl) GetCurrentClusterName() string {
return metadata.currentClusterName
}

// GetAllClusterNames return the all cluster names
func (metadata *MetadataImpl) GetAllClusterNames() map[string]bool {
return metadata.clusterNames
}
Loading

0 comments on commit 1a9baeb

Please sign in to comment.