Skip to content

Commit 20b929b

Browse files
committed
Add gRPC MaxConnectionAgeGrace option for rpc server to complete stream close
1 parent e60b8e6 commit 20b929b

File tree

7 files changed

+121
-22
lines changed

7 files changed

+121
-22
lines changed

cmd/yorkie/server.go

+6
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,12 @@ func init() {
197197
server.DefaultRPCMaxConnectionAge.String(),
198198
"Maximum duration of connection may exist before it will be closed by sending a GoAway.",
199199
)
200+
cmd.Flags().StringVar(
201+
&conf.RPC.MaxConnectionAgeGrace,
202+
"rpc-max-connection-age-grace",
203+
server.DefaultRPCMaxConnectionAgeGrace.String(),
204+
"Additional grace period after MaxConnectionAge after which connections will be forcibly closed.",
205+
)
200206
cmd.Flags().IntVar(
201207
&conf.Profiling.Port,
202208
"profiling-port",

server/config.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@ import (
3434

3535
// Below are the values of the default values of Yorkie config.
3636
const (
37-
DefaultRPCPort = 11101
38-
DefaultRPCMaxRequestsBytes = 4 * 1024 * 1024 // 4MiB
39-
DefaultRPCMaxConnectionAge = 1 * time.Minute
37+
DefaultRPCPort = 11101
38+
DefaultRPCMaxRequestsBytes = 4 * 1024 * 1024 // 4MiB
39+
DefaultRPCMaxConnectionAge = 50 * time.Second
40+
DefaultRPCMaxConnectionAgeGrace = 10 * time.Second
4041

4142
DefaultProfilingPort = 11102
4243

@@ -157,6 +158,10 @@ func (c *Config) ensureDefaultValue() {
157158
c.RPC.MaxConnectionAge = DefaultRPCMaxConnectionAge.String()
158159
}
159160

161+
if c.RPC.MaxConnectionAgeGrace == "" {
162+
c.RPC.MaxConnectionAgeGrace = DefaultRPCMaxConnectionAgeGrace.String()
163+
}
164+
160165
if c.Profiling.Port == 0 {
161166
c.Profiling.Port = DefaultProfilingPort
162167
}

server/config.sample.yml

+5-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ RPC:
88

99
# MaxConnectionAge is a duration for the maximum amount of time a connection may exist
1010
# before it will be closed by sending a GoAway.
11-
MaxConnectionAge: "60s"
11+
MaxConnectionAge: "50s"
12+
13+
# MaxConnectionAgeGrace is a duration for the amount of time after receiving a GoAway
14+
# for pending RPCs to complete before forcibly closing connections.
15+
MaxConnectionAgeGrace: "10s"
1216

1317
# CertFile is the file containing the TLS certificate.
1418
CertFile: ""

server/rpc/config.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ var (
3232
ErrInvalidKeyFile = errors.New("invalid key file for RPC server")
3333
// ErrInvalidMaxConnectionAge occurs when the max connection age is invalid.
3434
ErrInvalidMaxConnectionAge = errors.New("invalid max connection age for RPC server")
35+
// ErrInvalidMaxConnectionAgeGrace occurs when the max connection age grace is invalid.
36+
ErrInvalidMaxConnectionAgeGrace = errors.New("invalid max connection age grace for RPC server")
3537
)
3638

3739
// Config is the configuration for creating a Server instance.
@@ -51,6 +53,10 @@ type Config struct {
5153
// MaxConnectionAge is a duration for the maximum amount of time a connection may exist
5254
// before it will be closed by sending a GoAway.
5355
MaxConnectionAge string `yaml:"MaxConnectionAge"`
56+
57+
// MaxConnectionAgeGrace is a duration for the amount of time after receiving a GoAway
58+
// for pending RPCs to complete before forcibly closing connections.
59+
MaxConnectionAgeGrace string `yaml:"MaxConnectionAgeGrace"`
5460
}
5561

5662
// Validate validates the port number and the files for certification.
@@ -74,11 +80,19 @@ func (c *Config) Validate() error {
7480

7581
if _, err := time.ParseDuration(c.MaxConnectionAge); err != nil {
7682
return fmt.Errorf(
77-
"must be a valid time duration string format %d: %w",
83+
"%s: %w",
7884
c.MaxConnectionAge,
7985
ErrInvalidMaxConnectionAge,
8086
)
8187
}
8288

89+
if _, err := time.ParseDuration(c.MaxConnectionAgeGrace); err != nil {
90+
return fmt.Errorf(
91+
"%s: %w",
92+
c.MaxConnectionAgeGrace,
93+
ErrInvalidMaxConnectionAgeGrace,
94+
)
95+
}
96+
8397
return nil
8498
}

server/rpc/server.go

+13-6
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,22 @@ package rpc
2121
import (
2222
"context"
2323
"fmt"
24-
"google.golang.org/grpc/keepalive"
2524
"math"
2625
"net"
2726
"time"
2827

2928
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
29+
"google.golang.org/grpc"
30+
"google.golang.org/grpc/credentials"
31+
"google.golang.org/grpc/health"
32+
healthpb "google.golang.org/grpc/health/grpc_health_v1"
33+
"google.golang.org/grpc/keepalive"
34+
3035
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
3136
"github.com/yorkie-team/yorkie/server/backend"
3237
"github.com/yorkie-team/yorkie/server/grpchelper"
3338
"github.com/yorkie-team/yorkie/server/logging"
3439
"github.com/yorkie-team/yorkie/server/rpc/interceptors"
35-
"google.golang.org/grpc"
36-
"google.golang.org/grpc/credentials"
37-
"google.golang.org/grpc/health"
38-
healthpb "google.golang.org/grpc/health/grpc_health_v1"
3940
)
4041

4142
// Server is a normal server that processes the logic requested by the client.
@@ -79,11 +80,17 @@ func NewServer(conf *Config, be *backend.Backend) (*Server, error) {
7980
return nil, fmt.Errorf("parse max connection age: %w", err)
8081
}
8182

83+
maxConnectionAgeGrace, err := time.ParseDuration(conf.MaxConnectionAgeGrace)
84+
if err != nil {
85+
return nil, fmt.Errorf("parse max connection age grace: %w", err)
86+
}
87+
8288
opts = append(opts, grpc.MaxRecvMsgSize(int(conf.MaxRequestBytes)))
8389
opts = append(opts, grpc.MaxSendMsgSize(math.MaxInt32))
8490
opts = append(opts, grpc.MaxConcurrentStreams(math.MaxUint32))
8591
opts = append(opts, grpc.KeepaliveParams(keepalive.ServerParameters{
86-
MaxConnectionAge: maxConnectionAge,
92+
MaxConnectionAge: maxConnectionAge,
93+
MaxConnectionAgeGrace: maxConnectionAgeGrace,
8794
}))
8895

8996
yorkieServiceCtx, yorkieServiceCancel := context.WithCancel(context.Background())

server/rpc/server_test.go

+65-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"log"
2424
"os"
2525
"testing"
26+
"time"
2627

2728
"github.com/stretchr/testify/assert"
2829
"google.golang.org/grpc"
@@ -90,8 +91,10 @@ func TestMain(m *testing.M) {
9091
}
9192

9293
testRPCServer, err = rpc.NewServer(&rpc.Config{
93-
Port: helper.RPCPort,
94-
MaxRequestBytes: helper.RPCMaxRequestBytes,
94+
Port: helper.RPCPort,
95+
MaxRequestBytes: helper.RPCMaxRequestBytes,
96+
MaxConnectionAge: helper.RPCMaxConnectionAge.String(),
97+
MaxConnectionAgeGrace: helper.RPCMaxConnectionAgeGrace.String(),
9598
}, be)
9699
if err != nil {
97100
log.Fatal(err)
@@ -622,6 +625,50 @@ func TestRPCServerBackend(t *testing.T) {
622625
)
623626
assert.Equal(t, codes.FailedPrecondition, status.Convert(err).Code())
624627
})
628+
629+
t.Run("watch document test", func(t *testing.T) {
630+
activateResp, err := testClient.ActivateClient(
631+
context.Background(),
632+
&api.ActivateClientRequest{ClientKey: t.Name()},
633+
)
634+
assert.NoError(t, err)
635+
636+
packWithNoChanges := &api.ChangePack{
637+
DocumentKey: helper.TestDocKey(t).String(),
638+
Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 0},
639+
}
640+
641+
resPack, err := testClient.AttachDocument(
642+
context.Background(),
643+
&api.AttachDocumentRequest{
644+
ClientId: activateResp.ClientId,
645+
ChangePack: packWithNoChanges,
646+
},
647+
)
648+
assert.NoError(t, err)
649+
650+
// watch document
651+
watchResp, err := testClient.WatchDocument(
652+
context.Background(),
653+
&api.WatchDocumentRequest{
654+
Client: &api.Client{Id: activateResp.ClientId, Presence: &api.Presence{}},
655+
DocumentId: resPack.DocumentId,
656+
},
657+
)
658+
assert.NoError(t, err)
659+
660+
// check if stream is open
661+
_, err = watchResp.Recv()
662+
assert.NoError(t, err)
663+
664+
// wait for MaxConnectionAge + MaxConnectionAgeGrace
665+
time.Sleep(helper.RPCMaxConnectionAge + helper.RPCMaxConnectionAgeGrace)
666+
667+
// check if stream has closed by server (EOF)
668+
_, err = watchResp.Recv()
669+
assert.Equal(t, codes.Unavailable, status.Code(err))
670+
assert.Contains(t, err.Error(), "EOF")
671+
})
625672
}
626673

627674
func TestConfig_Validate(t *testing.T) {
@@ -633,9 +680,23 @@ func TestConfig_Validate(t *testing.T) {
633680
{config: &rpc.Config{Port: 11101, CertFile: "noSuchCertFile"}, expected: rpc.ErrInvalidCertFile},
634681
{config: &rpc.Config{Port: 11101, KeyFile: "noSuchKeyFile"}, expected: rpc.ErrInvalidKeyFile},
635682
// not to use tls
636-
{config: &rpc.Config{Port: 11101, CertFile: "", KeyFile: ""}, expected: nil},
683+
{config: &rpc.Config{
684+
Port: 11101,
685+
CertFile: "",
686+
KeyFile: "",
687+
MaxConnectionAge: "50s",
688+
MaxConnectionAgeGrace: "10s",
689+
},
690+
expected: nil},
637691
// pass any file existing
638-
{config: &rpc.Config{Port: 11101, CertFile: "server_test.go", KeyFile: "server_test.go"}, expected: nil},
692+
{config: &rpc.Config{
693+
Port: 11101,
694+
CertFile: "server_test.go",
695+
KeyFile: "server_test.go",
696+
MaxConnectionAge: "50s",
697+
MaxConnectionAgeGrace: "10s",
698+
},
699+
expected: nil},
639700
}
640701
for _, scenario := range scenarios {
641702
assert.ErrorIs(t, scenario.config.Validate(), scenario.expected, "provided config: %#v", scenario.config)

test/helper/helper.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,18 @@ var testStartedAt int64
4545

4646
// Below are the values of the Yorkie config used in the test.
4747
var (
48-
RPCPort = 21101
49-
RPCMaxRequestBytes = uint64(4 * 1024 * 1024)
50-
RPCMaxConnectionAge = 10 * gotime.Second
48+
RPCPort = 21101
49+
RPCMaxRequestBytes = uint64(4 * 1024 * 1024)
50+
RPCMaxConnectionAge = 4 * gotime.Second
51+
RPCMaxConnectionAgeGrace = 1 * gotime.Second
5152

5253
ProfilingPort = 21102
5354

5455
AdminPort = 21103
5556

5657
AdminUser = server.DefaultAdminUser
5758
AdminPassword = server.DefaultAdminPassword
58-
HousekeepingInterval = 1 * gotime.Second
59+
HousekeepingInterval = 10 * gotime.Second
5960
HousekeepingCandidatesLimitPerProject = 10
6061

6162
ClientDeactivateThreshold = "10s"
@@ -114,9 +115,10 @@ func TestConfig() *server.Config {
114115
portOffset += 100
115116
return &server.Config{
116117
RPC: &rpc.Config{
117-
Port: RPCPort + portOffset,
118-
MaxRequestBytes: RPCMaxRequestBytes,
119-
MaxConnectionAge: RPCMaxConnectionAge.String(),
118+
Port: RPCPort + portOffset,
119+
MaxRequestBytes: RPCMaxRequestBytes,
120+
MaxConnectionAge: RPCMaxConnectionAge.String(),
121+
MaxConnectionAgeGrace: RPCMaxConnectionAgeGrace.String(),
120122
},
121123
Profiling: &profiling.Config{
122124
Port: ProfilingPort + portOffset,

0 commit comments

Comments
 (0)