diff --git a/charts/fluid/fluid/CHANGELOG.md b/charts/fluid/fluid/CHANGELOG.md index ccf9e96f121..7daf32e43bc 100644 --- a/charts/fluid/fluid/CHANGELOG.md +++ b/charts/fluid/fluid/CHANGELOG.md @@ -53,4 +53,5 @@ * Scale runtime controllers on demand ### 0.9.0 -* Support pass image pull secrets from fluid charts to alluxioruntime controller \ No newline at end of file +* Support pass image pull secrets from fluid charts to alluxioruntime controller +* Fix components rbacs and set Fluid CSI Plugin with node-authorized kube-client \ No newline at end of file diff --git a/charts/fluid/fluid/templates/csi/daemonset.yaml b/charts/fluid/fluid/templates/csi/daemonset.yaml index 16abe4d4529..a1bfaf39e9a 100644 --- a/charts/fluid/fluid/templates/csi/daemonset.yaml +++ b/charts/fluid/fluid/templates/csi/daemonset.yaml @@ -104,8 +104,16 @@ spec: - name: fluid-src-dir mountPath: {{ .Values.runtime.mountRoot | quote }} mountPropagation: "Bidirectional" - - name: host-etc-dir - mountPath: /host-etc + - name: kubelet-kube-config + mountPath: /etc/kubernetes/kubelet.conf + readOnly: true + - name: kubelet-cert-dir + mountPath: {{ .Values.csi.kubelet.certDir | quote }} + readOnly: true + - name: updatedb-conf + mountPath: /host-etc/updatedb.conf + - name: updatedb-conf-bak + mountPath: /host-etc/updatedb.conf.bak volumes: - name: kubelet-dir hostPath: @@ -124,6 +132,18 @@ spec: type: DirectoryOrCreate name: fluid-src-dir - hostPath: - path: /etc + path: {{ .Values.csi.kubelet.kubeConfigFile | quote }} + type: File + name: kubelet-kube-config + - hostPath: + path: {{ .Values.csi.kubelet.certDir | quote }} type: Directory - name: host-etc-dir + name: kubelet-cert-dir + - hostPath: + path: /etc/updatedb.conf + type: FileOrCreate + name: updatedb-conf + - hostPath: + path: /etc/updatedb.conf.backup + type: FileOrCreate + name: updatedb-conf-bak diff --git a/charts/fluid/fluid/templates/role/csi/rbac.yaml b/charts/fluid/fluid/templates/role/csi/rbac.yaml index c27f4adc222..5354a76fbc5 100644 --- a/charts/fluid/fluid/templates/role/csi/rbac.yaml +++ b/charts/fluid/fluid/templates/role/csi/rbac.yaml @@ -41,13 +41,7 @@ rules: verbs: ["get"] - apiGroups: [""] resources: ["events"] - verbs: ["get", "list", "watch", "create", "update", "patch"] - - apiGroups: [""] - resources: ["nodes"] - verbs: ["get", "patch"] - - apiGroups: [""] - resources: ["nodes/proxy"] - verbs: ["*"] + verbs: ["create", "patch"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/charts/fluid/fluid/templates/role/webhook/rabc.yaml b/charts/fluid/fluid/templates/role/webhook/rabc.yaml index a2558ec2d3d..c7e46795b01 100644 --- a/charts/fluid/fluid/templates/role/webhook/rabc.yaml +++ b/charts/fluid/fluid/templates/role/webhook/rabc.yaml @@ -1,16 +1,59 @@ {{ if .Values.webhook.enabled -}} apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: fluid-webhook + namespace: {{ include "fluid.namespace" . }} +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - update + resourceNames: + - fluid-webhook-certs + # resourceNames won't protect create verb, so individually specify it for readability + - apiGroups: + - "" + resources: + - secrets + verbs: + - create +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: fluid-webhook-rolebinding + namespace: {{ include "fluid.namespace" . }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: fluid-webhook +subjects: + - kind: ServiceAccount + name: fluid-webhook + namespace: {{ include "fluid.namespace" . }} +--- +apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: fluid-webhook rules: + # Can only list and watch secret `mutatingwebhookconfiguration` with a metadata.name field selector + # See https://kubernetes.io/docs/reference/access-authn-authz/rbac/#referring-to-resources - apiGroups: - admissionregistration.k8s.io resources: - - validatingwebhookconfigurations - mutatingwebhookconfigurations + resourceNames: + - fluid-pod-admission-webhook verbs: - - '*' + - get + - patch + - list + - watch - apiGroups: - data.fluid.io resources: @@ -38,9 +81,7 @@ rules: - apiGroups: - "" resources: - - secrets - configmaps - - events verbs: - get - create @@ -56,12 +97,6 @@ rules: - get - list - watch - - apiGroups: - - coordination.k8s.io - resources: - - leases - verbs: - - '*' --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/charts/fluid/fluid/templates/webhook/webhook.yaml b/charts/fluid/fluid/templates/webhook/webhook.yaml index 994020bf8fd..6b9c48e7e1c 100644 --- a/charts/fluid/fluid/templates/webhook/webhook.yaml +++ b/charts/fluid/fluid/templates/webhook/webhook.yaml @@ -16,6 +16,8 @@ spec: labels: control-plane: fluid-webhook spec: + tolerations: + - operator: Exists {{- with .Values.image.imagePullSecrets }} imagePullSecrets: {{- toYaml . | nindent 8 }} diff --git a/charts/fluid/fluid/values.yaml b/charts/fluid/fluid/values.yaml index 1b29b5d31d6..431aca34ad2 100644 --- a/charts/fluid/fluid/values.yaml +++ b/charts/fluid/fluid/values.yaml @@ -26,6 +26,8 @@ csi: plugins: image: fluidcloudnative/fluid-csi:v0.9.0-085b23e kubelet: + kubeConfigFile: /etc/kubernetes/kubelet.conf + certDir: /var/lib/kubelet/pki rootDir: /var/lib/kubelet pruneFs: fuse.alluxio-fuse,fuse.jindofs-fuse,fuse.juicefs,fuse.goosefs-fuse,ossfs,alifuse.aliyun-alinas-efc diff --git a/cmd/csi/app/csi.go b/cmd/csi/app/csi.go index 0188f11a894..d5b553e7605 100644 --- a/cmd/csi/app/csi.go +++ b/cmd/csi/app/csi.go @@ -39,12 +39,13 @@ import ( ) var ( - endpoint string - nodeID string - metricsAddr string - pprofAddr string - pruneFs []string - prunePath string + endpoint string + nodeID string + metricsAddr string + pprofAddr string + pruneFs []string + prunePath string + kubeletKubeConfigPath string ) var scheme = runtime.NewScheme() @@ -81,6 +82,7 @@ func init() { startCmd.Flags().StringVarP(&prunePath, "prune-path", "", "/runtime-mnt", "Prune path to add in /etc/updatedb.conf") startCmd.Flags().StringVarP(&metricsAddr, "metrics-addr", "", ":8080", "The address the metrics endpoint binds to.") startCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results") + startCmd.Flags().StringVarP(&kubeletKubeConfigPath, "kubelet-kube-config", "", "/etc/kubernetes/kubelet.conf", "The file path to kubelet kube config") utilfeature.DefaultMutableFeatureGate.AddFlag(startCmd.Flags()) startCmd.Flags().AddGoFlagSet(flag.CommandLine) } @@ -109,10 +111,11 @@ func handle() { } config := config.Config{ - NodeId: nodeID, - Endpoint: endpoint, - PruneFs: pruneFs, - PrunePath: prunePath, + NodeId: nodeID, + Endpoint: endpoint, + PruneFs: pruneFs, + PrunePath: prunePath, + KubeletConfigPath: kubeletKubeConfigPath, } if err = csi.SetupWithManager(mgr, config); err != nil { diff --git a/pkg/csi/config/config.go b/pkg/csi/config/config.go index 2b606872f6c..3be35acdaa2 100644 --- a/pkg/csi/config/config.go +++ b/pkg/csi/config/config.go @@ -17,8 +17,9 @@ limitations under the License. package config type Config struct { - NodeId string - Endpoint string - PruneFs []string - PrunePath string + NodeId string + Endpoint string + PruneFs []string + PrunePath string + KubeletConfigPath string } diff --git a/pkg/csi/plugins/driver.go b/pkg/csi/plugins/driver.go index f66c8b7f9b7..5681ea05c4f 100644 --- a/pkg/csi/plugins/driver.go +++ b/pkg/csi/plugins/driver.go @@ -23,6 +23,7 @@ import ( "path/filepath" "strings" + "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -38,15 +39,16 @@ const ( ) type driver struct { - client client.Client - apiReader client.Reader - csiDriver *csicommon.CSIDriver - nodeId, endpoint string + client client.Client + apiReader client.Reader + nodeAuthorizedClient *kubernetes.Clientset + csiDriver *csicommon.CSIDriver + nodeId, endpoint string } var _ manager.Runnable = &driver{} -func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.Reader) *driver { +func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.Reader, nodeAuthorizedClient *kubernetes.Clientset) *driver { glog.Infof("Driver: %v version: %v", driverName, version) proto, addr := utils.SplitSchemaAddr(endpoint) @@ -68,11 +70,12 @@ func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.R csiDriver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER}) return &driver{ - nodeId: nodeID, - endpoint: endpoint, - csiDriver: csiDriver, - client: client, - apiReader: apiReader, + nodeId: nodeID, + endpoint: endpoint, + csiDriver: csiDriver, + client: client, + nodeAuthorizedClient: nodeAuthorizedClient, + apiReader: apiReader, } } @@ -84,10 +87,11 @@ func (d *driver) newControllerServer() *controllerServer { func (d *driver) newNodeServer() *nodeServer { return &nodeServer{ - nodeId: d.nodeId, - DefaultNodeServer: csicommon.NewDefaultNodeServer(d.csiDriver), - client: d.client, - apiReader: d.apiReader, + nodeId: d.nodeId, + DefaultNodeServer: csicommon.NewDefaultNodeServer(d.csiDriver), + client: d.client, + apiReader: d.apiReader, + nodeAuthorizedClient: d.nodeAuthorizedClient, } } diff --git a/pkg/csi/plugins/nodeserver.go b/pkg/csi/plugins/nodeserver.go index fa5e94d7045..dabf800be66 100644 --- a/pkg/csi/plugins/nodeserver.go +++ b/pkg/csi/plugins/nodeserver.go @@ -17,6 +17,7 @@ limitations under the License. package plugins import ( + "encoding/json" "fmt" "os" "os/exec" @@ -31,9 +32,11 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/dataset/volume" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/pkg/errors" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" "k8s.io/utils/mount" "sigs.k8s.io/controller-runtime/pkg/client" @@ -52,10 +55,11 @@ const ( type nodeServer struct { nodeId string *csicommon.DefaultNodeServer - client client.Client - apiReader client.Reader - mutex sync.Mutex - node *v1.Node + client client.Client + apiReader client.Reader + nodeAuthorizedClient *kubernetes.Clientset + mutex sync.Mutex + node *v1.Node } func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { @@ -267,7 +271,8 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag return nil, errors.Wrapf(err, "NodeUnstageVolume: can't get node %s", ns.nodeId) } - _, err = utils.ChangeNodeLabelWithPatchMode(ns.client, node, labelsToModify) + // _, err = utils.ChangeNodeLabelWithPatchMode(ns.client, node, labelsToModify) + err = ns.patchNodeWithLabel(node, labelsToModify) if err != nil { glog.Errorf("NodeUnstageVolume: error when patching labels on node %s: %v", ns.nodeId, err) return nil, errors.Wrapf(err, "NodeUnstageVolume: error when patching labels on node %s", ns.nodeId) @@ -315,7 +320,8 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, errors.Wrapf(err, "NodeStageVolume: can't get node %s", ns.nodeId) } - _, err = utils.ChangeNodeLabelWithPatchMode(ns.client, node, labelsToModify) + // _, err = utils.ChangeNodeLabelWithPatchMode(ns.client, node, labelsToModify) + err = ns.patchNodeWithLabel(node, labelsToModify) if err != nil { glog.Errorf("NodeStageVolume: error when patching labels on node %s: %v", ns.nodeId, err) return nil, errors.Wrapf(err, "NodeStageVolume: error when patching labels on node %s", ns.nodeId) @@ -373,14 +379,58 @@ func (ns *nodeServer) getNode() (node *v1.Node, err error) { } } - if node, err = kubeclient.GetNode(ns.apiReader, ns.nodeId); err != nil { + if node, err = ns.nodeAuthorizedClient.CoreV1().Nodes().Get(context.TODO(), ns.nodeId, metav1.GetOptions{}); err != nil { return nil, err } + + // if node, err = kubeclient.GetNode(ns.apiReader, ns.nodeId); err != nil { + // return nil, err + // } + glog.V(1).Infof("Got node %s from api server", node.Name) ns.node = node return ns.node, nil } +func (ns *nodeServer) patchNodeWithLabel(node *v1.Node, labelsToModify common.LabelsToModify) error { + labels := labelsToModify.GetLabels() + labelValuePair := map[string]interface{}{} + + for _, labelToModify := range labels { + operationType := labelToModify.GetOperationType() + labelToModifyKey := labelToModify.GetLabelKey() + labelToModifyValue := labelToModify.GetLabelValue() + + switch operationType { + case common.AddLabel, common.UpdateLabel: + labelValuePair[labelToModifyKey] = labelToModifyValue + case common.DeleteLabel: + labelValuePair[labelToModifyKey] = nil + default: + err := fmt.Errorf("fail to update the label due to the wrong operation: %s", operationType) + return err + } + } + + metadata := map[string]interface{}{ + "metadata": map[string]interface{}{ + "labels": labelValuePair, + }, + } + + patchByteData, err := json.Marshal(metadata) + if err != nil { + return err + } + + _, err = ns.nodeAuthorizedClient.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.StrategicMergePatchType, patchByteData, metav1.PatchOptions{}) + if err != nil { + return err + } + + return nil +} + func checkMountInUse(volumeName string) (bool, error) { var inUse bool glog.Infof("Try to check if the volume %s is being used", volumeName) @@ -454,7 +504,8 @@ func (ns *nodeServer) prepareSessMgr(workDir string) error { return errors.Wrapf(err, "can't get node %s", ns.nodeId) } - _, err = utils.ChangeNodeLabelWithPatchMode(ns.client, node, labelsToModify) + // _, err = utils.ChangeNodeLabelWithPatchMode(ns.client, node, labelsToModify) + err = ns.patchNodeWithLabel(node, labelsToModify) if err != nil { return errors.Wrapf(err, "error when patching labels on node %s", ns.nodeId) } diff --git a/pkg/csi/plugins/register.go b/pkg/csi/plugins/register.go index e15b09f0197..45406626b66 100644 --- a/pkg/csi/plugins/register.go +++ b/pkg/csi/plugins/register.go @@ -18,12 +18,18 @@ package plugins import ( "github.com/fluid-cloudnative/fluid/pkg/csi/config" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubelet" "sigs.k8s.io/controller-runtime/pkg/manager" ) // Register initializes the csi driver and registers it to the controller manager. func Register(mgr manager.Manager, cfg config.Config) error { - csiDriver := NewDriver(cfg.NodeId, cfg.Endpoint, mgr.GetClient(), mgr.GetAPIReader()) + client, err := kubelet.InitNodeAuthorizedClient(cfg.KubeletConfigPath) + if err != nil { + return err + } + + csiDriver := NewDriver(cfg.NodeId, cfg.Endpoint, mgr.GetClient(), mgr.GetAPIReader(), client) if err := mgr.Add(csiDriver); err != nil { return err diff --git a/pkg/utils/kubelet/node_auth_client.go b/pkg/utils/kubelet/node_auth_client.go new file mode 100644 index 00000000000..a7aeb58f24e --- /dev/null +++ b/pkg/utils/kubelet/node_auth_client.go @@ -0,0 +1,24 @@ +package kubelet + +import ( + "github.com/pkg/errors" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +// InitNodeAuthorizedClient initializes node authorized client with kubelet's kube config. +// This is now an available workaround to implement a node-scoped daemonset. +// See discussion https://github.com/kubernetes/enhancements/pull/944#issuecomment-490242290 +func InitNodeAuthorizedClient(kubeletKubeConfigPath string) (*kubernetes.Clientset, error) { + config, err := clientcmd.BuildConfigFromFlags("", kubeletKubeConfigPath) + if err != nil { + return nil, errors.Wrapf(err, "fail to build kubelet config") + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, errors.Wrap(err, "fail to build client-go client from kubelet kubeconfig") + } + + return client, nil +}