Skip to content

Commit

Permalink
Support contextual logging
Browse files Browse the repository at this point in the history
  • Loading branch information
bells17 committed May 4, 2024
1 parent 5f016e3 commit 7110cda
Show file tree
Hide file tree
Showing 190 changed files with 24,465 additions and 228 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,8 @@ all: build

include release-tools/build.make

# Check contextual logging.
.PHONY: logcheck
test: logcheck
logcheck:
hack/verify-logcheck.sh
99 changes: 59 additions & 40 deletions cmd/csi-attacher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/featuregate"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
_ "k8s.io/component-base/logs/json/register"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -86,19 +90,27 @@ var (
)

func main() {
klog.InitFlags(nil)
flag.Set("logtostderr", "true")
fg := featuregate.NewFeatureGate()
logsapi.AddFeatureGates(fg)
c := logsapi.NewLoggingConfiguration()
logsapi.AddGoFlags(c, flag.CommandLine)
logs.InitLogs()
flag.Parse()
logger := klog.Background()
if err := logsapi.ValidateAndApply(c, fg); err != nil {
logger.Error(err, "LoggingConfiguration is invalid")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

if *showVersion {
fmt.Println(os.Args[0], version)
return
}
klog.Infof("Version: %s", version)
logger.Info("Version", "version", version)

if *metricsAddress != "" && *httpEndpoint != "" {
klog.Error("only one of `--metrics-address` and `--http-endpoint` can be set.")
os.Exit(1)
logger.Error(nil, "Only one of `--metrics-address` and `--http-endpoint` can be set")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
addr := *metricsAddress
if addr == "" {
Expand All @@ -108,21 +120,21 @@ func main() {
// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
config, err := buildConfig(*kubeconfig)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
logger.Error(err, "Failed to build a Kubernetes config")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
config.QPS = (float32)(*kubeAPIQPS)
config.Burst = *kubeAPIBurst

if *workerThreads == 0 {
klog.Error("option -worker-threads must be greater than zero")
os.Exit(1)
logger.Error(nil, "Option -worker-threads must be greater than zero")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
logger.Error(err, "Failed to create a Clientset")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

factory := informers.NewSharedInformerFactory(clientset, *resync)
Expand All @@ -133,41 +145,44 @@ func main() {
connection.SetMaxGRPCLogLength(*maxGRPCLogLength)
csiConn, err := connection.Connect(*csiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
if err != nil {
klog.Error(err.Error())
os.Exit(1)
logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", *csiAddress)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

err = rpc.ProbeForever(csiConn, *timeout)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
logger.Error(err, "Failed to probe the CSI driver")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

// Find driver name.
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
cancelationCtx, cancel := context.WithTimeout(context.Background(), csiTimeout)
cancelationCtx = klog.NewContext(cancelationCtx, logger)
defer cancel()
csiAttacher, err := rpc.GetDriverName(ctx, csiConn)
csiAttacher, err := rpc.GetDriverName(cancelationCtx, csiConn)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
logger.Error(err, "Failed to get the CSI driver name")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
klog.V(2).Infof("CSI driver name: %q", csiAttacher)
logger = klog.LoggerWithValues(logger, "driver", csiAttacher)
logger.V(2).Info("CSI driver name")
cancelationCtx = klog.NewContext(cancelationCtx, logger)

translator := csitrans.New()
if translator.IsMigratedCSIDriverByName(csiAttacher) {
metricsManager = metrics.NewCSIMetricsManagerWithOptions(csiAttacher, metrics.WithMigration())
migratedCsiClient, err := connection.Connect(*csiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
if err != nil {
klog.Error(err.Error())
os.Exit(1)
logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", *csiAddress, "migrated", true)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
csiConn.Close()
csiConn = migratedCsiClient

err = rpc.ProbeForever(csiConn, *timeout)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
logger.Error(err, "Failed to probe the CSI driver", "migrated", true)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}

Expand All @@ -177,18 +192,19 @@ func main() {
metricsManager.RegisterToServer(mux, *metricsPath)
metricsManager.SetDriverName(csiAttacher)
go func() {
klog.Infof("ServeMux listening at %q", addr)
logger.Info("ServeMux listening", "address", addr, "metricsPath", *metricsPath)
err := http.ListenAndServe(addr, mux)
if err != nil {
klog.Fatalf("Failed to start HTTP server at specified address (%q) and metrics path (%q): %s", addr, *metricsPath, err)
logger.Error(err, "Failed to start HTTP server at specified address and metrics path", "address", addr, "metricsPath", *metricsPath)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}()
}

supportsService, err := supportsPluginControllerService(ctx, csiConn)
supportsService, err := supportsPluginControllerService(cancelationCtx, csiConn)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
logger.Error(err, "Failed to check if the CSI Driver supports the CONTROLLER_SERVICE")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

var (
Expand All @@ -199,12 +215,12 @@ func main() {
)
if !supportsService {
handler = controller.NewTrivialHandler(clientset)
klog.V(2).Infof("CSI driver does not support Plugin Controller Service, using trivial handler")
logger.V(2).Info("CSI driver does not support Plugin Controller Service, using trivial handler")
} else {
supportsAttach, supportsReadOnly, supportsListVolumesPublishedNodes, supportsSingleNodeMultiWriter, err = supportsControllerCapabilities(ctx, csiConn)
supportsAttach, supportsReadOnly, supportsListVolumesPublishedNodes, supportsSingleNodeMultiWriter, err = supportsControllerCapabilities(cancelationCtx, csiConn)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
logger.Error(err, "Failed to controller capability check")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

if supportsAttach {
Expand All @@ -227,18 +243,19 @@ func main() {
csitrans.New(),
*defaultFSType,
)
klog.V(2).Infof("CSI driver supports ControllerPublishUnpublish, using real CSI handler")
logger.V(2).Info("CSI driver supports ControllerPublishUnpublish, using real CSI handler")
} else {
handler = controller.NewTrivialHandler(clientset)
klog.V(2).Infof("CSI driver does not support ControllerPublishUnpublish, using trivial handler")
logger.V(2).Info("CSI driver does not support ControllerPublishUnpublish, using trivial handler")
}
}

if supportsListVolumesPublishedNodes {
klog.V(2).Infof("CSI driver supports list volumes published nodes. Using capability to reconcile volume attachment objects with actual backend state")
logger.V(2).Info("CSI driver supports list volumes published nodes. Using capability to reconcile volume attachment objects with actual backend state")
}

ctrl := controller.NewCSIAttachController(
logger,
clientset,
csiAttacher,
handler,
Expand All @@ -253,18 +270,19 @@ func main() {
run := func(ctx context.Context) {
stopCh := ctx.Done()
factory.Start(stopCh)
ctrl.Run(int(*workerThreads), stopCh)
ctrl.Run(ctx, int(*workerThreads))
}

if !*enableLeaderElection {
run(context.TODO())
run(klog.NewContext(context.Background(), logger))
} else {
// Create a new clientset for leader election. When the attacher
// gets busy and its client gets throttled, the leader election
// can proceed without issues.
leClientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create leaderelection client: %v", err)
logger.Error(err, "Failed to create leaderelection client")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

// Name of config map with leader election lock
Expand All @@ -283,7 +301,8 @@ func main() {
le.WithRetryPeriod(*leaderElectionRetryPeriod)

if err := le.Run(); err != nil {
klog.Fatalf("failed to initialize leader election: %v", err)
logger.Error(err, "Failed to initialize leader election")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
k8s.io/api v0.29.0
k8s.io/apimachinery v0.29.0
k8s.io/client-go v0.29.0
k8s.io/component-base v0.29.0
k8s.io/csi-translation-lib v0.29.0
k8s.io/klog/v2 v2.120.1
)
Expand All @@ -25,6 +26,7 @@ require (
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-logr/zapr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
Expand All @@ -35,6 +37,7 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand All @@ -46,11 +49,15 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.53.0 // indirect
github.com/prometheus/procfs v0.14.0 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0 // indirect
go.opentelemetry.io/otel v1.26.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.19.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.19.0 // indirect
golang.org/x/sys v0.19.0 // indirect
Expand All @@ -62,7 +69,6 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/component-base v0.29.0 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
Loading

0 comments on commit 7110cda

Please sign in to comment.