From e176d36eeffd8c708c5ce61ac1992b396ccb999f Mon Sep 17 00:00:00 2001 From: Quentin Machu Date: Thu, 6 May 2021 20:32:28 -0700 Subject: [PATCH] *: switch to zap for logging rather than logrus, and align formatting onto etcd's embed given embed is not configurable --- cmd/operator/config.go | 7 ++-- cmd/operator/operator.go | 14 +------ cmd/tester/config.go | 4 +- cmd/tester/tester.go | 11 ++---- go.mod | 1 - pkg/etcd/client.go | 2 +- pkg/etcd/server.go | 55 ++++++++++++--------------- pkg/logger/level_converter.go | 45 ---------------------- pkg/logger/logger.go | 58 ++++++++++++++++++++++------- pkg/operator/acl.go | 17 +++++---- pkg/operator/misc.go | 14 +++---- pkg/operator/operator.go | 36 +++++++++--------- pkg/providers/asg/docker/docker.go | 4 +- pkg/providers/asg/sts/sts.go | 7 ++-- pkg/providers/snapshot/file/file.go | 7 ++-- pkg/providers/snapshot/s3/s3.go | 10 ++--- pkg/tester/tester.go | 24 ++++++------ 17 files changed, 142 insertions(+), 174 deletions(-) delete mode 100644 pkg/logger/level_converter.go diff --git a/cmd/operator/config.go b/cmd/operator/config.go index b1644adc..f11b8b00 100644 --- a/cmd/operator/config.go +++ b/cmd/operator/config.go @@ -15,15 +15,16 @@ package main import ( + "gopkg.in/yaml.v2" "io/ioutil" "os" "time" + "go.uber.org/zap" + "github.com/quentin-m/etcd-cloud-operator/pkg/etcd" "github.com/quentin-m/etcd-cloud-operator/pkg/operator" "github.com/quentin-m/etcd-cloud-operator/pkg/providers/snapshot" - log "github.com/sirupsen/logrus" - "gopkg.in/yaml.v2" ) // config represents a YAML configuration file that namespaces all ECO @@ -86,6 +87,6 @@ func loadConfig(path string) (config, error) { return config, err } - log.Infof("loaded configuration file %v", path) + zap.S().Infof("loaded configuration file %v", path) return config, err } diff --git a/cmd/operator/operator.go b/cmd/operator/operator.go index 13716ed2..8a032fa7 100644 --- a/cmd/operator/operator.go +++ b/cmd/operator/operator.go @@ -17,13 +17,9 @@ package main import ( "flag" - "io/ioutil" "os" - "strings" - log "github.com/sirupsen/logrus" "go.uber.org/zap" - "google.golang.org/grpc/grpclog" "github.com/quentin-m/etcd-cloud-operator/pkg/logger" "github.com/quentin-m/etcd-cloud-operator/pkg/operator" @@ -44,18 +40,12 @@ func main() { flag.Parse() // Initialize logging system. - logLevel, err := log.ParseLevel(strings.ToUpper(*flagLogLevel)) - log.SetOutput(os.Stdout) - log.SetLevel(logLevel) - log.SetFormatter(&log.TextFormatter{FullTimestamp: true}) - - grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, os.Stderr)) - zap.ReplaceGlobals(logger.BuildZapLogger(*flagLogLevel)) + logger.Configure(*flagLogLevel) // Read configuration. config, err := loadConfig(*flagConfigPath) if err != nil { - log.WithError(err).Fatal("failed to load configuration") + zap.S().With(zap.Error(err)).Fatal("failed to load configuration") } // Run. diff --git a/cmd/tester/config.go b/cmd/tester/config.go index f3316230..c82d3bfd 100644 --- a/cmd/tester/config.go +++ b/cmd/tester/config.go @@ -18,7 +18,7 @@ import ( "io/ioutil" "os" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" "gopkg.in/yaml.v2" "github.com/quentin-m/etcd-cloud-operator/pkg/tester" @@ -53,6 +53,6 @@ func loadConfig(path string) (config, error) { return config, err } - log.Infof("loaded configuration file %v", path) + zap.S().Infof("loaded configuration file %v", path) return config, err } diff --git a/cmd/tester/tester.go b/cmd/tester/tester.go index 290f6c04..d58eee22 100644 --- a/cmd/tester/tester.go +++ b/cmd/tester/tester.go @@ -18,10 +18,10 @@ package main import ( "flag" "os" - "strings" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" + "github.com/quentin-m/etcd-cloud-operator/pkg/logger" "github.com/quentin-m/etcd-cloud-operator/pkg/tester" ) @@ -33,15 +33,12 @@ func main() { flag.Parse() // Initialize logging system. - logLevel, err := log.ParseLevel(strings.ToUpper(*flagLogLevel)) - log.SetOutput(os.Stdout) - log.SetLevel(logLevel) - log.SetFormatter(&log.TextFormatter{FullTimestamp: true}) + logger.Configure(*flagLogLevel) // Read configuration. config, err := loadConfig(*flagConfigPath) if err != nil { - log.WithError(err).Fatal("failed to load configuration") + zap.S().With(zap.Error(err)).Fatal("failed to load configuration") } // Run. diff --git a/go.mod b/go.mod index 3af5a606..cefa154d 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.16 require ( github.com/aws/aws-sdk-go v1.38.34 github.com/prometheus/client_golang v1.11.0 - github.com/sirupsen/logrus v1.7.0 go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.0 diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 6374c950..dc8eb0c6 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -62,7 +62,7 @@ func NewClient(clientsAddresses []string, sc SecurityConfig, autoSync bool) (*Cl DialTimeout: defaultDialTimeout, TLS: tc, AutoSyncInterval: autoSyncInterval, - LogConfig: logger.BuildZapConfig(logger.GetZapLogLevelFromLogrus().String()), + LogConfig: logger.Config, }) if err != nil { return nil, err diff --git a/pkg/etcd/server.go b/pkg/etcd/server.go index 22b7f693..31f54c08 100644 --- a/pkg/etcd/server.go +++ b/pkg/etcd/server.go @@ -19,18 +19,14 @@ import ( "errors" "fmt" "io" - "io/ioutil" "os" "time" - log "github.com/sirupsen/logrus" - "go.uber.org/zap" - "go.etcd.io/etcd/client/pkg/v3/transport" "go.etcd.io/etcd/client/pkg/v3/types" etcdsnap "go.etcd.io/etcd/etcdutl/v3/snapshot" "go.etcd.io/etcd/server/v3/embed" - "google.golang.org/grpc/grpclog" + "go.uber.org/zap" "github.com/quentin-m/etcd-cloud-operator/pkg/logger" "github.com/quentin-m/etcd-cloud-operator/pkg/providers/snapshot" @@ -139,7 +135,7 @@ func (c *Server) Join(cluster *Client) error { // Attempt to re-join the server directly if we are still a member, and we have local data. if memberID != 0 && localSnapErr == nil { - log.Info("attempting to rejoin cluster under existing identity with local data") + zap.S().Info("attempting to rejoin cluster under existing identity with local data") ctx, cancel := context.WithTimeout(context.Background(), defaultStartRejoinTimeout) defer cancel() @@ -147,9 +143,9 @@ func (c *Server) Join(cluster *Client) error { return nil } - log.Warn("failed to join as an existing member, resetting") + zap.S().Warn("failed to join as an existing member, resetting") if err := cluster.RemoveMember(c.cfg.Name, memberID); err != nil { - log.WithError(err).Warning("failed to remove ourselves from the cluster's member list") + zap.S().With(zap.Error(err)).Warn("failed to remove ourselves from the cluster's member list") } } os.RemoveAll(c.cfg.DataDir) @@ -172,7 +168,7 @@ func (c *Server) Join(cluster *Client) error { } func (c *Server) Restore(metadata *snapshot.Metadata) error { - log.Infof("restoring snapshot %q (rev: %016x, size: %.3f MB)", metadata.Name, metadata.Revision, toMB(metadata.Size)) + zap.S().Infof("restoring snapshot %q (rev: %016x, size: %.3f MB)", metadata.Name, metadata.Revision, toMB(metadata.Size)) path, shouldDelete, err := metadata.Source.Get(metadata) if err != nil && err != snapshot.ErrNoSnapshot { @@ -218,14 +214,14 @@ func (c *Server) Snapshot() error { minRev = metadata.Revision } else { if err != snapshot.ErrNoSnapshot { - log.WithError(err).Warn("failed to find latest snapshot revision, snapshotting anyways") + zap.S().With(zap.Error(err)).Warn("failed to find latest snapshot revision, snapshotting anyways") } } // Initiate a snapshot. rc, rev, err := c.snapshot(minRev) if err == ErrMemberRevisionTooOld { - log.Infof("skipping snapshot: current revision %016x <= latest snapshot %016x", rev, minRev) + zap.S().Infof("skipping snapshot: current revision %016x <= latest snapshot %016x", rev, minRev) return nil } if err != nil { @@ -239,7 +235,7 @@ func (c *Server) Snapshot() error { return fmt.Errorf("failed to save snapshot: %v", err) } - log.Infof("snapshot %q saved successfully in %v (%.2f MB)", metadata.Filename(), time.Since(t), toMB(metadata.Size)) + zap.S().Infof("snapshot %q saved successfully in %v (%.2f MB)", metadata.Filename(), time.Since(t), toMB(metadata.Size)) return nil } @@ -251,14 +247,14 @@ func (c *Server) SnapshotInfo() (*snapshot.Metadata, error) { if !c.isRunning { localSnap, localErr = localSnapshotProvider(c.cfg.DataDir).Info() if localErr != nil && localErr != snapshot.ErrNoSnapshot { - log.WithError(localErr).Warn("failed to retrieve local snapshot info") + zap.S().With(zap.Error(localErr)).Warn("failed to retrieve local snapshot info") } } // Read snapshot info from the configured snapshot provider. cfgSnap, cfgErr = c.cfg.SnapshotProvider.Info() if cfgErr != nil && cfgErr != snapshot.ErrNoSnapshot { - log.WithError(cfgErr).Warn("failed to retrieve snapshot info") + zap.S().With(zap.Error(cfgErr)).Warn("failed to retrieve snapshot info") } // Return the highest revision one, or the one that worked. @@ -289,12 +285,12 @@ func (c *Server) snapshot(minRevision int64) (io.ReadCloser, int64, error) { // Forward the snapshot to the pipe. n, err := snapshot.WriteTo(pw) if err != nil { - log.WithError(err).Errorf("failed to write etcd snapshot out [written bytes: %d]", n) + zap.S().With(zap.Error(err)).Errorf("failed to write etcd snapshot out [written bytes: %d]", n) } pw.CloseWithError(err) if err := snapshot.Close(); err != nil { - log.WithError(err).Errorf("failed to close etcd snapshot [written bytes: %d]", n) + zap.S().With(zap.Error(err)).Errorf("failed to close etcd snapshot [written bytes: %d]", n) } }() @@ -311,7 +307,7 @@ func (c *Server) Stop(graceful, snapshot bool) { } if snapshot { if err := c.Snapshot(); err != nil { - log.WithError(err).Error("failed to snapshot before graceful stop") + zap.S().With(zap.Error(err)).Error("failed to snapshot before graceful stop") } } if !graceful { @@ -347,7 +343,8 @@ func (c *Server) startServer(ctx context.Context) error { etcdCfg.AutoCompactionMode = c.cfg.AutoCompactionMode etcdCfg.AutoCompactionRetention = c.cfg.AutoCompactionRetention etcdCfg.ZapLoggerBuilder = logger.BuildZapConfigBuilder() - etcdCfg.LogLevel = logger.GetZapLogLevelFromLogrus().String() + etcdCfg.LogLevel = logger.Config.Level.String() + etcdCfg.LogOutputs = []string{embed.StdOutLogOutput} etcdCfg.SocketOpts = transport.SocketOpts{ ReusePort: true, ReuseAddress: true, @@ -362,15 +359,11 @@ func (c *Server) startServer(ctx context.Context) error { // Start the server. c.server, err = embed.StartEtcd(etcdCfg) - - // Discard the gRPC logs, as the embed server will set that regardless of what was set before (i.e. at startup). - grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, os.Stderr)) - if err != nil { return fmt.Errorf("failed to start etcd: %s", err) } c.isRunning = true - log.Infof("embedded etcd server is now running") + zap.S().Infof("embedded etcd server is now running") // Wait until the server announces its ready, or until the start timeout is exceeded. // @@ -401,11 +394,11 @@ func (c *Server) startServer(ctx context.Context) error { func (c *Server) runErrorWatcher() { select { case <-c.server.Server.StopNotify(): - log.Warnf("etcd server is stopping") + zap.S().Warnf("etcd server is stopping") c.isRunning = false return case <-c.server.Err(): - log.Warnf("etcd server has crashed") + zap.S().Warnf("etcd server has crashed") c.Stop(false, false) } } @@ -455,18 +448,18 @@ func (c *Server) runMemberCleaner() { if time.Since(member.lastSeenHealthy) < c.cfg.UnhealthyMemberTTL { continue } - log.Infof("removing member %q that's been unhealthy for %v", member.name, c.cfg.UnhealthyMemberTTL) + zap.S().Infof("removing member %q that's been unhealthy for %v", member.name, c.cfg.UnhealthyMemberTTL) cl, err := NewClient([]string{c.cfg.PrivateAddress}, c.cfg.ClientSC, false) if err != nil { - log.WithError(err).Warn("failed to create etcd cluster client") + zap.S().With(zap.Error(err)).Warn("failed to create etcd cluster client") continue } if err := cl.RemoveMember(member.name, uint64(id)); err == context.DeadlineExceeded { - log.Warnf("failed to remove unhealthy member %q, it might be starting", member.name) + zap.S().Warnf("failed to remove unhealthy member %q, it might be starting", member.name) continue } else if err != nil { - log.WithError(err).Warnf("failed to remove unhealthy member %q", member.name) + zap.S().With(zap.Error(err)).Warnf("failed to remove unhealthy member %q", member.name) continue } @@ -477,7 +470,7 @@ func (c *Server) runMemberCleaner() { func (c *Server) runSnapshotter() { if c.cfg.SnapshotProvider == nil || c.cfg.SnapshotInterval == 0 { - log.Warn("periodic snapshots are disabled") + zap.S().Warn("periodic snapshots are disabled") return } @@ -491,7 +484,7 @@ func (c *Server) runSnapshotter() { } if err := c.Snapshot(); err != nil { - log.WithError(err).Error("failed to snapshot") + zap.S().With(zap.Error(err)).Error("failed to snapshot") } } } diff --git a/pkg/logger/level_converter.go b/pkg/logger/level_converter.go deleted file mode 100644 index 566ad389..00000000 --- a/pkg/logger/level_converter.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2017 Quentin Machu & eco authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package logger - -import ( - "fmt" - - log "github.com/sirupsen/logrus" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -// Converts from the current logrus log-level into a log-level that etcd's zap can parse. -// -// Not my proudest moment, but gotta do with current time constraint. -func GetZapLogLevelFromLogrus() zapcore.Level { - switch log.GetLevel().String() { - case "trace", "debug": - return zap.DebugLevel - case "info": - return zap.InfoLevel - case "warn", "warning": - return zap.WarnLevel - case "error": - return zap.ErrorLevel - case "fatal": - return zap.FatalLevel - case "panic": - return zap.PanicLevel - default: - panic(fmt.Sprintf("unable to convert logrus->zap log-level %q", log.GetLevel().String())) - } -} diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 84f1bb10..b0f691bd 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -15,11 +15,16 @@ package logger import ( + "go.etcd.io/etcd/client/pkg/v3/logutil" + "go.uber.org/zap/zapgrpc" + "google.golang.org/grpc" + "google.golang.org/grpc/grpclog" + "io/ioutil" "moul.io/zapfilter" + "os" "regexp" "strings" - log "github.com/sirupsen/logrus" "go.etcd.io/etcd/server/v3/embed" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -27,9 +32,42 @@ import ( var lbConnEOF = regexp.MustCompile(`embed: rejected connection from "[^"]*" \(error "EOF", ServerName ""\)`) -// BuildZapLogger builds a full zap.Logger that can be used in the server. -func BuildZapLogger(lvl string) *zap.Logger { - zl, err := BuildZapConfig(lvl).Build(zap.WrapCore(func (z zapcore.Core) zapcore.Core { +var Config *zap.Config +var Logger *zap.Logger + +func Configure(lvl string) { + // Build logger configuration + buildZapLogger(lvl) + + zap.ReplaceGlobals(Logger) + if lvl != "debug" { + grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, os.Stderr)) + } else { + grpc.EnableTracing = true + grpclog.SetLoggerV2(zapgrpc.NewLogger(Logger)) + } +} + +// buildZapLogger builds a *zap.Logger end-to-end based on the specified logging level, and stores the logger alongside +// his configuration in global variables, so they can be referred to from other libs who configure logging +// independently. +func buildZapLogger(lvl string) { + if Logger != nil { + return + } + + // Build configuration, the same way etcd's embed package does it; as it is currently not possible to set a + // *zap.Logger directly into embed, and we'd like to be consistent. https://github.com/etcd-io/etcd/issues/12326 + zlCfg := logutil.DefaultZapLoggerConfig + zlCfg.ErrorOutputPaths = []string{"stdout"} + zlCfg.OutputPaths = []string{"stdout"} + zlCfg.Level = zap.NewAtomicLevelAt(logutil.ConvertToZapLevel(lvl)) + zlCfg.Development = lvl == "debug" + zlCfg.Sampling = nil + Config = &zlCfg + + // Build logger. + zl, err := Config.Build(zap.WrapCore(func (z zapcore.Core) zapcore.Core { return zapfilter.NewFilteringCore(z, func(e zapcore.Entry, f []zapcore.Field) bool { // Mute ClientV3 hammering broken endpoints, which is common in ECO and bogus LB health checks connections. return !strings.Contains(e.Message, "retrying of unary invoker failed") && @@ -37,19 +75,11 @@ func BuildZapLogger(lvl string) *zap.Logger { !lbConnEOF.MatchString(e.Message)}) })) if err != nil { - log.WithError(err).Fatal("unable to parse zap's logging level") + zap.S().With(zap.Error(err)).Fatal("unable to build logger") } - return zl + Logger = zl } -// BuildZapConfig creates a uniform zap.Config for the etcd server & client. -func BuildZapConfig(lvl string) *zap.Config { - zlCfg := zap.NewProductionConfig() - zlCfg.Level.UnmarshalText([]byte(lvl)) - zlCfg.Sampling = nil - zlCfg.Encoding = "console" - return &zlCfg -} // BuildZapConfigBuilder returns a configuration builder for the etcd server. // diff --git a/pkg/operator/acl.go b/pkg/operator/acl.go index 99436951..41652912 100644 --- a/pkg/operator/acl.go +++ b/pkg/operator/acl.go @@ -19,15 +19,16 @@ import ( "crypto/x509" "encoding/pem" "fmt" + "gopkg.in/yaml.v2" "io/ioutil" "strings" "time" - "github.com/quentin-m/etcd-cloud-operator/pkg/etcd" - log "github.com/sirupsen/logrus" - "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" - "gopkg.in/yaml.v2" + "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + + "github.com/quentin-m/etcd-cloud-operator/pkg/etcd" ) const ( @@ -140,25 +141,25 @@ func (s *Operator) reconcileInitACLConfig(config *etcd.ACLConfig) error { defer cancel() if err := s.enableACL(ctx, config); err != nil { - log.WithError(err).Errorf("failed to enable ACL") + zap.S().With(zap.Error(err)).Error("failed to enable ACL") return err } oldACLConfig, err := s.getOldACLConfig(ctx) if err != nil { - log.WithError(err).Errorf("failed to get old ACL config") + zap.S().With(zap.Error(err)).Error("failed to get old ACL config") return err } if !oldACLConfig.Equal(config) { if oldACLConfig != nil { if err := s.removeACLConfig(ctx, oldACLConfig); err != nil { - log.WithError(err).Errorf("failed to remove ACL config") + zap.S().With(zap.Error(err)).Error("failed to remove ACL config") return err } } if err := s.applyACLConfig(ctx, config); err != nil { - log.WithError(err).Errorf("failed to apply ACL config") + zap.S().With(zap.Error(err)).Error("failed to apply ACL config") return err } } diff --git a/pkg/operator/misc.go b/pkg/operator/misc.go index 3e54fc0c..64b9a595 100644 --- a/pkg/operator/misc.go +++ b/pkg/operator/misc.go @@ -23,7 +23,7 @@ import ( "sync" "time" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" "github.com/quentin-m/etcd-cloud-operator/pkg/etcd" "github.com/quentin-m/etcd-cloud-operator/pkg/providers/asg" @@ -44,14 +44,14 @@ type status struct { func initProviders(cfg Config) (asg.Provider, snapshot.Provider) { if cfg.ASG.Provider == "" { - log.Fatal("no auto-scaling group provider configuration given") + zap.S().Fatal("no auto-scaling group provider configuration given") } asgProvider, ok := asg.AsMap()[cfg.ASG.Provider] if !ok { - log.Fatalf("unknown auto-scaling group provider %q, available providers: %v", cfg.ASG.Provider, asg.AsList()) + zap.S().Fatalf("unknown auto-scaling group provider %q, available providers: %v", cfg.ASG.Provider, asg.AsList()) } if err := asgProvider.Configure(cfg.ASG); err != nil { - log.WithError(err).Fatal("failed to configure auto-scaling group provider") + zap.S().With(zap.Error(err)).Fatal("failed to configure auto-scaling group provider") } if cfg.Snapshot.Provider == "" { @@ -59,10 +59,10 @@ func initProviders(cfg Config) (asg.Provider, snapshot.Provider) { } snapshotProvider, ok := snapshot.AsMap()[cfg.Snapshot.Provider] if !ok { - log.Fatalf("unknown snapshot provider %q, available providers: %v", cfg.Snapshot.Provider, snapshot.AsList()) + zap.S().Fatalf("unknown snapshot provider %q, available providers: %v", cfg.Snapshot.Provider, snapshot.AsList()) } if err := snapshotProvider.Configure(cfg.Snapshot); err != nil { - log.WithError(err).Fatal("failed to configure snapshot provider") + zap.S().With(zap.Error(err)).Fatal("failed to configure snapshot provider") } return asgProvider, snapshotProvider @@ -88,7 +88,7 @@ func fetchStatuses(httpClient *http.Client, etcdClient *etcd.Client, asgInstance st, err := fetchStatus(httpClient, asgInstance) if err != nil { - log.WithError(err).Warnf("failed to query %s's ECO instance", asgInstance.Name()) + zap.S().With(zap.Error(err)).Warnf("failed to query %s", asgInstance.Name()) return } diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 5f035958..29c27afe 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -24,7 +24,7 @@ import ( "syscall" "time" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" "github.com/quentin-m/etcd-cloud-operator/pkg/etcd" "github.com/quentin-m/etcd-cloud-operator/pkg/providers/asg" @@ -79,7 +79,7 @@ func New(cfg Config) *Operator { // Initialize providers. asgProvider, snapshotProvider := initProviders(cfg) if snapshotProvider == nil || cfg.Snapshot.Interval == 0 { - log.Fatal("snapshots must be enabled for disaster recovery") + zap.S().Fatal("snapshots must be enabled for disaster recovery") } // Setup signal handler. @@ -102,12 +102,12 @@ func (s *Operator) Run() { for { if err := s.evaluate(); err != nil { - log.WithError(err).Warn("could not evaluate cluster state") + zap.S().With(zap.Error(err)).Warn("could not evaluate cluster state") s.wait() continue } if err := s.execute(); err != nil { - log.WithError(err).Warn("could not execute action") + zap.S().With(zap.Error(err)).Warn("could not execute action") } s.wait() } @@ -123,7 +123,7 @@ func (s *Operator) evaluate() error { // Create the etcd cluster client. client, err := etcd.NewClient(instancesAddresses(asgInstances), s.cfg.Etcd.ClientTransportSecurity, true) if err != nil { - log.WithError(err).Warn("failed to create etcd cluster client") + zap.S().With(zap.Error(err)).Warn("failed to create etcd cluster client") } // Output. @@ -149,7 +149,7 @@ func (s *Operator) execute() error { switch { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// case s.shutdown: - log.Info("STATUS: Received SIGTERM -> Snapshot + Stop") + zap.S().Info("STATUS: Received SIGTERM -> Snapshot + Stop") s.state = "PENDING" s.server.Stop(s.etcdHealthy, true) @@ -157,23 +157,23 @@ func (s *Operator) execute() error { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// case s.etcdHealthy && !s.etcdRunning: - log.Info("STATUS: Healthy + Not running -> Join") + zap.S().Info("STATUS: Healthy + Not running -> Join") s.state = "PENDING" if err := s.server.Join(s.etcdClient); err != nil { - log.WithError(err).Error("failed to join the cluster") + zap.S().With(zap.Error(err)).Error("failed to join the cluster") } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// case s.etcdHealthy && s.etcdRunning: - log.Info("STATUS: Healthy + Running -> Standby") + zap.S().Info("STATUS: Healthy + Running -> Standby") s.state = "OK" //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// case !s.etcdHealthy && s.etcdRunning && s.states["OK"] >= s.clusterSize/2+1: - log.Info("STATUS: Unhealthy + Running -> Pending confirmation from other ECO instances") + zap.S().Info("STATUS: Unhealthy + Running -> Pending confirmation from other ECO instances") s.state = "PENDING" case !s.etcdHealthy && s.etcdRunning && s.states["OK"] < s.clusterSize/2+1: - log.Info("STATUS: Unhealthy + Running + No quorum -> Snapshot + Stop") + zap.S().Info("STATUS: Unhealthy + Running + No quorum -> Snapshot + Stop") s.state = "PENDING" s.server.Stop(false, true) @@ -186,15 +186,15 @@ func (s *Operator) execute() error { return err } } - log.Info("STATUS: Unhealthy + Not running -> Ready to start + Pending all ready / seeder") + zap.S().Info("STATUS: Unhealthy + Not running -> Ready to start + Pending all ready / seeder") s.state = "START" //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// case !s.etcdHealthy && !s.etcdRunning && s.states["START"] == s.clusterSize && s.isSeeder: - log.Info("STATUS: Unhealthy + Not running + All ready + Seeder status -> Seeding cluster") + zap.S().Info("STATUS: Unhealthy + Not running + All ready + Seeder status -> Seeding cluster") s.state = "START" if err := s.server.Seed(s.etcdSnapshot); err != nil { - log.WithError(err).Error("failed to seed the cluster") + zap.S().With(zap.Error(err)).Error("failed to seed the cluster") } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -206,7 +206,7 @@ func (s *Operator) execute() error { if s.state == "OK" && s.isSeeder && s.cfg.Etcd.InitACL != nil { if err := s.reconcileInitACLConfig(s.cfg.Etcd.InitACL); err != nil { - log.WithError(err).Error("failed to reconcile initial ACL config") + zap.S().With(zap.Error(err)).Error("failed to reconcile initial ACL config") return err } } @@ -222,14 +222,14 @@ func (s *Operator) webserver() { } b, err := json.Marshal(&st) if err != nil { - log.WithError(err).Warn("failed to marshal status") + zap.S().With(zap.Error(err)).Warn("failed to marshal status") return } if _, err := w.Write(b); err != nil { - log.WithError(err).Warn("failed to write status") + zap.S().With(zap.Error(err)).Warn("failed to write status") } }) - log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", webServerPort), nil)) + zap.S().Fatal(http.ListenAndServe(fmt.Sprintf(":%d", webServerPort), nil)) } func (s *Operator) wait() { diff --git a/pkg/providers/asg/docker/docker.go b/pkg/providers/asg/docker/docker.go index 3fe8ab19..cea75898 100644 --- a/pkg/providers/asg/docker/docker.go +++ b/pkg/providers/asg/docker/docker.go @@ -19,7 +19,7 @@ import ( "os" "strings" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" "github.com/quentin-m/etcd-cloud-operator/pkg/providers" "github.com/quentin-m/etcd-cloud-operator/pkg/providers/asg" @@ -69,6 +69,6 @@ func (d *docker) AutoScalingGroupStatus() (instances []asg.Instance, self asg.In } size = d.config.Size - log.Debugf("Discovered %d / %d replicas: %s", len(instances), d.config.Size, strings.Join(instancesStr, ", ")) + zap.S().Debugf("Discovered %d / %d replicas: %s", len(instances), d.config.Size, strings.Join(instancesStr, ", ")) return } diff --git a/pkg/providers/asg/sts/sts.go b/pkg/providers/asg/sts/sts.go index 5fd6d295..27f496e8 100644 --- a/pkg/providers/asg/sts/sts.go +++ b/pkg/providers/asg/sts/sts.go @@ -21,8 +21,9 @@ import ( "strconv" "strings" + "go.uber.org/zap" + "github.com/quentin-m/etcd-cloud-operator/pkg/providers/asg" - log "github.com/sirupsen/logrus" ) func init() { @@ -88,7 +89,7 @@ func (a *sts) Configure(providerConfig asg.Config) (err error) { } a.self.address = fmt.Sprintf("%s.%s.%s.svc.%s", a.self.name, a.serviceName, a.namespace, a.dnsClusterSuffix) - log.Debugf("Running as %s within Statefulset %s of %d replicas, with headless service %s.%s.svc.%s", a.self.address, a.name, a.replicas, a.serviceName, a.namespace, a.dnsClusterSuffix) + zap.S().Debugf("Running as %s within Statefulset %s of %d replicas, with headless service %s.%s.svc.%s", a.self.address, a.name, a.replicas, a.serviceName, a.namespace, a.dnsClusterSuffix) return } @@ -104,7 +105,7 @@ func (a *sts) AutoScalingGroupStatus() ([]asg.Instance, asg.Instance, int, error instances = append(instances, &instance) instancesStr = append(instancesStr, instance.address) } - log.Debugf("Discovered %d / %d replicas: %s", len(instances), a.replicas, strings.Join(instancesStr, ", ")) + zap.S().Debugf("Discovered %d / %d replicas: %s", len(instances), a.replicas, strings.Join(instancesStr, ", ")) return instances, &a.self, a.replicas, nil } diff --git a/pkg/providers/snapshot/file/file.go b/pkg/providers/snapshot/file/file.go index 5dd63e2f..bfa7e4e6 100644 --- a/pkg/providers/snapshot/file/file.go +++ b/pkg/providers/snapshot/file/file.go @@ -23,9 +23,10 @@ import ( "sort" "time" + "go.uber.org/zap" + "github.com/quentin-m/etcd-cloud-operator/pkg/providers" "github.com/quentin-m/etcd-cloud-operator/pkg/providers/snapshot" - log "github.com/sirupsen/logrus" ) const filePermissions = 0600 @@ -94,7 +95,7 @@ func (f *file) Info() (*snapshot.Metadata, error) { metadata, err := snapshot.NewMetadata(file.Name(), -1, file.Size(), f) if err != nil { - log.Warnf("failed to parse metadata for snapshot %v", file.Name()) + zap.S().Warnf("failed to parse metadata for snapshot %v", file.Name()) continue } metadatas = append(metadatas, metadata) @@ -119,7 +120,7 @@ func (f *file) Purge(ttl time.Duration) error { for _, file := range files { if time.Since(file.ModTime()) > ttl { - log.Infof("purging snapshot file %q because it is that older than %v", file.Name(), ttl) + zap.S().Infof("purging snapshot file %q because it is that older than %v", file.Name(), ttl) os.Remove(filepath.Join(f.config.Dir, file.Name())) } } diff --git a/pkg/providers/snapshot/s3/s3.go b/pkg/providers/snapshot/s3/s3.go index a18abf6c..ca47a270 100644 --- a/pkg/providers/snapshot/s3/s3.go +++ b/pkg/providers/snapshot/s3/s3.go @@ -28,7 +28,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" ss3 "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" "github.com/quentin-m/etcd-cloud-operator/pkg/providers" "github.com/quentin-m/etcd-cloud-operator/pkg/providers/snapshot" @@ -102,7 +102,7 @@ func (s *s3) Save(r io.ReadCloser, metadata *snapshot.Metadata) error { Key: aws.String(key), }) if err != nil { - log.WithError(err).Warnf("failed to get aws s3 object size for %q") + zap.S().With(zap.Error(err)).Warnf("failed to get aws s3 object size for %q") return nil } @@ -152,7 +152,7 @@ func (s *s3) Info() (*snapshot.Metadata, error) { for _, obj := range resp.Contents { metadata, err := snapshot.NewMetadata(*obj.Key, -1, *obj.Size, s) if err != nil { - log.Warnf("failed to parse metadata for snapshot %v", *obj.Key) + zap.S().Warnf("failed to parse metadata for snapshot %v", *obj.Key) continue } metadatas = append(metadatas, metadata) @@ -179,14 +179,14 @@ func (s *s3) Purge(ttl time.Duration) error { for _, item := range resp.Contents { if time.Since(*item.LastModified) > ttl { - log.Infof("purging snapshot file %q because it is that older than %v", *item.Key, ttl) + zap.S().Infof("purging snapshot file %q because it is that older than %v", *item.Key, ttl) _, err := s3s.DeleteObject(&ss3.DeleteObjectInput{ Bucket: aws.String(s.config.Bucket), Key: item.Key, }) if err != nil { - log.WithError(err).Warn("failed to remove aws s3 object") + zap.S().With(zap.Error(err)).Warn("failed to remove aws s3 object") } } } diff --git a/pkg/tester/tester.go b/pkg/tester/tester.go index bf9d1ae0..f2e46d7c 100644 --- a/pkg/tester/tester.go +++ b/pkg/tester/tester.go @@ -19,7 +19,7 @@ import ( "errors" "time" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" "github.com/quentin-m/etcd-cloud-operator/pkg/etcd" ) @@ -47,7 +47,7 @@ func Run(cfg Config) { false, ) if err != nil { - log.WithError(err).Fatal("failed to create etcd cluster client") + zap.S().With(zap.Error(err)).Fatal("failed to create etcd cluster client") } defer c.Close() @@ -58,12 +58,12 @@ func Run(cfg Config) { // Compact and defrag the cluster up to this point. if err := c.Cleanup(); err != nil { - log.WithError(err).Fatal("failed to compact/defrag the cluster") + zap.S().With(zap.Error(err)).Fatal("failed to compact/defrag the cluster") } } func runTest(testCase testCase, client *etcd.Client, cfg Config, dm *dataMarker) { - log.Infof("starting test %q", testCase.name) + zap.S().Infof("starting test %q", testCase.name) promSetRunningTest(testCase.name) defer promSetRunningTest("") @@ -71,18 +71,18 @@ func runTest(testCase testCase, client *etcd.Client, cfg Config, dm *dataMarker) // Compact and defrag the cluster up to this point, and set a data marker. if err := client.Cleanup(); err != nil { - log.WithError(err).Warning("failed to compact/defrag the cluster, next test might be affected") + zap.S().With(zap.Error(err)).Warn("failed to compact/defrag the cluster, next test might be affected") } if err := dm.setMarker(); err != nil { - log.WithError(err).Warning("failed to put data marker, skipping test") + zap.S().With(zap.Error(err)).Warn("failed to put data marker, skipping test") return } // Verify that the cluster is healthy in the first place, and get the current leader. leaderID, err := isHealthy(client, cfg.Cluster.Size) if err != nil { - log.WithError(err).Fatal("cluster is unhealthy") + zap.S().With(zap.Error(err)).Fatal("cluster is unhealthy") } // Start stressing the cluster. @@ -95,7 +95,7 @@ func runTest(testCase testCase, client *etcd.Client, cfg Config, dm *dataMarker) promSetInjectedFailure(testCase.name) if err := testCase.fi(failureInjectorConfig{client: client, testerConfig: cfg, leaderID: leaderID}); err != nil { - log.WithError(err).Warning("failed to inject failure, skipping test") + zap.S().With(zap.Error(err)).Warn("failed to inject failure, skipping test") return } @@ -106,7 +106,7 @@ func runTest(testCase testCase, client *etcd.Client, cfg Config, dm *dataMarker) for i := 0; i < 4; i++ { time.Sleep(15 * time.Second) if _, err := isHealthy(client, cfg.Cluster.Size); err != nil { - log.WithError(err).Warning("cluster is unhealthy") + zap.S().With(zap.Error(err)).Warn("cluster is unhealthy") i = -1 } } @@ -116,10 +116,10 @@ func runTest(testCase testCase, client *etcd.Client, cfg Config, dm *dataMarker) // Verify cluster's consistency and the presence of the data marker. if err := client.IsConsistent(); err != nil { - log.WithError(err).Fatal("cluster is un-consistent, exiting") + zap.S().With(zap.Error(err)).Fatal("cluster is un-consistent, exiting") } if err := dm.verify(testCase.isLossy); err != nil { - log.Error(err) + zap.S().Error(err) } } @@ -148,7 +148,7 @@ func (dm *dataMarker) verify(isLossy bool) error { } if !t.Equal(dm.time) { - log.Infof("data marker was reset the value it held %v ago", time.Since(t)) + zap.S().Infof("data marker was reset the value it held %v ago", time.Since(t)) if !isLossy { return errors.New("failed to verify data marker: unexpected value - cluster might have been restored to an old snapshot unexpectedly")