Skip to content

Commit

Permalink
Extract events package
Browse files Browse the repository at this point in the history
  • Loading branch information
hackerwins committed Feb 4, 2025
1 parent eded115 commit 626303b
Show file tree
Hide file tree
Showing 15 changed files with 128 additions and 97 deletions.
11 changes: 6 additions & 5 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/api/types/events"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/crdt"
Expand Down Expand Up @@ -200,16 +201,16 @@ func FromDocumentID(pbID string) (types.ID, error) {
}

// FromEventType converts the given Protobuf formats to model format.
func FromEventType(pbDocEventType api.DocEventType) (types.DocEventType, error) {
func FromEventType(pbDocEventType api.DocEventType) (events.DocEventType, error) {
switch pbDocEventType {
case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_CHANGED:
return types.DocumentChangedEvent, nil
return events.DocChangedEvent, nil
case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_WATCHED:
return types.DocumentWatchedEvent, nil
return events.DocWatchedEvent, nil
case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_UNWATCHED:
return types.DocumentUnwatchedEvent, nil
return events.DocUnwatchedEvent, nil
case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_BROADCAST:
return types.DocumentBroadcastEvent, nil
return events.DocBroadcastEvent, nil
}
return "", fmt.Errorf("%v: %w", pbDocEventType, ErrUnsupportedEventType)
}
Expand Down
11 changes: 6 additions & 5 deletions api/converter/to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/api/types/events"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/crdt"
Expand Down Expand Up @@ -193,15 +194,15 @@ func ToVersionVector(vector time.VersionVector) (*api.VersionVector, error) {
}

// ToDocEventType converts the given model format to Protobuf format.
func ToDocEventType(eventType types.DocEventType) (api.DocEventType, error) {
func ToDocEventType(eventType events.DocEventType) (api.DocEventType, error) {
switch eventType {
case types.DocumentChangedEvent:
case events.DocChangedEvent:
return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_CHANGED, nil
case types.DocumentWatchedEvent:
case events.DocWatchedEvent:
return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_WATCHED, nil
case types.DocumentUnwatchedEvent:
case events.DocUnwatchedEvent:
return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_UNWATCHED, nil
case types.DocumentBroadcastEvent:
case events.DocBroadcastEvent:
return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_BROADCAST, nil
default:
return 0, fmt.Errorf("%s: %w", eventType, ErrUnsupportedEventType)
Expand Down
33 changes: 0 additions & 33 deletions api/types/event.go

This file was deleted.

62 changes: 62 additions & 0 deletions api/types/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2025 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package events

Check failure on line 17 in api/types/events/events.go

View workflow job for this annotation

GitHub Actions / build

package-comments: should have a package comment (revive)

import (
"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/pkg/document/time"
)

// DocEventType represents the event that the Server delivers to the client.
type DocEventType string

const (
// DocChangedEvent is an event indicating that document is being
// modified by a change.
DocChangedEvent DocEventType = "document-changed"

// DocWatchedEvent is an event that occurs when document is watched
// by other clients.
DocWatchedEvent DocEventType = "document-watched"

// DocUnwatchedEvent is an event that occurs when document is
// unwatched by other clients.
DocUnwatchedEvent DocEventType = "document-unwatched"

// DocBroadcastEvent is an event that occurs when a payload is broadcasted
// on a specific topic.
DocBroadcastEvent DocEventType = "document-broadcast"
)

// DocEventBody includes additional data specific to the DocEvent.
type DocEventBody struct {
Topic string
Payload []byte
}

// PayloadLen returns the size of the payload.
func (b *DocEventBody) PayloadLen() int {
return len(b.Payload)
}

// DocEvent represents events that occur related to the document.
type DocEvent struct {
Type DocEventType
Publisher *time.ActorID
DocumentRefKey types.DocRefKey
Body DocEventBody
}
9 changes: 5 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

"github.com/yorkie-team/yorkie/api/converter"
"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/api/types/events"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/api/yorkie/v1/v1connect"
"github.com/yorkie-team/yorkie/pkg/document"
Expand Down Expand Up @@ -619,9 +620,9 @@ func handleResponse(
}

switch eventType {
case types.DocumentChangedEvent:
case events.DocChangedEvent:
return &WatchResponse{Type: DocumentChanged}, nil
case types.DocumentWatchedEvent:
case events.DocWatchedEvent:
doc.AddOnlineClient(cli.String())
if doc.Presence(cli.String()) == nil {
return nil, nil
Expand All @@ -633,7 +634,7 @@ func handleResponse(
cli.String(): doc.Presence(cli.String()),
},
}, nil
case types.DocumentUnwatchedEvent:
case events.DocUnwatchedEvent:
p := doc.Presence(cli.String())
doc.RemoveOnlineClient(cli.String())
if p == nil {
Expand All @@ -646,7 +647,7 @@ func handleResponse(
cli.String(): p,
},
}, nil
case types.DocumentBroadcastEvent:
case events.DocBroadcastEvent:
eventBody := resp.Event.Body
// If the handler exists, it means that the broadcast topic has been subscribed to.
if handler, ok := doc.BroadcastEventHandlers()[eventBody.Topic]; ok && handler != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/webhook/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (c *Client[Req, Res]) Send(ctx context.Context, req Req) (*Res, int, error)
}
defer func() {
if err := resp.Body.Close(); err != nil {
// TODO(hackerwins): Consider to remove the dependency of logging.
logging.From(ctx).Error(err)
}
}()
Expand Down
5 changes: 3 additions & 2 deletions server/backend/pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"go.uber.org/zap"

"github.com/yorkie-team/yorkie/api/types/events"
"github.com/yorkie-team/yorkie/server/logging"
)

Expand All @@ -41,7 +42,7 @@ func (c *loggerID) next() string {
type BatchPublisher struct {
logger *zap.SugaredLogger
mutex gosync.Mutex
events []DocEvent
events []events.DocEvent

window time.Duration
closeChan chan struct{}
Expand All @@ -63,7 +64,7 @@ func NewBatchPublisher(subs *Subscriptions, window time.Duration) *BatchPublishe

// Publish adds the given event to the batch. If the batch is full, it publishes
// the batch.
func (bp *BatchPublisher) Publish(event DocEvent) {
func (bp *BatchPublisher) Publish(event events.DocEvent) {
bp.mutex.Lock()
defer bp.mutex.Unlock()

Expand Down
13 changes: 3 additions & 10 deletions server/backend/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.uber.org/zap"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/api/types/events"
"github.com/yorkie-team/yorkie/pkg/cmap"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/server/logging"
Expand All @@ -33,14 +34,6 @@ const (
publishTimeout = 100 * gotime.Millisecond
)

// DocEvent represents events that occur related to the document.
type DocEvent struct {
Type types.DocEventType
Publisher *time.ActorID
DocumentRefKey types.DocRefKey
Body types.DocEventBody
}

// Subscriptions is a map of Subscriptions.
type Subscriptions struct {
docKey types.DocRefKey
Expand Down Expand Up @@ -68,7 +61,7 @@ func (s *Subscriptions) Values() []*Subscription {
}

// Publish publishes the given event.
func (s *Subscriptions) Publish(event DocEvent) {
func (s *Subscriptions) Publish(event events.DocEvent) {
s.publisher.Publish(event)
}

Expand Down Expand Up @@ -182,7 +175,7 @@ func (m *PubSub) Unsubscribe(
func (m *PubSub) Publish(
ctx context.Context,
publisherID *time.ActorID,
event DocEvent,
event events.DocEvent,
) {
// NOTE(hackerwins): String() triggers the cache of ActorID to avoid
// race condition of concurrent access to the cache.
Expand Down
5 changes: 3 additions & 2 deletions server/backend/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/api/types/events"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/server/backend/pubsub"
)
Expand All @@ -40,8 +41,8 @@ func TestPubSub(t *testing.T) {
ProjectID: types.ID("000000000000000000000000"),
DocID: types.ID("000000000000000000000000"),
}
docEvent := pubsub.DocEvent{
Type: types.DocumentWatchedEvent,
docEvent := events.DocEvent{
Type: events.DocWatchedEvent,
Publisher: idB,
DocumentRefKey: refKey,
}
Expand Down
9 changes: 5 additions & 4 deletions server/backend/pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/rs/xid"

"github.com/yorkie-team/yorkie/api/types/events"
"github.com/yorkie-team/yorkie/pkg/document/time"
)

Expand All @@ -31,15 +32,15 @@ type Subscription struct {
subscriber *time.ActorID
mu sync.Mutex
closed bool
events chan DocEvent
events chan events.DocEvent
}

// NewSubscription creates a new instance of Subscription.
func NewSubscription(subscriber *time.ActorID) *Subscription {
return &Subscription{
id: xid.New().String(),
subscriber: subscriber,
events: make(chan DocEvent, 1),
events: make(chan events.DocEvent, 1),
closed: false,
}
}
Expand All @@ -50,7 +51,7 @@ func (s *Subscription) ID() string {
}

// Events returns the DocEvent channel of this subscription.
func (s *Subscription) Events() chan DocEvent {
func (s *Subscription) Events() chan events.DocEvent {
return s.events
}

Expand All @@ -71,7 +72,7 @@ func (s *Subscription) Close() {
}

// Publish publishes the given event to the subscriber.
func (s *Subscription) Publish(event DocEvent) bool {
func (s *Subscription) Publish(event events.DocEvent) bool {
s.mu.Lock()
defer s.mu.Unlock()

Expand Down
6 changes: 3 additions & 3 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import (
"go.uber.org/zap"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/api/types/events"
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/key"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/pkg/units"
"github.com/yorkie-team/yorkie/server/backend"
"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/backend/pubsub"
"github.com/yorkie-team/yorkie/server/backend/sync"
"github.com/yorkie-team/yorkie/server/logging"
)
Expand Down Expand Up @@ -188,8 +188,8 @@ func PushPull(
be.PubSub.Publish(
ctx,
publisherID,
pubsub.DocEvent{
Type: types.DocumentChangedEvent,
events.DocEvent{
Type: events.DocChangedEvent,
Publisher: publisherID,
DocumentRefKey: docRefKey,
},
Expand Down
5 changes: 3 additions & 2 deletions server/profiling/prometheus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/api/types/events"
"github.com/yorkie-team/yorkie/internal/version"
)

Expand Down Expand Up @@ -350,7 +351,7 @@ func (m *Metrics) RemoveWatchDocumentConnections(hostname string, project *types
}

// AddWatchDocumentEvents adds the number of events in document watch stream connections.
func (m *Metrics) AddWatchDocumentEvents(hostname string, project *types.Project, docEventType types.DocEventType) {
func (m *Metrics) AddWatchDocumentEvents(hostname string, project *types.Project, docEventType events.DocEventType) {
m.watchDocumentEventsTotal.With(prometheus.Labels{
projectIDLabel: project.ID.String(),
projectNameLabel: project.Name,
Expand All @@ -361,7 +362,7 @@ func (m *Metrics) AddWatchDocumentEvents(hostname string, project *types.Project

// AddWatchDocumentEventPayloadBytes adds the bytes of event payload in document watch stream connections.
func (m *Metrics) AddWatchDocumentEventPayloadBytes(hostname string, project *types.Project,
docEventType types.DocEventType, bytes int) {
docEventType events.DocEventType, bytes int) {
m.watchDocumentEventsTotal.With(prometheus.Labels{
projectIDLabel: project.ID.String(),
projectNameLabel: project.Name,
Expand Down
Loading

0 comments on commit 626303b

Please sign in to comment.