Skip to content

Commit

Permalink
Implement distributed snapshotting
Browse files Browse the repository at this point in the history
  • Loading branch information
nearora-msft committed Nov 1, 2021
1 parent caef201 commit 4484ece
Show file tree
Hide file tree
Showing 12 changed files with 727 additions and 8 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ Read more about how to install the example webhook [here](deploy/kubernetes/webh

* `--worker-threads`: Number of worker threads for running create snapshot and delete snapshot operations. Default value is 10.

* `--node-deployment`: Enables deploying the sidecar controller together with a CSI driver on nodes to manage node-local volumes. Off by default.

* `--retry-interval-start`: Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default value is 1 second.

*`--retry-interval-max`: Maximum retry interval of failed volume snapshot creation or deletion. Default value is 5 minutes.
Expand Down
31 changes: 25 additions & 6 deletions cmd/csi-snapshotter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ import (
"strings"
"time"

utils "github.com/kubernetes-csi/external-snapshotter/v4/pkg/utils"

"google.golang.org/grpc"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -72,11 +76,12 @@ var (
leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.")
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.")

metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.")
metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.")
enableNodeDeployment = flag.Bool("node-deployment", false, "Enables deploying the sidecar controller together with a CSI driver on nodes to manage snapshots for node-local volumes.")
)

var (
Expand Down Expand Up @@ -116,6 +121,19 @@ func main() {

factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod)
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, *resyncPeriod)
var snapshotContentfactory informers.SharedInformerFactory
if *enableNodeDeployment {
node := os.Getenv("NODE_NAME")
if node == "" {
klog.Fatal("The NODE_NAME environment variable must be set when using --enable-node-deployment.")
}
snapshotContentfactory = informers.NewSharedInformerFactoryWithOptions(snapClient, *resyncPeriod, informers.WithTweakListOptions(func(lo *v1.ListOptions) {
lo.LabelSelector = labels.Set{utils.VolumeSnapshotContentManagedByLabel: node}.AsSelector().String()
}),
)
} else {
snapshotContentfactory = factory
}

// Add Snapshot types to the default Kubernetes so events can be logged for them
snapshotscheme.AddToScheme(scheme.Scheme)
Expand Down Expand Up @@ -196,7 +214,7 @@ func main() {
snapClient,
kubeClient,
driverName,
factory.Snapshot().V1().VolumeSnapshotContents(),
snapshotContentfactory.Snapshot().V1().VolumeSnapshotContents(),
factory.Snapshot().V1().VolumeSnapshotClasses(),
snapShotter,
*csiTimeout,
Expand All @@ -210,6 +228,7 @@ func main() {
run := func(context.Context) {
// run...
stopCh := make(chan struct{})
snapshotContentfactory.Start(stopCh)
factory.Start(stopCh)
coreFactory.Start(stopCh)
go ctrl.Run(*threads, stopCh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ rules:
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshots/status"]
verbs: ["update", "patch"]

- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "watch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
k8s.io/apimachinery v0.22.0
k8s.io/client-go v0.22.0
k8s.io/component-base v0.22.0
k8s.io/component-helpers v0.22.1
k8s.io/klog/v2 v2.9.0
k8s.io/kubernetes v1.21.0
)
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,7 @@ k8s.io/cluster-bootstrap v0.22.0/go.mod h1:VeZXiGfH+yfnC2KtvkSwNTAqahg6yiCV/szbW
k8s.io/code-generator v0.22.0/go.mod h1:eV77Y09IopzeXOJzndrDyCI88UBok2h6WxAlBwpxa+o=
k8s.io/component-base v0.22.0 h1:ZTmX8hUqH9T9gc0mM42O+KDgtwTYbVTt2MwmLP0eK8A=
k8s.io/component-base v0.22.0/go.mod h1:SXj6Z+V6P6GsBhHZVbWCw9hFjUdUYnJerlhhPnYCBCg=
k8s.io/component-helpers v0.22.0 h1:OoTOtxTkg/T16FRS1K/WfABzxliTCq3RTbFHMBSod/o=
k8s.io/component-helpers v0.22.0/go.mod h1:YNIbQI59ayNiU8JHlPIxVkOUYycbKhk5Niy0pcyJOEY=
k8s.io/controller-manager v0.22.0/go.mod h1:KCFcmFIjh512sVIm1EhAPJ+4miASDvbZA5eO/2nbr2M=
k8s.io/cri-api v0.22.0/go.mod h1:mj5DGUtElRyErU5AZ8EM0ahxbElYsaLAMTPhLPQ40Eg=
Expand Down
39 changes: 39 additions & 0 deletions pkg/common-controller/snapshot_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes/scheme"
ref "k8s.io/client-go/tools/reference"
corev1 "k8s.io/component-helpers/scheduling/corev1"
klog "k8s.io/klog/v2"

crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
Expand Down Expand Up @@ -671,6 +672,16 @@ func (ctrl *csiSnapshotCommonController) createSnapshotContent(snapshot *crdv1.V
},
}

nodeName, err := ctrl.getManagedByNode(snapshot)
if err != nil {
return nil, err
}
if nodeName != "" {
snapshotContent.Labels = map[string]string{
utils.VolumeSnapshotContentManagedByLabel: nodeName,
}
}

// Set AnnDeletionSecretRefName and AnnDeletionSecretRefNamespace
if snapshotterSecretRef != nil {
klog.V(5).Infof("createSnapshotContent: set annotation [%s] on content [%s].", utils.AnnDeletionSecretRefName, snapshotContent.Name)
Expand Down Expand Up @@ -1655,3 +1666,31 @@ func (ctrl *csiSnapshotCommonController) checkAndSetInvalidSnapshotLabel(snapsho

return updatedSnapshot, nil
}

func (ctrl *csiSnapshotCommonController) getManagedByNode(snapshot *crdv1.VolumeSnapshot) (string, error) {
pv, err := ctrl.getVolumeFromVolumeSnapshot(snapshot)
if err != nil {
return "", fmt.Errorf("failed to retrieve PV from snapshot[%s]: %q", snapshot.Name, err)
}
if pv.Spec.NodeAffinity == nil {
klog.V(5).Infof("NodeAffinity not set for pv %s", pv.Name)
return "", nil
}
nodeSelectorTerms := pv.Spec.NodeAffinity.Required

nodes, err := ctrl.client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
if err != nil {
klog.Errorf("failed to get the list of nodes: %q", err)
return "", nil
}

for _, node := range nodes.Items {
val, _ := corev1.MatchNodeSelectorTerms(&node, nodeSelectorTerms)
if val {
return node.Name, nil
}
}

klog.Errorf("failed to find nodes that match the node affinity requirements for pv[%s]", pv.Name)
return "", nil
}
3 changes: 2 additions & 1 deletion pkg/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ const (
VolumeSnapshotContentInvalidLabel = "snapshot.storage.kubernetes.io/invalid-snapshot-content-resource"
// VolumeSnapshotInvalidLabel is applied to invalid snapshot as a label key. The value does not matter.
// See https://github.com/kubernetes/enhancements/blob/master/keps/sig-storage/177-volume-snapshot/tighten-validation-webhook-crd.md#automatic-labelling-of-invalid-objects
VolumeSnapshotInvalidLabel = "snapshot.storage.kubernetes.io/invalid-snapshot-resource"
VolumeSnapshotInvalidLabel = "snapshot.storage.kubernetes.io/invalid-snapshot-resource"
VolumeSnapshotContentManagedByLabel = "csi.storage.k8s.io/managed-by"
)

var SnapshotterSecretParams = secretParamsMap{
Expand Down
Loading

0 comments on commit 4484ece

Please sign in to comment.