diff --git a/README.md b/README.md index 0aae6c9..a2e5880 100644 --- a/README.md +++ b/README.md @@ -173,9 +173,9 @@ SELECT group_name, group_size from pods_provisional; ### TODO - [ ] Figure out how In-tree registry plugins (that are related to resources) should be run to inform fluxion + - we likely want to move assume pod outside of that schedule function, or ensure pod passed matches. - [ ] Optimize queries. - [ ] Restarting with postgres shouldn't have crashloopbackoff when the database isn't ready yet -- [ ] Add back in the created at filter / sort to the queues (removed when was testing / debugging harder stuff) - [ ] The queue should inherit (and return) the start time (when the pod was first seen) "start" in scheduler.go - Testing: - [ ] need to test duration / completion time works (run job with short duration, should be cancelled/cleaned up) diff --git a/kubernetes/pkg/fluxnetes/queries/queries.go b/kubernetes/pkg/fluxnetes/queries/queries.go index d8ab7f2..592f781 100644 --- a/kubernetes/pkg/fluxnetes/queries/queries.go +++ b/kubernetes/pkg/fluxnetes/queries/queries.go @@ -19,7 +19,7 @@ const ( // 1. Select groups for which the size >= the number of pods we've seen // 2. Then get a representative pod to model the resources for the group // TODO add back created by and then sort by it desc - SelectGroupsAtSizeQuery = "select group_name, group_size, duration, podspec, namespace from groups_provisional where current_size >= group_size;" + SelectGroupsAtSizeQuery = "select group_name, group_size, duration, podspec, namespace from groups_provisional where current_size >= group_size order by created_at desc;" // This currently will use one podspec (and all names) and we eventually want it to use all podspecs SelectPodsQuery = `select name, podspec from pods_provisional where group_name = $1 and namespace = $2;` @@ -32,13 +32,14 @@ const ( DeleteProvisionalPodsQuery = "delete from pods_provisional where group_name = $1 and namespace = $2;" // TODO add created_at back - InsertIntoProvisionalQuery = "insert into pods_provisional (podspec, namespace, name, duration, group_name) select '%s', '%s', '%s', %d, '%s' where not exists (select (group_name, name, namespace) from pods_provisional where group_name = '%s' and namespace = '%s' and name = '%s');" + InsertIntoProvisionalQuery = "insert into pods_provisional (podspec, namespace, name, duration, group_name, created_at) select '%s', '%s', '%s', %d, '%s', $1 where not exists (select (group_name, name, namespace) from pods_provisional where group_name = '%s' and namespace = '%s' and name = '%s');" // Enqueue queries + // TODO these need escaping (sql injection) // 1. Single pods are added to the pods_provisional - this is how we track uniqueness (and eventually will grab all podspecs from here) // 2. Groups are added to the groups_provisional, and this is where we can easily store a current count // Note that we add a current_size of 1 here assuming the first creation is done paired with an existing pod (and then don't need to increment again) - InsertIntoGroupProvisional = "insert into groups_provisional (group_name, namespace, group_size, duration, podspec, current_size) select '%s', '%s', '%d', '%d', '%s', '1' WHERE NOT EXISTS (SELECT (group_name, namespace) FROM groups_provisional WHERE group_name = '%s' and namespace = '%s');" + InsertIntoGroupProvisional = "insert into groups_provisional (group_name, namespace, group_size, duration, podspec, current_size, created_at) select '%s', '%s', '%d', '%d', '%s', '1', $1 WHERE NOT EXISTS (SELECT (group_name, namespace) FROM groups_provisional WHERE group_name = '%s' and namespace = '%s');" IncrementGroupProvisional = "update groups_provisional set current_size = current_size + 1 where group_name = '%s' and namespace = '%s';" // After allocate success, we update pending with the ID. We retrieve it to issue fluxion to cancel when it finishes diff --git a/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go b/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go index 603fea5..e6a06ca 100644 --- a/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go +++ b/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" corev1 "k8s.io/api/core/v1" @@ -106,9 +107,9 @@ func (q *ProvisionalQueue) Enqueue( // Insert or fall back if does not exists to doing nothing // TODO add back timestamp, and optimize this function to minimize database exec calls - // ts := &pgtype.Timestamptz{Time: group.Timestamp.Time, Valid: true} + ts := &pgtype.Timestamptz{Time: group.Timestamp.Time, Valid: true} query := fmt.Sprintf(queries.InsertIntoProvisionalQuery, string(podspec), pod.Namespace, pod.Name, group.Duration, group.Name, group.Name, pod.Namespace, pod.Name) - _, err = pool.Exec(context.Background(), query) + _, err = pool.Exec(context.Background(), query, ts) if err != nil { klog.Infof("Error inserting pod %s/%s into provisional queue", pod.Namespace, pod.Name) return types.Unknown, err @@ -120,11 +121,10 @@ func (q *ProvisionalQueue) Enqueue( return types.Unknown, err } - // Next add to group provisional - will only add if does not exist, and if so, we make count 1 to - // avoid doing the increment call. - // TODO eventually need to insert timestamp here + // Next add to group provisional - will only add if does not exist + // and if so, we make count 1 to avoid incremental call query = fmt.Sprintf(queries.InsertIntoGroupProvisional, group.Name, pod.Namespace, group.Size, group.Duration, string(podspec), group.Name, pod.Namespace) - _, err = pool.Exec(ctx, query) + _, err = pool.Exec(ctx, query, ts) if err != nil { klog.Infof("Error inserting group into provisional %s", err) return types.Unknown, err diff --git a/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go b/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go index 3c69d3e..80a1109 100644 --- a/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go +++ b/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go @@ -136,7 +136,6 @@ func deleteObjects(ctx context.Context, podspec string) error { func deleteJob(ctx context.Context, namespace string, client kubernetes.Interface, owner metav1.OwnerReference) error { job, err := client.BatchV1().Jobs(namespace).Get(ctx, owner.Name, metav1.GetOptions{}) if err != nil { - klog.Infof("Error deleting job: %s", err) return err } klog.Infof("Found job %s/%s", job.Namespace, job.Name) @@ -165,7 +164,7 @@ func Cleanup( groupName string, ) error { - klog.Infof("[CLEANUP-START] Cleanup (cancel) running for jobid %s", fluxID) + klog.Infof("[CLEANUP-START] Cleanup (cancel) running for jobid %d", fluxID) // First attempt cleanup in the cluster, only if in Kubernetes if inKubernetes { @@ -183,7 +182,7 @@ func Cleanup( if fluxID > -1 { err = deleteFluxion(fluxID) if err != nil { - klog.Infof("Error issuing cancel to fluxion for group %s", groupName) + klog.Infof("Error issuing cancel to fluxion for group '%s' and fluxID %d", groupName, fluxID) } return err } @@ -199,7 +198,7 @@ func Cleanup( // TODO should we allow this to continue? pool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL")) if err != nil { - klog.Errorf("Issue creating new pool %s", err) + klog.Errorf("Issue creating new pool during cancel: %s", err) return err } defer pool.Close() diff --git a/kubernetes/pkg/scheduler/schedule_one.go b/kubernetes/pkg/scheduler/schedule_one.go index 4d56550..35dae64 100644 --- a/kubernetes/pkg/scheduler/schedule_one.go +++ b/kubernetes/pkg/scheduler/schedule_one.go @@ -118,14 +118,10 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) { // it is an explicit update to an object (TBA). if enqueueStatus == types.GroupAlreadyInPending { klog.Infof("Pod %s/%s has group already in pending queue, rejecting.", pod.Namespace, pod.Name) - // status := framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod group is actively in pending and cannot be changed") - // sched.FailureHandler(ctx, fwk, assumedPodInfo, status, clearNominatedNode, start) deletePod = true } else if enqueueStatus == types.PodInvalid { klog.Infof("Pod %s/%s is invalid or erroneous, rejecting.", pod.Namespace, pod.Name) - // status := framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod is invalid or unable to be scheduled") - // sched.FailureHandler(ctx, fwk, assumedPodInfo, status, clearNominatedNode, start) deletePod = true } else if enqueueStatus == types.PodEnqueueSuccess { diff --git a/kubernetes/pkg/scheduler/scheduler.go b/kubernetes/pkg/scheduler/scheduler.go index 34092d0..912b5ce 100644 --- a/kubernetes/pkg/scheduler/scheduler.go +++ b/kubernetes/pkg/scheduler/scheduler.go @@ -585,6 +585,7 @@ func (sched *Scheduler) Run(ctx context.Context) { // and saving that output to use later (down here) schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() + _, queuedInfo, _ = sched.schedulingCycle(schedulingCycleCtx, state, fwk, queuedInfo, start, podsToActivate) // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).