Skip to content

Commit 71a378e

Browse files
authored
CSI: fix struct copying errors (#9239)
The CSIVolume struct "denormalizes" allocations when it's first queried from the state store. The CSIVolumeByID method on the state store copies the volume before denormalizing so that we don't end up with unexpected changes. The copying has some subtle bugs that meant that Allocations (as well as Topologies and MountOptions) were not getting copied when expected. Also, ensure we never write allocations attached to volumes to the state store during claims.
1 parent b5d2a4b commit 71a378e

File tree

4 files changed

+148
-30
lines changed

4 files changed

+148
-30
lines changed

nomad/csi_endpoint_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,10 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
561561

562562
if tc.expectedErrMsg == "" {
563563
require.NoError(t, err)
564+
vol, err = state.CSIVolumeByID(nil, ns, volID)
565+
require.NoError(t, err)
566+
require.NotNil(t, vol)
567+
require.Len(t, vol.ReadAllocs, 0)
564568
} else {
565569
require.Error(t, err)
566570
require.True(t, strings.Contains(err.Error(), tc.expectedErrMsg),

nomad/state/state_store.go

+22
Original file line numberDiff line numberDiff line change
@@ -2072,6 +2072,17 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
20722072
v.ModifyIndex = index
20732073
}
20742074

2075+
// Allocations are copy on write, so we want to keep the Allocation ID
2076+
// but we need to clear the pointer so that we don't store it when we
2077+
// write the volume to the state store. We'll get it from the db in
2078+
// denormalize.
2079+
for allocID := range v.ReadAllocs {
2080+
v.ReadAllocs[allocID] = nil
2081+
}
2082+
for allocID := range v.WriteAllocs {
2083+
v.WriteAllocs[allocID] = nil
2084+
}
2085+
20752086
err = txn.Insert("csi_volumes", v)
20762087
if err != nil {
20772088
return fmt.Errorf("volume insert: %v", err)
@@ -2263,6 +2274,17 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s
22632274

22642275
volume.ModifyIndex = index
22652276

2277+
// Allocations are copy on write, so we want to keep the Allocation ID
2278+
// but we need to clear the pointer so that we don't store it when we
2279+
// write the volume to the state store. We'll get it from the db in
2280+
// denormalize.
2281+
for allocID := range volume.ReadAllocs {
2282+
volume.ReadAllocs[allocID] = nil
2283+
}
2284+
for allocID := range volume.WriteAllocs {
2285+
volume.WriteAllocs[allocID] = nil
2286+
}
2287+
22662288
if err = txn.Insert("csi_volumes", volume); err != nil {
22672289
return fmt.Errorf("volume update failed: %s: %v", id, err)
22682290
}

nomad/structs/csi.go

+36-30
Original file line numberDiff line numberDiff line change
@@ -306,22 +306,14 @@ func NewCSIVolume(volumeID string, index uint64) *CSIVolume {
306306
}
307307

308308
func (v *CSIVolume) newStructs() {
309-
if v.Topologies == nil {
310-
v.Topologies = []*CSITopology{}
311-
}
312-
if v.Context == nil {
313-
v.Context = map[string]string{}
314-
}
315-
if v.Parameters == nil {
316-
v.Parameters = map[string]string{}
317-
}
318-
if v.Secrets == nil {
319-
v.Secrets = CSISecrets{}
320-
}
309+
v.Topologies = []*CSITopology{}
310+
v.MountOptions = new(CSIMountOptions)
311+
v.Secrets = CSISecrets{}
312+
v.Parameters = map[string]string{}
313+
v.Context = map[string]string{}
321314

322315
v.ReadAllocs = map[string]*Allocation{}
323316
v.WriteAllocs = map[string]*Allocation{}
324-
325317
v.ReadClaims = map[string]*CSIVolumeClaim{}
326318
v.WriteClaims = map[string]*CSIVolumeClaim{}
327319
v.PastClaims = map[string]*CSIVolumeClaim{}
@@ -386,7 +378,7 @@ func (v *CSIVolume) WriteSchedulable() bool {
386378
func (v *CSIVolume) WriteFreeClaims() bool {
387379
switch v.AccessMode {
388380
case CSIVolumeAccessModeSingleNodeWriter, CSIVolumeAccessModeMultiNodeSingleWriter:
389-
return len(v.WriteAllocs) == 0
381+
return len(v.WriteClaims) == 0
390382
case CSIVolumeAccessModeMultiNodeMultiWriter:
391383
// the CSI spec doesn't allow for setting a max number of writers.
392384
// we track node resource exhaustion through v.ResourceExhausted
@@ -405,25 +397,31 @@ func (v *CSIVolume) InUse() bool {
405397

406398
// Copy returns a copy of the volume, which shares only the Topologies slice
407399
func (v *CSIVolume) Copy() *CSIVolume {
408-
copy := *v
409-
out := &copy
410-
out.newStructs()
400+
out := new(CSIVolume)
401+
*out = *v
402+
out.newStructs() // zero-out the non-primitive structs
403+
404+
for _, t := range v.Topologies {
405+
out.Topologies = append(out.Topologies, t.Copy())
406+
}
407+
if v.MountOptions != nil {
408+
*out.MountOptions = *v.MountOptions
409+
}
410+
for k, v := range v.Secrets {
411+
out.Secrets[k] = v
412+
}
411413
for k, v := range v.Parameters {
412414
out.Parameters[k] = v
413415
}
414416
for k, v := range v.Context {
415417
out.Context[k] = v
416418
}
417-
for k, v := range v.Secrets {
418-
out.Secrets[k] = v
419-
}
420419

421-
for k, v := range v.ReadAllocs {
422-
out.ReadAllocs[k] = v
420+
for k, alloc := range v.ReadAllocs {
421+
out.ReadAllocs[k] = alloc.Copy()
423422
}
424-
425-
for k, v := range v.WriteAllocs {
426-
out.WriteAllocs[k] = v
423+
for k, alloc := range v.WriteAllocs {
424+
out.WriteAllocs[k] = alloc.Copy()
427425
}
428426

429427
for k, v := range v.ReadClaims {
@@ -498,7 +496,7 @@ func (v *CSIVolume) ClaimWrite(claim *CSIVolumeClaim, alloc *Allocation) error {
498496
if !v.WriteFreeClaims() {
499497
// Check the blocking allocations to see if they belong to this job
500498
for _, a := range v.WriteAllocs {
501-
if a.Namespace != alloc.Namespace || a.JobID != alloc.JobID {
499+
if a != nil && (a.Namespace != alloc.Namespace || a.JobID != alloc.JobID) {
502500
return fmt.Errorf("volume max claim reached")
503501
}
504502
}
@@ -775,19 +773,19 @@ func (p *CSIPlugin) Copy() *CSIPlugin {
775773
out.newStructs()
776774

777775
for k, v := range p.Controllers {
778-
out.Controllers[k] = v
776+
out.Controllers[k] = v.Copy()
779777
}
780778

781779
for k, v := range p.Nodes {
782-
out.Nodes[k] = v
780+
out.Nodes[k] = v.Copy()
783781
}
784782

785783
for k, v := range p.ControllerJobs {
786-
out.ControllerJobs[k] = v
784+
out.ControllerJobs[k] = v.Copy()
787785
}
788786

789787
for k, v := range p.NodeJobs {
790-
out.NodeJobs[k] = v
788+
out.NodeJobs[k] = v.Copy()
791789
}
792790

793791
return out
@@ -989,6 +987,14 @@ type JobDescription struct {
989987
// JobNamespacedDescriptions maps Job.ID to JobDescription
990988
type JobNamespacedDescriptions map[string]JobDescription
991989

990+
func (j JobNamespacedDescriptions) Copy() JobNamespacedDescriptions {
991+
copy := JobNamespacedDescriptions{}
992+
for k, v := range j {
993+
copy[k] = v
994+
}
995+
return copy
996+
}
997+
992998
// JobDescriptions maps Namespace to a mapping of Job.ID to JobDescription
993999
type JobDescriptions map[string]JobNamespacedDescriptions
9941000

nomad/structs/csi_test.go

+86
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package structs
22

33
import (
4+
"reflect"
45
"testing"
6+
"time"
57

68
"github.com/stretchr/testify/require"
79
)
@@ -43,6 +45,90 @@ func TestCSIVolumeClaim(t *testing.T) {
4345
require.True(t, vol.WriteFreeClaims())
4446
}
4547

48+
func TestVolume_Copy(t *testing.T) {
49+
50+
a1 := MockAlloc()
51+
a2 := MockAlloc()
52+
a3 := MockAlloc()
53+
c1 := &CSIVolumeClaim{
54+
AllocationID: a1.ID,
55+
NodeID: a1.NodeID,
56+
ExternalNodeID: "c1",
57+
Mode: CSIVolumeClaimRead,
58+
State: CSIVolumeClaimStateTaken,
59+
}
60+
c2 := &CSIVolumeClaim{
61+
AllocationID: a2.ID,
62+
NodeID: a2.NodeID,
63+
ExternalNodeID: "c2",
64+
Mode: CSIVolumeClaimRead,
65+
State: CSIVolumeClaimStateNodeDetached,
66+
}
67+
c3 := &CSIVolumeClaim{
68+
AllocationID: a3.ID,
69+
NodeID: a3.NodeID,
70+
ExternalNodeID: "c3",
71+
Mode: CSIVolumeClaimWrite,
72+
State: CSIVolumeClaimStateTaken,
73+
}
74+
75+
v1 := &CSIVolume{
76+
ID: "vol1",
77+
Name: "vol1",
78+
ExternalID: "vol-abcdef",
79+
Namespace: "default",
80+
Topologies: []*CSITopology{{Segments: map[string]string{"AZ1": "123"}}},
81+
AccessMode: CSIVolumeAccessModeSingleNodeWriter,
82+
AttachmentMode: CSIVolumeAttachmentModeBlockDevice,
83+
MountOptions: &CSIMountOptions{FSType: "ext4", MountFlags: []string{"ro", "noatime"}},
84+
Secrets: CSISecrets{"mysecret": "myvalue"},
85+
Parameters: map[string]string{"param1": "val1"},
86+
Context: map[string]string{"ctx1": "val1"},
87+
88+
ReadAllocs: map[string]*Allocation{a1.ID: a1, a2.ID: nil},
89+
WriteAllocs: map[string]*Allocation{a3.ID: a3},
90+
91+
ReadClaims: map[string]*CSIVolumeClaim{a1.ID: c1, a2.ID: c2},
92+
WriteClaims: map[string]*CSIVolumeClaim{a3.ID: c3},
93+
PastClaims: map[string]*CSIVolumeClaim{},
94+
95+
Schedulable: true,
96+
PluginID: "moosefs",
97+
Provider: "n/a",
98+
ProviderVersion: "1.0",
99+
ControllerRequired: true,
100+
ControllersHealthy: 2,
101+
ControllersExpected: 2,
102+
NodesHealthy: 4,
103+
NodesExpected: 5,
104+
ResourceExhausted: time.Now(),
105+
}
106+
107+
v2 := v1.Copy()
108+
if !reflect.DeepEqual(v1, v2) {
109+
t.Fatalf("Copy() returned an unequal Volume; got %#v; want %#v", v1, v2)
110+
}
111+
112+
v1.ReadClaims[a1.ID].State = CSIVolumeClaimStateReadyToFree
113+
v1.ReadAllocs[a2.ID] = a2
114+
v1.WriteAllocs[a3.ID].ClientStatus = AllocClientStatusComplete
115+
v1.MountOptions.FSType = "zfs"
116+
117+
if v2.ReadClaims[a1.ID].State == CSIVolumeClaimStateReadyToFree {
118+
t.Fatalf("Volume.Copy() failed; changes to original ReadClaims seen in copy")
119+
}
120+
if v2.ReadAllocs[a2.ID] != nil {
121+
t.Fatalf("Volume.Copy() failed; changes to original ReadAllocs seen in copy")
122+
}
123+
if v2.WriteAllocs[a3.ID].ClientStatus == AllocClientStatusComplete {
124+
t.Fatalf("Volume.Copy() failed; changes to original WriteAllocs seen in copy")
125+
}
126+
if v2.MountOptions.FSType == "zfs" {
127+
t.Fatalf("Volume.Copy() failed; changes to original MountOptions seen in copy")
128+
}
129+
130+
}
131+
46132
func TestCSIPluginJobs(t *testing.T) {
47133
plug := NewCSIPlugin("foo", 1000)
48134
controller := &Job{

0 commit comments

Comments
 (0)