Skip to content

Commit

Permalink
cleanup: basic functionality added (#13)
Browse files Browse the repository at this point in the history
* cleanup: basic functionality added

This changeset adds support for a duration that drives cleanup,
meaning a duration in seconds can be provided as a label, and then
the label will be populated into the duration (seconds) to kickoff
a cleanup job after allocation. This currently is not doing a cleanup,
as we will need to walk up to a parent level abstraction (often deleting
the pod is not sufficient) and issue cancel to fluxion, but that will
come soon/next. I am also converting the fluxion service container build
to be multi-stage to hopefully make it smaller

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch authored Aug 3, 2024
1 parent 2edd3a6 commit 24ed8d2
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 26 deletions.
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,21 @@ SELECT group_name, group_size from pods_provisional;
- [ ] Restarting with postgres shouldn't have crashloopbackoff when the database isn't ready yet
- [ ] In-tree registry plugins (that are related to resources) should be run first to inform fluxion what nodes not to bind, where there are volumes, etc.
- [ ] The queue should inherit (and return) the start time (when the pod was first seen) "start" in scheduler.go
- [ ] need to test duration / completion time works (run job with short duration, should be cancelled/cleaned up)
- spam submission and test reservations (and cancel)
- Testing:
- [ ] need to test duration / completion time works (run job with short duration, should be cancelled/cleaned up)
- [ ] spam submission and test reservations (and cancel)
- [ ] implement other queue strategies (fcfs and backfill with > 1 reservation depth)
- fcfs can work by only adding one job (first in provisional) to the worker queue at once, only when it's empty! lol.
- [ ] create state diagram that shows how stuff works
- [ ] When a job is allocated, we likely need to submit a cancel job that will ensure it can be cancelled when the time runs out
- add the label for the job timeout, default to one hour
- note clear how to orchestrate this if we need parent object
- [x] add the label for the job timeout, default to one hour
- [x] cleanup job is triggered after duration
- [ ] issue cancel to fluxion and delete pods up to parent (how to do this)?
- [ ] When a job is not able to schedule, it should go into a rejected queue, which should finish and return a NOT SCHEDULABLE status.

Thinking:

- We can allow trying to schedule jobs in the future, although I'm not sure about that use case (add label to do this)
- When a job is not able to schedule, it should go into a rejected queue, which should finish and return a NOT SCHEDULABLE status.

## License

Expand Down
1 change: 1 addition & 0 deletions examples/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ spec:
metadata:
labels:
fluxnetes.group-name: job
fluxnetes.duration: "5"
spec:
schedulerName: fluxnetes
containers:
Expand Down
4 changes: 2 additions & 2 deletions kubernetes/pkg/fluxnetes/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package queries
const (
GetTimestampQuery = "select created_at from pods_provisional where group_name=$1 limit 1"
GetPodQuery = "select * from pods_provisional where group_name=$1 and namespace=$2 and name=$3"
InsertPodQuery = "insert into pods_provisional (podspec, namespace, name, created_at, group_name, group_size) values ($1, $2, $3, $4, $5, $6)"
InsertPodQuery = "insert into pods_provisional (podspec, namespace, name, duration, created_at, group_name, group_size) values ($1, $2, $3, $4, $5, $6, $7)"
CountPodsQuery = "select count(*) from pods_provisional where group_name=$1"
UpdateNodesQuery = "update river_job set args = jsonb_set(args, '{nodes}', to_jsonb($1::text)) where id=$2;"

Expand All @@ -25,5 +25,5 @@ const (

// Note that is used to be done with two queries - these are no longer used
SelectGroupsAtSizeQuery = "select group_name from pods_provisional group by group_name, group_size, created_at having group_size >= count(*) order by created_at desc;"
SelectGroupsQuery = "select group_name, group_size, podspec from pods_provisional where group_name in ('%s');"
SelectGroupsQuery = "select group_name, group_size, podspec, duration from pods_provisional where group_name in ('%s');"
)
3 changes: 2 additions & 1 deletion kubernetes/pkg/fluxnetes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ func (q *Queue) GetCreationTimestamp(pod *corev1.Pod, groupName string) (metav1.
ts := metav1.MicroTime{}

// This query will fail if there are no rows (the podGroup is not known)
err := q.Pool.QueryRow(context.Background(), queries.GetTimestampQuery, groupName).Scan(&ts)
row := q.Pool.QueryRow(context.Background(), queries.GetTimestampQuery, groupName)
err := row.Scan(&ts)
if err == nil {
klog.Info("Creation timestamp is", ts)
return ts, err
Expand Down
11 changes: 9 additions & 2 deletions kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type JobModel struct {
GroupName string `db:"group_name"`
GroupSize int32 `db:"group_size"`
Podspec string `db:"podspec"`
Duration int32 `db:"duration"`
// CreatedAt time.Time `db:"created_at"`
}

Expand Down Expand Up @@ -63,7 +64,8 @@ func (q *ProvisionalQueue) Enqueue(
if err != nil {
return err
}
_, err = q.pool.Query(ctx, queries.InsertPodQuery, string(podspec), pod.Namespace, pod.Name, ts, group.Name, group.Size)
_, err = q.pool.Query(ctx, queries.InsertPodQuery, string(podspec), pod.Namespace,
pod.Name, group.Duration, ts, group.Name, group.Size)

// Show the user a success or an error, we return it either way
if err != nil {
Expand Down Expand Up @@ -124,7 +126,12 @@ func (q *ProvisionalQueue) getGroupsAtSize(ctx context.Context, pool *pgxpool.Po

// TODO(vsoch) we need to collect all podspecs here and be able to give that to the worker
for _, model := range models {
jobArgs := workers.JobArgs{GroupName: model.GroupName, Podspec: model.Podspec, GroupSize: model.GroupSize}
jobArgs := workers.JobArgs{
GroupName: model.GroupName,
Podspec: model.Podspec,
GroupSize: model.GroupSize,
Duration: model.Duration,
}
lookup[model.GroupName] = jobArgs
}
for _, jobArgs := range lookup {
Expand Down
51 changes: 51 additions & 0 deletions kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package workers

import (
"context"
"fmt"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

"google.golang.org/grpc"

klog "k8s.io/klog/v2"

"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/defaults"
pb "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/fluxion-grpc"

"github.com/riverqueue/river"
Expand All @@ -17,6 +22,9 @@ type CleanupArgs struct {
// We don't need to know this, but it's nice for the user to see
GroupName string `json:"groupName"`
FluxID int64 `json:"fluxid"`

// Do we need to cleanup Kubernetes too?
Kubernetes bool `json:"kubernetes"`
}

// The cleanup workers cleans up a reservation (issuing cancel)
Expand All @@ -26,6 +34,49 @@ type CleanupWorker struct {
river.WorkerDefaults[CleanupArgs]
}

// SubmitCleanup submits a cleanup job N seconds into the future
func SubmitCleanup(
ctx context.Context,
pool *pgxpool.Pool,
seconds int32,
fluxID int64,
inKubernetes bool,
tags []string,
) error {

klog.Infof("SUBMIT CLEANUP starting for %d", fluxID)

client, err := river.ClientFromContextSafely[pgx.Tx](ctx)
if err != nil {
return fmt.Errorf("error getting client from context: %w", err)
}
tx, err := pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)

// Create scheduledAt time - N seconds from now
now := time.Now()
scheduledAt := now.Add(time.Second * time.Duration(seconds))

insertOpts := river.InsertOpts{
MaxAttempts: defaults.MaxAttempts,
Tags: tags,
Queue: "cancel_queue",
ScheduledAt: scheduledAt,
}
_, err = client.InsertTx(ctx, tx, CleanupArgs{FluxID: fluxID, Kubernetes: inKubernetes}, &insertOpts)
if err != nil {
return err
}
if err := tx.Commit(ctx); err != nil {
return err
}
klog.Infof("SUBMIT CLEANUP ending for %d", fluxID)
return nil
}

// Work performs the Cancel action
func (w CleanupWorker) Work(ctx context.Context, job *river.Job[CleanupArgs]) error {
klog.Infof("[CLEANUP-WORKER-START] Cleanup (cancel) running for jobid %s", job.Args.FluxID)
Expand Down
14 changes: 9 additions & 5 deletions kubernetes/pkg/fluxnetes/strategy/workers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type JobArgs struct {
Podspec string `json:"podspec"`
GroupName string `json:"groupName"`
GroupSize int32 `json:"groupSize"`
Duration int32 `json:"duration"`

// If true, we are allowed to ask Fluxion for a reservation
Reservation bool `json:"reservation"`
Expand Down Expand Up @@ -115,9 +116,12 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error {
return err
}

// Flux job identifier (known to fluxion)
fluxID := response.GetFluxID()

// If it's reserved, we need to add the id to our reservation table
if response.Reserved {
rRows, err := pool.Query(fluxionCtx, queries.AddReservationQuery, job.Args.GroupName, response.GetFluxID())
rRows, err := pool.Query(fluxionCtx, queries.AddReservationQuery, job.Args.GroupName, fluxID)
if err != nil {
return err
}
Expand Down Expand Up @@ -150,10 +154,10 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error {
defer rows.Close()

// Kick off a cleaning job for when everyting should be cancelled
// client, err := river.ClientFromContextSafely[pgx.Tx](ctx)
// if err != nil {
// return fmt.Errorf("error getting client from context: %w", err)
// }
err = SubmitCleanup(ctx, pool, job.Args.Duration, int64(fluxID), true, []string{})
if err != nil {
return err
}

// Collect rows into single result
// pgx.CollectRows(rows, pgx.RowTo[string])
Expand Down
16 changes: 13 additions & 3 deletions kubernetes/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,14 +489,24 @@ func (sched *Scheduler) Run(ctx context.Context) {
return
}

// This ensures we don't parse anything for the cleanup queue
// The function is added (to work on next) but not yet implemented how
// that is going to work
klog.Infof("Job Event Received: %s", event.Job.Kind)
if event.Job.Kind == "cleanup" {
continue
}
// TODO: possibly filter to queue name or similar
// if event.Queue.Name != river.DefaultQueue {continue}
// Note that job states are here:
// https://github.com/riverqueue/river/blob/master/riverdriver/riverpgxv5/migration/main/002_initial_schema.up.sql#L1-L9

// Parse event result into type
args := fluxnetes.JobResult{}
json.Unmarshal(event.Job.EncodedArgs[:], &args)

// We only care about job results to further process (not cleanup)
err = json.Unmarshal(event.Job.EncodedArgs[:], &args)
if err != nil {
continue
}
nodes := args.GetNodes()

if len(nodes) > 0 {
Expand Down
1 change: 1 addition & 0 deletions src/build/postgres/create-tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CREATE TABLE pods_provisional (
podspec TEXT NOT NULL,
namespace TEXT NOT NULL,
name TEXT NOT NULL,
duration INTEGER NOT NULL,
created_at timestamptz NOT NULL default NOW(),
group_name TEXT NOT NULL,
group_size INTEGER NOT NULL
Expand Down
27 changes: 19 additions & 8 deletions src/build/scheduler/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,22 @@ COPY fluxnetes Makefile /go/src/fluxnetes/

RUN go mod tidy && \
go mod vendor && \
make server FLUX_SCHED_ROOT=/opt/flux-sched && \
mkdir -p /home/data/jobspecs /home/data/jgf && chmod -R ugo+rwx /home/data && \
cp /go/src/fluxnetes/bin/server /bin/fluxion-service

# TODO minimize build, can we copy over libraries?
# FROM ubuntu:jammy
# COPY --from=builder /opt/flux-sched /opt/flux-sched
# COPY --from=builder /go/src/fluxnetes/bin/server /bin/fluxion-service
make server FLUX_SCHED_ROOT=/opt/flux-sched

# minimize build!
FROM ubuntu:jammy
COPY --from=builder /go/src/fluxnetes/bin/server /bin/fluxion-service
COPY --from=builder /usr/lib/flux/ /usr/lib/flux
COPY --from=builder /usr/lib/libflux* /usr/lib/

RUN apt-get update && apt-get -qq install -y --no-install-recommends \
libboost-graph-dev \
libboost-system-dev \
libboost-filesystem-dev \
libboost-regex-dev \
libyaml-cpp-dev \
libjansson-dev \
hwloc && \
apt-get clean && \
mkdir -p /home/data/jobspecs /home/data/jgf && chmod -R ugo+rwx /home/data
ENV LD_LIBRARY_PATH=/usr/local/lib:/usr/lib:/usr/lib/flux

0 comments on commit 24ed8d2

Please sign in to comment.