Skip to content

Commit

Permalink
Fix the bug of falsely wrong annotation added to etcd-member lease of…
Browse files Browse the repository at this point in the history
… TLS not enabled. (#564)

* Fix bug of wrong annotation is added to etcd-member lease of TLS not enabled.

* Small fixes.

* Address review comments.
  • Loading branch information
ishan16696 authored Feb 9, 2023
1 parent 5b9739a commit 0b5fbee
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 89 deletions.
6 changes: 5 additions & 1 deletion pkg/health/heartbeat/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,11 @@ func DeltaSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, k8s
}

// RenewMemberLeasePeriodically has a timer and will periodically call RenewMemberLeases to renew the member lease until stopped
func RenewMemberLeasePeriodically(ctx context.Context, stopCh chan struct{}, hconfig *brtypes.HealthConfig, logger *logrus.Entry, etcdConfig *brtypes.EtcdConnectionConfig, peerURLTLSEnabled bool) error {
func RenewMemberLeasePeriodically(ctx context.Context, stopCh chan struct{}, hconfig *brtypes.HealthConfig, logger *logrus.Entry, etcdConfig *brtypes.EtcdConnectionConfig) error {
peerURLTLSEnabled, err := miscellaneous.IsPeerURLTLSEnabled()
if err != nil {
return fmt.Errorf("unable to check peer TLS enabled or not: %v", err)
}

clientSet, err := miscellaneous.GetKubernetesClientSetOrError()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/health/heartbeat/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ var _ = Describe("Heartbeat", func() {
})
Context("With fail to create clientset", func() {
It("Should return an error", func() {
err := heartbeat.RenewMemberLeasePeriodically(testCtx, mmStopCh, hConfig, logger, etcdConnectionConfig, true)
err := heartbeat.RenewMemberLeasePeriodically(testCtx, mmStopCh, hConfig, logger, etcdConnectionConfig)
Expect(err).Should(HaveOccurred())
})
})
Expand Down
17 changes: 8 additions & 9 deletions pkg/member/member_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Control interface {
PromoteMember(context.Context) error

// UpdateMemberPeerURL updates the peer address of a specified etcd cluster member.
UpdateMemberPeerURL(context.Context, client.ClusterCloser) (string, error)
UpdateMemberPeerURL(context.Context, client.ClusterCloser) error

// RemoveMember removes the member from the etcd cluster.
RemoveMember(context.Context) error
Expand Down Expand Up @@ -188,24 +188,23 @@ func getMemberPeerURL(configFile string, podName string) (string, error) {
}

// doUpdateMemberPeerAddress updated the peer address of a specified etcd member
func (m *memberControl) doUpdateMemberPeerAddress(ctx context.Context, cli client.ClusterCloser, id uint64) (string, error) {
// Already existing clusters have `http://localhost:2380` as the peer address. This needs to explicitly updated to the new address
// TODO: Remove this peer address updation logic on etcd-br v0.20.0
func (m *memberControl) doUpdateMemberPeerAddress(ctx context.Context, cli client.ClusterCloser, id uint64) error {
// Already existing clusters or cluster after restoration have `http://localhost:2380` as the peer address. This needs to explicitly updated to the correct peer address.
m.logger.Infof("Updating member peer URL for %s", m.podName)

memberPeerURL, err := getMemberPeerURL(m.configFile, m.podName)
if err != nil {
return "", fmt.Errorf("could not fetch member URL : %v", err)
return fmt.Errorf("could not fetch member URL : %v", err)
}

memberUpdateCtx, cancel := context.WithTimeout(ctx, EtcdTimeout)
defer cancel()

if _, err = cli.MemberUpdate(memberUpdateCtx, id, []string{memberPeerURL}); err == nil {
m.logger.Info("Successfully updated the member peer URL")
return memberPeerURL, nil
return nil
}
return "", err
return err
}

// PromoteMember promotes an etcd member from a learner to a voting member of the cluster. This will succeed only if its logs are caught up with the leader
Expand Down Expand Up @@ -246,14 +245,14 @@ func findMember(existingMembers []*etcdserverpb.Member, memberName string) *etcd
}

// UpdateMemberPeerURL updates the peer address of a specified etcd cluster member.
func (m *memberControl) UpdateMemberPeerURL(ctx context.Context, cli client.ClusterCloser) (string, error) {
func (m *memberControl) UpdateMemberPeerURL(ctx context.Context, cli client.ClusterCloser) error {
m.logger.Infof("Attempting to update the member Info: %v", m.podName)
ctx, cancel := context.WithTimeout(ctx, brtypes.DefaultEtcdConnectionTimeout)
defer cancel()

membersInfo, err := cli.MemberList(ctx)
if err != nil {
return "", fmt.Errorf("error listing members: %v", err)
return fmt.Errorf("error listing members: %v", err)
}

return m.doUpdateMemberPeerAddress(ctx, cli, membersInfo.Header.GetMemberId())
Expand Down
6 changes: 2 additions & 4 deletions pkg/member/member_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,8 @@ var _ = Describe("Membercontrol", func() {

cl.EXPECT().MemberUpdate(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)

peerUrl, err := m.UpdateMemberPeerURL(context.TODO(), client)
err = m.UpdateMemberPeerURL(context.TODO(), client)
Expect(err).ShouldNot(HaveOccurred())
expectedPeerUrl := fmt.Sprintf("http://%s.%s.%s.svc:2380", podName, "etcd-main-peer", "default")
Expect(peerUrl).To(Equal(expectedPeerUrl))
})
})

Expand Down Expand Up @@ -166,7 +164,7 @@ var _ = Describe("Membercontrol", func() {

cl.EXPECT().MemberUpdate(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("unable to connect to dummy etcd"))

_, err = m.UpdateMemberPeerURL(context.TODO(), client)
err = m.UpdateMemberPeerURL(context.TODO(), client)
Expect(err).Should(HaveOccurred())
})
})
Expand Down
34 changes: 32 additions & 2 deletions pkg/miscellaneous/miscellaneous.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ const (
ClusterStateNew = "new"
// ClusterStateExisting defines the "existing" state of etcd cluster.
ClusterStateExisting = "existing"

https = "https"
)

// GetLatestFullSnapshotAndDeltaSnapList returns the latest snapshot
Expand Down Expand Up @@ -501,12 +503,12 @@ func RemoveMemberFromCluster(ctx context.Context, cli etcdClient.ClusterCloser,
func ReadConfigFileAsMap(path string) (map[string]interface{}, error) {
configYML, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("unable to read etcd c file at path: %s : %v", path, err)
return nil, fmt.Errorf("unable to read etcd config file at path: %s : %v", path, err)
}

c := map[string]interface{}{}
if err := yaml.Unmarshal(configYML, &c); err != nil {
return nil, fmt.Errorf("unable to unmarshal etcd c yaml file at path: %s : %v", path, err)
return nil, fmt.Errorf("unable to unmarshal etcd config yaml file at path: %s : %v", path, err)
}
return c, nil
}
Expand All @@ -520,3 +522,31 @@ func ParsePeerURL(initialAdvertisePeerURLs, podName string) (string, error) {
domaiName := fmt.Sprintf("%s.%s.%s", tokens[1], tokens[2], "svc")
return fmt.Sprintf("%s://%s.%s:%s", tokens[0], podName, domaiName, tokens[3]), nil
}

// IsPeerURLTLSEnabled checks whether the peer address is TLS enabled or not.
func IsPeerURLTLSEnabled() (bool, error) {
podName, err := GetEnvVarOrError("POD_NAME")
if err != nil {
return false, err
}

configFile := GetConfigFilePath()

config, err := ReadConfigFileAsMap(configFile)
if err != nil {
return false, err
}
initAdPeerURL := config["initial-advertise-peer-urls"]

memberPeerURL, err := ParsePeerURL(initAdPeerURL.(string), podName)
if err != nil {
return false, err
}

peerURL, err := url.Parse(memberPeerURL)
if err != nil {
return false, err
}

return peerURL.Scheme == https, nil
}
61 changes: 61 additions & 0 deletions pkg/miscellaneous/miscellaneous_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"reflect"
"time"
Expand Down Expand Up @@ -661,6 +662,66 @@ var _ = Describe("Miscellaneous Tests", func() {
})
})

Describe("Checking IsPeerURLTLSEnabled", func() {
var (
outfile = "/tmp/etcd.conf.yaml"
)
BeforeEach(func() {
Expect(os.Setenv("POD_NAME", "test_pod")).To(Succeed())
Expect(os.Setenv("ETCD_CONF", outfile)).To(Succeed())
})
AfterEach(func() {
Expect(os.Unsetenv("POD_NAME")).To(Succeed())
Expect(os.Unsetenv("ETCD_CONF")).To(Succeed())
})

Context("with non-TLS enabled peer url", func() {
BeforeEach(func() {
etcdConfigYaml := `name: etcd1
initial-advertise-peer-urls: http@etcd-main-peer@default@2380
initial-cluster: etcd1=http://0.0.0.0:2380`
err := os.WriteFile(outfile, []byte(etcdConfigYaml), 0755)
Expect(err).ShouldNot(HaveOccurred())
})
It("should return false", func() {
enabled, err := IsPeerURLTLSEnabled()
Expect(err).To(BeNil())
Expect(enabled).To(BeFalse())
})

})

Context("with TLS enabled peer url", func() {
BeforeEach(func() {
etcdConfigYaml := `name: etcd1
initial-advertise-peer-urls: https@etcd-main-peer@default@2380
initial-cluster: etcd1=https://0.0.0.0:2380`
err := os.WriteFile(outfile, []byte(etcdConfigYaml), 0755)
Expect(err).ShouldNot(HaveOccurred())
})
It("should return true", func() {
enabled, err := IsPeerURLTLSEnabled()
Expect(err).To(BeNil())
Expect(enabled).To(BeTrue())
})
})

Context("with empty peer url passed", func() {
BeforeEach(func() {
etcdConfigYaml := `name: etcd1
initial-advertise-peer-urls: ""
initial-cluster: etcd1=http://0.0.0.0:2380`
err := os.WriteFile(outfile, []byte(etcdConfigYaml), 0755)
Expect(err).ShouldNot(HaveOccurred())
})
It("should return error", func() {
enabled, err := IsPeerURLTLSEnabled()
Expect(err).Should(HaveOccurred())
Expect(enabled).To(BeFalse())
})
})
})

})

func emptyStatefulSet(name, namespace string) *appsv1.StatefulSet {
Expand Down
46 changes: 0 additions & 46 deletions pkg/server/backrestoreserver_test.go

This file was deleted.

33 changes: 7 additions & 26 deletions pkg/server/backuprestoreserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"net/http"
"net/url"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -59,10 +58,6 @@ var (
retryTimeout = 5 * time.Second
)

const (
https = "https"
)

// NewBackupRestoreServer return new backup restore server.
func NewBackupRestoreServer(logger *logrus.Logger, config *BackupRestoreComponentConfig) (*BackupRestoreServer, error) {
serverLogger := logger.WithField("actor", "backup-restore-server")
Expand Down Expand Up @@ -197,26 +192,22 @@ func (b *BackupRestoreServer) runServer(ctx context.Context, restoreOpts *brtype
}
}

var memberPeerURL string

m := member.NewMemberControl(b.config.EtcdConnectionConfig)
err := retry.OnError(retry.DefaultBackoff, errors.AnyError, func() error {
if err := retry.OnError(retry.DefaultBackoff, errors.AnyError, func() error {
cli, err := etcdutil.NewFactory(*b.config.EtcdConnectionConfig).NewCluster()
if err != nil {
return err
}
memberPeerURL, err = m.UpdateMemberPeerURL(ctx, cli)
if err != nil {
defer cli.Close()

if err := m.UpdateMemberPeerURL(ctx, cli); err != nil {
return err
}
return nil
})
if err != nil {
}); err != nil {
b.logger.Errorf("failed to update member peer url: %v", err)
}

peerURLTLSEnabled := b.isPeerURLTLSEnabled(memberPeerURL)

leaderCallbacks := &brtypes.LeaderCallbacks{
OnStartedLeading: func(leCtx context.Context) {
ssrStopCh = make(chan struct{})
Expand Down Expand Up @@ -263,7 +254,7 @@ func (b *BackupRestoreServer) runServer(ctx context.Context, restoreOpts *brtype
mmStopCh = make(chan struct{})
if b.config.HealthConfig.MemberLeaseRenewalEnabled {
go func() {
if err := heartbeat.RenewMemberLeasePeriodically(ctx, mmStopCh, b.config.HealthConfig, b.logger, b.config.EtcdConnectionConfig, peerURLTLSEnabled); err != nil {
if err := heartbeat.RenewMemberLeasePeriodically(ctx, mmStopCh, b.config.HealthConfig, b.logger, b.config.EtcdConnectionConfig); err != nil {
b.logger.Fatalf("failed RenewMemberLeases: %v", err)
}
}()
Expand Down Expand Up @@ -303,7 +294,7 @@ func (b *BackupRestoreServer) runServer(ctx context.Context, restoreOpts *brtype

if b.config.HealthConfig.MemberLeaseRenewalEnabled {
go func() {
if err := heartbeat.RenewMemberLeasePeriodically(ctx, mmStopCh, b.config.HealthConfig, b.logger, b.config.EtcdConnectionConfig, peerURLTLSEnabled); err != nil {
if err := heartbeat.RenewMemberLeasePeriodically(ctx, mmStopCh, b.config.HealthConfig, b.logger, b.config.EtcdConnectionConfig); err != nil {
b.logger.Fatalf("failed RenewMemberLeases: %v", err)
}
}()
Expand Down Expand Up @@ -528,13 +519,3 @@ func handleSsrStopRequest(ctx context.Context, handler *HTTPHandler, ssr *snapsh
}
}
}

func (b *BackupRestoreServer) isPeerURLTLSEnabled(memberPeerURL string) bool {
peerURL, err := url.Parse(memberPeerURL)
if err != nil {
b.logger.WithFields(logrus.Fields{
"memberPeerURL": memberPeerURL,
}).Fatalf("malformed member peer url: %v", err)
}
return peerURL.Scheme == https
}

0 comments on commit 0b5fbee

Please sign in to comment.