diff --git a/cmd/yorkie/server.go b/cmd/yorkie/server.go index f8673fd60..4e5ef04e4 100644 --- a/cmd/yorkie/server.go +++ b/cmd/yorkie/server.go @@ -49,7 +49,7 @@ var ( authWebhookMaxWaitInterval time.Duration authWebhookCacheTTL time.Duration - projectInfoCacheTTL time.Duration + projectCacheTTL time.Duration conf = server.NewConfig() ) @@ -65,7 +65,7 @@ func newServerCmd() *cobra.Command { conf.Backend.AuthWebhookMaxWaitInterval = authWebhookMaxWaitInterval.String() conf.Backend.AuthWebhookCacheTTL = authWebhookCacheTTL.String() - conf.Backend.ProjectInfoCacheTTL = projectInfoCacheTTL.String() + conf.Backend.ProjectCacheTTL = projectCacheTTL.String() conf.Housekeeping.Interval = housekeepingInterval.String() @@ -320,15 +320,15 @@ func init() { "TTL value to set when caching authorization webhook response.", ) cmd.Flags().IntVar( - &conf.Backend.ProjectInfoCacheSize, + &conf.Backend.ProjectCacheSize, "project-info-cache-size", - server.DefaultProjectInfoCacheSize, + server.DefaultProjectCacheSize, "The cache size of the project info.", ) cmd.Flags().DurationVar( - &projectInfoCacheTTL, + &projectCacheTTL, "project-info-cache-ttl", - server.DefaultProjectInfoCacheTTL, + server.DefaultProjectCacheTTL, "TTL value to set when caching project info.", ) cmd.Flags().StringVar( diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 13beba916..b0f22d512 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -43,15 +43,15 @@ type LRUExpireCache[K comparable, V any] struct { } // NewLRUExpireCache creates an expiring cache with the given size -func NewLRUExpireCache[K comparable, V any](maxSize int) (*LRUExpireCache[K, V], error) { +func NewLRUExpireCache[K comparable, V any](maxSize int) *LRUExpireCache[K, V] { if maxSize <= 0 { - return nil, ErrInvalidMaxSize + panic(ErrInvalidMaxSize) } return &LRUExpireCache[K, V]{ maxSize: maxSize, entries: map[K]*list.Element{}, - }, nil + } } type cacheEntry[K comparable, V any] struct { diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 275ded3bb..1830379a4 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -11,19 +11,16 @@ import ( func TestCache(t *testing.T) { t.Run("create lru expire cache test", func(t *testing.T) { - lruCache, err := cache.NewLRUExpireCache[string, string](1) - assert.NoError(t, err) + lruCache := cache.NewLRUExpireCache[string, string](1) assert.NotNil(t, lruCache) - lruCache, err = cache.NewLRUExpireCache[string, string](0) - assert.ErrorIs(t, err, cache.ErrInvalidMaxSize) - assert.Nil(t, lruCache) + assert.PanicsWithError(t, cache.ErrInvalidMaxSize.Error(), func() { + cache.NewLRUExpireCache[string, string](0) + }) }) t.Run("add test", func(t *testing.T) { - lruCache, err := cache.NewLRUExpireCache[string, string](1) - assert.NoError(t, err) - + lruCache := cache.NewLRUExpireCache[string, string](1) lruCache.Add("request1", "response1", time.Second) response1, ok := lruCache.Get("request1") assert.True(t, ok) @@ -41,9 +38,7 @@ func TestCache(t *testing.T) { }) t.Run("get expired cache test", func(t *testing.T) { - lruCache, err := cache.NewLRUExpireCache[string, string](1) - assert.NoError(t, err) - + lruCache := cache.NewLRUExpireCache[string, string](1) ttl := time.Millisecond lruCache.Add("request", "response", ttl) @@ -54,9 +49,7 @@ func TestCache(t *testing.T) { }) t.Run("update expired cache test", func(t *testing.T) { - lruCache, err := cache.NewLRUExpireCache[string, string](1) - assert.NoError(t, err) - + lruCache := cache.NewLRUExpireCache[string, string](1) var ttl time.Duration ttl = time.Minute diff --git a/pkg/webhook/client_test.go b/pkg/webhook/client_test.go index 20fa698c4..61f7004eb 100644 --- a/pkg/webhook/client_test.go +++ b/pkg/webhook/client_test.go @@ -71,12 +71,9 @@ func TestHMAC(t *testing.T) { defer testServer.Close() t.Run("webhook client with valid HMAC key test", func(t *testing.T) { - testCache, err := cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100) - assert.NoError(t, err) - client := webhook.NewClient[testRequest, testResponse]( testServer.URL, - testCache, + cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100), webhook.Options{ CacheKeyPrefix: "testPrefix-hmac", CacheTTL: 5 * time.Second, @@ -95,12 +92,9 @@ func TestHMAC(t *testing.T) { }) t.Run("webhook client with invalid HMAC key test", func(t *testing.T) { - testCache, err := cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100) - assert.NoError(t, err) - client := webhook.NewClient[testRequest, testResponse]( testServer.URL, - testCache, + cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100), webhook.Options{ CacheKeyPrefix: "testPrefix-hmac", CacheTTL: 5 * time.Second, @@ -118,12 +112,9 @@ func TestHMAC(t *testing.T) { }) t.Run("webhook client without HMAC key test", func(t *testing.T) { - testCache, err := cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100) - assert.NoError(t, err) - - client := webhook.NewClient[testRequest, testResponse]( + client := webhook.NewClient[testRequest]( testServer.URL, - testCache, + cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100), webhook.Options{ CacheKeyPrefix: "testPrefix-hmac", CacheTTL: 5 * time.Second, @@ -140,12 +131,9 @@ func TestHMAC(t *testing.T) { }) t.Run("webhook client with empty body test", func(t *testing.T) { - testCache, err := cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100) - assert.NoError(t, err) - - client := webhook.NewClient[testRequest, testResponse]( + client := webhook.NewClient[testRequest]( testServer.URL, - testCache, + cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100), webhook.Options{ CacheKeyPrefix: "testPrefix-hmac", CacheTTL: 5 * time.Second, diff --git a/server/backend/backend.go b/server/backend/backend.go index 4bba83fdd..cf9bd4bf3 100644 --- a/server/backend/backend.go +++ b/server/backend/backend.go @@ -23,9 +23,6 @@ import ( "context" "fmt" "os" - "time" - - "github.com/rs/xid" "github.com/yorkie-team/yorkie/api/types" "github.com/yorkie-team/yorkie/pkg/cache" @@ -35,26 +32,35 @@ import ( memdb "github.com/yorkie-team/yorkie/server/backend/database/memory" "github.com/yorkie-team/yorkie/server/backend/database/mongo" "github.com/yorkie-team/yorkie/server/backend/housekeeping" + "github.com/yorkie-team/yorkie/server/backend/pubsub" "github.com/yorkie-team/yorkie/server/backend/sync" - memsync "github.com/yorkie-team/yorkie/server/backend/sync/memory" "github.com/yorkie-team/yorkie/server/logging" "github.com/yorkie-team/yorkie/server/profiling/prometheus" ) -// Backend manages Yorkie's backend such as Database and Coordinator. And it -// has the server status such as the information of this Server. +// Backend manages Yorkie's backend such as Database and Coordinator. It also +// provides in-memory cache, pubsub, and locker. type Backend struct { - Config *Config - serverInfo *sync.ServerInfo + Config *Config + + // AuthWebhookCache is used to cache the response of the auth webhook. WebhookCache *cache.LRUExpireCache[string, pkgtypes.Pair[ int, *types.AuthWebhookResponse, ]] - - Metrics *prometheus.Metrics - DB database.Database - Coordinator sync.Coordinator - Background *background.Background + // PubSub is used to publish/subscribe events to/from clients. + PubSub *pubsub.PubSub + // Locker is used to lock/unlock resources. + Locker *sync.LockerManager + + // Metrics is used to expose metrics. + Metrics *prometheus.Metrics + // DB is the database instance. + DB database.Database + + // Background is used to manage background tasks. + Background *background.Background + // Housekeeping is used to manage background batch tasks. Housekeeping *housekeeping.Housekeeping } @@ -76,20 +82,12 @@ func New( conf.Hostname = hostname } - serverInfo := &sync.ServerInfo{ - ID: xid.New().String(), - Hostname: hostname, - UpdatedAt: time.Now(), - } - - // 02. Create the auth webhook cache. The auth webhook cache is used to - // cache the response of the auth webhook. - webhookCache, err := cache.NewLRUExpireCache[string, pkgtypes.Pair[int, *types.AuthWebhookResponse]]( + // 02. Create in-memory cache, pubsub, and locker. + cache := cache.NewLRUExpireCache[string, pkgtypes.Pair[int, *types.AuthWebhookResponse]]( conf.AuthWebhookCacheSize, ) - if err != nil { - return nil, err - } + locker := sync.New() + pubsub := pubsub.New() // 03. Create the background instance. The background instance is used to // manage background tasks. @@ -97,6 +95,7 @@ func New( // 04. Create the database instance. If the MongoDB configuration is given, // create a MongoDB instance. Otherwise, create a memory database instance. + var err error var db database.Database if mongoConf != nil { db, err = mongo.Dial(mongoConf) @@ -110,13 +109,6 @@ func New( } } - // 05. Create the coordinator instance. The coordinator is used to manage - // the synchronization between the Yorkie servers. - // TODO(hackerwins): Implement the coordinator for a shard. For now, we - // distribute workloads to all shards per document. In the future, we - // will need to distribute workloads of a document. - coordinator := memsync.NewCoordinator(serverInfo) - // 06. Create the housekeeping instance. The housekeeping is used // to manage keeping tasks such as deactivating inactive clients. keeping, err := housekeeping.New(housekeepingConf) @@ -144,19 +136,18 @@ func New( } logging.DefaultLogger().Infof( - "backend created: id: %s, db: %s", - serverInfo.ID, + "backend created: db: %s", dbInfo, ) return &Backend{ Config: conf, - serverInfo: serverInfo, - WebhookCache: webhookCache, + WebhookCache: cache, + Locker: locker, + PubSub: pubsub, Metrics: metrics, DB: db, - Coordinator: coordinator, Background: bg, Housekeeping: keeping, }, nil @@ -168,7 +159,7 @@ func (b *Backend) Start() error { return err } - logging.DefaultLogger().Infof("backend started: id: %s", b.serverInfo.ID) + logging.DefaultLogger().Infof("backend started") return nil } @@ -180,19 +171,10 @@ func (b *Backend) Shutdown() error { b.Background.Close() - if err := b.Coordinator.Close(); err != nil { - logging.DefaultLogger().Error(err) - } - if err := b.DB.Close(); err != nil { logging.DefaultLogger().Error(err) } - logging.DefaultLogger().Infof("backend stopped: id: %s", b.serverInfo.ID) + logging.DefaultLogger().Infof("backend stopped") return nil } - -// Members returns the members of this cluster. -func (b *Backend) Members() map[string]*sync.ServerInfo { - return b.Coordinator.Members() -} diff --git a/server/backend/config.go b/server/backend/config.go index a7e194b86..89399ddb7 100644 --- a/server/backend/config.go +++ b/server/backend/config.go @@ -70,11 +70,11 @@ type Config struct { // AuthWebhookCacheTTL is the TTL value to set when caching the authorized result. AuthWebhookCacheTTL string `yaml:"AuthWebhookCacheTTL"` - // ProjectInfoCacheSize is the cache size of the project info. - ProjectInfoCacheSize int `yaml:"ProjectInfoCacheSize"` + // ProjectCacheSize is the cache size of the project metadata. + ProjectCacheSize int `yaml:"ProjectCacheSize"` - // ProjectInfoCacheTTL is the TTL value to set when caching the project info. - ProjectInfoCacheTTL string `yaml:"ProjectInfoCacheTTL"` + // ProjectCacheTTL is the TTL value to set when caching the project metadata. + ProjectCacheTTL string `yaml:"ProjectCacheTTL"` // Hostname is yorkie server hostname. hostname is used by metrics. Hostname string `yaml:"Hostname"` @@ -109,10 +109,10 @@ func (c *Config) Validate() error { ) } - if _, err := time.ParseDuration(c.ProjectInfoCacheTTL); err != nil { + if _, err := time.ParseDuration(c.ProjectCacheTTL); err != nil { return fmt.Errorf( `invalid argument "%s" for "--project-info-cache-ttl" flag: %w`, - c.ProjectInfoCacheTTL, + c.ProjectCacheTTL, err, ) } @@ -153,11 +153,11 @@ func (c *Config) ParseAuthWebhookCacheTTL() time.Duration { return result } -// ParseProjectInfoCacheTTL returns TTL for project info cache. -func (c *Config) ParseProjectInfoCacheTTL() time.Duration { - result, err := time.ParseDuration(c.ProjectInfoCacheTTL) +// ParseProjectCacheTTL returns TTL for project metadata cache. +func (c *Config) ParseProjectCacheTTL() time.Duration { + result, err := time.ParseDuration(c.ProjectCacheTTL) if err != nil { - fmt.Fprintln(os.Stderr, "parse project info cache ttl: %w", err) + fmt.Fprintln(os.Stderr, "parse project metadata cache ttl: %w", err) os.Exit(1) } diff --git a/server/backend/config_test.go b/server/backend/config_test.go index 84e83a2f0..c4b8247eb 100644 --- a/server/backend/config_test.go +++ b/server/backend/config_test.go @@ -30,7 +30,7 @@ func TestConfig(t *testing.T) { ClientDeactivateThreshold: "1h", AuthWebhookMaxWaitInterval: "0ms", AuthWebhookCacheTTL: "10s", - ProjectInfoCacheTTL: "10m", + ProjectCacheTTL: "10m", } assert.NoError(t, validConf.Validate()) @@ -47,7 +47,7 @@ func TestConfig(t *testing.T) { assert.Error(t, conf3.Validate()) conf4 := validConf - conf4.ProjectInfoCacheTTL = "10 minutes" + conf4.ProjectCacheTTL = "10 minutes" assert.Error(t, conf4.Validate()) }) } diff --git a/server/backend/sync/memory/publisher.go b/server/backend/pubsub/publisher.go similarity index 94% rename from server/backend/sync/memory/publisher.go rename to server/backend/pubsub/publisher.go index 5beae0a5f..c158b2660 100644 --- a/server/backend/sync/memory/publisher.go +++ b/server/backend/pubsub/publisher.go @@ -14,7 +14,8 @@ * limitations under the License. */ -package memory +// Package pubsub provides a pub-sub implementation. +package pubsub import ( "strconv" @@ -24,7 +25,6 @@ import ( "go.uber.org/zap" - "github.com/yorkie-team/yorkie/server/backend/sync" "github.com/yorkie-team/yorkie/server/logging" ) @@ -41,7 +41,7 @@ func (c *loggerID) next() string { type BatchPublisher struct { logger *zap.SugaredLogger mutex gosync.Mutex - events []sync.DocEvent + events []DocEvent window time.Duration closeChan chan struct{} @@ -63,7 +63,7 @@ func NewBatchPublisher(subs *Subscriptions, window time.Duration) *BatchPublishe // Publish adds the given event to the batch. If the batch is full, it publishes // the batch. -func (bp *BatchPublisher) Publish(event sync.DocEvent) { +func (bp *BatchPublisher) Publish(event DocEvent) { bp.mutex.Lock() defer bp.mutex.Unlock() diff --git a/server/backend/sync/memory/pubsub.go b/server/backend/pubsub/pubsub.go similarity index 79% rename from server/backend/sync/memory/pubsub.go rename to server/backend/pubsub/pubsub.go index 0858e104e..6483f1783 100644 --- a/server/backend/sync/memory/pubsub.go +++ b/server/backend/pubsub/pubsub.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package memory +package pubsub import ( "context" @@ -25,44 +25,56 @@ import ( "github.com/yorkie-team/yorkie/api/types" "github.com/yorkie-team/yorkie/pkg/cmap" "github.com/yorkie-team/yorkie/pkg/document/time" - "github.com/yorkie-team/yorkie/server/backend/sync" "github.com/yorkie-team/yorkie/server/logging" ) +const ( + // publishTimeout is the timeout for publishing an event. + publishTimeout = 100 * gotime.Millisecond +) + +// DocEvent represents events that occur related to the document. +type DocEvent struct { + Type types.DocEventType + Publisher *time.ActorID + DocumentRefKey types.DocRefKey + Body types.DocEventBody +} + // Subscriptions is a map of Subscriptions. type Subscriptions struct { docKey types.DocRefKey - internalMap *cmap.Map[string, *sync.Subscription] + internalMap *cmap.Map[string, *Subscription] publisher *BatchPublisher } func newSubscriptions(docKey types.DocRefKey) *Subscriptions { s := &Subscriptions{ docKey: docKey, - internalMap: cmap.New[string, *sync.Subscription](), + internalMap: cmap.New[string, *Subscription](), } s.publisher = NewBatchPublisher(s, 100*gotime.Millisecond) return s } // Set adds the given subscription. -func (s *Subscriptions) Set(sub *sync.Subscription) { +func (s *Subscriptions) Set(sub *Subscription) { s.internalMap.Set(sub.ID(), sub) } // Values returns the values of these subscriptions. -func (s *Subscriptions) Values() []*sync.Subscription { +func (s *Subscriptions) Values() []*Subscription { return s.internalMap.Values() } // Publish publishes the given event. -func (s *Subscriptions) Publish(event sync.DocEvent) { +func (s *Subscriptions) Publish(event DocEvent) { s.publisher.Publish(event) } // Delete deletes the subscription of the given id. func (s *Subscriptions) Delete(id string) { - s.internalMap.Delete(id, func(sub *sync.Subscription, exists bool) bool { + s.internalMap.Delete(id, func(sub *Subscription, exists bool) bool { if exists { sub.Close() } @@ -85,8 +97,8 @@ type PubSub struct { subscriptionsMap *cmap.Map[types.DocRefKey, *Subscriptions] } -// NewPubSub creates an instance of PubSub. -func NewPubSub() *PubSub { +// New creates an instance of PubSub. +func New() *PubSub { return &PubSub{ subscriptionsMap: cmap.New[types.DocRefKey, *Subscriptions](), } @@ -97,7 +109,7 @@ func (m *PubSub) Subscribe( ctx context.Context, subscriber *time.ActorID, docKey types.DocRefKey, -) (*sync.Subscription, error) { +) (*Subscription, []*time.ActorID, error) { if logging.Enabled(zap.DebugLevel) { logging.From(ctx).Debugf( `Subscribe(%s,%s) Start`, @@ -113,7 +125,7 @@ func (m *PubSub) Subscribe( return subs }) - sub := sync.NewSubscription(subscriber) + sub := NewSubscription(subscriber) subs.Set(sub) if logging.Enabled(zap.DebugLevel) { @@ -123,14 +135,16 @@ func (m *PubSub) Subscribe( subscriber, ) } - return sub, nil + + ids := m.ClientIDs(docKey) + return sub, ids, nil } // Unsubscribe unsubscribes the given docKeys. func (m *PubSub) Unsubscribe( ctx context.Context, docKey types.DocRefKey, - sub *sync.Subscription, + sub *Subscription, ) { if logging.Enabled(zap.DebugLevel) { logging.From(ctx).Debugf( @@ -168,8 +182,12 @@ func (m *PubSub) Unsubscribe( func (m *PubSub) Publish( ctx context.Context, publisherID *time.ActorID, - event sync.DocEvent, + event DocEvent, ) { + // NOTE(hackerwins): String() triggers the cache of ActorID to avoid + // race condition of concurrent access to the cache. + _ = event.Publisher.String() + docKey := event.DocumentRefKey if logging.Enabled(zap.DebugLevel) { diff --git a/server/backend/sync/memory/pubsub_test.go b/server/backend/pubsub/pubsub_test.go similarity index 87% rename from server/backend/sync/memory/pubsub_test.go rename to server/backend/pubsub/pubsub_test.go index ea375f854..a3022605d 100644 --- a/server/backend/sync/memory/pubsub_test.go +++ b/server/backend/pubsub/pubsub_test.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package memory_test +package pubsub_test import ( "context" @@ -25,8 +25,7 @@ import ( "github.com/yorkie-team/yorkie/api/types" "github.com/yorkie-team/yorkie/pkg/document/time" - "github.com/yorkie-team/yorkie/server/backend/sync" - "github.com/yorkie-team/yorkie/server/backend/sync/memory" + "github.com/yorkie-team/yorkie/server/backend/pubsub" ) func TestPubSub(t *testing.T) { @@ -36,12 +35,12 @@ func TestPubSub(t *testing.T) { assert.NoError(t, err) t.Run("publish subscribe test", func(t *testing.T) { - pubSub := memory.NewPubSub() + pubSub := pubsub.New() refKey := types.DocRefKey{ ProjectID: types.ID("000000000000000000000000"), DocID: types.ID("000000000000000000000000"), } - docEvent := sync.DocEvent{ + docEvent := pubsub.DocEvent{ Type: types.DocumentWatchedEvent, Publisher: idB, DocumentRefKey: refKey, @@ -49,7 +48,7 @@ func TestPubSub(t *testing.T) { ctx := context.Background() // subscribe the documents by actorA - subA, err := pubSub.Subscribe(ctx, idA, refKey) + subA, _, err := pubSub.Subscribe(ctx, idA, refKey) assert.NoError(t, err) defer func() { pubSub.Unsubscribe(ctx, refKey, subA) diff --git a/server/backend/sync/pubsub.go b/server/backend/pubsub/subscription.go similarity index 82% rename from server/backend/sync/pubsub.go rename to server/backend/pubsub/subscription.go index 4f54d4ed0..cfeff9bf2 100644 --- a/server/backend/sync/pubsub.go +++ b/server/backend/pubsub/subscription.go @@ -1,5 +1,5 @@ /* - * Copyright 2021 The Yorkie Authors. All rights reserved. + * Copyright 2025 The Yorkie Authors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -package sync +package pubsub import ( "sync" @@ -22,15 +22,9 @@ import ( "github.com/rs/xid" - "github.com/yorkie-team/yorkie/api/types" "github.com/yorkie-team/yorkie/pkg/document/time" ) -const ( - // publishTimeout is the timeout for publishing an event. - publishTimeout = 100 * gotime.Millisecond -) - // Subscription represents a subscription of a subscriber to documents. type Subscription struct { id string @@ -55,14 +49,6 @@ func (s *Subscription) ID() string { return s.id } -// DocEvent represents events that occur related to the document. -type DocEvent struct { - Type types.DocEventType - Publisher *time.ActorID - DocumentRefKey types.DocRefKey - Body types.DocEventBody -} - // Events returns the DocEvent channel of this subscription. func (s *Subscription) Events() chan DocEvent { return s.events diff --git a/server/backend/sync/coordinator.go b/server/backend/sync/coordinator.go deleted file mode 100644 index 4fa3dd2a4..000000000 --- a/server/backend/sync/coordinator.go +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright 2020 The Yorkie Authors. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Package sync provides the synchronization primitives for the server. -package sync - -import ( - "context" - gotime "time" - - "github.com/yorkie-team/yorkie/api/types" - "github.com/yorkie-team/yorkie/pkg/document/time" -) - -// ServerInfo represents the information of the Server. -type ServerInfo struct { - ID string `json:"id"` - Hostname string `json:"hostname"` - UpdatedAt gotime.Time `json:"updated_at"` -} - -// Coordinator provides synchronization functions such as locks and event Pub/Sub. -type Coordinator interface { - // NewLocker creates a sync.Locker. - NewLocker(ctx context.Context, key Key) (Locker, error) - - // Subscribe subscribes to the given documents. - Subscribe( - ctx context.Context, - subscriber *time.ActorID, - documentRefKey types.DocRefKey, - ) (*Subscription, []*time.ActorID, error) - - // Unsubscribe unsubscribes from the given documents. - Unsubscribe( - ctx context.Context, - documentRefKey types.DocRefKey, - sub *Subscription, - ) error - - // Publish publishes the given event. - Publish(ctx context.Context, publisherID *time.ActorID, event DocEvent) - - // Members returns the members of this cluster. - Members() map[string]*ServerInfo - - // Close closes all resources of this Coordinator. - Close() error -} diff --git a/server/backend/sync/locker.go b/server/backend/sync/locker.go index 7988f8ed2..4d8eaa96d 100644 --- a/server/backend/sync/locker.go +++ b/server/backend/sync/locker.go @@ -14,11 +14,14 @@ * limitations under the License. */ +// Package sync provides a locker implementation. package sync import ( "context" "errors" + + "github.com/yorkie-team/yorkie/pkg/locker" ) // ErrAlreadyLocked is returned when the lock is already locked. @@ -37,6 +40,29 @@ func (k Key) String() string { return string(k) } +// LockerManager manages Lockers. +type LockerManager struct { + locks *locker.Locker +} + +// New creates a new instance of LockerManager. +func New() *LockerManager { + return &LockerManager{ + locks: locker.New(), + } +} + +// NewLocker creates locker of the given key. +func (c *LockerManager) NewLocker( + _ context.Context, + key Key, +) (Locker, error) { + return &internalLocker{ + key.String(), + c.locks, + }, nil +} + // A Locker represents an object that can be locked and unlocked. type Locker interface { // Lock locks the mutex with a cancelable context @@ -48,3 +74,33 @@ type Locker interface { // Unlock unlocks the mutex. Unlock(ctx context.Context) error } + +type internalLocker struct { + key string + locks *locker.Locker +} + +// Lock locks the mutex. +func (il *internalLocker) Lock(_ context.Context) error { + il.locks.Lock(il.key) + + return nil +} + +// TryLock locks the mutex if not already locked by another session. +func (il *internalLocker) TryLock(_ context.Context) error { + if !il.locks.TryLock(il.key) { + return ErrAlreadyLocked + } + + return nil +} + +// Unlock unlocks the mutex. +func (il *internalLocker) Unlock(_ context.Context) error { + if err := il.locks.Unlock(il.key); err != nil { + return err + } + + return nil +} diff --git a/server/backend/sync/memory/coordinator.go b/server/backend/sync/memory/coordinator.go deleted file mode 100644 index 17b93790a..000000000 --- a/server/backend/sync/memory/coordinator.go +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright 2021 The Yorkie Authors. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Package memory provides the memory implementation of the sync package. -package memory - -import ( - "context" - - "github.com/yorkie-team/yorkie/api/types" - "github.com/yorkie-team/yorkie/pkg/document/time" - "github.com/yorkie-team/yorkie/pkg/locker" - "github.com/yorkie-team/yorkie/server/backend/sync" -) - -// Coordinator is a memory-based implementation of sync.Coordinator. -type Coordinator struct { - serverInfo *sync.ServerInfo - - locks *locker.Locker - pubSub *PubSub -} - -// NewCoordinator creates an instance of Coordinator. -func NewCoordinator(serverInfo *sync.ServerInfo) *Coordinator { - return &Coordinator{ - serverInfo: serverInfo, - locks: locker.New(), - pubSub: NewPubSub(), - } -} - -// NewLocker creates locker of the given key. -func (c *Coordinator) NewLocker( - _ context.Context, - key sync.Key, -) (sync.Locker, error) { - return &internalLocker{ - key.String(), - c.locks, - }, nil -} - -// Subscribe subscribes to the given documents. -func (c *Coordinator) Subscribe( - ctx context.Context, - subscriber *time.ActorID, - documentRefKey types.DocRefKey, -) (*sync.Subscription, []*time.ActorID, error) { - sub, err := c.pubSub.Subscribe(ctx, subscriber, documentRefKey) - if err != nil { - return nil, nil, err - } - - ids := c.pubSub.ClientIDs(documentRefKey) - return sub, ids, nil -} - -// Unsubscribe unsubscribes the given documents. -func (c *Coordinator) Unsubscribe( - ctx context.Context, - documentRefKey types.DocRefKey, - sub *sync.Subscription, -) error { - c.pubSub.Unsubscribe(ctx, documentRefKey, sub) - return nil -} - -// Publish publishes the given event. -func (c *Coordinator) Publish( - ctx context.Context, - publisherID *time.ActorID, - event sync.DocEvent, -) { - // NOTE(hackerwins): String() triggers the cache of ActorID to avoid - // race condition of concurrent access to the cache. - _ = event.Publisher.String() - c.pubSub.Publish(ctx, publisherID, event) -} - -// Members returns the members of this cluster. -func (c *Coordinator) Members() map[string]*sync.ServerInfo { - members := make(map[string]*sync.ServerInfo) - members[c.serverInfo.ID] = c.serverInfo - return members -} - -// Close closes all resources of this Coordinator. -func (c *Coordinator) Close() error { - return nil -} diff --git a/server/backend/sync/memory/coordinator_test.go b/server/backend/sync/memory/coordinator_test.go deleted file mode 100644 index 9f0dcc7db..000000000 --- a/server/backend/sync/memory/coordinator_test.go +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2023 The Yorkie Authors. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package memory_test - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/yorkie-team/yorkie/api/types" - "github.com/yorkie-team/yorkie/pkg/document/time" - "github.com/yorkie-team/yorkie/server/backend/sync/memory" -) - -func TestCoordinator(t *testing.T) { - t.Run("subscriptions map test", func(t *testing.T) { - coordinator := memory.NewCoordinator(nil) - docRefKey := types.DocRefKey{ - ProjectID: types.ID("000000000000000000000000"), - DocID: types.ID("000000000000000000000000"), - } - ctx := context.Background() - - for i := 0; i < 5; i++ { - id, err := time.ActorIDFromBytes([]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, byte(i)}) - assert.NoError(t, err) - - _, clientIDs, err := coordinator.Subscribe(ctx, id, docRefKey) - assert.NoError(t, err) - assert.Len(t, clientIDs, i+1) - } - }) -} diff --git a/server/backend/sync/memory/locker.go b/server/backend/sync/memory/locker.go deleted file mode 100644 index be23937be..000000000 --- a/server/backend/sync/memory/locker.go +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2021 The Yorkie Authors. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package memory - -import ( - "context" - - "github.com/yorkie-team/yorkie/pkg/locker" - "github.com/yorkie-team/yorkie/server/backend/sync" -) - -type internalLocker struct { - key string - locks *locker.Locker -} - -// Lock locks the mutex. -func (il *internalLocker) Lock(_ context.Context) error { - il.locks.Lock(il.key) - - return nil -} - -// TryLock locks the mutex if not already locked by another session. -func (il *internalLocker) TryLock(_ context.Context) error { - if !il.locks.TryLock(il.key) { - return sync.ErrAlreadyLocked - } - - return nil -} - -// Unlock unlocks the mutex. -func (il *internalLocker) Unlock(_ context.Context) error { - if err := il.locks.Unlock(il.key); err != nil { - return err - } - - return nil -} diff --git a/server/clients/housekeeping.go b/server/clients/housekeeping.go index 57a632ec7..c98e7ce99 100644 --- a/server/clients/housekeeping.go +++ b/server/clients/housekeeping.go @@ -41,7 +41,7 @@ func DeactivateInactives( ) (types.ID, error) { start := time.Now() - locker, err := be.Coordinator.NewLocker(ctx, deactivateCandidatesKey) + locker, err := be.Locker.NewLocker(ctx, deactivateCandidatesKey) if err != nil { return database.DefaultProjectID, err } diff --git a/server/config.go b/server/config.go index 6067729de..fd8589dbd 100644 --- a/server/config.go +++ b/server/config.go @@ -64,8 +64,8 @@ const ( DefaultAuthWebhookMaxWaitInterval = 3000 * time.Millisecond DefaultAuthWebhookCacheSize = 5000 DefaultAuthWebhookCacheTTL = 10 * time.Second - DefaultProjectInfoCacheSize = 256 - DefaultProjectInfoCacheTTL = 10 * time.Minute + DefaultProjectCacheSize = 256 + DefaultProjectCacheTTL = 10 * time.Minute DefaultHostname = "" DefaultGatewayAddr = "localhost:8080" @@ -189,12 +189,12 @@ func (c *Config) ensureDefaultValue() { c.Backend.AuthWebhookCacheTTL = DefaultAuthWebhookCacheTTL.String() } - if c.Backend.ProjectInfoCacheSize == 0 { - c.Backend.ProjectInfoCacheSize = DefaultProjectInfoCacheSize + if c.Backend.ProjectCacheSize == 0 { + c.Backend.ProjectCacheSize = DefaultProjectCacheSize } - if c.Backend.ProjectInfoCacheTTL == "" { - c.Backend.ProjectInfoCacheTTL = DefaultProjectInfoCacheTTL.String() + if c.Backend.ProjectCacheTTL == "" { + c.Backend.ProjectCacheTTL = DefaultProjectCacheTTL.String() } if c.Mongo != nil { diff --git a/server/config.sample.yml b/server/config.sample.yml index 124e6ac2d..b7f124b11 100644 --- a/server/config.sample.yml +++ b/server/config.sample.yml @@ -74,11 +74,11 @@ Backend: # AuthWebhookCacheTTL is the TTL value to set when caching the authorized result. AuthWebhookCacheTTL: "10s" - # ProjectInfoCacheSize is the size of the project info cache. - ProjectInfoCacheSize: 256 + # ProjectCacheSize is the size of the project metadata cache. + ProjectCacheSize: 256 - # ProjectInfoCacheTTL is the TTL value to set when caching the project info. - ProjectInfoCacheTTL: "10m" + # ProjectCacheTTL is the TTL value to set when caching the project metadata. + ProjectCacheTTL: "10m" # Hostname is the hostname of the server. If not provided, the hostname will be # determined automatically by the OS (Optional, default: os.Hostname()). diff --git a/server/config_test.go b/server/config_test.go index ed5604aeb..5ca5008a9 100644 --- a/server/config_test.go +++ b/server/config_test.go @@ -74,8 +74,8 @@ func TestNewConfigFromFile(t *testing.T) { assert.NoError(t, err) assert.Equal(t, authWebhookCacheTTL, server.DefaultAuthWebhookCacheTTL) - projectInfoCacheTTL, err := time.ParseDuration(conf.Backend.ProjectInfoCacheTTL) + projectCacheTTL, err := time.ParseDuration(conf.Backend.ProjectCacheTTL) assert.NoError(t, err) - assert.Equal(t, projectInfoCacheTTL, server.DefaultProjectInfoCacheTTL) + assert.Equal(t, projectCacheTTL, server.DefaultProjectCacheTTL) }) } diff --git a/server/packs/packs.go b/server/packs/packs.go index f3a10487a..dcc6c11fe 100644 --- a/server/packs/packs.go +++ b/server/packs/packs.go @@ -34,6 +34,7 @@ import ( "github.com/yorkie-team/yorkie/pkg/units" "github.com/yorkie-team/yorkie/server/backend" "github.com/yorkie-team/yorkie/server/backend/database" + "github.com/yorkie-team/yorkie/server/backend/pubsub" "github.com/yorkie-team/yorkie/server/backend/sync" "github.com/yorkie-team/yorkie/server/logging" ) @@ -184,17 +185,17 @@ func PushPull( return } - be.Coordinator.Publish( + be.PubSub.Publish( ctx, publisherID, - sync.DocEvent{ + pubsub.DocEvent{ Type: types.DocumentChangedEvent, Publisher: publisherID, DocumentRefKey: docRefKey, }, ) - locker, err := be.Coordinator.NewLocker(ctx, SnapshotKey(project.ID, reqPack.DocumentKey)) + locker, err := be.Locker.NewLocker(ctx, SnapshotKey(project.ID, reqPack.DocumentKey)) if err != nil { logging.From(ctx).Error(err) return diff --git a/server/packs/packs_test.go b/server/packs/packs_test.go index a207a1b15..fa7413cfd 100644 --- a/server/packs/packs_test.go +++ b/server/packs/packs_test.go @@ -100,8 +100,8 @@ func TestMain(m *testing.M) { ClientDeactivateThreshold: helper.ClientDeactivateThreshold, SnapshotThreshold: helper.SnapshotThreshold, AuthWebhookCacheSize: helper.AuthWebhookSize, - ProjectInfoCacheSize: helper.ProjectInfoCacheSize, - ProjectInfoCacheTTL: helper.ProjectInfoCacheTTL.String(), + ProjectCacheSize: helper.ProjectCacheSize, + ProjectCacheTTL: helper.ProjectCacheTTL.String(), AdminTokenDuration: helper.AdminTokenDuration, }, &mongo.Config{ ConnectionURI: helper.MongoConnectionURI, diff --git a/server/rpc/admin_server.go b/server/rpc/admin_server.go index 6add24e2d..bff582210 100644 --- a/server/rpc/admin_server.go +++ b/server/rpc/admin_server.go @@ -30,7 +30,7 @@ import ( "github.com/yorkie-team/yorkie/pkg/document/key" "github.com/yorkie-team/yorkie/pkg/document/time" "github.com/yorkie-team/yorkie/server/backend" - "github.com/yorkie-team/yorkie/server/backend/sync" + "github.com/yorkie-team/yorkie/server/backend/pubsub" "github.com/yorkie-team/yorkie/server/documents" "github.com/yorkie-team/yorkie/server/logging" "github.com/yorkie-team/yorkie/server/packs" @@ -404,7 +404,7 @@ func (s *adminServer) RemoveDocumentByAdmin( } // TODO(hackerwins): Rename PushPullKey to something else like DocWriteLockKey?. - locker, err := s.backend.Coordinator.NewLocker(ctx, packs.PushPullKey(project.ID, docInfo.Key)) + locker, err := s.backend.Locker.NewLocker(ctx, packs.PushPullKey(project.ID, docInfo.Key)) if err != nil { return nil, err } @@ -428,10 +428,10 @@ func (s *adminServer) RemoveDocumentByAdmin( // TODO(emplam27): Change the publisherID to the actual user ID. This is a temporary solution. publisherID := time.InitialActorID - s.backend.Coordinator.Publish( + s.backend.PubSub.Publish( ctx, publisherID, - sync.DocEvent{ + pubsub.DocEvent{ Type: types.DocumentChangedEvent, Publisher: publisherID, DocumentRefKey: docInfo.RefKey(), diff --git a/server/rpc/cluster_server.go b/server/rpc/cluster_server.go index bf3e001a8..25b756125 100644 --- a/server/rpc/cluster_server.go +++ b/server/rpc/cluster_server.go @@ -62,7 +62,7 @@ func (s *clusterServer) DetachDocument( summary := converter.FromDocumentSummary(req.Msg.DocumentSummary) project := converter.FromProject(req.Msg.Project) - locker, err := s.backend.Coordinator.NewLocker(ctx, packs.PushPullKey(project.ID, summary.Key)) + locker, err := s.backend.Locker.NewLocker(ctx, packs.PushPullKey(project.ID, summary.Key)) if err != nil { return nil, err } diff --git a/server/rpc/interceptors/yorkie.go b/server/rpc/interceptors/yorkie.go index 2eb0afefa..5e3c55249 100644 --- a/server/rpc/interceptors/yorkie.go +++ b/server/rpc/interceptors/yorkie.go @@ -42,21 +42,18 @@ func isYorkieService(method string) bool { // YorkieServiceInterceptor is an interceptor for building additional context // and handling authentication for YorkieService. type YorkieServiceInterceptor struct { - backend *backend.Backend - requestID *requestID - projectInfoCache *cache.LRUExpireCache[string, *types.Project] + backend *backend.Backend + requestID *requestID + projectCache *cache.LRUExpireCache[string, *types.Project] } // NewYorkieServiceInterceptor creates a new instance of YorkieServiceInterceptor. func NewYorkieServiceInterceptor(be *backend.Backend) *YorkieServiceInterceptor { - projectInfoCache, err := cache.NewLRUExpireCache[string, *types.Project](be.Config.ProjectInfoCacheSize) - if err != nil { - logging.DefaultLogger().Fatal("Failed to create project info cache: %v", err) - } + cache := cache.NewLRUExpireCache[string, *types.Project](be.Config.ProjectCacheSize) return &YorkieServiceInterceptor{ - backend: be, - requestID: newRequestID("r"), - projectInfoCache: projectInfoCache, + backend: be, + requestID: newRequestID("r"), + projectCache: cache, } } @@ -171,14 +168,14 @@ func (i *YorkieServiceInterceptor) buildContext(ctx context.Context, header http cacheKey := md.APIKey // 02. building project - if _, ok := i.projectInfoCache.Get(cacheKey); !ok { + if _, ok := i.projectCache.Get(cacheKey); !ok { prj, err := projects.GetProjectFromAPIKey(ctx, i.backend, md.APIKey) if err != nil { return nil, connecthelper.ToStatusError(err) } - i.projectInfoCache.Add(cacheKey, prj, i.backend.Config.ParseProjectInfoCacheTTL()) + i.projectCache.Add(cacheKey, prj, i.backend.Config.ParseProjectCacheTTL()) } - project, _ := i.projectInfoCache.Get(cacheKey) + project, _ := i.projectCache.Get(cacheKey) ctx = projects.With(ctx, project) // 03. building logger diff --git a/server/rpc/server_test.go b/server/rpc/server_test.go index 159c686ed..205733b35 100644 --- a/server/rpc/server_test.go +++ b/server/rpc/server_test.go @@ -74,8 +74,8 @@ func TestMain(m *testing.M) { ClientDeactivateThreshold: helper.ClientDeactivateThreshold, SnapshotThreshold: helper.SnapshotThreshold, AuthWebhookCacheSize: helper.AuthWebhookSize, - ProjectInfoCacheSize: helper.ProjectInfoCacheSize, - ProjectInfoCacheTTL: helper.ProjectInfoCacheTTL.String(), + ProjectCacheSize: helper.ProjectCacheSize, + ProjectCacheTTL: helper.ProjectCacheTTL.String(), AdminTokenDuration: helper.AdminTokenDuration, }, &mongo.Config{ ConnectionURI: helper.MongoConnectionURI, diff --git a/server/rpc/yorkie_server.go b/server/rpc/yorkie_server.go index 4408fb38b..ab1adbfb2 100644 --- a/server/rpc/yorkie_server.go +++ b/server/rpc/yorkie_server.go @@ -29,6 +29,7 @@ import ( "github.com/yorkie-team/yorkie/pkg/document/key" "github.com/yorkie-team/yorkie/pkg/document/time" "github.com/yorkie-team/yorkie/server/backend" + "github.com/yorkie-team/yorkie/server/backend/pubsub" "github.com/yorkie-team/yorkie/server/backend/sync" "github.com/yorkie-team/yorkie/server/clients" "github.com/yorkie-team/yorkie/server/documents" @@ -131,7 +132,7 @@ func (s *yorkieServer) AttachDocument( } project := projects.From(ctx) - locker, err := s.backend.Coordinator.NewLocker(ctx, packs.PushPullKey(project.ID, pack.DocumentKey)) + locker, err := s.backend.Locker.NewLocker(ctx, packs.PushPullKey(project.ID, pack.DocumentKey)) if err != nil { return nil, err } @@ -207,7 +208,7 @@ func (s *yorkieServer) DetachDocument( } project := projects.From(ctx) - locker, err := s.backend.Coordinator.NewLocker(ctx, packs.PushPullKey(project.ID, pack.DocumentKey)) + locker, err := s.backend.Locker.NewLocker(ctx, packs.PushPullKey(project.ID, pack.DocumentKey)) if err != nil { return nil, err } @@ -302,7 +303,7 @@ func (s *yorkieServer) PushPullChanges( project := projects.From(ctx) if pack.HasChanges() { - locker, err := s.backend.Coordinator.NewLocker( + locker, err := s.backend.Locker.NewLocker( ctx, packs.PushPullKey(project.ID, pack.DocumentKey), ) @@ -409,7 +410,7 @@ func (s *yorkieServer) WatchDocument( return err } - locker, err := s.backend.Coordinator.NewLocker( + locker, err := s.backend.Locker.NewLocker( ctx, sync.NewKey(fmt.Sprintf("watchdoc-%s-%s", clientID, docID)), ) @@ -518,7 +519,7 @@ func (s *yorkieServer) RemoveDocument( project := projects.From(ctx) if pack.HasChanges() { - locker, err := s.backend.Coordinator.NewLocker(ctx, packs.PushPullKey(project.ID, pack.DocumentKey)) + locker, err := s.backend.Locker.NewLocker(ctx, packs.PushPullKey(project.ID, pack.DocumentKey)) if err != nil { return nil, err } @@ -572,17 +573,17 @@ func (s *yorkieServer) watchDoc( ctx context.Context, clientID *time.ActorID, documentRefKey types.DocRefKey, -) (*sync.Subscription, []*time.ActorID, error) { - subscription, clientIDs, err := s.backend.Coordinator.Subscribe(ctx, clientID, documentRefKey) +) (*pubsub.Subscription, []*time.ActorID, error) { + subscription, clientIDs, err := s.backend.PubSub.Subscribe(ctx, clientID, documentRefKey) if err != nil { logging.From(ctx).Error(err) return nil, nil, err } - s.backend.Coordinator.Publish( + s.backend.PubSub.Publish( ctx, subscription.Subscriber(), - sync.DocEvent{ + pubsub.DocEvent{ Type: types.DocumentWatchedEvent, Publisher: subscription.Subscriber(), DocumentRefKey: documentRefKey, @@ -600,19 +601,15 @@ func (s *yorkieServer) watchDoc( func (s *yorkieServer) unwatchDoc( ctx context.Context, - subscription *sync.Subscription, + subscription *pubsub.Subscription, documentRefKey types.DocRefKey, ) error { - err := s.backend.Coordinator.Unsubscribe(ctx, documentRefKey, subscription) - if err != nil { - logging.From(ctx).Error(err) - return err - } + s.backend.PubSub.Unsubscribe(ctx, documentRefKey, subscription) - s.backend.Coordinator.Publish( + s.backend.PubSub.Publish( ctx, subscription.Subscriber(), - sync.DocEvent{ + pubsub.DocEvent{ Type: types.DocumentUnwatchedEvent, Publisher: subscription.Subscriber(), DocumentRefKey: documentRefKey, @@ -672,10 +669,10 @@ func (s *yorkieServer) Broadcast( } docEventType := types.DocumentBroadcastEvent - s.backend.Coordinator.Publish( + s.backend.PubSub.Publish( ctx, clientID, - sync.DocEvent{ + pubsub.DocEvent{ Type: docEventType, Publisher: clientID, DocumentRefKey: docRefKey, diff --git a/test/bench/sync_bench_test.go b/test/bench/sync_bench_test.go index 8de5c3eb9..954836ba8 100644 --- a/test/bench/sync_bench_test.go +++ b/test/bench/sync_bench_test.go @@ -26,7 +26,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/yorkie-team/yorkie/server/backend/sync" - "github.com/yorkie-team/yorkie/server/backend/sync/memory" ) func BenchmarkSync(b *testing.B) { @@ -49,7 +48,7 @@ func BenchmarkSync(b *testing.B) { func benchmarkMemorySync(cnt int, b *testing.B) { for i := 0; i < b.N; i++ { - coordinator := memory.NewCoordinator(nil) + locker := sync.New() sum := 0 var wg gosync.WaitGroup @@ -59,7 +58,7 @@ func benchmarkMemorySync(cnt int, b *testing.B) { defer wg.Done() ctx := context.Background() - locker, err := coordinator.NewLocker(ctx, sync.Key(b.Name())) + locker, err := locker.NewLocker(ctx, sync.Key(b.Name())) assert.NoError(b, err) assert.NoError(b, locker.Lock(ctx)) sum += 1 diff --git a/test/complex/main_test.go b/test/complex/main_test.go index 557925c74..26b530b99 100644 --- a/test/complex/main_test.go +++ b/test/complex/main_test.go @@ -79,8 +79,8 @@ func TestMain(m *testing.M) { ClientDeactivateThreshold: helper.ClientDeactivateThreshold, SnapshotThreshold: helper.SnapshotThreshold, AuthWebhookCacheSize: helper.AuthWebhookSize, - ProjectInfoCacheSize: helper.ProjectInfoCacheSize, - ProjectInfoCacheTTL: helper.ProjectInfoCacheTTL.String(), + ProjectCacheSize: helper.ProjectCacheSize, + ProjectCacheTTL: helper.ProjectCacheTTL.String(), AdminTokenDuration: helper.AdminTokenDuration, GatewayAddr: fmt.Sprintf("localhost:%d", helper.RPCPort), }, &mongo.Config{ diff --git a/test/helper/helper.go b/test/helper/helper.go index 708bc6248..9068b5302 100644 --- a/test/helper/helper.go +++ b/test/helper/helper.go @@ -77,8 +77,8 @@ var ( AuthWebhookMaxWaitInterval = 3 * gotime.Millisecond AuthWebhookSize = 100 AuthWebhookCacheTTL = 10 * gotime.Second - ProjectInfoCacheSize = 256 - ProjectInfoCacheTTL = 5 * gotime.Second + ProjectCacheSize = 256 + ProjectCacheTTL = 5 * gotime.Second MongoConnectionURI = "mongodb://localhost:27017" MongoConnectionTimeout = "5s" @@ -266,8 +266,8 @@ func TestConfig() *server.Config { AuthWebhookMaxWaitInterval: AuthWebhookMaxWaitInterval.String(), AuthWebhookCacheSize: AuthWebhookSize, AuthWebhookCacheTTL: AuthWebhookCacheTTL.String(), - ProjectInfoCacheSize: ProjectInfoCacheSize, - ProjectInfoCacheTTL: ProjectInfoCacheTTL.String(), + ProjectCacheSize: ProjectCacheSize, + ProjectCacheTTL: ProjectCacheTTL.String(), GatewayAddr: fmt.Sprintf("localhost:%d", RPCPort+portOffset), }, Mongo: &mongo.Config{ diff --git a/test/integration/auth_webhook_test.go b/test/integration/auth_webhook_test.go index baf1e0bf9..21bca5c2a 100644 --- a/test/integration/auth_webhook_test.go +++ b/test/integration/auth_webhook_test.go @@ -232,8 +232,8 @@ func TestProjectAuthWebhook(t *testing.T) { ) assert.NoError(t, err) - projectInfoCacheTTL := 5 * time.Second - time.Sleep(projectInfoCacheTTL) + projectCacheTTL := 5 * time.Second + time.Sleep(projectCacheTTL) cli, err := client.Dial( svr.RPCAddr(), client.WithAPIKey(project.PublicKey),