Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup: basic functionality added #13

Merged
merged 2 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,21 @@ SELECT group_name, group_size from pods_provisional;
- [ ] Restarting with postgres shouldn't have crashloopbackoff when the database isn't ready yet
- [ ] In-tree registry plugins (that are related to resources) should be run first to inform fluxion what nodes not to bind, where there are volumes, etc.
- [ ] The queue should inherit (and return) the start time (when the pod was first seen) "start" in scheduler.go
- [ ] need to test duration / completion time works (run job with short duration, should be cancelled/cleaned up)
- spam submission and test reservations (and cancel)
- Testing:
- [ ] need to test duration / completion time works (run job with short duration, should be cancelled/cleaned up)
- [ ] spam submission and test reservations (and cancel)
- [ ] implement other queue strategies (fcfs and backfill with > 1 reservation depth)
- fcfs can work by only adding one job (first in provisional) to the worker queue at once, only when it's empty! lol.
- [ ] create state diagram that shows how stuff works
- [ ] When a job is allocated, we likely need to submit a cancel job that will ensure it can be cancelled when the time runs out
- add the label for the job timeout, default to one hour
- note clear how to orchestrate this if we need parent object
- [x] add the label for the job timeout, default to one hour
- [x] cleanup job is triggered after duration
- [ ] issue cancel to fluxion and delete pods up to parent (how to do this)?
- [ ] When a job is not able to schedule, it should go into a rejected queue, which should finish and return a NOT SCHEDULABLE status.

Thinking:

- We can allow trying to schedule jobs in the future, although I'm not sure about that use case (add label to do this)
- When a job is not able to schedule, it should go into a rejected queue, which should finish and return a NOT SCHEDULABLE status.

## License

Expand Down
1 change: 1 addition & 0 deletions examples/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ spec:
metadata:
labels:
fluxnetes.group-name: job
fluxnetes.duration: "5"
spec:
schedulerName: fluxnetes
containers:
Expand Down
4 changes: 2 additions & 2 deletions kubernetes/pkg/fluxnetes/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package queries
const (
GetTimestampQuery = "select created_at from pods_provisional where group_name=$1 limit 1"
GetPodQuery = "select * from pods_provisional where group_name=$1 and namespace=$2 and name=$3"
InsertPodQuery = "insert into pods_provisional (podspec, namespace, name, created_at, group_name, group_size) values ($1, $2, $3, $4, $5, $6)"
InsertPodQuery = "insert into pods_provisional (podspec, namespace, name, duration, created_at, group_name, group_size) values ($1, $2, $3, $4, $5, $6, $7)"
CountPodsQuery = "select count(*) from pods_provisional where group_name=$1"
UpdateNodesQuery = "update river_job set args = jsonb_set(args, '{nodes}', to_jsonb($1::text)) where id=$2;"

Expand All @@ -25,5 +25,5 @@ const (

// Note that is used to be done with two queries - these are no longer used
SelectGroupsAtSizeQuery = "select group_name from pods_provisional group by group_name, group_size, created_at having group_size >= count(*) order by created_at desc;"
SelectGroupsQuery = "select group_name, group_size, podspec from pods_provisional where group_name in ('%s');"
SelectGroupsQuery = "select group_name, group_size, podspec, duration from pods_provisional where group_name in ('%s');"
)
3 changes: 2 additions & 1 deletion kubernetes/pkg/fluxnetes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ func (q *Queue) GetCreationTimestamp(pod *corev1.Pod, groupName string) (metav1.
ts := metav1.MicroTime{}

// This query will fail if there are no rows (the podGroup is not known)
err := q.Pool.QueryRow(context.Background(), queries.GetTimestampQuery, groupName).Scan(&ts)
row := q.Pool.QueryRow(context.Background(), queries.GetTimestampQuery, groupName)
err := row.Scan(&ts)
if err == nil {
klog.Info("Creation timestamp is", ts)
return ts, err
Expand Down
11 changes: 9 additions & 2 deletions kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type JobModel struct {
GroupName string `db:"group_name"`
GroupSize int32 `db:"group_size"`
Podspec string `db:"podspec"`
Duration int32 `db:"duration"`
// CreatedAt time.Time `db:"created_at"`
}

Expand Down Expand Up @@ -63,7 +64,8 @@ func (q *ProvisionalQueue) Enqueue(
if err != nil {
return err
}
_, err = q.pool.Query(ctx, queries.InsertPodQuery, string(podspec), pod.Namespace, pod.Name, ts, group.Name, group.Size)
_, err = q.pool.Query(ctx, queries.InsertPodQuery, string(podspec), pod.Namespace,
pod.Name, group.Duration, ts, group.Name, group.Size)

// Show the user a success or an error, we return it either way
if err != nil {
Expand Down Expand Up @@ -124,7 +126,12 @@ func (q *ProvisionalQueue) getGroupsAtSize(ctx context.Context, pool *pgxpool.Po

// TODO(vsoch) we need to collect all podspecs here and be able to give that to the worker
for _, model := range models {
jobArgs := workers.JobArgs{GroupName: model.GroupName, Podspec: model.Podspec, GroupSize: model.GroupSize}
jobArgs := workers.JobArgs{
GroupName: model.GroupName,
Podspec: model.Podspec,
GroupSize: model.GroupSize,
Duration: model.Duration,
}
lookup[model.GroupName] = jobArgs
}
for _, jobArgs := range lookup {
Expand Down
51 changes: 51 additions & 0 deletions kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package workers

import (
"context"
"fmt"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

"google.golang.org/grpc"

klog "k8s.io/klog/v2"

"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/defaults"
pb "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/fluxion-grpc"

"github.com/riverqueue/river"
Expand All @@ -17,6 +22,9 @@ type CleanupArgs struct {
// We don't need to know this, but it's nice for the user to see
GroupName string `json:"groupName"`
FluxID int64 `json:"fluxid"`

// Do we need to cleanup Kubernetes too?
Kubernetes bool `json:"kubernetes"`
}

// The cleanup workers cleans up a reservation (issuing cancel)
Expand All @@ -26,6 +34,49 @@ type CleanupWorker struct {
river.WorkerDefaults[CleanupArgs]
}

// SubmitCleanup submits a cleanup job N seconds into the future
func SubmitCleanup(
ctx context.Context,
pool *pgxpool.Pool,
seconds int32,
fluxID int64,
inKubernetes bool,
tags []string,
) error {

klog.Infof("SUBMIT CLEANUP starting for %d", fluxID)

client, err := river.ClientFromContextSafely[pgx.Tx](ctx)
if err != nil {
return fmt.Errorf("error getting client from context: %w", err)
}
tx, err := pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)

// Create scheduledAt time - N seconds from now
now := time.Now()
scheduledAt := now.Add(time.Second * time.Duration(seconds))

insertOpts := river.InsertOpts{
MaxAttempts: defaults.MaxAttempts,
Tags: tags,
Queue: "cancel_queue",
ScheduledAt: scheduledAt,
}
_, err = client.InsertTx(ctx, tx, CleanupArgs{FluxID: fluxID, Kubernetes: inKubernetes}, &insertOpts)
if err != nil {
return err
}
if err := tx.Commit(ctx); err != nil {
return err
}
klog.Infof("SUBMIT CLEANUP ending for %d", fluxID)
return nil
}

// Work performs the Cancel action
func (w CleanupWorker) Work(ctx context.Context, job *river.Job[CleanupArgs]) error {
klog.Infof("[CLEANUP-WORKER-START] Cleanup (cancel) running for jobid %s", job.Args.FluxID)
Expand Down
14 changes: 9 additions & 5 deletions kubernetes/pkg/fluxnetes/strategy/workers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type JobArgs struct {
Podspec string `json:"podspec"`
GroupName string `json:"groupName"`
GroupSize int32 `json:"groupSize"`
Duration int32 `json:"duration"`

// If true, we are allowed to ask Fluxion for a reservation
Reservation bool `json:"reservation"`
Expand Down Expand Up @@ -115,9 +116,12 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error {
return err
}

// Flux job identifier (known to fluxion)
fluxID := response.GetFluxID()

// If it's reserved, we need to add the id to our reservation table
if response.Reserved {
rRows, err := pool.Query(fluxionCtx, queries.AddReservationQuery, job.Args.GroupName, response.GetFluxID())
rRows, err := pool.Query(fluxionCtx, queries.AddReservationQuery, job.Args.GroupName, fluxID)
if err != nil {
return err
}
Expand Down Expand Up @@ -150,10 +154,10 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error {
defer rows.Close()

// Kick off a cleaning job for when everyting should be cancelled
// client, err := river.ClientFromContextSafely[pgx.Tx](ctx)
// if err != nil {
// return fmt.Errorf("error getting client from context: %w", err)
// }
err = SubmitCleanup(ctx, pool, job.Args.Duration, int64(fluxID), true, []string{})
if err != nil {
return err
}

// Collect rows into single result
// pgx.CollectRows(rows, pgx.RowTo[string])
Expand Down
16 changes: 13 additions & 3 deletions kubernetes/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,14 +489,24 @@ func (sched *Scheduler) Run(ctx context.Context) {
return
}

// This ensures we don't parse anything for the cleanup queue
// The function is added (to work on next) but not yet implemented how
// that is going to work
klog.Infof("Job Event Received: %s", event.Job.Kind)
if event.Job.Kind == "cleanup" {
continue
}
// TODO: possibly filter to queue name or similar
// if event.Queue.Name != river.DefaultQueue {continue}
// Note that job states are here:
// https://github.com/riverqueue/river/blob/master/riverdriver/riverpgxv5/migration/main/002_initial_schema.up.sql#L1-L9

// Parse event result into type
args := fluxnetes.JobResult{}
json.Unmarshal(event.Job.EncodedArgs[:], &args)

// We only care about job results to further process (not cleanup)
err = json.Unmarshal(event.Job.EncodedArgs[:], &args)
if err != nil {
continue
}
nodes := args.GetNodes()

if len(nodes) > 0 {
Expand Down
1 change: 1 addition & 0 deletions src/build/postgres/create-tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CREATE TABLE pods_provisional (
podspec TEXT NOT NULL,
namespace TEXT NOT NULL,
name TEXT NOT NULL,
duration INTEGER NOT NULL,
created_at timestamptz NOT NULL default NOW(),
group_name TEXT NOT NULL,
group_size INTEGER NOT NULL
Expand Down
27 changes: 19 additions & 8 deletions src/build/scheduler/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,22 @@ COPY fluxnetes Makefile /go/src/fluxnetes/

RUN go mod tidy && \
go mod vendor && \
make server FLUX_SCHED_ROOT=/opt/flux-sched && \
mkdir -p /home/data/jobspecs /home/data/jgf && chmod -R ugo+rwx /home/data && \
cp /go/src/fluxnetes/bin/server /bin/fluxion-service

# TODO minimize build, can we copy over libraries?
# FROM ubuntu:jammy
# COPY --from=builder /opt/flux-sched /opt/flux-sched
# COPY --from=builder /go/src/fluxnetes/bin/server /bin/fluxion-service
make server FLUX_SCHED_ROOT=/opt/flux-sched

# minimize build!
FROM ubuntu:jammy
COPY --from=builder /go/src/fluxnetes/bin/server /bin/fluxion-service
COPY --from=builder /usr/lib/flux/ /usr/lib/flux
COPY --from=builder /usr/lib/libflux* /usr/lib/

RUN apt-get update && apt-get -qq install -y --no-install-recommends \
libboost-graph-dev \
libboost-system-dev \
libboost-filesystem-dev \
libboost-regex-dev \
libyaml-cpp-dev \
libjansson-dev \
hwloc && \
apt-get clean && \
mkdir -p /home/data/jobspecs /home/data/jgf && chmod -R ugo+rwx /home/data
ENV LD_LIBRARY_PATH=/usr/local/lib:/usr/lib:/usr/lib/flux
Loading