Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automate cluster repair #888

Merged
merged 4 commits into from
May 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions pkg/clusterstatus/apiserverstatus/apiserverstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,31 @@ type Report struct {

// Get uses the /healthz endpoint to check are all API server instances healthy
func Get(s *state.State, node kubeoneapi.HostConfig) (*Report, error) {
tunneler, err := sshtunnel.NewHTTPTunnel(s.Connector, node, &tls.Config{InsecureSkipVerify: true})
roundTripper, err := sshtunnel.NewHTTPTransport(s.Connector, node, &tls.Config{InsecureSkipVerify: true})
if err != nil {
return nil, err
}

health, err := apiserverHealth(tunneler, node.PrivateAddress)
health, err := apiserverHealth(roundTripper, node.PrivateAddress)
if err != nil {
return &Report{
Health: false,
}, err
}

return &Report{
Health: health,
}, nil
return &Report{Health: health}, nil
}

// apiserverHealth checks is API server healthy
func apiserverHealth(t sshtunnel.Doer, nodeAddress string) (bool, error) {
func apiserverHealth(t http.RoundTripper, nodeAddress string) (bool, error) {
endpoint := fmt.Sprintf(healthzEndpoint, nodeAddress)
request, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return false, err
}

resp, err := t.Do(request)
httpClient := http.Client{Transport: t}
resp, err := httpClient.Do(request)
if err != nil {
return false, err
}
Expand Down
109 changes: 22 additions & 87 deletions pkg/clusterstatus/etcdstatus/etcdstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,17 @@ limitations under the License.
package etcdstatus

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"time"

"github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"

kubeoneapi "github.com/kubermatic/kubeone/pkg/apis/kubeone"
"github.com/kubermatic/kubeone/pkg/etcdutil"
"github.com/kubermatic/kubeone/pkg/ssh/sshtunnel"
"github.com/kubermatic/kubeone/pkg/state"
)
Expand All @@ -49,33 +44,23 @@ type Report struct {
}

func MemberList(s *state.State) (*clientv3.MemberListResponse, error) {
tunnel, err := s.Connector.Tunnel(s.Cluster.RandomHost())
if err != nil {
return nil, err
etcdEndpoints := []string{}
for _, node := range s.Cluster.Hosts {
etcdEndpoints = append(etcdEndpoints, fmt.Sprintf(clientEndpointFmt, node.PrivateAddress))
}

tlsConfig, err := loadTLSConfig(s)
leader, err := s.Cluster.Leader()
if err != nil {
return nil, err
}

etcdEndpoints := []string{}
for _, node := range s.Cluster.Hosts {
etcdEndpoints = append(etcdEndpoints, fmt.Sprintf(clientEndpointFmt, node.PrivateAddress))
etcdcfg, err := etcdutil.NewClientConfig(s, leader)
if err != nil {
return nil, err
}

etcdcli, err := clientv3.New(clientv3.Config{
Endpoints: etcdEndpoints,
TLS: tlsConfig,
Context: s.Context,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(),
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return tunnel.TunnelTo(ctx, "tcp4", addr)
}),
},
})
etcdcfg.Endpoints = etcdEndpoints
etcdcli, err := clientv3.New(*etcdcfg)
if err != nil {
return nil, errors.Wrap(err, "failed to connect to etcd cluster")
}
Expand All @@ -87,18 +72,23 @@ func MemberList(s *state.State) (*clientv3.MemberListResponse, error) {

// Get analyzes health of an etcd cluster member
func Get(s *state.State, node kubeoneapi.HostConfig, etcdRing *clientv3.MemberListResponse) (*Report, error) {
tlsConfig, err := loadTLSConfig(s)
sshconn, err := s.Connector.Connect(node)
if err != nil {
return nil, err
}

etcdTLSConfig, err := etcdutil.LoadTLSConfig(sshconn)
if err != nil {
return nil, err
}

tunneler, err := sshtunnel.NewHTTPTunnel(s.Connector, node, tlsConfig)
roundTripper, err := sshtunnel.NewHTTPTransport(s.Connector, node, etcdTLSConfig)
if err != nil {
return nil, err
}

// Check etcd member health
health, err := memberHealth(tunneler, node.PrivateAddress)
health, err := memberHealth(roundTripper, node.PrivateAddress)
if err != nil {
return nil, err
}
Expand All @@ -118,7 +108,7 @@ func Get(s *state.State, node kubeoneapi.HostConfig, etcdRing *clientv3.MemberLi
}

// memberHealth returns health for a requested etcd member
func memberHealth(t sshtunnel.Doer, nodeAddress string) (bool, error) {
func memberHealth(t http.RoundTripper, nodeAddress string) (bool, error) {
endpoint := fmt.Sprintf(healthEndpointFmt, nodeAddress)

request, err := http.NewRequest("GET", endpoint, nil)
Expand All @@ -127,7 +117,9 @@ func memberHealth(t sshtunnel.Doer, nodeAddress string) (bool, error) {
}

request.Header.Set("Content-type", "application/json")
resp, err := t.Do(request)

httpClient := http.Client{Transport: t}
resp, err := httpClient.Do(request)
if err != nil {
return false, err
}
Expand All @@ -148,60 +140,3 @@ func memberHealth(t sshtunnel.Doer, nodeAddress string) (bool, error) {

return strconv.ParseBool(h.Health)
}

// loadTLSConfig creates the tls.Config structure used in an http client to securely connect to etcd
func loadTLSConfig(s *state.State) (*tls.Config, error) {
caBytes, certBytes, keyBytes, err := downloadEtcdCerts(s)
if err != nil {
return nil, err
}

// Add certificate and key to the TLS config
cert, err := tls.X509KeyPair(certBytes, keyBytes)
if err != nil {
return nil, err
}

// Add CA certificate to the TLS config
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caBytes)

return &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{cert},
}, nil
}

// downloadEtcdCerts returns CA certificate, certificate, and key used to securely access etcd
func downloadEtcdCerts(s *state.State) ([]byte, []byte, []byte, error) {
// Connect to the host
host, err := s.Cluster.Leader()
if err != nil {
return nil, nil, nil, err
}

conn, err := s.Connector.Connect(host)
if err != nil {
return nil, nil, nil, err
}

// Download CA
caCert, _, _, err := conn.Exec("sudo cat /etc/kubernetes/pki/etcd/ca.crt")
if err != nil {
return nil, nil, nil, err
}

// Download cert
cert, _, _, err := conn.Exec("sudo cat /etc/kubernetes/pki/etcd/server.crt")
if err != nil {
return nil, nil, nil, err
}

// Download key
key, _, _, err := conn.Exec("sudo cat /etc/kubernetes/pki/etcd/server.key")
if err != nil {
return nil, nil, nil, err
}

return []byte(caCert), []byte(cert), []byte(key), nil
}
101 changes: 101 additions & 0 deletions pkg/etcdutil/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Copyright 2019 The KubeOne Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package etcdutil

import (
"crypto/tls"
"crypto/x509"
"fmt"
"time"

"github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"

"github.com/kubermatic/kubeone/pkg/apis/kubeone"
"github.com/kubermatic/kubeone/pkg/ssh"
"github.com/kubermatic/kubeone/pkg/ssh/sshtunnel"
"github.com/kubermatic/kubeone/pkg/state"
)

// NewClientConfig returns etcd clientv3 Config configured with TLS certificates
// and tunneled over SSH
func NewClientConfig(s *state.State, host kubeone.HostConfig) (*clientv3.Config, error) {
sshconn, err := s.Connector.Connect(host)
if err != nil {
return nil, err
}

grpcDialer, err := sshtunnel.NewGRPCDialer(s.Connector, host)
if err != nil {
return nil, errors.Wrap(err, "failed to create grpc tunnel dialer")
}

tlsConf, err := LoadTLSConfig(sshconn)
if err != nil {
return nil, err
}

return &clientv3.Config{
Endpoints: []string{fmt.Sprintf("%s:2379", host.PrivateAddress)},
TLS: tlsConf,
Context: s.Context,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(),
grpcDialer,
},
}, nil
}

// LoadTLSConfig creates the tls.Config structure used securely connect to etcd,
// certificates and key are downloaded over SSH from the
// /etc/kubernetes/pki/etcd/ directory.
func LoadTLSConfig(conn ssh.Connection) (*tls.Config, error) {
// Download CA
caCertPem, _, _, err := conn.Exec("sudo cat /etc/kubernetes/pki/etcd/ca.crt")
if err != nil {
return nil, err
}

// Download cert
certPem, _, _, err := conn.Exec("sudo cat /etc/kubernetes/pki/etcd/server.crt")
if err != nil {
return nil, err
}

// Download key
keyPem, _, _, err := conn.Exec("sudo cat /etc/kubernetes/pki/etcd/server.key")
if err != nil {
return nil, err
}

// Add certificate and key to the TLS config
cert, err := tls.X509KeyPair([]byte(certPem), []byte(keyPem))
if err != nil {
return nil, err
}

// Add CA certificate to the TLS config
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(caCertPem))

return &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{cert},
}, nil
}
41 changes: 41 additions & 0 deletions pkg/ssh/sshtunnel/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2019 The KubeOne Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sshtunnel

import (
"context"
"net"

"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/kubermatic/kubeone/pkg/apis/kubeone"
"github.com/kubermatic/kubeone/pkg/ssh"
)

// NewGRPCDialer initialize gRPC dialer that will use ssh tunnel as
// transport
func NewGRPCDialer(connector *ssh.Connector, target kubeone.HostConfig) (grpc.DialOption, error) {
tunnel, err := connector.Tunnel(target)
if err != nil {
return nil, errors.Wrap(err, "failed to establish SSH tunnel")
}

return grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return tunnel.TunnelTo(ctx, "tcp4", addr)
}), nil
}
20 changes: 4 additions & 16 deletions pkg/ssh/sshtunnel/httptunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,20 @@ import (
"github.com/kubermatic/kubeone/pkg/ssh"
)

type httpTunnel struct {
*http.Client
}

type Doer interface {
Do(*http.Request) (*http.Response, error)
}

func NewHTTPTunnel(connector *ssh.Connector, target kubeone.HostConfig, tlsConfig *tls.Config) (Doer, error) {
// NewHTTPTransport initialize net/http Transport that will use SSH tunnel as
// transport
func NewHTTPTransport(connector *ssh.Connector, target kubeone.HostConfig, tlsConfig *tls.Config) (http.RoundTripper, error) {
tunn, err := connector.Tunnel(target)
if err != nil {
return nil, errors.Wrap(err, "failed to get SSH tunnel")
}

transport := &http.Transport{
return &http.Transport{
DialContext: tunn.TunnelTo,
TLSClientConfig: tlsConfig,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}

return &httpTunnel{
Client: &http.Client{
Transport: transport,
},
}, nil
}
Loading