Skip to content

Commit

Permalink
Add grpc keepalive configuration.
Browse files Browse the repository at this point in the history
Prior to the introduction of this configuration, grpc keepalive messages were
sent after 2 hours of inactivity on the stream. This posed issues in various
scenarios where the server-side xds connection balancing was unaware that envoy
instances were uncleanly killed / force-closed, since the connections would
only be cleaned up after ~5 minutes of TCP timeouts occurred. Setting this
config to a 30 second interval with a 20 second timeout ensures that at most,
it should take up to 50 seconds for a dead xds connection to be closed.
  • Loading branch information
hashi-derek committed Oct 23, 2023
1 parent 62dec7e commit c2200d1
Show file tree
Hide file tree
Showing 14 changed files with 58 additions and 8 deletions.
9 changes: 9 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver"
Expand Down Expand Up @@ -722,6 +723,10 @@ func (a *Agent) Start(ctx context.Context) error {
metrics.Default(),
a.tlsConfigurator,
incomingRPCLimiter,
keepalive.ServerParameters{
Time: a.config.GRPCKeepaliveInterval,
Timeout: a.config.GRPCKeepaliveTimeout,
},
)

var pt *proxytracker.ProxyTracker
Expand Down Expand Up @@ -757,6 +762,10 @@ func (a *Agent) Start(ctx context.Context) error {
metrics.Default(),
a.tlsConfigurator,
rpcRate.NullRequestLimitsHandler(),
keepalive.ServerParameters{
Time: a.config.GRPCKeepaliveInterval,
Timeout: a.config.GRPCKeepaliveTimeout,
},
)

client, err := consul.NewClient(consulCfg, a.baseDeps.Deps)
Expand Down
2 changes: 2 additions & 0 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,8 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
GRPCPort: grpcPort,
GRPCTLSAddrs: grpcTlsAddrs,
GRPCTLSPort: grpcTlsPort,
GRPCKeepaliveInterval: b.durationValWithDefaultMin("performance.grpc_keepalive_interval", c.Performance.GRPCKeepaliveInterval, 30*time.Second, time.Second),
GRPCKeepaliveTimeout: b.durationValWithDefaultMin("performance.grpc_keepalive_timeout", c.Performance.GRPCKeepaliveTimeout, 20*time.Second, time.Second),
HTTPMaxConnsPerClient: intVal(c.Limits.HTTPMaxConnsPerClient),
HTTPSHandshakeTimeout: b.durationVal("limits.https_handshake_timeout", c.Limits.HTTPSHandshakeTimeout),
KVMaxValueSize: uint64Val(c.Limits.KVMaxValueSize),
Expand Down
8 changes: 5 additions & 3 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,9 +673,11 @@ type HTTPConfig struct {
}

type Performance struct {
LeaveDrainTime *string `mapstructure:"leave_drain_time"`
RaftMultiplier *int `mapstructure:"raft_multiplier"` // todo(fs): validate as uint
RPCHoldTimeout *string `mapstructure:"rpc_hold_timeout"`
LeaveDrainTime *string `mapstructure:"leave_drain_time"`
RaftMultiplier *int `mapstructure:"raft_multiplier"` // todo(fs): validate as uint
RPCHoldTimeout *string `mapstructure:"rpc_hold_timeout"`
GRPCKeepaliveInterval *string `mapstructure:"grpc_keepalive_interval"`
GRPCKeepaliveTimeout *string `mapstructure:"grpc_keepalive_timeout"`
}

type Telemetry struct {
Expand Down
2 changes: 2 additions & 0 deletions agent/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func DefaultSource() Source {
leave_drain_time = "5s"
raft_multiplier = ` + strconv.Itoa(int(consul.DefaultRaftMultiplier)) + `
rpc_hold_timeout = "7s"
grpc_keepalive_interval = "30s"
grpc_keepalive_timeout = "20s"
}
ports = {
dns = 8600
Expand Down
13 changes: 13 additions & 0 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,19 @@ type RuntimeConfig struct {
// hcl: client_addr = string addresses { grpc_tls = string } ports { grpc_tls = int }
GRPCTLSAddrs []net.Addr

// GRPCKeepaliveInterval determines how frequently an HTTP2 keepalive will be broadcast
// whenever a GRPC connection is idle. This helps detect xds connections that have died.
//
// Since the xds load balancing between servers relies on knowing how many connections
// are active, this configuration ensures that they are routinely detected / cleaned up
// on an interval.
GRPCKeepaliveInterval time.Duration

// GRPCKeepaliveTimeout specifies how long a GRPC client has to reply to the keepalive
// messages spawned from GRPCKeepaliveInterval. If a client does not reply in this amount of
// time, the connection will be closed by the server.
GRPCKeepaliveTimeout time.Duration

// HTTPAddrs contains the list of TCP addresses and UNIX sockets the HTTP
// server will bind to. If the HTTP endpoint is disabled (ports.http <= 0)
// the list is empty.
Expand Down
2 changes: 2 additions & 0 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6560,6 +6560,8 @@ func TestLoad_FullConfig(t *testing.T) {
GRPCAddrs: []net.Addr{tcpAddr("32.31.61.91:4881")},
GRPCTLSPort: 5201,
GRPCTLSAddrs: []net.Addr{tcpAddr("23.14.88.19:5201")},
GRPCKeepaliveInterval: 33 * time.Second,
GRPCKeepaliveTimeout: 22 * time.Second,
HTTPAddrs: []net.Addr{tcpAddr("83.39.91.39:7999")},
HTTPBlockEndpoints: []string{"RBvAFcGD", "fWOWFznh"},
AllowWriteHTTPFrom: []*net.IPNet{cidr("127.0.0.0/8"), cidr("22.33.44.55/32"), cidr("0.0.0.0/0")},
Expand Down
2 changes: 2 additions & 0 deletions agent/config/testdata/TestRuntimeConfig_Sanitize.golden
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@
"GRPCPort": 0,
"GRPCTLSAddrs": [],
"GRPCTLSPort": 0,
"GRPCKeepaliveInterval": "0s",
"GRPCKeepaliveTimeout": "0s",
"GossipLANGossipInterval": "0s",
"GossipLANGossipNodes": 0,
"GossipLANProbeInterval": "0s",
Expand Down
2 changes: 2 additions & 0 deletions agent/config/testdata/full-config.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ performance {
leave_drain_time = "8265s"
raft_multiplier = 5
rpc_hold_timeout = "15707s"
grpc_keepalive_interval = "33s"
grpc_keepalive_timeout = "22s"
}
pid_file = "43xN80Km"
ports {
Expand Down
4 changes: 3 additions & 1 deletion agent/config/testdata/full-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,9 @@
"performance": {
"leave_drain_time": "8265s",
"raft_multiplier": 5,
"rpc_hold_timeout": "15707s"
"rpc_hold_timeout": "15707s",
"grpc_keepalive_interval": "33s",
"grpc_keepalive_timeout": "22s"
},
"pid_file": "43xN80Km",
"ports": {
Expand Down
3 changes: 2 additions & 1 deletion agent/consul/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

"github.com/hashicorp/consul-net-rpc/net/rpc"

Expand Down Expand Up @@ -339,7 +340,7 @@ func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) {
oldNotify()
}
}
grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, rpcRate.NullRequestLimitsHandler())
grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, rpcRate.NullRequestLimitsHandler(), keepalive.ServerParameters{})
proxyUpdater := proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{})
srv, err := NewServer(c, deps, grpcServer, nil, deps.Logger, proxyUpdater)
if err != nil {
Expand Down
9 changes: 8 additions & 1 deletion agent/grpc-external/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ var (

// NewServer constructs a gRPC server for the external gRPC port, to which
// handlers can be registered.
func NewServer(logger agentmiddleware.Logger, metricsObj *metrics.Metrics, tls *tlsutil.Configurator, limiter rate.RequestLimitsHandler) *grpc.Server {
func NewServer(
logger agentmiddleware.Logger,
metricsObj *metrics.Metrics,
tls *tlsutil.Configurator,
limiter rate.RequestLimitsHandler,
keepaliveParams keepalive.ServerParameters,
) *grpc.Server {
if metricsObj == nil {
metricsObj = metrics.Default()
}
Expand Down Expand Up @@ -56,6 +62,7 @@ func NewServer(logger agentmiddleware.Logger, metricsObj *metrics.Metrics, tls *
grpc.StatsHandler(agentmiddleware.NewStatsHandler(metricsObj, metricsLabels)),
middleware.WithUnaryServerChain(unaryInterceptors...),
middleware.WithStreamServerChain(streamInterceptors...),
grpc.KeepaliveParams(keepaliveParams),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
// This must be less than the keealive.ClientParameters Time setting, otherwise
// the server will disconnect the client for sending too many keepalive pings.
Expand Down
3 changes: 2 additions & 1 deletion agent/grpc-external/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

"github.com/hashicorp/go-hclog"

Expand All @@ -27,7 +28,7 @@ import (
func TestServer_EmitsStats(t *testing.T) {
sink, metricsObj := testutil.NewFakeSink(t)

srv := NewServer(hclog.Default(), metricsObj, nil, rate.NullRequestLimitsHandler())
srv := NewServer(hclog.Default(), metricsObj, nil, rate.NullRequestLimitsHandler(), keepalive.ServerParameters{})

testservice.RegisterSimpleServer(srv, &testservice.Simple{})

Expand Down
3 changes: 2 additions & 1 deletion agent/rpc/peering/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
gogrpc "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
grpcstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -1818,7 +1819,7 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer {
conf.ACLResolverSettings.EnterpriseMeta = *conf.AgentEnterpriseMeta()

deps := newDefaultDeps(t, conf)
externalGRPCServer := external.NewServer(deps.Logger, nil, deps.TLSConfigurator, rate.NullRequestLimitsHandler())
externalGRPCServer := external.NewServer(deps.Logger, nil, deps.TLSConfigurator, rate.NullRequestLimitsHandler(), keepalive.ServerParameters{})

server, err := consul.NewServer(conf, deps, externalGRPCServer, nil, deps.Logger, nil)
require.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions website/content/docs/agent/config/config-files.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,10 @@ Refer to the [formatting specification](https://golang.org/pkg/time/#ParseDurati
This was added in Consul 1.0. Must be a duration value such as 10s. Defaults
to 7s.

- `grpc_keepalive_interval` - A duration that determines the frequency that servers will send keepalive messages to inactive grpc clients. This configuration can be used to modify how quick uncleanly closed xds or peering connections are detected and removed. Defaults to 30s.

- `grpc_keepalive_timeout` - A duration that determines how long a server will wait for a reply to a keepalive message before considering a grpc connection unhealthy and forcibly removing it. Defaults to 20s.

- `pid_file` Equivalent to the [`-pid-file` command line flag](/consul/docs/agent/config/cli-flags#_pid_file).

- `ports` This is a nested object that allows setting the bind ports for the following keys:
Expand Down

0 comments on commit c2200d1

Please sign in to comment.