Skip to content

Commit

Permalink
squashme: fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
olevski committed Feb 5, 2025
1 parent dce89f0 commit 3b96954
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 38 deletions.
11 changes: 9 additions & 2 deletions cmd/csi-rclone-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,22 @@ func handleNode() {
klog.Warningf("There was an error when trying to unmount old volumes: %e", err)
}
d := rclone.NewDriver(nodeID, endpoint)
err = d.RunNodeService()
ns, err := rclone.NewNodeServer(d.CSIDriver)
if err != nil {
panic(err)
}
d.WithNodeServer(ns)
err = d.Run()
if err != nil {
panic(err)
}
}

func handleController() {
d := rclone.NewDriver(nodeID, endpoint)
err := d.RunControllerService()
cs := rclone.NewControllerServer(d.CSIDriver)
d.WithControllerServer(cs)
err := d.Run()
if err != nil {
panic(err)
}
Expand Down
67 changes: 37 additions & 30 deletions pkg/rclone/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ import (
)

type Driver struct {
csiDriver *csicommon.CSIDriver
CSIDriver *csicommon.CSIDriver
endpoint string

ns *nodeServer
cs *controllerServer
cap []*csi.VolumeCapability_AccessMode
cscap []*csi.ControllerServiceCapability
Server csicommon.NonBlockingGRPCServer
server csicommon.NonBlockingGRPCServer
}

var (
Expand Down Expand Up @@ -51,64 +52,70 @@ func NewDriver(nodeID, endpoint string) *Driver {
d := &Driver{}
d.endpoint = endpoint

d.csiDriver = csicommon.NewCSIDriver(driverName, DriverVersion, nodeID)
d.csiDriver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
d.CSIDriver = csicommon.NewCSIDriver(driverName, DriverVersion, nodeID)
d.CSIDriver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
})
d.csiDriver.AddControllerServiceCapabilities(
d.CSIDriver.AddControllerServiceCapabilities(
[]csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
})

return d
}

func (d *Driver) RunNodeService() error {
func NewNodeServer(csiDriver *csicommon.CSIDriver) (*nodeServer, error) {
kubeClient, err := kube.GetK8sClient()
if err != nil {
return err
return nil, err
}

rclonePort, err := getFreePort()
if err != nil {
return fmt.Errorf("Cannot get a free TCP port to run rclone")
return nil, fmt.Errorf("Cannot get a free TCP port to run rclone")
}
rcloneOps := NewRclone(kubeClient, rclonePort)

s := csicommon.NewNonBlockingGRPCServer()
ns := &nodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d.csiDriver),
return &nodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(csiDriver),
mounter: &mount.SafeFormatAndMount{
Interface: mount.New(""),
Exec: utilexec.New(),
},
RcloneOps: rcloneOps,
}, nil
}

func NewControllerServer(csiDriver *csicommon.CSIDriver) *controllerServer {
return &controllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(csiDriver),
active_volumes: map[string]int64{},
mutex: sync.RWMutex{},
}
s.Start(
d.endpoint,
csicommon.NewDefaultIdentityServer(d.csiDriver),
nil,
ns,
)
d.Server = s
}

func (d *Driver) WithNodeServer(ns *nodeServer) *Driver {
d.ns = ns
return d
}

return rcloneOps.Run()
func (d *Driver) WithControllerServer(cs *controllerServer) *Driver {
d.cs = cs
return d
}

func (d *Driver) RunControllerService() error {
func (d *Driver) Run() error {
s := csicommon.NewNonBlockingGRPCServer()
s.Start(
d.endpoint,
csicommon.NewDefaultIdentityServer(d.csiDriver),
&controllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d.csiDriver),
active_volumes: map[string]int64{},
mutex: sync.RWMutex{},
},
nil,
csicommon.NewDefaultIdentityServer(d.CSIDriver),
d.cs,
d.ns,
)
d.Server = s
d.server = s
if d.ns != nil && d.ns.RcloneOps != nil {
return d.ns.RcloneOps.Run()
}
s.Wait()
return nil
}
Expand All @@ -118,8 +125,8 @@ func (d *Driver) Stop() error {
if d.ns != nil && d.ns.RcloneOps != nil {
err = d.ns.RcloneOps.Cleanup()
}
if d.Server != nil {
d.Server.Stop()
if d.server != nil {
d.server.Stop()
}
return err
}
11 changes: 5 additions & 6 deletions test/sanity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,13 @@ var _ = Describe("Sanity CSI checks", Ordered, func() {
Expect(err).ShouldNot(HaveOccurred())
os.Setenv("DRIVER_NAME", "csi-rclone")
driver = rclone.NewDriver("hostname", endpoint)
cs := rclone.NewControllerServer(driver.CSIDriver)
ns, err := rclone.NewNodeServer(driver.CSIDriver)
Expect(err).ShouldNot(HaveOccurred())
driver.WithControllerServer(cs).WithNodeServer(ns)
go func() {
defer GinkgoRecover()
err := driver.RunNodeService()
Expect(err).ShouldNot(HaveOccurred())
}()
go func() {
defer GinkgoRecover()
err := driver.RunControllerService()
err := driver.Run()
Expect(err).ShouldNot(HaveOccurred())
}()
_, err = utils.Connect(endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down

0 comments on commit 3b96954

Please sign in to comment.