Skip to content

Commit 68754e7

Browse files
committed
Support draining daemonset pods that use sriov devices
with this commit we also take care of removing daemonset owned pods using sriov devices. we only do it when drain is requested we don't do it for reboot requests Signed-off-by: Sebastian Sch <[email protected]>
1 parent 9ddf872 commit 68754e7

File tree

4 files changed

+261
-26
lines changed

4 files changed

+261
-26
lines changed

controllers/drain_controller_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ func expectNodeStateAnnotation(nodeState *sriovnetworkv1.SriovNetworkNodeState,
326326
g.Expect(utils.ObjectHasAnnotation(nodeState, constants.NodeStateDrainAnnotationCurrent, expectedAnnotationValue)).
327327
To(BeTrue(),
328328
"Node[%s] annotation[%s] == '%s'. Expected '%s'", nodeState.Name, constants.NodeDrainAnnotation, nodeState.GetLabels()[constants.NodeStateDrainAnnotationCurrent], expectedAnnotationValue)
329-
}, "20s", "1s").Should(Succeed())
329+
}, "200s", "1s").Should(Succeed())
330330
}
331331

332332
func expectNumberOfDrainingNodes(numbOfDrain int, nodesState ...*sriovnetworkv1.SriovNetworkNodeState) {

pkg/drain/drainer.go

+83-23
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
corev1 "k8s.io/api/core/v1"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1011
"k8s.io/apimachinery/pkg/util/wait"
1112
"k8s.io/client-go/kubernetes"
1213
"k8s.io/kubectl/pkg/drain"
@@ -68,29 +69,35 @@ func (d *Drainer) DrainNode(ctx context.Context, node *corev1.Node, fullNodeDrai
6869
return false, nil
6970
}
7071

71-
drainHelper := createDrainHelper(d.kubeClient, ctx, fullNodeDrain)
7272
backoff := wait.Backoff{
7373
Steps: 5,
74-
Duration: 10 * time.Second,
74+
Duration: 5 * time.Second,
7575
Factor: 2,
7676
}
7777
var lastErr error
78-
7978
reqLogger.Info("drainNode(): Start draining")
80-
if err = wait.ExponentialBackoff(backoff, func() (bool, error) {
79+
if err = wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
80+
drainHelper := createDrainHelper(d.kubeClient, ctx, fullNodeDrain)
8181
err := drain.RunCordonOrUncordon(drainHelper, node, true)
8282
if err != nil {
8383
lastErr = err
8484
reqLogger.Info("drainNode(): Cordon failed, retrying", "error", err)
8585
return false, nil
8686
}
8787
err = drain.RunNodeDrain(drainHelper, node.Name)
88-
if err == nil {
89-
return true, nil
88+
if err != nil {
89+
lastErr = err
90+
reqLogger.Info("drainNode(): Draining failed, retrying", "error", err)
91+
return false, nil
9092
}
91-
lastErr = err
92-
reqLogger.Info("drainNode(): Draining failed, retrying", "error", err)
93-
return false, nil
93+
94+
err = d.removeDaemonSetsFromNode(ctx, node.Name)
95+
if err != nil {
96+
lastErr = err
97+
return false, nil
98+
}
99+
100+
return true, nil
94101
}); err != nil {
95102
if wait.Interrupted(err) {
96103
reqLogger.Info("drainNode(): failed to drain node", "steps", backoff.Steps, "error", lastErr)
@@ -131,6 +138,28 @@ func (d *Drainer) CompleteDrainNode(ctx context.Context, node *corev1.Node) (boo
131138
return completed, nil
132139
}
133140

141+
// removeDaemonSetsFromNode go over all the remain pods and search for DaemonSets that have SR-IOV devices to remove them
142+
// we can't use the drain from core kubernetes as it doesn't support removing pods that are part of a DaemonSets
143+
func (d *Drainer) removeDaemonSetsFromNode(ctx context.Context, nodeName string) error {
144+
reqLogger := log.FromContext(ctx)
145+
reqLogger.Info("drainNode(): remove DaemonSets using sriov devices from node", "nodeName", nodeName)
146+
147+
podList, err := d.kubeClient.CoreV1().Pods("").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName)})
148+
if err != nil {
149+
reqLogger.Info("drainNode(): Failed to list pods, retrying", "error", err)
150+
return err
151+
}
152+
153+
// remove pods that are owned by a DaemonSet and use SR-IOV devices
154+
dsPodsList := getDsPodsToRemove(podList)
155+
drainHelper := createDrainHelper(d.kubeClient, ctx, true)
156+
err = drainHelper.DeleteOrEvictPods(dsPodsList)
157+
if err != nil {
158+
reqLogger.Error(err, "failed to delete or evict pods from node", "nodeName", nodeName)
159+
}
160+
return err
161+
}
162+
134163
// createDrainHelper function to create a drain helper
135164
// if fullDrain is false we only remove pods that have the resourcePrefix
136165
// if not we remove all the pods in the node
@@ -150,25 +179,21 @@ func createDrainHelper(kubeClient kubernetes.Interface, ctx context.Context, ful
150179
}
151180
log.Log.Info(fmt.Sprintf("%s pod from Node %s/%s", verbStr, pod.Namespace, pod.Name))
152181
},
153-
Ctx: ctx,
154-
Out: writer{logger.Info},
155-
ErrOut: writer{func(msg string, kv ...interface{}) { logger.Error(nil, msg, kv...) }},
182+
Ctx: ctx,
183+
Out: writer{logger.Info},
184+
ErrOut: writer{func(msg string, kv ...interface{}) {
185+
logger.Error(nil, msg, kv...)
186+
}},
156187
}
157188

158189
// when we just want to drain and not reboot we can only remove the pods using sriov devices
159190
if !fullDrain {
160191
deleteFunction := func(p corev1.Pod) drain.PodDeleteStatus {
161-
for _, c := range p.Spec.Containers {
162-
if c.Resources.Requests != nil {
163-
for r := range c.Resources.Requests {
164-
if strings.HasPrefix(r.String(), vars.ResourcePrefix) {
165-
return drain.PodDeleteStatus{
166-
Delete: true,
167-
Reason: "pod contain SR-IOV device",
168-
Message: "SR-IOV network operator draining the node",
169-
}
170-
}
171-
}
192+
if podHasSRIOVDevice(&p) {
193+
return drain.PodDeleteStatus{
194+
Delete: true,
195+
Reason: "pod contains SR-IOV device",
196+
Message: "SR-IOV network operator draining the node",
172197
}
173198
}
174199
return drain.PodDeleteStatus{Delete: false}
@@ -179,3 +204,38 @@ func createDrainHelper(kubeClient kubernetes.Interface, ctx context.Context, ful
179204

180205
return drainer
181206
}
207+
208+
func podHasSRIOVDevice(p *corev1.Pod) bool {
209+
for _, c := range p.Spec.Containers {
210+
if c.Resources.Requests != nil {
211+
for r := range c.Resources.Requests {
212+
if strings.HasPrefix(r.String(), vars.ResourcePrefix) {
213+
return true
214+
}
215+
}
216+
}
217+
}
218+
219+
return false
220+
}
221+
222+
func podsHasDSOwner(p *corev1.Pod) bool {
223+
for _, o := range p.OwnerReferences {
224+
if o.Kind == "DaemonSet" {
225+
return true
226+
}
227+
}
228+
229+
return false
230+
}
231+
232+
func getDsPodsToRemove(pl *corev1.PodList) []corev1.Pod {
233+
podsToRemove := []corev1.Pod{}
234+
for _, pod := range pl.Items {
235+
if podsHasDSOwner(&pod) && podHasSRIOVDevice(&pod) {
236+
podsToRemove = append(podsToRemove, pod)
237+
}
238+
}
239+
240+
return podsToRemove
241+
}

test/conformance/tests/test_policy_configuration.go

+1
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,7 @@ var _ = Describe("[sriov] operator", Ordered, func() {
448448
createTestPod(nodeToTest, []string{sriovNetworkName})
449449
})
450450
})
451+
451452
Context("PF shutdown", func() {
452453
// 29398
453454
It("Should be able to create pods successfully if PF is down.Pods are able to communicate with each other on the same node", func() {

0 commit comments

Comments
 (0)