Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify Coordinator by Refactoring to Locker and PubSub #1136

Merged
merged 3 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cmd/yorkie/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (

authWebhookMaxWaitInterval time.Duration
authWebhookCacheTTL time.Duration
projectInfoCacheTTL time.Duration
projectCacheTTL time.Duration

conf = server.NewConfig()
)
Expand All @@ -65,7 +65,7 @@ func newServerCmd() *cobra.Command {

conf.Backend.AuthWebhookMaxWaitInterval = authWebhookMaxWaitInterval.String()
conf.Backend.AuthWebhookCacheTTL = authWebhookCacheTTL.String()
conf.Backend.ProjectInfoCacheTTL = projectInfoCacheTTL.String()
conf.Backend.ProjectCacheTTL = projectCacheTTL.String()

conf.Housekeeping.Interval = housekeepingInterval.String()

Expand Down Expand Up @@ -320,15 +320,15 @@ func init() {
"TTL value to set when caching authorization webhook response.",
)
cmd.Flags().IntVar(
&conf.Backend.ProjectInfoCacheSize,
&conf.Backend.ProjectCacheSize,
"project-info-cache-size",
server.DefaultProjectInfoCacheSize,
server.DefaultProjectCacheSize,
"The cache size of the project info.",
)
cmd.Flags().DurationVar(
&projectInfoCacheTTL,
&projectCacheTTL,
"project-info-cache-ttl",
server.DefaultProjectInfoCacheTTL,
server.DefaultProjectCacheTTL,
"TTL value to set when caching project info.",
)
cmd.Flags().StringVar(
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ type LRUExpireCache[K comparable, V any] struct {
}

// NewLRUExpireCache creates an expiring cache with the given size
func NewLRUExpireCache[K comparable, V any](maxSize int) (*LRUExpireCache[K, V], error) {
func NewLRUExpireCache[K comparable, V any](maxSize int) *LRUExpireCache[K, V] {
if maxSize <= 0 {
return nil, ErrInvalidMaxSize
panic(ErrInvalidMaxSize)
}

return &LRUExpireCache[K, V]{
maxSize: maxSize,
entries: map[K]*list.Element{},
}, nil
}
}

type cacheEntry[K comparable, V any] struct {
Expand Down
21 changes: 7 additions & 14 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,16 @@ import (

func TestCache(t *testing.T) {
t.Run("create lru expire cache test", func(t *testing.T) {
lruCache, err := cache.NewLRUExpireCache[string, string](1)
assert.NoError(t, err)
lruCache := cache.NewLRUExpireCache[string, string](1)
assert.NotNil(t, lruCache)

lruCache, err = cache.NewLRUExpireCache[string, string](0)
assert.ErrorIs(t, err, cache.ErrInvalidMaxSize)
assert.Nil(t, lruCache)
assert.PanicsWithError(t, cache.ErrInvalidMaxSize.Error(), func() {
cache.NewLRUExpireCache[string, string](0)
})
})

t.Run("add test", func(t *testing.T) {
lruCache, err := cache.NewLRUExpireCache[string, string](1)
assert.NoError(t, err)

lruCache := cache.NewLRUExpireCache[string, string](1)
lruCache.Add("request1", "response1", time.Second)
response1, ok := lruCache.Get("request1")
assert.True(t, ok)
Expand All @@ -41,9 +38,7 @@ func TestCache(t *testing.T) {
})

t.Run("get expired cache test", func(t *testing.T) {
lruCache, err := cache.NewLRUExpireCache[string, string](1)
assert.NoError(t, err)

lruCache := cache.NewLRUExpireCache[string, string](1)
ttl := time.Millisecond
lruCache.Add("request", "response", ttl)

Expand All @@ -54,9 +49,7 @@ func TestCache(t *testing.T) {
})

t.Run("update expired cache test", func(t *testing.T) {
lruCache, err := cache.NewLRUExpireCache[string, string](1)
assert.NoError(t, err)

lruCache := cache.NewLRUExpireCache[string, string](1)
var ttl time.Duration
ttl = time.Minute

Expand Down
24 changes: 6 additions & 18 deletions pkg/webhook/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,9 @@ func TestHMAC(t *testing.T) {
defer testServer.Close()

t.Run("webhook client with valid HMAC key test", func(t *testing.T) {
testCache, err := cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100)
assert.NoError(t, err)

client := webhook.NewClient[testRequest, testResponse](
testServer.URL,
testCache,
cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100),
webhook.Options{
CacheKeyPrefix: "testPrefix-hmac",
CacheTTL: 5 * time.Second,
Expand All @@ -95,12 +92,9 @@ func TestHMAC(t *testing.T) {
})

t.Run("webhook client with invalid HMAC key test", func(t *testing.T) {
testCache, err := cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100)
assert.NoError(t, err)

client := webhook.NewClient[testRequest, testResponse](
testServer.URL,
testCache,
cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100),
webhook.Options{
CacheKeyPrefix: "testPrefix-hmac",
CacheTTL: 5 * time.Second,
Expand All @@ -118,12 +112,9 @@ func TestHMAC(t *testing.T) {
})

t.Run("webhook client without HMAC key test", func(t *testing.T) {
testCache, err := cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100)
assert.NoError(t, err)

client := webhook.NewClient[testRequest, testResponse](
client := webhook.NewClient[testRequest](
testServer.URL,
testCache,
cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100),
webhook.Options{
CacheKeyPrefix: "testPrefix-hmac",
CacheTTL: 5 * time.Second,
Expand All @@ -140,12 +131,9 @@ func TestHMAC(t *testing.T) {
})

t.Run("webhook client with empty body test", func(t *testing.T) {
testCache, err := cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100)
assert.NoError(t, err)

client := webhook.NewClient[testRequest, testResponse](
client := webhook.NewClient[testRequest](
testServer.URL,
testCache,
cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100),
webhook.Options{
CacheKeyPrefix: "testPrefix-hmac",
CacheTTL: 5 * time.Second,
Expand Down
78 changes: 30 additions & 48 deletions server/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ import (
"context"
"fmt"
"os"
"time"

"github.com/rs/xid"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/pkg/cache"
Expand All @@ -35,26 +32,35 @@ import (
memdb "github.com/yorkie-team/yorkie/server/backend/database/memory"
"github.com/yorkie-team/yorkie/server/backend/database/mongo"
"github.com/yorkie-team/yorkie/server/backend/housekeeping"
"github.com/yorkie-team/yorkie/server/backend/pubsub"
"github.com/yorkie-team/yorkie/server/backend/sync"
memsync "github.com/yorkie-team/yorkie/server/backend/sync/memory"
"github.com/yorkie-team/yorkie/server/logging"
"github.com/yorkie-team/yorkie/server/profiling/prometheus"
)

// Backend manages Yorkie's backend such as Database and Coordinator. And it
// has the server status such as the information of this Server.
// Backend manages Yorkie's backend such as Database and Coordinator. It also
// provides in-memory cache, pubsub, and locker.
type Backend struct {
Config *Config
serverInfo *sync.ServerInfo
Config *Config

// AuthWebhookCache is used to cache the response of the auth webhook.
WebhookCache *cache.LRUExpireCache[string, pkgtypes.Pair[
int,
*types.AuthWebhookResponse,
]]

Metrics *prometheus.Metrics
DB database.Database
Coordinator sync.Coordinator
Background *background.Background
// PubSub is used to publish/subscribe events to/from clients.
PubSub *pubsub.PubSub
// Locker is used to lock/unlock resources.
Locker *sync.LockerManager

// Metrics is used to expose metrics.
Metrics *prometheus.Metrics
// DB is the database instance.
DB database.Database

// Background is used to manage background tasks.
Background *background.Background
// Housekeeping is used to manage background batch tasks.
Housekeeping *housekeeping.Housekeeping
}

Expand All @@ -76,27 +82,20 @@ func New(
conf.Hostname = hostname
}

serverInfo := &sync.ServerInfo{
ID: xid.New().String(),
Hostname: hostname,
UpdatedAt: time.Now(),
}

// 02. Create the auth webhook cache. The auth webhook cache is used to
// cache the response of the auth webhook.
webhookCache, err := cache.NewLRUExpireCache[string, pkgtypes.Pair[int, *types.AuthWebhookResponse]](
// 02. Create in-memory cache, pubsub, and locker.
cache := cache.NewLRUExpireCache[string, pkgtypes.Pair[int, *types.AuthWebhookResponse]](
conf.AuthWebhookCacheSize,
)
if err != nil {
return nil, err
}
locker := sync.New()
pubsub := pubsub.New()

// 03. Create the background instance. The background instance is used to
// manage background tasks.
bg := background.New(metrics)

// 04. Create the database instance. If the MongoDB configuration is given,
// create a MongoDB instance. Otherwise, create a memory database instance.
var err error
var db database.Database
if mongoConf != nil {
db, err = mongo.Dial(mongoConf)
Expand All @@ -110,13 +109,6 @@ func New(
}
}

// 05. Create the coordinator instance. The coordinator is used to manage
// the synchronization between the Yorkie servers.
// TODO(hackerwins): Implement the coordinator for a shard. For now, we
// distribute workloads to all shards per document. In the future, we
// will need to distribute workloads of a document.
coordinator := memsync.NewCoordinator(serverInfo)

// 06. Create the housekeeping instance. The housekeeping is used
// to manage keeping tasks such as deactivating inactive clients.
keeping, err := housekeeping.New(housekeepingConf)
Expand Down Expand Up @@ -144,19 +136,18 @@ func New(
}

logging.DefaultLogger().Infof(
"backend created: id: %s, db: %s",
serverInfo.ID,
"backend created: db: %s",
dbInfo,
)

return &Backend{
Config: conf,
serverInfo: serverInfo,
WebhookCache: webhookCache,
WebhookCache: cache,
Locker: locker,
PubSub: pubsub,

Metrics: metrics,
DB: db,
Coordinator: coordinator,
Background: bg,
Housekeeping: keeping,
}, nil
Expand All @@ -168,7 +159,7 @@ func (b *Backend) Start() error {
return err
}

logging.DefaultLogger().Infof("backend started: id: %s", b.serverInfo.ID)
logging.DefaultLogger().Infof("backend started")
return nil
}

Expand All @@ -180,19 +171,10 @@ func (b *Backend) Shutdown() error {

b.Background.Close()

if err := b.Coordinator.Close(); err != nil {
logging.DefaultLogger().Error(err)
}

if err := b.DB.Close(); err != nil {
logging.DefaultLogger().Error(err)
}

logging.DefaultLogger().Infof("backend stopped: id: %s", b.serverInfo.ID)
logging.DefaultLogger().Infof("backend stopped")
return nil
}

// Members returns the members of this cluster.
func (b *Backend) Members() map[string]*sync.ServerInfo {
return b.Coordinator.Members()
}
20 changes: 10 additions & 10 deletions server/backend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ type Config struct {
// AuthWebhookCacheTTL is the TTL value to set when caching the authorized result.
AuthWebhookCacheTTL string `yaml:"AuthWebhookCacheTTL"`

// ProjectInfoCacheSize is the cache size of the project info.
ProjectInfoCacheSize int `yaml:"ProjectInfoCacheSize"`
// ProjectCacheSize is the cache size of the project metadata.
ProjectCacheSize int `yaml:"ProjectCacheSize"`

// ProjectInfoCacheTTL is the TTL value to set when caching the project info.
ProjectInfoCacheTTL string `yaml:"ProjectInfoCacheTTL"`
// ProjectCacheTTL is the TTL value to set when caching the project metadata.
ProjectCacheTTL string `yaml:"ProjectCacheTTL"`

// Hostname is yorkie server hostname. hostname is used by metrics.
Hostname string `yaml:"Hostname"`
Expand Down Expand Up @@ -109,10 +109,10 @@ func (c *Config) Validate() error {
)
}

if _, err := time.ParseDuration(c.ProjectInfoCacheTTL); err != nil {
if _, err := time.ParseDuration(c.ProjectCacheTTL); err != nil {
return fmt.Errorf(
`invalid argument "%s" for "--project-info-cache-ttl" flag: %w`,
c.ProjectInfoCacheTTL,
c.ProjectCacheTTL,
err,
)
}
Expand Down Expand Up @@ -153,11 +153,11 @@ func (c *Config) ParseAuthWebhookCacheTTL() time.Duration {
return result
}

// ParseProjectInfoCacheTTL returns TTL for project info cache.
func (c *Config) ParseProjectInfoCacheTTL() time.Duration {
result, err := time.ParseDuration(c.ProjectInfoCacheTTL)
// ParseProjectCacheTTL returns TTL for project metadata cache.
func (c *Config) ParseProjectCacheTTL() time.Duration {
result, err := time.ParseDuration(c.ProjectCacheTTL)
if err != nil {
fmt.Fprintln(os.Stderr, "parse project info cache ttl: %w", err)
fmt.Fprintln(os.Stderr, "parse project metadata cache ttl: %w", err)
os.Exit(1)
}

Expand Down
4 changes: 2 additions & 2 deletions server/backend/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestConfig(t *testing.T) {
ClientDeactivateThreshold: "1h",
AuthWebhookMaxWaitInterval: "0ms",
AuthWebhookCacheTTL: "10s",
ProjectInfoCacheTTL: "10m",
ProjectCacheTTL: "10m",
}
assert.NoError(t, validConf.Validate())

Expand All @@ -47,7 +47,7 @@ func TestConfig(t *testing.T) {
assert.Error(t, conf3.Validate())

conf4 := validConf
conf4.ProjectInfoCacheTTL = "10 minutes"
conf4.ProjectCacheTTL = "10 minutes"
assert.Error(t, conf4.Validate())
})
}
Loading
Loading