Skip to content

Commit

Permalink
*: switch to zap for logging rather than logrus, and align formatting…
Browse files Browse the repository at this point in the history
… onto etcd's embed given embed is not configurable
  • Loading branch information
Quentin-M committed Sep 13, 2021
1 parent 73f24e0 commit e176d36
Show file tree
Hide file tree
Showing 17 changed files with 142 additions and 174 deletions.
7 changes: 4 additions & 3 deletions cmd/operator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
package main

import (
"gopkg.in/yaml.v2"
"io/ioutil"
"os"
"time"

"go.uber.org/zap"

"github.com/quentin-m/etcd-cloud-operator/pkg/etcd"
"github.com/quentin-m/etcd-cloud-operator/pkg/operator"
"github.com/quentin-m/etcd-cloud-operator/pkg/providers/snapshot"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)

// config represents a YAML configuration file that namespaces all ECO
Expand Down Expand Up @@ -86,6 +87,6 @@ func loadConfig(path string) (config, error) {
return config, err
}

log.Infof("loaded configuration file %v", path)
zap.S().Infof("loaded configuration file %v", path)
return config, err
}
14 changes: 2 additions & 12 deletions cmd/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@ package main

import (
"flag"
"io/ioutil"
"os"
"strings"

log "github.com/sirupsen/logrus"
"go.uber.org/zap"
"google.golang.org/grpc/grpclog"

"github.com/quentin-m/etcd-cloud-operator/pkg/logger"
"github.com/quentin-m/etcd-cloud-operator/pkg/operator"
Expand All @@ -44,18 +40,12 @@ func main() {
flag.Parse()

// Initialize logging system.
logLevel, err := log.ParseLevel(strings.ToUpper(*flagLogLevel))
log.SetOutput(os.Stdout)
log.SetLevel(logLevel)
log.SetFormatter(&log.TextFormatter{FullTimestamp: true})

grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, os.Stderr))
zap.ReplaceGlobals(logger.BuildZapLogger(*flagLogLevel))
logger.Configure(*flagLogLevel)

// Read configuration.
config, err := loadConfig(*flagConfigPath)
if err != nil {
log.WithError(err).Fatal("failed to load configuration")
zap.S().With(zap.Error(err)).Fatal("failed to load configuration")
}

// Run.
Expand Down
4 changes: 2 additions & 2 deletions cmd/tester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"io/ioutil"
"os"

log "github.com/sirupsen/logrus"
"go.uber.org/zap"
"gopkg.in/yaml.v2"

"github.com/quentin-m/etcd-cloud-operator/pkg/tester"
Expand Down Expand Up @@ -53,6 +53,6 @@ func loadConfig(path string) (config, error) {
return config, err
}

log.Infof("loaded configuration file %v", path)
zap.S().Infof("loaded configuration file %v", path)
return config, err
}
11 changes: 4 additions & 7 deletions cmd/tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package main
import (
"flag"
"os"
"strings"

log "github.com/sirupsen/logrus"
"go.uber.org/zap"

"github.com/quentin-m/etcd-cloud-operator/pkg/logger"
"github.com/quentin-m/etcd-cloud-operator/pkg/tester"
)

Expand All @@ -33,15 +33,12 @@ func main() {
flag.Parse()

// Initialize logging system.
logLevel, err := log.ParseLevel(strings.ToUpper(*flagLogLevel))
log.SetOutput(os.Stdout)
log.SetLevel(logLevel)
log.SetFormatter(&log.TextFormatter{FullTimestamp: true})
logger.Configure(*flagLogLevel)

// Read configuration.
config, err := loadConfig(*flagConfigPath)
if err != nil {
log.WithError(err).Fatal("failed to load configuration")
zap.S().With(zap.Error(err)).Fatal("failed to load configuration")
}

// Run.
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.16
require (
github.com/aws/aws-sdk-go v1.38.34
github.com/prometheus/client_golang v1.11.0
github.com/sirupsen/logrus v1.7.0
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
Expand Down
2 changes: 1 addition & 1 deletion pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewClient(clientsAddresses []string, sc SecurityConfig, autoSync bool) (*Cl
DialTimeout: defaultDialTimeout,
TLS: tc,
AutoSyncInterval: autoSyncInterval,
LogConfig: logger.BuildZapConfig(logger.GetZapLogLevelFromLogrus().String()),
LogConfig: logger.Config,
})
if err != nil {
return nil, err
Expand Down
55 changes: 24 additions & 31 deletions pkg/etcd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,14 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"time"

log "github.com/sirupsen/logrus"
"go.uber.org/zap"

"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/pkg/v3/types"
etcdsnap "go.etcd.io/etcd/etcdutl/v3/snapshot"
"go.etcd.io/etcd/server/v3/embed"
"google.golang.org/grpc/grpclog"
"go.uber.org/zap"

"github.com/quentin-m/etcd-cloud-operator/pkg/logger"
"github.com/quentin-m/etcd-cloud-operator/pkg/providers/snapshot"
Expand Down Expand Up @@ -139,17 +135,17 @@ func (c *Server) Join(cluster *Client) error {

// Attempt to re-join the server directly if we are still a member, and we have local data.
if memberID != 0 && localSnapErr == nil {
log.Info("attempting to rejoin cluster under existing identity with local data")
zap.S().Info("attempting to rejoin cluster under existing identity with local data")

ctx, cancel := context.WithTimeout(context.Background(), defaultStartRejoinTimeout)
defer cancel()
if err := c.startServer(ctx); err == nil {
return nil
}

log.Warn("failed to join as an existing member, resetting")
zap.S().Warn("failed to join as an existing member, resetting")
if err := cluster.RemoveMember(c.cfg.Name, memberID); err != nil {
log.WithError(err).Warning("failed to remove ourselves from the cluster's member list")
zap.S().With(zap.Error(err)).Warn("failed to remove ourselves from the cluster's member list")
}
}
os.RemoveAll(c.cfg.DataDir)
Expand All @@ -172,7 +168,7 @@ func (c *Server) Join(cluster *Client) error {
}

func (c *Server) Restore(metadata *snapshot.Metadata) error {
log.Infof("restoring snapshot %q (rev: %016x, size: %.3f MB)", metadata.Name, metadata.Revision, toMB(metadata.Size))
zap.S().Infof("restoring snapshot %q (rev: %016x, size: %.3f MB)", metadata.Name, metadata.Revision, toMB(metadata.Size))

path, shouldDelete, err := metadata.Source.Get(metadata)
if err != nil && err != snapshot.ErrNoSnapshot {
Expand Down Expand Up @@ -218,14 +214,14 @@ func (c *Server) Snapshot() error {
minRev = metadata.Revision
} else {
if err != snapshot.ErrNoSnapshot {
log.WithError(err).Warn("failed to find latest snapshot revision, snapshotting anyways")
zap.S().With(zap.Error(err)).Warn("failed to find latest snapshot revision, snapshotting anyways")
}
}

// Initiate a snapshot.
rc, rev, err := c.snapshot(minRev)
if err == ErrMemberRevisionTooOld {
log.Infof("skipping snapshot: current revision %016x <= latest snapshot %016x", rev, minRev)
zap.S().Infof("skipping snapshot: current revision %016x <= latest snapshot %016x", rev, minRev)
return nil
}
if err != nil {
Expand All @@ -239,7 +235,7 @@ func (c *Server) Snapshot() error {
return fmt.Errorf("failed to save snapshot: %v", err)
}

log.Infof("snapshot %q saved successfully in %v (%.2f MB)", metadata.Filename(), time.Since(t), toMB(metadata.Size))
zap.S().Infof("snapshot %q saved successfully in %v (%.2f MB)", metadata.Filename(), time.Since(t), toMB(metadata.Size))
return nil
}

Expand All @@ -251,14 +247,14 @@ func (c *Server) SnapshotInfo() (*snapshot.Metadata, error) {
if !c.isRunning {
localSnap, localErr = localSnapshotProvider(c.cfg.DataDir).Info()
if localErr != nil && localErr != snapshot.ErrNoSnapshot {
log.WithError(localErr).Warn("failed to retrieve local snapshot info")
zap.S().With(zap.Error(localErr)).Warn("failed to retrieve local snapshot info")
}
}

// Read snapshot info from the configured snapshot provider.
cfgSnap, cfgErr = c.cfg.SnapshotProvider.Info()
if cfgErr != nil && cfgErr != snapshot.ErrNoSnapshot {
log.WithError(cfgErr).Warn("failed to retrieve snapshot info")
zap.S().With(zap.Error(cfgErr)).Warn("failed to retrieve snapshot info")
}

// Return the highest revision one, or the one that worked.
Expand Down Expand Up @@ -289,12 +285,12 @@ func (c *Server) snapshot(minRevision int64) (io.ReadCloser, int64, error) {
// Forward the snapshot to the pipe.
n, err := snapshot.WriteTo(pw)
if err != nil {
log.WithError(err).Errorf("failed to write etcd snapshot out [written bytes: %d]", n)
zap.S().With(zap.Error(err)).Errorf("failed to write etcd snapshot out [written bytes: %d]", n)
}
pw.CloseWithError(err)

if err := snapshot.Close(); err != nil {
log.WithError(err).Errorf("failed to close etcd snapshot [written bytes: %d]", n)
zap.S().With(zap.Error(err)).Errorf("failed to close etcd snapshot [written bytes: %d]", n)
}
}()

Expand All @@ -311,7 +307,7 @@ func (c *Server) Stop(graceful, snapshot bool) {
}
if snapshot {
if err := c.Snapshot(); err != nil {
log.WithError(err).Error("failed to snapshot before graceful stop")
zap.S().With(zap.Error(err)).Error("failed to snapshot before graceful stop")
}
}
if !graceful {
Expand Down Expand Up @@ -347,7 +343,8 @@ func (c *Server) startServer(ctx context.Context) error {
etcdCfg.AutoCompactionMode = c.cfg.AutoCompactionMode
etcdCfg.AutoCompactionRetention = c.cfg.AutoCompactionRetention
etcdCfg.ZapLoggerBuilder = logger.BuildZapConfigBuilder()
etcdCfg.LogLevel = logger.GetZapLogLevelFromLogrus().String()
etcdCfg.LogLevel = logger.Config.Level.String()
etcdCfg.LogOutputs = []string{embed.StdOutLogOutput}
etcdCfg.SocketOpts = transport.SocketOpts{
ReusePort: true,
ReuseAddress: true,
Expand All @@ -362,15 +359,11 @@ func (c *Server) startServer(ctx context.Context) error {

// Start the server.
c.server, err = embed.StartEtcd(etcdCfg)

// Discard the gRPC logs, as the embed server will set that regardless of what was set before (i.e. at startup).
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, os.Stderr))

if err != nil {
return fmt.Errorf("failed to start etcd: %s", err)
}
c.isRunning = true
log.Infof("embedded etcd server is now running")
zap.S().Infof("embedded etcd server is now running")

// Wait until the server announces its ready, or until the start timeout is exceeded.
//
Expand Down Expand Up @@ -401,11 +394,11 @@ func (c *Server) startServer(ctx context.Context) error {
func (c *Server) runErrorWatcher() {
select {
case <-c.server.Server.StopNotify():
log.Warnf("etcd server is stopping")
zap.S().Warnf("etcd server is stopping")
c.isRunning = false
return
case <-c.server.Err():
log.Warnf("etcd server has crashed")
zap.S().Warnf("etcd server has crashed")
c.Stop(false, false)
}
}
Expand Down Expand Up @@ -455,18 +448,18 @@ func (c *Server) runMemberCleaner() {
if time.Since(member.lastSeenHealthy) < c.cfg.UnhealthyMemberTTL {
continue
}
log.Infof("removing member %q that's been unhealthy for %v", member.name, c.cfg.UnhealthyMemberTTL)
zap.S().Infof("removing member %q that's been unhealthy for %v", member.name, c.cfg.UnhealthyMemberTTL)

cl, err := NewClient([]string{c.cfg.PrivateAddress}, c.cfg.ClientSC, false)
if err != nil {
log.WithError(err).Warn("failed to create etcd cluster client")
zap.S().With(zap.Error(err)).Warn("failed to create etcd cluster client")
continue
}
if err := cl.RemoveMember(member.name, uint64(id)); err == context.DeadlineExceeded {
log.Warnf("failed to remove unhealthy member %q, it might be starting", member.name)
zap.S().Warnf("failed to remove unhealthy member %q, it might be starting", member.name)
continue
} else if err != nil {
log.WithError(err).Warnf("failed to remove unhealthy member %q", member.name)
zap.S().With(zap.Error(err)).Warnf("failed to remove unhealthy member %q", member.name)
continue
}

Expand All @@ -477,7 +470,7 @@ func (c *Server) runMemberCleaner() {

func (c *Server) runSnapshotter() {
if c.cfg.SnapshotProvider == nil || c.cfg.SnapshotInterval == 0 {
log.Warn("periodic snapshots are disabled")
zap.S().Warn("periodic snapshots are disabled")
return
}

Expand All @@ -491,7 +484,7 @@ func (c *Server) runSnapshotter() {
}

if err := c.Snapshot(); err != nil {
log.WithError(err).Error("failed to snapshot")
zap.S().With(zap.Error(err)).Error("failed to snapshot")
}
}
}
45 changes: 0 additions & 45 deletions pkg/logger/level_converter.go

This file was deleted.

Loading

0 comments on commit e176d36

Please sign in to comment.