Skip to content

Commit

Permalink
make domain cache duplication deep copy, add more test
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenquan Xing committed May 25, 2018
1 parent ba13d2b commit 61bd092
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 11 deletions.
28 changes: 23 additions & 5 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ func (c *domainCache) refreshLoop() {
// the domains in the v1 table will be refreshed if cache is stale
func (c *domainCache) refreshDomains() error {
// first load the metadata record, then load domains
// this can guarentee that domains in the cache is not stale
// since we are using domainNotificationVersion as indication
// this can guarentee that domains in the cache is not
// more update to date then the metadata record
domainNotificationVersion, err := c.metadataMgr.GetMetadata()
if err != nil {
return err
Expand Down Expand Up @@ -267,7 +267,20 @@ func (c *domainCache) refreshDomains() error {
// since history shard have to update the shard info
// with domain change version.
sort.Sort(domains)
c.RLock()
domainNotificationVersion = c.domainNotificationVersion
c.RUnlock()

UpdateLoop:
for _, domain := range domains {
if domain.NotificationVersion >= domainNotificationVersion {
// this guarantee that domain change events before the
// domainNotificationVersion is loaded into the cache.

// the domain change events after the domainNotificationVersion
// will be loaded into cache in the next refresh
continue UpdateLoop
}
c.updateIDToDomainCache(domain.Info.ID, domain)
c.updateNameToIDCache(domain.Info.Name, domain.Info.ID)
}
Expand Down Expand Up @@ -409,9 +422,14 @@ func (c *domainCache) triggerDomainChangeCallback(prevDomain *DomainCacheEntry,

func (entry *DomainCacheEntry) duplicate() *DomainCacheEntry {
result := newDomainCacheEntry(entry.clusterMetadata)
result.info = entry.info
result.config = entry.config
result.replicationConfig = entry.replicationConfig
result.info = &*entry.info
result.config = &*entry.config
result.replicationConfig = &persistence.DomainReplicationConfig{
ActiveClusterName: entry.replicationConfig.ActiveClusterName,
}
for _, cluster := range entry.replicationConfig.Clusters {
result.replicationConfig.Clusters = append(result.replicationConfig.Clusters, &*cluster)
}
result.configVersion = entry.configVersion
result.failoverVersion = entry.failoverVersion
result.isGlobalDomain = entry.isGlobalDomain
Expand Down
158 changes: 152 additions & 6 deletions common/cache/domainCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (s *domainCacheSuite) TearDownTest() {
}

func (s *domainCacheSuite) TestListDomain() {
domainNotificationVersion := int64(0)
domainRecord1 := &persistence.GetDomainResponse{
Info: &persistence.DomainInfo{ID: uuid.New(), Name: "some random domain name"},
Config: &persistence.DomainConfig{Retention: 1},
Expand All @@ -88,8 +89,11 @@ func (s *domainCacheSuite) TestListDomain() {
&persistence.ClusterReplicationConfig{ClusterName: cluster.TestAlternativeClusterName},
},
},
FailoverNotificationVersion: 0,
NotificationVersion: domainNotificationVersion,
}
entry1 := s.buildEntryFromRecord(domainRecord1)
domainNotificationVersion++

domainRecord2 := &persistence.GetDomainResponse{
Info: &persistence.DomainInfo{ID: uuid.New(), Name: "another random domain name"},
Expand All @@ -101,11 +105,30 @@ func (s *domainCacheSuite) TestListDomain() {
&persistence.ClusterReplicationConfig{ClusterName: cluster.TestAlternativeClusterName},
},
},
FailoverNotificationVersion: 0,
NotificationVersion: domainNotificationVersion,
}
entry2 := s.buildEntryFromRecord(domainRecord2)
domainNotificationVersion++

domainRecord3 := &persistence.GetDomainResponse{
Info: &persistence.DomainInfo{ID: uuid.New(), Name: "yet another random domain name"},
Config: &persistence.DomainConfig{Retention: 3},
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestAlternativeClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
&persistence.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName},
&persistence.ClusterReplicationConfig{ClusterName: cluster.TestAlternativeClusterName},
},
},
FailoverNotificationVersion: 0,
NotificationVersion: domainNotificationVersion,
}
// there is no domainNotificationVersion++ here
// this is to test that if new domain change event happen during the pagination,
// new change will not be loaded to domain cache

pageToken := []byte("some random page token")
domainNotificationVersion := int64(123)

s.metadataMgr.On("GetMetadata").Return(domainNotificationVersion, nil)
s.clusterMetadata.On("IsGlobalDomainEnabled").Return(true)
Expand All @@ -121,12 +144,13 @@ func (s *domainCacheSuite) TestListDomain() {
PageSize: domainCacheRefreshPageSize,
NextPageToken: pageToken,
}).Return(&persistence.ListDomainResponse{
Domains: []*persistence.GetDomainResponse{domainRecord2},
Domains: []*persistence.GetDomainResponse{domainRecord2, domainRecord3},
NextPageToken: nil,
}, nil).Once()

// load domains
s.domainCache.Start()
defer s.domainCache.Stop()
s.Equal(domainNotificationVersion, s.domainCache.GetDomainNotificationVersion())

entryByName1, err := s.domainCache.GetDomain(domainRecord1.Info.Name)
Expand Down Expand Up @@ -199,7 +223,122 @@ func (s *domainCacheSuite) TestGetDomain_NonLoaded_GetByID() {
s.Equal(entry, entryByID)
}

func (s *domainCacheSuite) TestUpdateCache_Trigger() {
func (s *domainCacheSuite) TestUpdateCache_ListTrigger() {
domainNotificationVersion := int64(0)
domainRecord1Old := &persistence.GetDomainResponse{
Info: &persistence.DomainInfo{ID: uuid.New(), Name: "some random domain name"},
Config: &persistence.DomainConfig{Retention: 1},
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
&persistence.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName},
&persistence.ClusterReplicationConfig{ClusterName: cluster.TestAlternativeClusterName},
},
},
ConfigVersion: 10,
FailoverVersion: 11,
FailoverNotificationVersion: 0,
NotificationVersion: domainNotificationVersion,
}
entry1Old := s.buildEntryFromRecord(domainRecord1Old)
domainNotificationVersion++

domainRecord2Old := &persistence.GetDomainResponse{
Info: &persistence.DomainInfo{ID: uuid.New(), Name: "another random domain name"},
Config: &persistence.DomainConfig{Retention: 2},
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestAlternativeClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
&persistence.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName},
&persistence.ClusterReplicationConfig{ClusterName: cluster.TestAlternativeClusterName},
},
},
ConfigVersion: 20,
FailoverVersion: 21,
FailoverNotificationVersion: 0,
NotificationVersion: domainNotificationVersion,
}
entry2Old := s.buildEntryFromRecord(domainRecord2Old)
domainNotificationVersion++

s.metadataMgr.On("GetMetadata").Return(domainNotificationVersion, nil).Once()
s.clusterMetadata.On("IsGlobalDomainEnabled").Return(true)
s.metadataMgr.On("ListDomain", &persistence.ListDomainRequest{
PageSize: domainCacheRefreshPageSize,
NextPageToken: nil,
}).Return(&persistence.ListDomainResponse{
Domains: []*persistence.GetDomainResponse{domainRecord1Old, domainRecord2Old},
NextPageToken: nil,
}, nil).Once()

// load domains
s.Nil(s.domainCache.refreshDomains())
s.Equal(domainNotificationVersion, s.domainCache.GetDomainNotificationVersion())

domainRecord2New := &persistence.GetDomainResponse{
Info: &*domainRecord2Old.Info,
Config: &*domainRecord2Old.Config,
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName, // only this changed
Clusters: []*persistence.ClusterReplicationConfig{
&persistence.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName},
&persistence.ClusterReplicationConfig{ClusterName: cluster.TestAlternativeClusterName},
},
},
ConfigVersion: domainRecord2Old.ConfigVersion,
FailoverVersion: domainRecord2Old.FailoverVersion + 1,
FailoverNotificationVersion: domainNotificationVersion,
NotificationVersion: domainNotificationVersion,
}
entry2New := s.buildEntryFromRecord(domainRecord2New)
domainNotificationVersion++

domainRecord1New := &persistence.GetDomainResponse{ // only the description changed
Info: &persistence.DomainInfo{ID: domainRecord1Old.Info.ID, Name: domainRecord1Old.Info.Name, Description: "updated description"},
Config: &*domainRecord2Old.Config,
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
&persistence.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName},
&persistence.ClusterReplicationConfig{ClusterName: cluster.TestAlternativeClusterName},
},
},
ConfigVersion: domainRecord1Old.ConfigVersion + 1,
FailoverVersion: domainRecord1Old.FailoverVersion,
FailoverNotificationVersion: domainRecord1Old.FailoverNotificationVersion,
NotificationVersion: domainNotificationVersion,
}
entry1New := s.buildEntryFromRecord(domainRecord1New)
domainNotificationVersion++

entriesOld := []*DomainCacheEntry{}
entriesNew := []*DomainCacheEntry{}
s.domainCache.RegisterDomainChangeCallback(0, func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
entriesOld = append(entriesOld, prevDomain)
entriesNew = append(entriesNew, nextDomain)
})
s.Empty(entriesOld)
s.Empty(entriesNew)

s.metadataMgr.On("GetMetadata").Return(domainNotificationVersion, nil).Once()
s.metadataMgr.On("ListDomain", &persistence.ListDomainRequest{
PageSize: domainCacheRefreshPageSize,
NextPageToken: nil,
}).Return(&persistence.ListDomainResponse{
Domains: []*persistence.GetDomainResponse{domainRecord1New, domainRecord2New},
NextPageToken: nil,
}, nil).Once()
s.Nil(s.domainCache.refreshDomains())

// the order matters here: the record 2 got updated first, thus with a lower notification version
// the record 1 got updated later, thus a higher notification version.
// making sure notifying from lower to higher version helps the shard to keep track the
// domain change events
s.Equal([]*DomainCacheEntry{entry2Old, entry1Old}, entriesOld)
s.Equal([]*DomainCacheEntry{entry2New, entry1New}, entriesNew)
}

func (s *domainCacheSuite) TestUpdateCache_GetTrigger() {
s.clusterMetadata.On("IsGlobalDomainEnabled").Return(true)
domainRecordOld := &persistence.GetDomainResponse{
Info: &persistence.DomainInfo{ID: uuid.New(), Name: "some random domain name"},
Expand Down Expand Up @@ -310,11 +449,18 @@ func (s *domainCacheSuite) TestGetUpdateCache_ConcurrentAccess() {

func (s *domainCacheSuite) buildEntryFromRecord(record *persistence.GetDomainResponse) *DomainCacheEntry {
newEntry := newDomainCacheEntry(s.clusterMetadata)
newEntry.info = record.Info
newEntry.config = record.Config
newEntry.replicationConfig = record.ReplicationConfig
newEntry.info = &*record.Info
newEntry.config = &*record.Config
newEntry.replicationConfig = &persistence.DomainReplicationConfig{
ActiveClusterName: record.ReplicationConfig.ActiveClusterName,
}
for _, cluster := range record.ReplicationConfig.Clusters {
newEntry.replicationConfig.Clusters = append(newEntry.replicationConfig.Clusters, &*cluster)
}
newEntry.configVersion = record.ConfigVersion
newEntry.failoverVersion = record.FailoverVersion
newEntry.isGlobalDomain = record.IsGlobalDomain
newEntry.failoverNotificationVersion = record.FailoverNotificationVersion
newEntry.notificationVersion = record.NotificationVersion
return newEntry
}

0 comments on commit 61bd092

Please sign in to comment.