Skip to content

Commit bbd06b2

Browse files
humdrumhackerwins
andcommitted
Extend PushPull to support sync mode by adding push-only flag (#500)
When push-only is set to true, the Server will respond with an empty operations unless there have been actual changes to the data. In cases where the push-pull response contains changes that have already been made on the client side, empty changes will be sent to prevent redoing those operations. Due to these modifications, deep copying of changeInfo is required. --------- Co-authored-by: Youngteac Hong <[email protected]>
1 parent 6f9c5ad commit bbd06b2

File tree

12 files changed

+311
-63
lines changed

12 files changed

+311
-63
lines changed

api/types/sync_mode.go

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2023 The Yorkie Authors. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package types
18+
19+
// SyncMode is the mode of synchronization. It is used to determine whether to
20+
// push and pull changes in PushPullChanges API.
21+
type SyncMode int
22+
23+
const (
24+
// SyncModePushPull is the mode that pushes and pulls changes.
25+
SyncModePushPull SyncMode = iota
26+
27+
// SyncModePushOnly is the mode that pushes changes only.
28+
SyncModePushOnly
29+
)

api/yorkie/v1/yorkie.pb.go

+85-42
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/yorkie/v1/yorkie.proto

+1
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ message PushPullChangesRequest {
108108
bytes client_id = 1;
109109
string document_id = 2;
110110
ChangePack change_pack = 3;
111+
bool push_only = 4;
111112
}
112113

113114
message PushPullChangesResponse {

client/client.go

+33-8
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,21 @@ var (
6565
ErrUnsupportedWatchResponseType = errors.New("unsupported watch response type")
6666
)
6767

68+
// SyncOption is an option for sync. It contains the key of the document to
69+
// sync and the sync mode.
70+
type SyncOption struct {
71+
key key.Key
72+
mode types.SyncMode
73+
}
74+
75+
// WithPushOnly returns a SyncOption with the sync mode set to PushOnly.
76+
func (o SyncOption) WithPushOnly() SyncOption {
77+
return SyncOption{
78+
key: o.key,
79+
mode: types.SyncModePushOnly,
80+
}
81+
}
82+
6883
// Attachment represents the document attached and peers.
6984
type Attachment struct {
7085
doc *document.Document
@@ -356,18 +371,26 @@ func (c *Client) Detach(ctx context.Context, doc *document.Document) error {
356371
return nil
357372
}
358373

374+
// WithDocKey creates a SyncOption with the given document key.
375+
func WithDocKey(k key.Key) SyncOption {
376+
return SyncOption{
377+
key: k,
378+
mode: types.SyncModePushPull,
379+
}
380+
}
381+
359382
// Sync pushes local changes of the attached documents to the server and
360383
// receives changes of the remote replica from the server then apply them to
361384
// local documents.
362-
func (c *Client) Sync(ctx context.Context, keys ...key.Key) error {
363-
if len(keys) == 0 {
385+
func (c *Client) Sync(ctx context.Context, options ...SyncOption) error {
386+
if len(options) == 0 {
364387
for _, attachment := range c.attachments {
365-
keys = append(keys, attachment.doc.Key())
388+
options = append(options, WithDocKey(attachment.doc.Key()))
366389
}
367390
}
368391

369-
for _, k := range keys {
370-
if err := c.pushPull(ctx, k); err != nil {
392+
for _, opt := range options {
393+
if err := c.pushPullChanges(ctx, opt); err != nil {
371394
return err
372395
}
373396
}
@@ -573,12 +596,13 @@ func (c *Client) IsActive() bool {
573596
return c.status == activated
574597
}
575598

576-
func (c *Client) pushPull(ctx context.Context, key key.Key) error {
599+
// pushPullChanges pushes the changes of the document to the server and pulls the changes from the server.
600+
func (c *Client) pushPullChanges(ctx context.Context, opt SyncOption) error {
577601
if c.status != activated {
578602
return ErrClientNotActivated
579603
}
580604

581-
attachment, ok := c.attachments[key]
605+
attachment, ok := c.attachments[opt.key]
582606
if !ok {
583607
return ErrDocumentNotAttached
584608
}
@@ -589,11 +613,12 @@ func (c *Client) pushPull(ctx context.Context, key key.Key) error {
589613
}
590614

591615
res, err := c.client.PushPullChanges(
592-
withShardKey(ctx, c.options.APIKey, key.String()),
616+
withShardKey(ctx, c.options.APIKey, opt.key.String()),
593617
&api.PushPullChangesRequest{
594618
ClientId: c.id.Bytes(),
595619
DocumentId: attachment.docID.String(),
596620
ChangePack: pbChangePack,
621+
PushOnly: opt.mode == types.SyncModePushOnly,
597622
},
598623
)
599624
if err != nil {

server/backend/database/change_info.go

+12
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,15 @@ func (i *ChangeInfo) ToChange() (*change.Change, error) {
9090

9191
return c, nil
9292
}
93+
94+
// DeepCopy returns a deep copy of this ChangeInfo.
95+
func (i *ChangeInfo) DeepCopy() *ChangeInfo {
96+
if i == nil {
97+
return nil
98+
}
99+
100+
clone := &ChangeInfo{}
101+
*clone = *i
102+
103+
return clone
104+
}

server/backend/database/memory/database.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -881,7 +881,7 @@ func (d *DB) FindChangeInfosBetweenServerSeqs(
881881
if info.DocID != docID || info.ServerSeq > to {
882882
break
883883
}
884-
infos = append(infos, info)
884+
infos = append(infos, info.DeepCopy())
885885
}
886886
return infos, nil
887887
}

server/packs/packs.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func PushPull(
5757
clientInfo *database.ClientInfo,
5858
docInfo *database.DocInfo,
5959
reqPack *change.Pack,
60+
mode types.SyncMode,
6061
) (*ServerPack, error) {
6162
start := gotime.Now()
6263
defer func() {
@@ -73,7 +74,7 @@ func PushPull(
7374
be.Metrics.AddPushPullReceivedOperations(reqPack.OperationsLen())
7475

7576
// 02. pull pack: pull changes or a snapshot from the database and create a response pack.
76-
respPack, err := pullPack(ctx, be, clientInfo, docInfo, reqPack, cpAfterPush, initialServerSeq)
77+
respPack, err := pullPack(ctx, be, clientInfo, docInfo, reqPack, cpAfterPush, initialServerSeq, mode)
7778
if err != nil {
7879
return nil, err
7980
}

server/packs/pushpull.go

+29-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323

2424
"github.com/yorkie-team/yorkie/api/converter"
25+
"github.com/yorkie-team/yorkie/api/types"
2526
"github.com/yorkie-team/yorkie/pkg/document/change"
2627
"github.com/yorkie-team/yorkie/server/backend"
2728
"github.com/yorkie-team/yorkie/server/backend/database"
@@ -86,7 +87,17 @@ func pullPack(
8687
reqPack *change.Pack,
8788
cpAfterPush change.Checkpoint,
8889
initialServerSeq int64,
90+
mode types.SyncMode,
8991
) (*ServerPack, error) {
92+
// If the client is push-only, it does not need to pull changes.
93+
// So, just return the checkpoint with server seq after pushing changes.
94+
if mode == types.SyncModePushOnly {
95+
return NewServerPack(docInfo.Key, change.Checkpoint{
96+
ServerSeq: reqPack.Checkpoint.ServerSeq,
97+
ClientSeq: cpAfterPush.ClientSeq,
98+
}, nil, nil), nil
99+
}
100+
90101
if initialServerSeq < reqPack.Checkpoint.ServerSeq {
91102
return nil, fmt.Errorf(
92103
"serverSeq of CP greater than serverSeq of clientInfo(clientInfo %d, cp %d): %w",
@@ -182,19 +193,35 @@ func pullChangeInfos(
182193
return change.InitialCheckpoint, nil, err
183194
}
184195

196+
// NOTE(hackerwins, humdrum): Remove changes from the pulled if the client already has them.
197+
// This could happen when the client has pushed changes and the server receives the changes
198+
// and stores them in the DB, but fails to send the response to the client.
199+
// And it could also happen when the client sync with push-only mode and then sync with pull mode.
200+
//
201+
// See the following test case for more details:
202+
// "sync option with mixed mode test" in integration/client_test.go
203+
var filteredChanges []*database.ChangeInfo
204+
for _, pulledChange := range pulledChanges {
205+
if clientInfo.ID == pulledChange.ActorID && cpAfterPush.ClientSeq >= pulledChange.ClientSeq {
206+
continue
207+
}
208+
filteredChanges = append(filteredChanges, pulledChange)
209+
}
210+
185211
cpAfterPull := cpAfterPush.NextServerSeq(docInfo.ServerSeq)
186212

187213
if len(pulledChanges) > 0 {
188214
logging.From(ctx).Infof(
189-
"PULL: '%s' pulls %d changes(%d~%d) from '%s', cp: %s",
215+
"PULL: '%s' pulls %d changes(%d~%d) from '%s', cp: %s, filtered changes: %d",
190216
clientInfo.ID,
191217
len(pulledChanges),
192218
pulledChanges[0].ServerSeq,
193219
pulledChanges[len(pulledChanges)-1].ServerSeq,
194220
docInfo.Key,
195221
cpAfterPull.String(),
222+
len(filteredChanges),
196223
)
197224
}
198225

199-
return cpAfterPull, pulledChanges, nil
226+
return cpAfterPull, filteredChanges, nil
200227
}

0 commit comments

Comments
 (0)