From 0cbbc89afdd2726c7fb6fa653a89846b0c337ef8 Mon Sep 17 00:00:00 2001 From: Amarnath Valluri Date: Fri, 8 Mar 2019 10:58:55 +0200 Subject: [PATCH] Persisting driver state and reload state on restart. Upon restarts, driver has to recognize the provisioned volumes on last run. For that driver has to persist its state(provisioned volume information). We have two options: 1. Per node state: Persist the driver state on each node specific to that node. Upon driver start, it has to load the persisted information. Whenever a node registers, Controller/Master has to collect the volume information stored on that node. 2. Cluster level state: Persist the consolidated cluster level driver state, say to a configmap on Kubernetes. This is supposed to done at Controller side, but not clear how to pass node specific information to a node. This change implements the fist option, i.e, per node state. Defined a new interface, StateManager, that handles the state persistency irrespective of either per node or cluster. Also provided an implementation of StateManager that persists the state to a file. Made changes to RegistrySrerver, so that it informs the listeners upon registering of a node controller. The ControllerServer on master, listens on this and collects the volume information on node, by calling csi.ControllerServer.ListVolumes. FIXES #25 - Clean handling of driver restart --- DEVELOPMENT.md | 2 +- .../pmem-csi-direct-testing.yaml | 7 + deploy/kubernetes-1.13/pmem-csi-direct.yaml | 7 + .../kubernetes-1.13/pmem-csi-lvm-testing.yaml | 7 + deploy/kubernetes-1.13/pmem-csi-lvm.yaml | 7 + .../pmem-csi-direct-testing.yaml | 7 + deploy/kubernetes-1.14/pmem-csi-direct.yaml | 7 + .../kubernetes-1.14/pmem-csi-lvm-testing.yaml | 7 + deploy/kubernetes-1.14/pmem-csi-lvm.yaml | 7 + deploy/kustomize/driver/pmem-csi.yaml | 9 +- .../controllerserver-master.go | 62 +++- pkg/pmem-csi-driver/controllerserver-node.go | 157 +++++++-- pkg/pmem-csi-driver/main.go | 4 +- pkg/pmem-csi-driver/nodeserver.go | 4 + pkg/pmem-csi-driver/pmem-csi-driver.go | 13 +- pkg/pmem-csi-driver/registryserver.go | 28 +- pkg/pmem-state/pmem-state.go | 184 ++++++++++ pkg/pmem-state/pmem-state_test.go | 332 ++++++++++++++++++ 18 files changed, 800 insertions(+), 51 deletions(-) create mode 100644 pkg/pmem-state/pmem-state.go create mode 100644 pkg/pmem-state/pmem-state_test.go diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index a3f6d3dc80..323962a2c0 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -147,7 +147,7 @@ argument name | meaning | type -mode string | driver run mode | string | controller, node | -nodeid string | node id | string | | nodeid -registryEndpoint string | endpoint to connect/listen registry server | string | | - +-statePath | Directory path where to persist the state of the driver running on a node | string | absolute directory path on node | /var/lib/ Environment variables --------------------- diff --git a/deploy/kubernetes-1.13/pmem-csi-direct-testing.yaml b/deploy/kubernetes-1.13/pmem-csi-direct-testing.yaml index cf60dcba20..86c06b6a7a 100644 --- a/deploy/kubernetes-1.13/pmem-csi-direct-testing.yaml +++ b/deploy/kubernetes-1.13/pmem-csi-direct-testing.yaml @@ -295,6 +295,7 @@ spec: - -caFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt - -certFile=/certs/$(KUBE_NODE_NAME).crt - -keyFile=/certs/$(KUBE_NODE_NAME).key + - -statePath=/var/lib/pmem-csi.intel.com - -v=5 - -coverprofile=/var/lib/pmem-csi-coverage/pmem-csi-driver-node-*.out env: @@ -331,6 +332,8 @@ spec: name: registry-cert - mountPath: /dev name: dev-dir + - mountPath: /var/lib/pmem-csi.intel.com + name: pmem-state-dir - mountPath: /sys name: sys-dir - mountPath: /var/lib/pmem-csi-coverage @@ -371,6 +374,10 @@ spec: - name: registry-cert secret: secretName: pmem-csi-node-secrets + - hostPath: + path: /var/lib/pmem-csi.intel.com + type: DirectoryOrCreate + name: pmem-state-dir - hostPath: path: /dev type: DirectoryOrCreate diff --git a/deploy/kubernetes-1.13/pmem-csi-direct.yaml b/deploy/kubernetes-1.13/pmem-csi-direct.yaml index 57c260e995..86314d7254 100644 --- a/deploy/kubernetes-1.13/pmem-csi-direct.yaml +++ b/deploy/kubernetes-1.13/pmem-csi-direct.yaml @@ -218,6 +218,7 @@ spec: - -caFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt - -certFile=/certs/$(KUBE_NODE_NAME).crt - -keyFile=/certs/$(KUBE_NODE_NAME).key + - -statePath=/var/lib/pmem-csi.intel.com env: - name: CSI_ENDPOINT value: unix:///csi/csi.sock @@ -252,6 +253,8 @@ spec: name: registry-cert - mountPath: /dev name: dev-dir + - mountPath: /var/lib/pmem-csi.intel.com + name: pmem-state-dir - mountPath: /sys name: sys-dir - args: @@ -289,6 +292,10 @@ spec: - name: registry-cert secret: secretName: pmem-csi-node-secrets + - hostPath: + path: /var/lib/pmem-csi.intel.com + type: DirectoryOrCreate + name: pmem-state-dir - hostPath: path: /dev type: DirectoryOrCreate diff --git a/deploy/kubernetes-1.13/pmem-csi-lvm-testing.yaml b/deploy/kubernetes-1.13/pmem-csi-lvm-testing.yaml index 09ba11065f..0df0a2172e 100644 --- a/deploy/kubernetes-1.13/pmem-csi-lvm-testing.yaml +++ b/deploy/kubernetes-1.13/pmem-csi-lvm-testing.yaml @@ -295,6 +295,7 @@ spec: - -caFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt - -certFile=/certs/$(KUBE_NODE_NAME).crt - -keyFile=/certs/$(KUBE_NODE_NAME).key + - -statePath=/var/lib/pmem-csi.intel.com - -v=5 - -coverprofile=/var/lib/pmem-csi-coverage/pmem-csi-driver-node-*.out env: @@ -331,6 +332,8 @@ spec: name: registry-cert - mountPath: /dev name: dev-dir + - mountPath: /var/lib/pmem-csi.intel.com + name: pmem-state-dir - mountPath: /var/lib/pmem-csi-coverage name: coverage-dir - args: @@ -408,6 +411,10 @@ spec: - name: registry-cert secret: secretName: pmem-csi-node-secrets + - hostPath: + path: /var/lib/pmem-csi.intel.com + type: DirectoryOrCreate + name: pmem-state-dir - hostPath: path: /dev type: DirectoryOrCreate diff --git a/deploy/kubernetes-1.13/pmem-csi-lvm.yaml b/deploy/kubernetes-1.13/pmem-csi-lvm.yaml index 678913f97b..402eb39d60 100644 --- a/deploy/kubernetes-1.13/pmem-csi-lvm.yaml +++ b/deploy/kubernetes-1.13/pmem-csi-lvm.yaml @@ -218,6 +218,7 @@ spec: - -caFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt - -certFile=/certs/$(KUBE_NODE_NAME).crt - -keyFile=/certs/$(KUBE_NODE_NAME).key + - -statePath=/var/lib/pmem-csi.intel.com env: - name: CSI_ENDPOINT value: unix:///csi/csi.sock @@ -252,6 +253,8 @@ spec: name: registry-cert - mountPath: /dev name: dev-dir + - mountPath: /var/lib/pmem-csi.intel.com + name: pmem-state-dir - args: - -v=3 - --kubelet-registration-path=/var/lib/kubelet/plugins/pmem-csi.intel.com/csi.sock @@ -317,6 +320,10 @@ spec: - name: registry-cert secret: secretName: pmem-csi-node-secrets + - hostPath: + path: /var/lib/pmem-csi.intel.com + type: DirectoryOrCreate + name: pmem-state-dir - hostPath: path: /dev type: DirectoryOrCreate diff --git a/deploy/kubernetes-1.14/pmem-csi-direct-testing.yaml b/deploy/kubernetes-1.14/pmem-csi-direct-testing.yaml index 8753aaa2f3..acda0267ad 100644 --- a/deploy/kubernetes-1.14/pmem-csi-direct-testing.yaml +++ b/deploy/kubernetes-1.14/pmem-csi-direct-testing.yaml @@ -315,6 +315,7 @@ spec: - -caFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt - -certFile=/certs/$(KUBE_NODE_NAME).crt - -keyFile=/certs/$(KUBE_NODE_NAME).key + - -statePath=/var/lib/pmem-csi.intel.com - -v=5 - -coverprofile=/var/lib/pmem-csi-coverage/pmem-csi-driver-node-*.out env: @@ -351,6 +352,8 @@ spec: name: registry-cert - mountPath: /dev name: dev-dir + - mountPath: /var/lib/pmem-csi.intel.com + name: pmem-state-dir - mountPath: /sys name: sys-dir - mountPath: /var/lib/pmem-csi-coverage @@ -391,6 +394,10 @@ spec: - name: registry-cert secret: secretName: pmem-csi-node-secrets + - hostPath: + path: /var/lib/pmem-csi.intel.com + type: DirectoryOrCreate + name: pmem-state-dir - hostPath: path: /dev type: DirectoryOrCreate diff --git a/deploy/kubernetes-1.14/pmem-csi-direct.yaml b/deploy/kubernetes-1.14/pmem-csi-direct.yaml index 70fcadaa2e..714370ebc3 100644 --- a/deploy/kubernetes-1.14/pmem-csi-direct.yaml +++ b/deploy/kubernetes-1.14/pmem-csi-direct.yaml @@ -238,6 +238,7 @@ spec: - -caFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt - -certFile=/certs/$(KUBE_NODE_NAME).crt - -keyFile=/certs/$(KUBE_NODE_NAME).key + - -statePath=/var/lib/pmem-csi.intel.com env: - name: CSI_ENDPOINT value: unix:///csi/csi.sock @@ -272,6 +273,8 @@ spec: name: registry-cert - mountPath: /dev name: dev-dir + - mountPath: /var/lib/pmem-csi.intel.com + name: pmem-state-dir - mountPath: /sys name: sys-dir - args: @@ -309,6 +312,10 @@ spec: - name: registry-cert secret: secretName: pmem-csi-node-secrets + - hostPath: + path: /var/lib/pmem-csi.intel.com + type: DirectoryOrCreate + name: pmem-state-dir - hostPath: path: /dev type: DirectoryOrCreate diff --git a/deploy/kubernetes-1.14/pmem-csi-lvm-testing.yaml b/deploy/kubernetes-1.14/pmem-csi-lvm-testing.yaml index 1ffbb19da2..4d867dac73 100644 --- a/deploy/kubernetes-1.14/pmem-csi-lvm-testing.yaml +++ b/deploy/kubernetes-1.14/pmem-csi-lvm-testing.yaml @@ -315,6 +315,7 @@ spec: - -caFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt - -certFile=/certs/$(KUBE_NODE_NAME).crt - -keyFile=/certs/$(KUBE_NODE_NAME).key + - -statePath=/var/lib/pmem-csi.intel.com - -v=5 - -coverprofile=/var/lib/pmem-csi-coverage/pmem-csi-driver-node-*.out env: @@ -351,6 +352,8 @@ spec: name: registry-cert - mountPath: /dev name: dev-dir + - mountPath: /var/lib/pmem-csi.intel.com + name: pmem-state-dir - mountPath: /var/lib/pmem-csi-coverage name: coverage-dir - args: @@ -428,6 +431,10 @@ spec: - name: registry-cert secret: secretName: pmem-csi-node-secrets + - hostPath: + path: /var/lib/pmem-csi.intel.com + type: DirectoryOrCreate + name: pmem-state-dir - hostPath: path: /dev type: DirectoryOrCreate diff --git a/deploy/kubernetes-1.14/pmem-csi-lvm.yaml b/deploy/kubernetes-1.14/pmem-csi-lvm.yaml index ba0b3cf426..92aab29cc0 100644 --- a/deploy/kubernetes-1.14/pmem-csi-lvm.yaml +++ b/deploy/kubernetes-1.14/pmem-csi-lvm.yaml @@ -238,6 +238,7 @@ spec: - -caFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt - -certFile=/certs/$(KUBE_NODE_NAME).crt - -keyFile=/certs/$(KUBE_NODE_NAME).key + - -statePath=/var/lib/pmem-csi.intel.com env: - name: CSI_ENDPOINT value: unix:///csi/csi.sock @@ -272,6 +273,8 @@ spec: name: registry-cert - mountPath: /dev name: dev-dir + - mountPath: /var/lib/pmem-csi.intel.com + name: pmem-state-dir - args: - -v=3 - --kubelet-registration-path=/var/lib/kubelet/plugins/pmem-csi.intel.com/csi.sock @@ -337,6 +340,10 @@ spec: - name: registry-cert secret: secretName: pmem-csi-node-secrets + - hostPath: + path: /var/lib/pmem-csi.intel.com + type: DirectoryOrCreate + name: pmem-state-dir - hostPath: path: /dev type: DirectoryOrCreate diff --git a/deploy/kustomize/driver/pmem-csi.yaml b/deploy/kustomize/driver/pmem-csi.yaml index 8f7ea380f2..6371955bc0 100644 --- a/deploy/kustomize/driver/pmem-csi.yaml +++ b/deploy/kustomize/driver/pmem-csi.yaml @@ -113,7 +113,8 @@ spec: "-registryEndpoint=$(PMEM_CSI_CONTROLLER_PORT_10000_TCP)", "-caFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", "-certFile=/certs/$(KUBE_NODE_NAME).crt", - "-keyFile=/certs/$(KUBE_NODE_NAME).key" + "-keyFile=/certs/$(KUBE_NODE_NAME).key", + "-statePath=/var/lib/pmem-csi.intel.com" ] # Passing /dev to container may cause container creation error because # termination-log is located on /dev/ by default, re-locate to /tmp @@ -148,6 +149,8 @@ spec: mountPath: /certs/ - name : dev-dir mountPath: /dev + - name: pmem-state-dir + mountPath: /var/lib/pmem-csi.intel.com - name: driver-registrar imagePullPolicy: Always image: quay.io/k8scsi/csi-node-driver-registrar:v1.X.Y @@ -179,6 +182,10 @@ spec: - name: registry-cert secret: secretName: pmem-csi-node-secrets + - name: pmem-state-dir + hostPath: + path: /var/lib/pmem-csi.intel.com + type: DirectoryOrCreate - name: dev-dir hostPath: path: /dev diff --git a/pkg/pmem-csi-driver/controllerserver-master.go b/pkg/pmem-csi-driver/controllerserver-master.go index 2eccd6b23a..e7b4b5cdc2 100644 --- a/pkg/pmem-csi-driver/controllerserver-master.go +++ b/pkg/pmem-csi-driver/controllerserver-master.go @@ -53,8 +53,6 @@ type pmemVolume struct { // ID of nodes where the volume provisioned/attached // It would be one if simple volume, else would be more than one for "cached" volume nodeIDs map[string]VolumeStatus - // VolumeType - volumeType PmemPersistencyModel } type masterController struct { @@ -65,6 +63,7 @@ type masterController struct { var _ csi.ControllerServer = &masterController{} var _ PmemService = &masterController{} +var _ RegistryListener = &masterController{} var volumeMutex = keymutex.NewHashed(-1) func NewMasterControllerServer(rs *registryServer) *masterController { @@ -73,17 +72,62 @@ func NewMasterControllerServer(rs *registryServer) *masterController { csi.ControllerServiceCapability_RPC_LIST_VOLUMES, csi.ControllerServiceCapability_RPC_GET_CAPACITY, } - return &masterController{ + cs := &masterController{ DefaultControllerServer: NewDefaultControllerServer(serverCaps), rs: rs, pmemVolumes: map[string]*pmemVolume{}, } + + rs.AddListener(cs) + + return cs } func (cs *masterController) RegisterService(rpcServer *grpc.Server) { csi.RegisterControllerServer(rpcServer, cs) } +// OnNodeAdded retrieves the existing volumes at recently added Node. +// It uses ControllerServer.ListVolume() CSI call to retrieve volumes. +func (cs *masterController) OnNodeAdded(ctx context.Context, node NodeInfo) { + conn, err := cs.rs.ConnectToNodeController(node.NodeID) + if err != nil { + glog.Warningf("Failed to connect to node controller at : %s on node %s: %s", node.Endpoint, node.NodeID, err.Error()) + return + } + + csiClient := csi.NewControllerClient(conn) + resp, err := csiClient.ListVolumes(ctx, &csi.ListVolumesRequest{}) + if err != nil { + glog.Warningf("Failed to get volumes on node %s: %s", node.NodeID, err.Error()) + } + + glog.V(5).Infof("Found Volumes at %s: %v", node.NodeID, resp.Entries) + + for _, entry := range resp.Entries { + v := entry.GetVolume() + if v == nil { /* this shouldn't happen */ + continue + } + if vol, ok := cs.pmemVolumes[v.VolumeId]; ok && vol != nil { + // This is possibly Cache volume, so just add this node id. + vol.nodeIDs[node.NodeID] = Created + } else { + cs.pmemVolumes[v.VolumeId] = &pmemVolume{ + id: v.VolumeId, + size: v.CapacityBytes, + name: v.VolumeContext["Name"], + nodeIDs: map[string]VolumeStatus{ + node.NodeID: Created, + }, + } + } + } +} + +func (cs *masterController) OnNodeDeleted(ctx context.Context, node NodeInfo) { +} + func (cs *masterController) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { var vol *pmemVolume chosenNodes := map[string]VolumeStatus{} @@ -117,14 +161,13 @@ func (cs *masterController) CreateVolume(ctx context.Context, req *csi.CreateVol id, _ := uuid.NewUUID() //nolint: gosec volumeID := id.String() inTopology := []*csi.Topology{} - volumeType := pmemPersistencyModelNone cacheCount := uint64(1) if req.Parameters == nil { req.Parameters = map[string]string{} } else { if val, ok := req.Parameters[pmemParameterKeyPersistencyModel]; ok { - volumeType = PmemPersistencyModel(val) + volumeType := PmemPersistencyModel(val) if volumeType == pmemPersistencyModelCache { if val, ok := req.Parameters[pmemParameterKeyCacheSize]; ok { c, err := strconv.ParseUint(val, 10, 64) @@ -191,11 +234,10 @@ func (cs *masterController) CreateVolume(ctx context.Context, req *csi.CreateVol glog.V(3).Infof("Chosen nodes: %v", chosenNodes) vol = &pmemVolume{ - id: volumeID, - name: req.Name, - size: asked, - nodeIDs: chosenNodes, - volumeType: volumeType, + id: volumeID, + name: req.Name, + size: asked, + nodeIDs: chosenNodes, } cs.pmemVolumes[volumeID] = vol glog.V(3).Infof("CreateVolume: Record new volume as %v", *vol) diff --git a/pkg/pmem-csi-driver/controllerserver-node.go b/pkg/pmem-csi-driver/controllerserver-node.go index 8bf6449918..0fc3f88f2e 100644 --- a/pkg/pmem-csi-driver/controllerserver-node.go +++ b/pkg/pmem-csi-driver/controllerserver-node.go @@ -20,6 +20,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" pmdmanager "github.com/intel/pmem-csi/pkg/pmem-device-manager" + pmemstate "github.com/intel/pmem-csi/pkg/pmem-state" "k8s.io/utils/keymutex" ) @@ -32,17 +33,30 @@ const ( ) type nodeVolume struct { - ID string - Name string - Size int64 - Erase bool - NsMode string + ID string `json:"id"` + Size int64 `json:"size"` + Params map[string]string `json:"parameters"` +} + +func (nv *nodeVolume) Copy() *nodeVolume { + nvCopy := &nodeVolume{ + ID: nv.ID, + Size: nv.Size, + Params: map[string]string{}, + } + + for k, v := range nv.Params { + nvCopy.Params[k] = v + } + + return nvCopy } type nodeControllerServer struct { *DefaultControllerServer nodeID string dm pmdmanager.PmemDeviceManager + sm pmemstate.StateManager pmemVolumes map[string]*nodeVolume // map of reqID:nodeVolume } @@ -51,18 +65,55 @@ var _ PmemService = &nodeControllerServer{} var nodeVolumeMutex = keymutex.NewHashed(-1) -func NewNodeControllerServer(nodeID string, dm pmdmanager.PmemDeviceManager) *nodeControllerServer { +func NewNodeControllerServer(nodeID string, dm pmdmanager.PmemDeviceManager, sm pmemstate.StateManager) *nodeControllerServer { serverCaps := []csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, csi.ControllerServiceCapability_RPC_LIST_VOLUMES, csi.ControllerServiceCapability_RPC_GET_CAPACITY, } - return &nodeControllerServer{ + + ncs := &nodeControllerServer{ DefaultControllerServer: NewDefaultControllerServer(serverCaps), nodeID: nodeID, dm: dm, + sm: sm, pmemVolumes: map[string]*nodeVolume{}, } + + // Restore provisioned volumes from state. + if sm != nil { + // Get actual devices at DeviceManager + devices, err := dm.ListDevices() + if err != nil { + glog.Warningf("Failed to get volumes: %s", err.Error()) + } + cleanupList := []string{} + v := &nodeVolume{} + err = sm.GetAll(v, func(id string) bool { + // See if the device data stored at StateManager is still valid + for _, devInfo := range devices { + if devInfo.Name == id { + ncs.pmemVolumes[id] = v.Copy() + return true + } + } + // if not found in DeviceManager's list, add to cleanupList + cleanupList = append(cleanupList, id) + + return true + }) + if err != nil { + glog.Warningf("Failed to load state on node %s: %s", nodeID, err.Error()) + } + + for _, id := range cleanupList { + if err := sm.Delete(id); err != nil { + glog.Warningf("Failed to delete stale volume %s from state : %s", id, err.Error()) + } + } + } + + return ncs } func (cs *nodeControllerServer) RegisterService(rpcServer *grpc.Server) { @@ -73,9 +124,10 @@ func (cs *nodeControllerServer) CreateVolume(ctx context.Context, req *csi.Creat var vol *nodeVolume topology := []*csi.Topology{} volumeID := "" - eraseafter := true nsmode := pmemNamespaceModeFsdax + var resp *csi.CreateVolumeResponse + if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { glog.Errorf("invalid create volume req: %v", req) return nil, err @@ -89,28 +141,27 @@ func (cs *nodeControllerServer) CreateVolume(ctx context.Context, req *csi.Creat return nil, status.Error(codes.InvalidArgument, "Name missing in request") } - // We recognize eraseafter=false/true, defaulting to true - if params := req.GetParameters(); params != nil { - if val, ok := params[pmemParameterKeyEraseAfter]; ok { - if bVal, err := strconv.ParseBool(val); err == nil { - eraseafter = bVal - } else { - glog.Warningf("Ignoring parameter %s:%s, reason: %s", pmemParameterKeyEraseAfter, val, err.Error()) - } - } + params := req.GetParameters() + if params != nil { if val, ok := params[pmemParameterKeyNamespaceMode]; ok { if val == pmemNamespaceModeFsdax || val == pmemNamespaceModeSector { nsmode = val - } else { - glog.Warningf("Ignoring parameter %s:%s, reason: unknown namespace mode", pmemParameterKeyNamespaceMode, val) } } if val, ok := params["_id"]; ok { /* use master controller provided volume uid */ volumeID = val + + delete(params, "_id") } + } else { + params = map[string]string{} } + // Keeping volume name as part of volume parameters, this helps to + // persist volume name and to pass to Master via ListVolumes. + params["Name"] = req.Name + /* choose volume uid if not provided by master controller */ if volumeID == "" { id, _ := uuid.NewUUID() //nolint: gosec @@ -133,17 +184,30 @@ func (cs *nodeControllerServer) CreateVolume(ctx context.Context, req *csi.Creat } } else { glog.V(4).Infof("CreateVolume: Name: %v req.Required: %v req.Limit: %v", req.Name, asked, req.GetCapacityRange().GetLimitBytes()) - - if err := cs.dm.CreateDevice(volumeID, uint64(asked), nsmode); err != nil { - return nil, status.Errorf(codes.Internal, "CreateVolume: failed to create volume: %s", err.Error()) - } vol = &nodeVolume{ ID: volumeID, - Name: req.GetName(), Size: asked, - Erase: eraseafter, - NsMode: nsmode, + Params: params, + } + if cs.sm != nil { + // Persist new volume state + if err := cs.sm.Create(volumeID, vol); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer func(id string) { + // Incase of failure, remove volume from state + if resp == nil { + if err := cs.sm.Delete(id); err != nil { + glog.Warningf("Delete volume state for id '%s' failed with error: %s", id, err.Error()) + } + } + }(volumeID) } + + if err := cs.dm.CreateDevice(volumeID, uint64(asked), nsmode); err != nil { + return nil, status.Errorf(codes.Internal, "CreateVolume: failed to create volume: %s", err.Error()) + } + cs.pmemVolumes[volumeID] = vol glog.V(3).Infof("CreateVolume: Record new volume as %v", *vol) } @@ -154,13 +218,15 @@ func (cs *nodeControllerServer) CreateVolume(ctx context.Context, req *csi.Creat }, }) - return &csi.CreateVolumeResponse{ + resp = &csi.CreateVolumeResponse{ Volume: &csi.Volume{ VolumeId: vol.ID, CapacityBytes: vol.Size, AccessibleTopology: topology, }, - }, nil + } + + return resp, nil } func (cs *nodeControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { @@ -180,15 +246,35 @@ func (cs *nodeControllerServer) DeleteVolume(ctx context.Context, req *csi.Delet defer nodeVolumeMutex.UnlockKey(req.VolumeId) //nolint: errcheck glog.V(4).Infof("DeleteVolume: volumeID: %v", req.GetVolumeId()) - if vol := cs.getVolumeByID(req.GetVolumeId()); vol != nil { - if err := cs.dm.DeleteDevice(req.VolumeId, vol.Erase); err != nil { - return nil, status.Errorf(codes.Internal, "Failed to delete volume: %s", err.Error()) + eraseafter := true + if vol := cs.getVolumeByID(req.VolumeId); vol != nil { + // We recognize eraseafter=false/true, defaulting to true + if val, ok := vol.Params[pmemParameterKeyEraseAfter]; ok { + bVal, err := strconv.ParseBool(val) + if err != nil { + glog.Warningf("Ignoring invalid parameter %s:%s, reason: %s, falling back to default: %v", + pmemParameterKeyEraseAfter, val, err.Error(), eraseafter) + } else { + eraseafter = bVal + } } - delete(cs.pmemVolumes, vol.ID) - glog.V(4).Infof("DeleteVolume: volume %s deleted", req.GetVolumeId()) } else { - glog.V(3).Infof("Volume %s not created by this controller", req.GetVolumeId()) + glog.V(3).Infof("Volume %s not found in driver cache.", req.VolumeId) } + + if err := cs.dm.DeleteDevice(req.VolumeId, eraseafter); err != nil { + return nil, status.Errorf(codes.Internal, "Failed to delete volume: %s", err.Error()) + } + if cs.sm != nil { + if err := cs.sm.Delete(req.VolumeId); err != nil { + glog.Warning("Failed to remove volume from state: ", err) + } + } + + delete(cs.pmemVolumes, req.VolumeId) + + glog.V(4).Infof("DeleteVolume: volume %s deleted", req.GetVolumeId()) + return &csi.DeleteVolumeResponse{}, nil } @@ -235,6 +321,7 @@ func (cs *nodeControllerServer) ListVolumes(ctx context.Context, req *csi.ListVo Volume: &csi.Volume{ VolumeId: vol.ID, CapacityBytes: vol.Size, + VolumeContext: vol.Params, }, }) } @@ -280,7 +367,7 @@ func (cs *nodeControllerServer) getVolumeByID(volumeID string) *nodeVolume { func (cs *nodeControllerServer) getVolumeByName(volumeName string) *nodeVolume { for _, pmemVol := range cs.pmemVolumes { - if pmemVol.Name == volumeName { + if pmemVol.Params["Name"] == volumeName { return pmemVol } } diff --git a/pkg/pmem-csi-driver/main.go b/pkg/pmem-csi-driver/main.go index e0d995bc19..85fccc97d7 100644 --- a/pkg/pmem-csi-driver/main.go +++ b/pkg/pmem-csi-driver/main.go @@ -14,7 +14,7 @@ import ( "k8s.io/klog" "k8s.io/klog/glog" - "github.com/intel/pmem-csi/pkg/pmem-common" + pmemcommon "github.com/intel/pmem-csi/pkg/pmem-common" ) var ( @@ -33,6 +33,7 @@ var ( controllerEndpoint = flag.String("controllerEndpoint", "", "internal node controller endpoint") deviceManager = flag.String("deviceManager", "lvm", "device manager to use to manage pmem devices. supported types: 'lvm' or 'ndctl'") showVersion = flag.Bool("version", false, "Show release version and exit") + driverStatePath = flag.String("statePath", "", "Directory path where to persist the state of the driver running on a node, defaults to /var/lib/") version = "unknown" // Set version during build time ) @@ -63,6 +64,7 @@ func Main() int { ClientKeyFile: *clientKeyFile, ControllerEndpoint: *controllerEndpoint, DeviceManager: *deviceManager, + StateBasePath: *driverStatePath, Version: version, }) if err != nil { diff --git a/pkg/pmem-csi-driver/nodeserver.go b/pkg/pmem-csi-driver/nodeserver.go index d1e9e4305c..fec7cd8155 100644 --- a/pkg/pmem-csi-driver/nodeserver.go +++ b/pkg/pmem-csi-driver/nodeserver.go @@ -187,6 +187,10 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request") } requestedFsType := req.GetVolumeCapability().GetMount().GetFsType() + if requestedFsType == "" { + // Default to ext4 filesystem + requestedFsType = "ext4" + } // Serialize by VolumeId volumeMutex.LockKey(req.GetVolumeId()) diff --git a/pkg/pmem-csi-driver/pmem-csi-driver.go b/pkg/pmem-csi-driver/pmem-csi-driver.go index 14a14c07fc..9c058fde85 100644 --- a/pkg/pmem-csi-driver/pmem-csi-driver.go +++ b/pkg/pmem-csi-driver/pmem-csi-driver.go @@ -19,6 +19,7 @@ import ( pmdmanager "github.com/intel/pmem-csi/pkg/pmem-device-manager" pmemgrpc "github.com/intel/pmem-csi/pkg/pmem-grpc" registry "github.com/intel/pmem-csi/pkg/pmem-registry" + pmemstate "github.com/intel/pmem-csi/pkg/pmem-state" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" @@ -72,6 +73,8 @@ type Config struct { ControllerEndpoint string //DeviceManager device manager to use DeviceManager string + //Directory where to persist the node driver state + StateBasePath string //Version driver release version Version string } @@ -104,6 +107,10 @@ func GetPMEMDriver(cfg Config) (*pmemDriver, error) { cfg.ControllerEndpoint = cfg.Endpoint } + if cfg.Mode == Node && cfg.StateBasePath == "" { + cfg.StateBasePath = "/var/lib/" + cfg.DriverName + } + peerName := "pmem-registry" if cfg.Mode == Controller { //When driver running in Controller mode, we connect to node controllers @@ -175,8 +182,12 @@ func (pmemd *pmemDriver) Run() error { if err != nil { return err } + sm, err := pmemstate.NewFileState(pmemd.cfg.StateBasePath) + if err != nil { + return err + } + cs := NewNodeControllerServer(pmemd.cfg.NodeID, dm, sm) ns := NewNodeServer(pmemd.cfg.NodeID, dm) - cs := NewNodeControllerServer(pmemd.cfg.NodeID, dm) if pmemd.cfg.Endpoint != pmemd.cfg.ControllerEndpoint { if err := s.Start(pmemd.cfg.ControllerEndpoint, pmemd.serverTLSConfig, cs); err != nil { diff --git a/pkg/pmem-csi-driver/registryserver.go b/pkg/pmem-csi-driver/registryserver.go index c7fd8be2e4..ec1f88ac6a 100644 --- a/pkg/pmem-csi-driver/registryserver.go +++ b/pkg/pmem-csi-driver/registryserver.go @@ -13,9 +13,17 @@ import ( "k8s.io/klog/glog" ) +type RegistryListener interface { + // OnNodeAdded is called by RegistryServer whenever a node controller registered. + OnNodeAdded(ctx context.Context, node NodeInfo) + // OnNodeDeleted is called by RegistryServer whenever a node controller unregistered. + OnNodeDeleted(ctx context.Context, node NodeInfo) +} + type registryServer struct { clientTLSConfig *tls.Config nodeClients map[string]NodeInfo + listeners map[RegistryListener]struct{} } var _ PmemService = ®istryServer{} @@ -31,6 +39,7 @@ func NewRegistryServer(tlsConfig *tls.Config) *registryServer { return ®istryServer{ clientTLSConfig: tlsConfig, nodeClients: map[string]NodeInfo{}, + listeners: map[RegistryListener]struct{}{}, } } @@ -57,6 +66,10 @@ func (rs *registryServer) ConnectToNodeController(nodeId string) (*grpc.ClientCo return pmemgrpc.Connect(nodeInfo.Endpoint, rs.clientTLSConfig) } +func (rs *registryServer) AddListener(l RegistryListener) { + rs.listeners[l] = struct{}{} +} + func (rs *registryServer) RegisterController(ctx context.Context, req *registry.RegisterControllerRequest) (*registry.RegisterControllerReply, error) { if req.GetNodeId() == "" { return nil, status.Error(codes.InvalidArgument, "Missing NodeId parameter") @@ -67,11 +80,17 @@ func (rs *registryServer) RegisterController(ctx context.Context, req *registry. } glog.V(3).Infof("Registering node: %s, endpoint: %s", req.NodeId, req.Endpoint) - rs.nodeClients[req.NodeId] = NodeInfo{ + node := NodeInfo{ NodeID: req.NodeId, Endpoint: req.Endpoint, } + rs.nodeClients[req.NodeId] = node + + for l := range rs.listeners { + l.OnNodeAdded(ctx, node) + } + return ®istry.RegisterControllerReply{}, nil } @@ -80,10 +99,15 @@ func (rs *registryServer) UnregisterController(ctx context.Context, req *registr return nil, status.Error(codes.InvalidArgument, "Missing NodeId parameter") } - if _, ok := rs.nodeClients[req.NodeId]; !ok { + node, ok := rs.nodeClients[req.NodeId] + if !ok { return nil, status.Errorf(codes.NotFound, "No entry with id '%s' found in registry", req.NodeId) } + for l := range rs.listeners { + l.OnNodeDeleted(ctx, node) + } + glog.V(3).Infof("Unregistering node: %s", req.NodeId) delete(rs.nodeClients, req.NodeId) diff --git a/pkg/pmem-state/pmem-state.go b/pkg/pmem-state/pmem-state.go new file mode 100644 index 0000000000..5e14d6f457 --- /dev/null +++ b/pkg/pmem-state/pmem-state.go @@ -0,0 +1,184 @@ +/* +Copyright 2019 Intel Corporation. + +SPDX-License-Identifier: Apache-2.0 +*/ +package pmemstate + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path" + "strings" + "sync" + + "github.com/pkg/errors" + "k8s.io/klog/glog" +) + +// GetAllFunc callback function used for StateManager.GetAll(). +// This function is called with ID, for each entry found in the state. +type GetAllFunc func(id string) bool + +// StateManager manages the driver persistent state, i.e, volumes information +type StateManager interface { + // Create creates an entry in the state with given id and data + Create(id string, data interface{}) error + // Delete deletes an entry found with the id from the state + Delete(id string) error + // Get retrives the entry data into location pointed by dataPtr. + Get(id string, dataPtr interface{}) error + // GetAll retrieves all entries found in the state, foreach functions is + // called with id for every entry found in the state, and entry data is filled in dataPtr. + // the caller has to copy the data if needed. + GetAll(dataPtr interface{}, foreach GetAllFunc) error +} + +// fileState Persists the state information into a file. +// This is is supposed to use by Nodes to persists the state. +type fileState struct { + location string + // lock holds read-write lock + lock sync.RWMutex + // stateDirLock holds lock on state directory + stateDirLock sync.Mutex +} + +var _ StateManager = &fileState{} + +// NewFileState instantiates the file state manager with given directory +// location. It ensures the provided directory exists. +// Returns error, if fails to create the direcotry incase of not pre-existing. +func NewFileState(directory string) (StateManager, error) { + if err := ensureLocation(directory); err != nil { + return nil, err + } + + return &fileState{ + location: directory, + }, nil +} + +// Create saves the volume metadata to file named .json +func (fs *fileState) Create(id string, data interface{}) error { + fs.lock.Lock() + defer fs.lock.Unlock() + + file := path.Join(fs.location, id+".json") + // Create new file for synchronous writes + fp, err := os.OpenFile(file, os.O_WRONLY|os.O_SYNC|os.O_CREATE|os.O_EXCL, 0600) + if err != nil { + return errors.Wrapf(err, "file-state: failed to create metadata storage file %s", file) + } + + if err := json.NewEncoder(fp).Encode(data); err != nil { + // cleanup file entry before returning error + fp.Close() //nolint: errcheck, gosec + if e := os.Remove(file); e != nil { + glog.Warningf("file-state: fail to remove file %s: %s", file, e.Error()) + } + return errors.Wrap(err, "file-state: failed to encode metadata") + } + + if err := fp.Close(); err != nil { + return errors.Wrapf(err, "file-state: failed to close metadata storage file %s", file) + } + + return fs.syncStateDir() +} + +// Delete deletes the metadata file saved for given volume id +func (fs *fileState) Delete(id string) error { + fs.lock.Lock() + defer fs.lock.Unlock() + + file := path.Join(fs.location, id+".json") + if err := os.Remove(file); err != nil && err != os.ErrNotExist { + return errors.Wrapf(err, "file-state: failed to delete file %s", file) + } + + return fs.syncStateDir() +} + +// Get retrieves metadata for given volume id to pointer location of dataPtr +func (fs *fileState) Get(id string, dataPtr interface{}) error { + return fs.readFileData(path.Join(fs.location, id+".json"), dataPtr) +} + +// GetAll retrieves metadata of all volumes found in fileState.location directory. +// reads all the .json files in fileState.location direcotry and decodes the filedata +func (fs *fileState) GetAll(dataPtr interface{}, f GetAllFunc) error { + fs.stateDirLock.Lock() + files, err := ioutil.ReadDir(fs.location) + fs.stateDirLock.Unlock() + if err != nil { + return errors.Wrapf(err, "file-state: failed to read metadata from %s", fs.location) + } + for _, fileInfo := range files { + fileName := fileInfo.Name() + if !strings.HasSuffix(fileName, ".json") { + continue + } + + file := path.Join(fs.location, fileName) + if err := fs.readFileData(file, dataPtr); err != nil { + return err + } + + id := fileName[0 : len(fileName)-len(".json")] + if !f(id) { + return nil + } + } + + return nil +} + +func ensureLocation(directory string) error { + info, err := os.Stat(directory) + if err != nil { + if os.IsNotExist(err) { + err = os.Mkdir(directory, 0750) + } + } else if !info.IsDir() { + err = fmt.Errorf("State location(%s) must be a directory", directory) + } + + return err +} + +func (fs *fileState) readFileData(file string, dataPtr interface{}) error { + fs.lock.RLock() + defer fs.lock.RUnlock() + + fp, err := os.OpenFile(file, os.O_RDONLY|os.O_SYNC, 0) //nolint: gosec + if err != nil { + return errors.Wrapf(err, "file-state: failed to open file %s", file) + } + defer fp.Close() //nolint: errcheck + + if err := json.NewDecoder(fp).Decode(dataPtr); err != nil { + return errors.Wrapf(err, "file-state: failed to decode metadata from file %s", file) + } + + return nil +} + +func (fs *fileState) syncStateDir() error { + var rErr error + fs.stateDirLock.Lock() + defer fs.stateDirLock.Unlock() + + if fp, err := os.Open(fs.location); err != nil { + rErr = errors.Wrap(err, "file-state: failed to open state directory for syncing") + } else if err := fp.Sync(); err != nil { + fp.Close() //nolint: errcheck + rErr = errors.Wrap(err, "file-state: fsync failure on state directroy") + } else if err := fp.Close(); err != nil { + rErr = errors.Wrap(err, "file-state: failed to close state directory after sync") + } + + return rErr +} diff --git a/pkg/pmem-state/pmem-state_test.go b/pkg/pmem-state/pmem-state_test.go new file mode 100644 index 0000000000..6ca59c6815 --- /dev/null +++ b/pkg/pmem-state/pmem-state_test.go @@ -0,0 +1,332 @@ +/* +Copyright 2019 Intel Corporation. + +SPDX-License-Identifier: Apache-2.0 +*/ +package pmemstate_test + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "strings" + "sync" + "testing" + "time" + + pmemstate "github.com/intel/pmem-csi/pkg/pmem-state" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +type testData struct { + Id string + Name string + Params map[string]string +} + +func (td *testData) IsEqual(other testData) bool { + if other.Id != td.Id { + return false + } + if other.Name != td.Name { + return false + } + + if len(td.Params) != len(other.Params) { + return false + } + + for k, v := range td.Params { + otherV, ok := other.Params[k] + if !ok || v != otherV { + return false + } + } + + return true +} + +func TestPmemState(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "PMEM State Suite") +} + +var _ = Describe("pmem state", func() { + var stateDir string + var DontClean bool = false + + BeforeEach(func() { + var err error + stateDir, err = ioutil.TempDir("", "pmemstate-") + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + if !DontClean { + os.RemoveAll(stateDir) + } + }) + + Context("State API", func() { + It("new file state", func() { + data := testData{ + Id: "id1", + Name: "test-data", + Params: map[string]string{ + "key1": "val1", + "key2": "val2", + }, + } + rData := testData{} + var err error + + _, err = pmemstate.NewFileState("") + Expect(err).To(HaveOccurred()) + + _, err = pmemstate.NewFileState("/unknown/base/direcotry/") + Expect(err).To(HaveOccurred()) + + file, err := ioutil.TempFile("", "pmemstate-file") + Expect(err).NotTo(HaveOccurred()) + _, err = pmemstate.NewFileState(file.Name()) + os.Remove(file.Name()) //nolint: errcheck + Expect(err).To(HaveOccurred()) + + Expect(stateDir).ShouldNot(BeNil()) + fs, err := pmemstate.NewFileState(stateDir) + Expect(err).NotTo(HaveOccurred()) + + err = fs.Create(data.Id, data) + Expect(err).NotTo(HaveOccurred()) + + err = fs.Get(data.Id, &rData) + Expect(err).NotTo(HaveOccurred()) + + Expect(data.IsEqual(rData)).To(Equal(true)) + }) + + It("multiple files", func() { + data := []testData{ + testData{ + Id: "one", + Name: "test-data1", + Params: map[string]string{ + "key1": "val1", + "key2": "val2", + }, + }, + testData{ + Id: "two", + Name: "test-data2", + Params: map[string]string{ + "key1": "val1", + "key2": "val2", + }, + }, + testData{ + Id: "three", + Name: "test-data3", + Params: map[string]string{ + "key1": "val1", + "key2": "val2", + }, + }, + } + var err error + + Expect(stateDir).ShouldNot(BeNil()) + fs, err := pmemstate.NewFileState(stateDir) + Expect(err).NotTo(HaveOccurred()) + + for _, d := range data { + err = fs.Create(d.Id, d) + Expect(err).NotTo(HaveOccurred()) + } + + for _, d := range data { + rData := testData{} + err = fs.Get(d.Id, &rData) + Expect(err).NotTo(HaveOccurred()) + Expect(d.IsEqual(rData)).To(Equal(true)) + } + + // Delete and GetAll tests + rData := testData{} + err = fs.GetAll(&rData, func(id string) bool { + found := false + for _, d := range data { + if d.Id == id { + found = true + Expect(d.IsEqual(rData)).To(Equal(true)) + + err = fs.Delete(id) + Expect(err).NotTo(HaveOccurred()) + break + } + } + Expect(found).To(Equal(true)) + return true + }) + Expect(err).NotTo(HaveOccurred()) + + // Should have left no file + err = fs.GetAll(&rData, func(id string) bool { + Expect(id).NotTo(HaveOccurred()) + return false + }) + Expect(err).NotTo(HaveOccurred()) + + }) + + It("read write files", func() { + data := []testData{ + testData{ + Id: "one", + Name: "test-data1", + Params: map[string]string{ + "key1": "val1", + "key2": "val2", + }, + }, + testData{ + Id: "two", + Name: "test-data2", + Params: map[string]string{ + "key1": "val1", + "key2": "val2", + }, + }, + testData{ + Id: "three", + Name: "test-data3", + Params: map[string]string{ + "key1": "val1", + "key2": "val2", + }, + }, + } + var err error + + Expect(stateDir).ShouldNot(BeNil()) + fs, err := pmemstate.NewFileState(stateDir) + Expect(err).NotTo(HaveOccurred()) + + for _, d := range data { + err = fs.Create(d.Id, d) + Expect(err).NotTo(HaveOccurred()) + } + + for _, d := range data { + rData := testData{} + err = fs.Get(d.Id, &rData) + Expect(err).NotTo(HaveOccurred()) + Expect(d.IsEqual(rData)).To(Equal(true)) + } + }) + + It("concurrent read writes", func() { + data := []testData{ + testData{ + Id: "one", + Name: "test-data1", + Params: map[string]string{ + "key1": "val1", + "key2": "val2", + }, + }, + testData{ + Id: "two", + Name: "test-data2", + Params: map[string]string{ + "key1": "val1", + "key2": "val2", + }, + }, + testData{ + Id: "three", + Name: "test-data3", + Params: map[string]string{ + "key1": "val1", + "key2": "val2", + }, + }, + } + + Expect(stateDir).ShouldNot(BeNil()) + fs, err := pmemstate.NewFileState(stateDir) + Expect(err).NotTo(HaveOccurred()) + + wg := sync.WaitGroup{} + for _, d := range data { + wg.Add(1) + // Writer + go func(fs_ pmemstate.StateManager, d_ testData) { + var createError error + fmt.Printf(">> %s\n", d_.Id) + createError = fs_.Create(d_.Id, d_) + Expect(createError).NotTo(HaveOccurred()) + wg.Done() + }(fs, d) + + wg.Add(1) + // Reader + go func(fs_ pmemstate.StateManager, d_ testData) { + var getErr error + rData := testData{} + fmt.Printf("<< %s\n", d_.Id) + i := 0 + for i < 10 { + getErr = fs_.Get(d_.Id, &rData) + if getErr == nil { + break + } + if strings.HasSuffix(getErr.Error(), "no such file or directory") { + // Might not ready yet, try again + i++ + fmt.Printf("<< File '%s' is not ready yet, will retry(%d)\n", d_.Id, i) + time.Sleep(200 * time.Millisecond) + } else { + break + } + } + Expect(getErr).NotTo(HaveOccurred()) + Expect(d_.IsEqual(rData)).To(Equal(true)) + wg.Done() + }(fs, d) + } + wg.Wait() + }) + + It("corrupted file", func() { + data := testData{ + Id: "one", + Name: "test-data1", + Params: map[string]string{ + "key1": "val1", + "key2": "val2", + }, + } + var err error + + Expect(stateDir).ShouldNot(BeNil()) + fs, err := pmemstate.NewFileState(stateDir) + Expect(err).NotTo(HaveOccurred()) + + err = fs.Create(data.Id, data) + Expect(err).NotTo(HaveOccurred()) + + // truncate file data + file := path.Join(stateDir, data.Id+".json") + fInfo, err := os.Stat(file) + Expect(err).NotTo(HaveOccurred()) + err = os.Truncate(file, fInfo.Size()-10) + Expect(err).NotTo(HaveOccurred()) + + rData := testData{} + err = fs.Get(data.Id, &rData) + Expect(err).To(HaveOccurred()) + }) + }) +})