-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathmt_proxy.go
207 lines (182 loc) · 6.48 KB
/
mt_proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package cli
import (
"context"
"fmt"
"net"
"os"
"os/signal"
"time"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/spf13/cobra"
"golang.org/x/sys/unix"
)
var proxyOpts sqlproxyccl.ProxyOptions
var mtStartSQLProxyCmd = &cobra.Command{
Use: "start-proxy <basepath>",
Short: "start-proxy host:port",
Long: `Starts a SQL proxy.
This proxy accepts incoming connections and relays them to a backend server
determined by the arguments used.
`,
RunE: MaybeDecorateGRPCError(runStartSQLProxy),
Args: cobra.NoArgs,
}
func init() {
f := mtStartSQLProxyCmd.Flags()
f.StringVar(&proxyOpts.Denylist, "denylist-file", "",
"Denylist file to limit access to IP addresses and tenant ids.")
f.StringVar(&proxyOpts.ListenAddr, "listen-addr", "127.0.0.1:46257",
"Listen address for incoming connections.")
f.StringVar(&proxyOpts.ListenCert, "listen-cert", "",
"File containing PEM-encoded x509 certificate for listen address.")
f.StringVar(&proxyOpts.ListenKey, "listen-key", "",
"File containing PEM-encoded x509 key for listen address.")
f.StringVar(&proxyOpts.MetricsAddress, "listen-metrics", "0.0.0.0:8080",
"Listen address for incoming connections.")
f.StringVar(&proxyOpts.RoutingRule, "routing-rule", "",
"Routing rule for incoming connections. Use '{{clusterName}}' for substitution.")
f.StringVar(&proxyOpts.DirectoryAddr, "directory", "",
"Directory address for resolving from backend id to IP.")
f.BoolVar(&proxyOpts.SkipVerify, "skip-verify", false,
"If true, skip identity verification of backend. For testing only.")
f.BoolVar(&proxyOpts.Insecure, "insecure", false,
"If true, use insecure connection to the backend.")
f.DurationVar(&proxyOpts.RatelimitBaseDelay, "ratelimit-base-delay", 50*time.Millisecond,
"Initial backoff after a failed login attempt. Set to 0 to disable rate limiting.")
f.DurationVar(&proxyOpts.ValidateAccessInterval, "validate-access-interval", 30*time.Second,
"Time interval between validation that current connections are still valid.")
f.DurationVar(&proxyOpts.PollConfigInterval, "poll-config-interval", 30*time.Second,
"Polling interval changes in config file.")
f.DurationVar(&proxyOpts.IdleTimeout, "idle-timeout", 0,
"Close connections idle for this duration.")
}
func runStartSQLProxy(cmd *cobra.Command, args []string) (returnErr error) {
// Initialize logging, stopper and context that can be canceled
ctx, stopper, err := initLogging(cmd)
if err != nil {
return err
}
defer stopper.Stop(ctx)
log.Infof(ctx, "New proxy with opts: %+v", proxyOpts)
proxyLn, err := net.Listen("tcp", proxyOpts.ListenAddr)
if err != nil {
return err
}
stopper.AddCloser(stop.CloserFn(func() { _ = proxyLn.Close() }))
metricsLn, err := net.Listen("tcp", proxyOpts.MetricsAddress)
if err != nil {
return err
}
stopper.AddCloser(stop.CloserFn(func() { _ = metricsLn.Close() }))
handler, err := sqlproxyccl.NewProxyHandler(ctx, stopper, proxyOpts)
if err != nil {
return err
}
server := sqlproxyccl.NewServer(handler.Handle)
errChan := make(chan error, 1)
if err := stopper.RunAsyncTask(ctx, "serve-http", func(ctx context.Context) {
log.Infof(ctx, "HTTP metrics server listening at %s", metricsLn.Addr())
if err := server.ServeHTTP(ctx, metricsLn); err != nil {
errChan <- err
}
}); err != nil {
return err
}
if err := stopper.RunAsyncTask(ctx, "serve-proxy", func(ctx context.Context) {
log.Infof(ctx, "proxy server listening at %s", proxyLn.Addr())
if err := server.Serve(ctx, proxyLn); err != nil {
errChan <- err
}
}); err != nil {
return err
}
return waitForSignals(ctx, stopper, errChan)
}
func initLogging(cmd *cobra.Command) (ctx context.Context, stopper *stop.Stopper, err error) {
// Remove the default store, which avoids using it to set up logging.
// Instead, we'll default to logging to stderr unless --log-dir is
// specified. This makes sense since the standalone SQL server is
// at the time of writing stateless and may not be provisioned with
// suitable storage.
serverCfg.Stores.Specs = nil
serverCfg.ClusterName = ""
ctx = context.Background()
stopper, err = setupAndInitializeLoggingAndProfiling(ctx, cmd, false /* isServerCmd */)
if err != nil {
return
}
ctx, _ = stopper.WithCancelOnQuiesce(ctx)
return
}
func waitForSignals(
ctx context.Context, stopper *stop.Stopper, errChan chan error,
) (returnErr error) {
// Need to alias the signals if this has to run on non-unix OSes too.
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, unix.SIGINT, unix.SIGTERM)
// Dump the stacks when QUIT is received.
quitSignalCh := make(chan os.Signal, 1)
signal.Notify(quitSignalCh, unix.SIGQUIT)
go func() {
for {
<-quitSignalCh
log.DumpStacks(context.Background())
}
}()
select {
case err := <-errChan:
log.StartAlwaysFlush()
return err
case <-stopper.ShouldQuiesce():
// Stop has been requested through the stopper's Stop
<-stopper.IsStopped()
// StartAlwaysFlush both flushes and ensures that subsequent log
// writes are flushed too.
log.StartAlwaysFlush()
case sig := <-signalCh: // INT or TERM
log.StartAlwaysFlush() // In case the caller follows up with KILL
log.Ops.Infof(ctx, "received signal '%s'", sig)
if sig == os.Interrupt {
returnErr = errors.New("interrupted")
}
go func() {
log.Infof(ctx, "server stopping")
stopper.Stop(ctx)
}()
case <-log.FatalChan():
stopper.Stop(ctx)
select {} // Block and wait for logging go routine to shut down the process
}
for {
select {
case sig := <-signalCh:
switch sig {
case os.Interrupt: // SIGTERM after SIGTERM
log.Ops.Infof(ctx, "received additional signal '%s'; continuing graceful shutdown", sig)
continue
}
log.Ops.Shoutf(ctx, severity.ERROR,
"received signal '%s' during shutdown, initiating hard shutdown", log.Safe(sig))
panic("terminate")
case <-stopper.IsStopped():
const msgDone = "server shutdown completed"
log.Ops.Infof(ctx, msgDone)
fmt.Fprintln(os.Stdout, msgDone)
}
break
}
return returnErr
}