Skip to content

Commit

Permalink
add owner cleanup
Browse files Browse the repository at this point in the history
In the case that a duration is set (the pod or pod
template activeDeadlineSeconds) kick off a cleanup
job to remove it from Kubernetes and the scheduler.
Other deletions will need to be handled with events,
which I will do next! I also changed my mind about
the default deletion of 3600 - I think that can be
dangerous for services or other stuffs that should
not be expected to just end. We need to trust the
users creating the abstractions. Also, pods that
are associated with jobs clean themselves up, so
we do not want to double mess with that.

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Aug 4, 2024
1 parent 24ed8d2 commit 3411e3a
Show file tree
Hide file tree
Showing 13 changed files with 267 additions and 52 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,16 @@ SELECT group_name, group_size from pods_provisional;
- [x] cleanup job is triggered after duration
- [ ] issue cancel to fluxion and delete pods up to parent (how to do this)?
- [ ] When a job is not able to schedule, it should go into a rejected queue, which should finish and return a NOT SCHEDULABLE status.
- [ ] In cleanup we will need to handle [BlockOwnerDeletion](https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L319). I don't yet understand the cases under which this is used, but likely we want to delete the child object and allow the owner to do whatever is the default (create another pod, etc.)

Thinking:

- Need to walk through deletion / update process - right now we have cleanup event if there is termination time, otherwise we wait for pod event to informer
- Need to figure out how to handle explicit pod deletion - the callback has a job execution error we need to look into
- We can allow trying to schedule jobs in the future, although I'm not sure about that use case (add label to do this)
- What should we do if a pod is updated, and the group is removed?
- fluxion is deriving the nodes on its own, but we might get updated nodes from the scheduler. It might be good to think about how to use the fluxion-service container instead.
- more efficient to retrieve podspec from kubernetes instead of putting into database?

## License

Expand Down
23 changes: 20 additions & 3 deletions chart/templates/rbac.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: scheduler-plugins-scheduler
name: {{ .Values.scheduler.name }}
rules:
- apiGroups: [""]
resources: ["namespaces"]
Expand All @@ -26,6 +26,9 @@ rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "watch", "patch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["delete", "get", "list", "watch", "update"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["delete", "get", "list", "watch", "update"]
Expand Down Expand Up @@ -80,15 +83,29 @@ rules:
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: scheduler-plugins-scheduler
name: {{ .Values.scheduler.name }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: scheduler-plugins-scheduler
name: {{ .Values.scheduler.name }}
subjects:
- kind: ServiceAccount
name: {{ .Values.scheduler.name }}
namespace: {{ .Release.Namespace }}
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{ .Values.scheduler.name }}
namespace: {{ .Release.Namespace }}
subjects:
- kind: ServiceAccount
name: {{ .Values.scheduler.name }}
namespace: {{ .Release.Namespace }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: {{ .Values.scheduler.name }}
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
8 changes: 6 additions & 2 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ After install (see the [README](../README.md)) you can create any abstraction (p
|------|-------------|---------|
| "fluxnetes.group-name" | The name of the group | fluxnetes-group-<namespace>-<name> |
| "fluxnetes.group-size" | The size of the group | 1 |
| "fluxnetes.duration" | Duration of the job (seconds) | 3600 |

As you might guess, if you specify `fluxnetes` as the scheduler but don't provide any of the above, the defaults are used. This means a single
pod becomes a single member group. If you set the duration to 0, it will be unlimited. If you set a negative value, you'll get an error.
pod becomes a single member group.

### Duration

Any pod object (or PodSpec template) can accept an `activeDeadlineSeconds` and this is how you should set your job time. Note that if you don't set this value, there will
be no duration (and the pod or object can run forever), which is needed for services, etc.

## Design

Expand Down
15 changes: 15 additions & 0 deletions examples/job-deadline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: batch/v1
kind: Job
metadata:
name: job-deadline
spec:
template:
spec:
schedulerName: fluxnetes
activeDeadlineSeconds: 5
containers:
- name: job
image: busybox
command: [sleep, "10"]
restartPolicy: Never
backoffLimit: 4
1 change: 0 additions & 1 deletion examples/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ spec:
metadata:
labels:
fluxnetes.group-name: job
fluxnetes.duration: "5"
spec:
schedulerName: fluxnetes
containers:
Expand Down
14 changes: 14 additions & 0 deletions examples/pod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apiVersion: v1
kind: Pod
metadata:
name: pod
labels:
fluxnetes.group-name: pod
# fluxnetes.duration: "5"
spec:
activeDeadlineSeconds: 5
schedulerName: fluxnetes
containers:
- name: fruit
image: busybox
command: [sleep, "10"]
51 changes: 51 additions & 0 deletions kubernetes/pkg/fluxnetes/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package fluxnetes

import (
corev1 "k8s.io/api/core/v1"
klog "k8s.io/klog/v2"
)

// UpdatePod is called on an update, and the old and new object are presented
func (q *Queue) UpdatePodEvent(oldObj, newObj interface{}) {

pod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)

// a pod is updated, get the group. TODO: how to handle change in group name?
// groupName := groups.GetPodGroupName(oldPod)
switch pod.Status.Phase {
case corev1.PodPending:
klog.Infof("Received update event 'Pending' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name)
case corev1.PodRunning:
klog.Infof("Received update event 'Running' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name)
case corev1.PodSucceeded:
klog.Infof("Received update event 'Succeeded' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name)
case corev1.PodFailed:
klog.Infof("Received update event 'Failed' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name)
case corev1.PodUnknown:
klog.Infof("Received update event 'Unknown' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name)
default:
klog.Infof("Received unknown update event %s for pod %s/%s", newPod.Status.Phase, pod.Status.Phase, pod.Namespace, pod.Name)
}
}

// DeletePod handles the delete event handler
func (q *Queue) DeletePodEvent(podObj interface{}) {
pod := podObj.(*corev1.Pod)

switch pod.Status.Phase {
case corev1.PodPending:
klog.Infof("Received delete event 'Pending' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodRunning:
klog.Infof("Received delete event 'Running' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodSucceeded:
klog.Infof("Received delete event 'Succeeded' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodFailed:
klog.Infof("Received delete event 'Failed' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodUnknown:
klog.Infof("Received delete event 'Unknown' for pod %s/%s", pod.Namespace, pod.Name)
default:
klog.Infof("Received unknown update event %s for pod %s/%s", pod.Status.Phase, pod.Namespace, pod.Name)
}

}
54 changes: 34 additions & 20 deletions kubernetes/pkg/fluxnetes/group/group.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package group

import (
"context"
"fmt"
"time"

"strconv"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/defaults"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/labels"
)

Expand All @@ -18,9 +22,7 @@ type PodGroup struct {
Name string
Size int32
Timestamp metav1.MicroTime

// Duration in seconds
Duration int32
Duration int64
}

// getPodGroupName returns the pod group name
Expand Down Expand Up @@ -61,28 +63,40 @@ func GetPodGroupSize(pod *corev1.Pod) (int32, error) {
return int32(size), nil
}

// GetPodGroupDuration gets the runtime of a job in seconds
// We default to an hour (3600 seconds)
func GetPodGroupDuration(pod *corev1.Pod) (int32, error) {
// AddDuration adds the pod.Spec.ActiveDeadlineSeconds if it isn't set.
func AddDeadline(ctx context.Context, pod *corev1.Pod) error {

// Do we have a group size? This will be parsed as a string, likely
duration, ok := pod.Labels[labels.PodGroupDurationLabel]
if !ok {
duration = "3600"
pod.Labels[labels.PodGroupDurationLabel] = duration
// Cut out early if it is nil - will be added later
if pod.Spec.ActiveDeadlineSeconds == nil {
return nil
}

// We need the group size to be an integer now!
jobDuration, err := strconv.ParseInt(duration, 10, 32)
// Also cut out early with no error if one is set
if *pod.Spec.ActiveDeadlineSeconds > int64(0) {
return nil
}
payload := `{"spec": {"activeDeadlineSeconds": 3600}`
config, err := rest.InClusterConfig()
if err != nil {
return err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return defaults.DefaultDuration, err
return err
}
_, err = clientset.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.MergePatchType, []byte(payload), metav1.PatchOptions{})
return err
}

// GetPodGroupDuration gets the runtime of a job in seconds
// We default to an hour (3600 seconds)
func GetPodGroupDuration(pod *corev1.Pod) (int64, error) {

// The duration cannot be negative
if jobDuration < 0 {
return 0, fmt.Errorf("%s label must be >= 0", labels.PodGroupDurationLabel)
// It is already set
if pod.Spec.ActiveDeadlineSeconds != nil && *pod.Spec.ActiveDeadlineSeconds > int64(0) {
return *pod.Spec.ActiveDeadlineSeconds, nil
}
return int32(jobDuration), nil
// We can't enforce everything have a duration, lots of services should not.
return 0, nil
}

// GetPodCreationTimestamp
Expand Down
3 changes: 0 additions & 3 deletions kubernetes/pkg/fluxnetes/labels/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ const (
// We use the same label to be consistent
PodGroupLabel = "fluxnetes.group-name"
PodGroupSizeLabel = "fluxnetes.group-size"

// How long should the group run, in seconds (before cancel)
PodGroupDurationLabel = "fluxnetes.duration"
)

// GetPodGroupLabel get pod group name from pod labels
Expand Down
29 changes: 24 additions & 5 deletions kubernetes/pkg/fluxnetes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/util/slogutil"
"k8s.io/client-go/tools/cache"

"k8s.io/kubernetes/pkg/scheduler/framework"
groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries"
strategies "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy"
Expand All @@ -33,6 +34,7 @@ type Queue struct {
riverClient *river.Client[pgx.Tx]
EventChannel *QueueEvent
Strategy strategies.QueueStrategy
Handle framework.Handle

// IMPORTANT: subscriptions need to use same context
// that client submit them uses
Expand All @@ -55,7 +57,7 @@ type QueueEvent struct {
}

// NewQueue starts a new queue with a river client
func NewQueue(ctx context.Context) (*Queue, error) {
func NewQueue(ctx context.Context, handle framework.Handle) (*Queue, error) {
dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
if err != nil {
return nil, err
Expand Down Expand Up @@ -105,6 +107,7 @@ func NewQueue(ctx context.Context) (*Queue, error) {
Strategy: strategy,
Context: ctx,
ReservationDepth: depth,
Handle: handle,
}
queue.setupEvents()
return &queue, nil
Expand Down Expand Up @@ -139,6 +142,21 @@ func (q *Queue) setupEvents() {
q.EventChannel = &QueueEvent{Function: trigger, Channel: c}
}

// GetInformer returns the pod informer to run as a go routine
func (q *Queue) GetInformer() cache.SharedIndexInformer {

// Performance improvement when retrieving list of objects by namespace or we'll log 'index not exist' warning.
podsInformer := q.Handle.SharedInformerFactory().Core().V1().Pods().Informer()
podsInformer.AddIndexers(cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})

// Event handlers to call on update/delete for cleanup
podsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: q.UpdatePodEvent,
DeleteFunc: q.DeletePodEvent,
})
return podsInformer
}

// Enqueue a new job to the provisional queue
// 1. Assemble (discover or define) the group
// 2. Add to provisional table
Expand All @@ -150,13 +168,14 @@ func (q *Queue) Enqueue(pod *corev1.Pod) error {
if err != nil {
return err
}
duration, err := groups.GetPodGroupDuration(pod)

// Get the creation timestamp for the group
ts, err := q.GetCreationTimestamp(pod, groupName)
if err != nil {
return err
}

// Get the creation timestamp for the group
ts, err := q.GetCreationTimestamp(pod, groupName)
duration, err := groups.GetPodGroupDuration(pod)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 3411e3a

Please sign in to comment.