Skip to content

Commit

Permalink
support TLS for components (#904)
Browse files Browse the repository at this point in the history
truely support TLS for components.
before this pr if enable TLS for components

- `tidb` will fail to connect to `pump`
- no TLS between drainer and pump
- no enable TLS for tikv client in `drainer`
- `binlogctl` can't work actually
...

[relate docs](https://pingcap.com/docs/stable/how-to/secure/enable-tls-between-components/) ([Chinese version](https://pingcap.com/docs-cn/stable/how-to/secure/enable-tls-between-components/))
This Commit:
- properly handle things about TLS when enabling TLS
- enable TLS in the integration tests
- log pump config at startup time
  • Loading branch information
july2993 committed Mar 15, 2020
1 parent e4f2ba2 commit 738f0ca
Show file tree
Hide file tree
Showing 22 changed files with 324 additions and 131 deletions.
30 changes: 15 additions & 15 deletions binlogctl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,20 @@ const (

// Config holds the configuration of drainer
type Config struct {
*flag.FlagSet

Command string `toml:"cmd" json:"cmd"`
NodeID string `toml:"node-id" json:"node-id"`
DataDir string `toml:"data-dir" json:"data-dir"`
TimeZone string `toml:"time-zone" json:"time-zone"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
SSLCA string `toml:"ssl-ca" json:"ssl-ca"`
SSLCert string `toml:"ssl-cert" json:"ssl-cert"`
SSLKey string `toml:"ssl-key" json:"ssl-key"`
State string `toml:"state" json:"state"`
ShowOfflineNodes bool `toml:"state" json:"show-offline-nodes"`
Text string `toml:"text" json:"text"`
tls *tls.Config
*flag.FlagSet `toml:"-" json:"-"`

Command string `toml:"cmd" json:"cmd"`
NodeID string `toml:"node-id" json:"node-id"`
DataDir string `toml:"data-dir" json:"data-dir"`
TimeZone string `toml:"time-zone" json:"time-zone"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
SSLCA string `toml:"ssl-ca" json:"ssl-ca"`
SSLCert string `toml:"ssl-cert" json:"ssl-cert"`
SSLKey string `toml:"ssl-key" json:"ssl-key"`
State string `toml:"state" json:"state"`
ShowOfflineNodes bool `toml:"state" json:"show-offline-nodes"`
Text string `toml:"text" json:"text"`
TLS *tls.Config `toml:"-" json:"-"`
printVersion bool
}

Expand Down Expand Up @@ -134,7 +134,7 @@ func (cfg *Config) Parse(args []string) error {
SSLCert: cfg.SSLCert,
SSLKey: cfg.SSLKey,
}
cfg.tls, err = sCfg.ToTLSConfig()
cfg.TLS, err = sCfg.ToTLSConfig()
if err != nil {
return errors.Errorf("tls config error %v", err)
}
Expand Down
39 changes: 28 additions & 11 deletions binlogctl/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package binlogctl

import (
"context"
"crypto/tls"
"fmt"
"net/http"
"time"
Expand All @@ -34,8 +35,8 @@ var (
)

// QueryNodesByKind returns specified nodes, like pumps/drainers
func QueryNodesByKind(urls string, kind string, showOffline bool) error {
registry, err := createRegistryFuc(urls)
func QueryNodesByKind(urls string, kind string, showOffline bool, tlsConfig *tls.Config) error {
registry, err := createRegistryFuc(urls, tlsConfig)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -56,12 +57,12 @@ func QueryNodesByKind(urls string, kind string, showOffline bool) error {
}

// UpdateNodeState update pump or drainer's state.
func UpdateNodeState(urls, kind, nodeID, state string) error {
func UpdateNodeState(urls, kind, nodeID, state string, tlsConfig *tls.Config) error {
/*
node's state can be online, pausing, paused, closing and offline.
if the state is one of them, will update the node's state saved in etcd directly.
*/
registry, err := createRegistryFuc(urls)
registry, err := createRegistryFuc(urls, tlsConfig)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -81,12 +82,12 @@ func UpdateNodeState(urls, kind, nodeID, state string) error {
}

// createRegistry returns an ectd registry
func createRegistry(urls string) (*node.EtcdRegistry, error) {
func createRegistry(urls string, tlsConfig *tls.Config) (*node.EtcdRegistry, error) {
ectdEndpoints, err := flags.ParseHostPortAddr(urls)
if err != nil {
return nil, errors.Trace(err)
}
cli, err := newEtcdClientFromCfgFunc(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, nil)
cli, err := newEtcdClientFromCfgFunc(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, tlsConfig)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -95,8 +96,8 @@ func createRegistry(urls string) (*node.EtcdRegistry, error) {
}

// ApplyAction applies action on pump or drainer
func ApplyAction(urls, kind, nodeID string, action string) error {
registry, err := createRegistryFuc(urls)
func ApplyAction(urls, kind, nodeID string, action string, tlsConfig *tls.Config) error {
registry, err := createRegistryFuc(urls, tlsConfig)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -106,18 +107,34 @@ func ApplyAction(urls, kind, nodeID string, action string) error {
return errors.Trace(err)
}

var client http.Client
url := fmt.Sprintf("http://%s/state/%s/%s", n.Addr, n.NodeID, action)
schema := "http"
if tlsConfig != nil {
schema = "https"
}

url := fmt.Sprintf("%s://%s/state/%s/%s", schema, n.Addr, n.NodeID, action)
log.Debug("send put http request", zap.String("url", url))
req, err := http.NewRequest("PUT", url, nil)
if err != nil {
return errors.Trace(err)
}
_, err = client.Do(req)
_, err = getClient(tlsConfig).Do(req)
if err == nil {
log.Info("Apply action on node success", zap.String("action", action), zap.String("NodeID", n.NodeID))
return nil
}

return errors.Trace(err)
}

func getClient(tlsConfig *tls.Config) *http.Client {
if tlsConfig == nil {
return &http.Client{}
}

return &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}
}
16 changes: 8 additions & 8 deletions binlogctl/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type testNodesSuite struct{}
func (s *testNodesSuite) SetUpTest(c *C) {
newEtcdClientFromCfgFunc = newFakeEtcdClientFromCfg
createRegistryFuc = createMockRegistry
_, err := createMockRegistry("127.0.0.1:2379")
_, err := createMockRegistry("127.0.0.1:2379", nil)
c.Assert(err, IsNil)
}

Expand All @@ -63,29 +63,29 @@ func (s *testNodesSuite) TestApplyAction(c *C) {

registerPumpForTest(c, "test", url)

err := ApplyAction("127.0.0.1:2379", "pumps", "test2", PausePump)
err := ApplyAction("127.0.0.1:2379", "pumps", "test2", PausePump, nil)
c.Assert(errors.IsNotFound(err), IsTrue)

// TODO: handle log information and add check
err = ApplyAction("127.0.0.1:2379", "pumps", "test", PausePump)
err = ApplyAction("127.0.0.1:2379", "pumps", "test", PausePump, nil)
c.Assert(err, IsNil)
}

func (s *testNodesSuite) TestQueryNodesByKind(c *C) {
registerPumpForTest(c, "test", "127.0.0.1:8255")

// TODO: handle log information and add check
err := QueryNodesByKind("127.0.0.1:2379", "pumps", false)
err := QueryNodesByKind("127.0.0.1:2379", "pumps", false, nil)
c.Assert(err, IsNil)
}

func (s *testNodesSuite) TestUpdateNodeState(c *C) {
registerPumpForTest(c, "test", "127.0.0.1:8255")

err := UpdateNodeState("127.0.0.1:2379", "pumps", "test2", node.Paused)
err := UpdateNodeState("127.0.0.1:2379", "pumps", "test2", node.Paused, nil)
c.Assert(err, ErrorMatches, ".*not found.*")

err = UpdateNodeState("127.0.0.1:2379", "pumps", "test", node.Paused)
err = UpdateNodeState("127.0.0.1:2379", "pumps", "test", node.Paused, nil)
c.Assert(err, IsNil)

// check node's state is changed to paused
Expand All @@ -104,7 +104,7 @@ func (s *testNodesSuite) TestUpdateNodeState(c *C) {

func (s *testNodesSuite) TestCreateRegistry(c *C) {
urls := "127.0.0.1:2379"
registry, err := createRegistry(urls)
registry, err := createRegistry(urls, nil)
c.Assert(err, IsNil)
c.Assert(registry, NotNil)

Expand All @@ -131,7 +131,7 @@ func (s *testNodesSuite) TestCreateRegistry(c *C) {

}

func createMockRegistry(urls string) (*node.EtcdRegistry, error) {
func createMockRegistry(urls string, _ *tls.Config) (*node.EtcdRegistry, error) {
if fakeRegistry != nil {
return fakeRegistry, nil
}
Expand Down
16 changes: 8 additions & 8 deletions cmd/binlogctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,21 @@ func main() {
case ctl.GenerateMeta:
err = ctl.GenerateMetaInfo(cfg)
case ctl.QueryPumps:
err = ctl.QueryNodesByKind(cfg.EtcdURLs, node.PumpNode, cfg.ShowOfflineNodes)
err = ctl.QueryNodesByKind(cfg.EtcdURLs, node.PumpNode, cfg.ShowOfflineNodes, cfg.TLS)
case ctl.QueryDrainers:
err = ctl.QueryNodesByKind(cfg.EtcdURLs, node.DrainerNode, cfg.ShowOfflineNodes)
err = ctl.QueryNodesByKind(cfg.EtcdURLs, node.DrainerNode, cfg.ShowOfflineNodes, cfg.TLS)
case ctl.UpdatePump:
err = ctl.UpdateNodeState(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, cfg.State)
err = ctl.UpdateNodeState(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, cfg.State, cfg.TLS)
case ctl.UpdateDrainer:
err = ctl.UpdateNodeState(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, cfg.State)
err = ctl.UpdateNodeState(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, cfg.State, cfg.TLS)
case ctl.PausePump:
err = ctl.ApplyAction(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, pause)
err = ctl.ApplyAction(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, pause, cfg.TLS)
case ctl.PauseDrainer:
err = ctl.ApplyAction(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, pause)
err = ctl.ApplyAction(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, pause, cfg.TLS)
case ctl.OfflinePump:
err = ctl.ApplyAction(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, close)
err = ctl.ApplyAction(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, close, cfg.TLS)
case ctl.OfflineDrainer:
err = ctl.ApplyAction(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, close)
err = ctl.ApplyAction(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, close, cfg.TLS)
case ctl.Encrypt:
if len(cfg.Text) == 0 {
err = errors.New("need to specify the text to be encrypt")
Expand Down
1 change: 1 addition & 0 deletions cmd/pump/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func main() {
log.Fatal("Failed to initialize log", zap.Error(err))
}
version.PrintVersionInfo("Pump")
log.Info("start pump...", zap.Reflect("config", cfg))

p, err := pump.NewServer(cfg)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion drainer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package drainer

import (
"crypto/tls"
"fmt"
"net/http"
"strings"
Expand Down Expand Up @@ -49,6 +50,7 @@ type notifyResult struct {
// Collector collects binlog from all pump, and send binlog to syncer.
type Collector struct {
clusterID uint64
tls *tls.Config
interval time.Duration
reg *node.EtcdRegistry
tiStore kv.Storage
Expand Down Expand Up @@ -106,6 +108,7 @@ func NewCollector(cfg *Config, clusterID uint64, s *Syncer, cpt checkpoint.Check

c := &Collector{
clusterID: clusterID,
tls: cfg.tls,
interval: time.Duration(cfg.DetectInterval) * time.Second,
reg: node.NewEtcdRegistry(cli, cfg.EtcdTimeout),
pumps: make(map[string]*Pump),
Expand Down Expand Up @@ -308,7 +311,7 @@ func (c *Collector) handlePumpStatusUpdate(ctx context.Context, n *node.Status)
}

commitTS := c.merger.GetLatestTS()
p := NewPump(n.NodeID, n.Addr, c.clusterID, commitTS, c.errCh)
p := NewPump(n.NodeID, n.Addr, c.tls, c.clusterID, commitTS, c.errCh)
c.pumps[n.NodeID] = p
c.merger.AddSource(MergeSource{
ID: n.NodeID,
Expand Down
15 changes: 13 additions & 2 deletions drainer/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package drainer

import (
"crypto/tls"
"strings"
"sync/atomic"
"time"
Expand All @@ -29,6 +30,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
)

Expand All @@ -40,6 +42,7 @@ const (
type Pump struct {
nodeID string
addr string
tlsConfig *tls.Config
clusterID uint64
// the latest binlog ts that pump had handled
latestTS int64
Expand All @@ -56,11 +59,12 @@ type Pump struct {
}

// NewPump returns an instance of Pump
func NewPump(nodeID, addr string, clusterID uint64, startTs int64, errCh chan error) *Pump {
func NewPump(nodeID, addr string, tlsConfig *tls.Config, clusterID uint64, startTs int64, errCh chan error) *Pump {
nodeID = pump.FormatNodeID(nodeID)
return &Pump{
nodeID: nodeID,
addr: addr,
tlsConfig: tlsConfig,
clusterID: clusterID,
latestTS: startTs,
errCh: errCh,
Expand Down Expand Up @@ -204,7 +208,14 @@ func (p *Pump) createPullBinlogsClient(ctx context.Context, last int64) error {
callOpts = append(callOpts, grpc.UseCompressor(compressor))
}

conn, err := grpc.Dial(p.addr, grpc.WithInsecure(), grpc.WithDefaultCallOptions(callOpts...))
dialOpts := []grpc.DialOption{grpc.WithDefaultCallOptions(callOpts...)}
if p.tlsConfig != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(p.tlsConfig)))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}

conn, err := grpc.Dial(p.addr, dialOpts...)
if err != nil {
p.logger.Error("pump create grpc dial failed", zap.Error(err))
p.pullCli = nil
Expand Down
2 changes: 1 addition & 1 deletion drainer/pump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (x *mockPumpPullBinlogsClient) Recv() (*binlog.PullBinlogResp, error) {

func (s *pumpSuite) TestPullBinlog(c *C) {
errChan := make(chan error, 10)
p := NewPump("pump_test", "", 0, 5, errChan)
p := NewPump("pump_test", "", nil, 0, 5, errChan)
p.grpcConn = &grpc.ClientConn{}
binlogBytesChan := make(chan []byte, 10)
p.pullCli = &mockPumpPullBinlogsClient{binlogBytesChan: binlogBytesChan}
Expand Down
17 changes: 9 additions & 8 deletions drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package drainer

import (
"fmt"
"net"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -98,6 +97,11 @@ func NewServer(cfg *Config) (*Server, error) {
return nil, err
}

if cfg.tls != nil {
// TODO: avoid this magic enabling TLS for tikv client.
var _ = cfg.Security.ToTiDBSecurityConfig()
}

// get pd client and cluster ID
pdCli, err := getPdClient(cfg.EtcdURLs, cfg.Security)
if err != nil {
Expand Down Expand Up @@ -284,15 +288,12 @@ func (s *Server) Start() error {
}
})

// start a TCP listener
tcpURL, err := url.Parse(s.tcpAddr)
if err != nil {
return errors.Annotatef(err, "invalid listening tcp addr (%s)", s.tcpAddr)
}
tcpLis, err := net.Listen("tcp", tcpURL.Host)
// We need to manage TLS here for cmux to distinguish between HTTP and gRPC.
tcpLis, err := util.Listen("tcp", s.tcpAddr, s.cfg.tls)
if err != nil {
return errors.Annotatef(err, "fail to start TCP listener on %s", tcpURL.Host)
return errors.Trace(err)
}

m := cmux.New(tcpLis)
grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
httpL := m.Match(cmux.HTTP1Fast())
Expand Down
2 changes: 2 additions & 0 deletions pkg/security/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func (c *Config) ToTiDBSecurityConfig() config.Security {
ClusterSSLKey: c.SSLKey,
}

// The TiKV client(kvstore.New) we use will use this global var as the TLS config.
// TODO avoid such magic implicit change when call this func.
config.GetGlobalConfig().Security = security
return security
}
Loading

0 comments on commit 738f0ca

Please sign in to comment.