Skip to content

Commit

Permalink
Merge pull request #17 from converged-computing/add-created-at
Browse files Browse the repository at this point in the history
sort: add back created at to sort group selection
  • Loading branch information
vsoch authored Aug 9, 2024
2 parents d6437af + f8078fa commit f925ffa
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 18 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ SELECT group_name, group_size from pods_provisional;
### TODO

- [ ] Figure out how In-tree registry plugins (that are related to resources) should be run to inform fluxion
- we likely want to move assume pod outside of that schedule function, or ensure pod passed matches.
- [ ] Optimize queries.
- [ ] Restarting with postgres shouldn't have crashloopbackoff when the database isn't ready yet
- [ ] Add back in the created at filter / sort to the queues (removed when was testing / debugging harder stuff)
- [ ] The queue should inherit (and return) the start time (when the pod was first seen) "start" in scheduler.go
- Testing:
- [ ] need to test duration / completion time works (run job with short duration, should be cancelled/cleaned up)
Expand Down
7 changes: 4 additions & 3 deletions kubernetes/pkg/fluxnetes/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
// 1. Select groups for which the size >= the number of pods we've seen
// 2. Then get a representative pod to model the resources for the group
// TODO add back created by and then sort by it desc
SelectGroupsAtSizeQuery = "select group_name, group_size, duration, podspec, namespace from groups_provisional where current_size >= group_size;"
SelectGroupsAtSizeQuery = "select group_name, group_size, duration, podspec, namespace from groups_provisional where current_size >= group_size order by created_at desc;"

// This currently will use one podspec (and all names) and we eventually want it to use all podspecs
SelectPodsQuery = `select name, podspec from pods_provisional where group_name = $1 and namespace = $2;`
Expand All @@ -32,13 +32,14 @@ const (
DeleteProvisionalPodsQuery = "delete from pods_provisional where group_name = $1 and namespace = $2;"

// TODO add created_at back
InsertIntoProvisionalQuery = "insert into pods_provisional (podspec, namespace, name, duration, group_name) select '%s', '%s', '%s', %d, '%s' where not exists (select (group_name, name, namespace) from pods_provisional where group_name = '%s' and namespace = '%s' and name = '%s');"
InsertIntoProvisionalQuery = "insert into pods_provisional (podspec, namespace, name, duration, group_name, created_at) select '%s', '%s', '%s', %d, '%s', $1 where not exists (select (group_name, name, namespace) from pods_provisional where group_name = '%s' and namespace = '%s' and name = '%s');"

// Enqueue queries
// TODO these need escaping (sql injection)
// 1. Single pods are added to the pods_provisional - this is how we track uniqueness (and eventually will grab all podspecs from here)
// 2. Groups are added to the groups_provisional, and this is where we can easily store a current count
// Note that we add a current_size of 1 here assuming the first creation is done paired with an existing pod (and then don't need to increment again)
InsertIntoGroupProvisional = "insert into groups_provisional (group_name, namespace, group_size, duration, podspec, current_size) select '%s', '%s', '%d', '%d', '%s', '1' WHERE NOT EXISTS (SELECT (group_name, namespace) FROM groups_provisional WHERE group_name = '%s' and namespace = '%s');"
InsertIntoGroupProvisional = "insert into groups_provisional (group_name, namespace, group_size, duration, podspec, current_size, created_at) select '%s', '%s', '%d', '%d', '%s', '1', $1 WHERE NOT EXISTS (SELECT (group_name, namespace) FROM groups_provisional WHERE group_name = '%s' and namespace = '%s');"
IncrementGroupProvisional = "update groups_provisional set current_size = current_size + 1 where group_name = '%s' and namespace = '%s';"

// After allocate success, we update pending with the ID. We retrieve it to issue fluxion to cancel when it finishes
Expand Down
12 changes: 6 additions & 6 deletions kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -106,9 +107,9 @@ func (q *ProvisionalQueue) Enqueue(

// Insert or fall back if does not exists to doing nothing
// TODO add back timestamp, and optimize this function to minimize database exec calls
// ts := &pgtype.Timestamptz{Time: group.Timestamp.Time, Valid: true}
ts := &pgtype.Timestamptz{Time: group.Timestamp.Time, Valid: true}
query := fmt.Sprintf(queries.InsertIntoProvisionalQuery, string(podspec), pod.Namespace, pod.Name, group.Duration, group.Name, group.Name, pod.Namespace, pod.Name)
_, err = pool.Exec(context.Background(), query)
_, err = pool.Exec(context.Background(), query, ts)
if err != nil {
klog.Infof("Error inserting pod %s/%s into provisional queue", pod.Namespace, pod.Name)
return types.Unknown, err
Expand All @@ -120,11 +121,10 @@ func (q *ProvisionalQueue) Enqueue(
return types.Unknown, err
}

// Next add to group provisional - will only add if does not exist, and if so, we make count 1 to
// avoid doing the increment call.
// TODO eventually need to insert timestamp here
// Next add to group provisional - will only add if does not exist
// and if so, we make count 1 to avoid incremental call
query = fmt.Sprintf(queries.InsertIntoGroupProvisional, group.Name, pod.Namespace, group.Size, group.Duration, string(podspec), group.Name, pod.Namespace)
_, err = pool.Exec(ctx, query)
_, err = pool.Exec(ctx, query, ts)
if err != nil {
klog.Infof("Error inserting group into provisional %s", err)
return types.Unknown, err
Expand Down
7 changes: 3 additions & 4 deletions kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func deleteObjects(ctx context.Context, podspec string) error {
func deleteJob(ctx context.Context, namespace string, client kubernetes.Interface, owner metav1.OwnerReference) error {
job, err := client.BatchV1().Jobs(namespace).Get(ctx, owner.Name, metav1.GetOptions{})
if err != nil {
klog.Infof("Error deleting job: %s", err)
return err
}
klog.Infof("Found job %s/%s", job.Namespace, job.Name)
Expand Down Expand Up @@ -165,7 +164,7 @@ func Cleanup(
groupName string,
) error {

klog.Infof("[CLEANUP-START] Cleanup (cancel) running for jobid %s", fluxID)
klog.Infof("[CLEANUP-START] Cleanup (cancel) running for jobid %d", fluxID)

// First attempt cleanup in the cluster, only if in Kubernetes
if inKubernetes {
Expand All @@ -183,7 +182,7 @@ func Cleanup(
if fluxID > -1 {
err = deleteFluxion(fluxID)
if err != nil {
klog.Infof("Error issuing cancel to fluxion for group %s", groupName)
klog.Infof("Error issuing cancel to fluxion for group '%s' and fluxID %d", groupName, fluxID)
}
return err
}
Expand All @@ -199,7 +198,7 @@ func Cleanup(
// TODO should we allow this to continue?
pool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL"))
if err != nil {
klog.Errorf("Issue creating new pool %s", err)
klog.Errorf("Issue creating new pool during cancel: %s", err)
return err
}
defer pool.Close()
Expand Down
4 changes: 0 additions & 4 deletions kubernetes/pkg/scheduler/schedule_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,10 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) {
// it is an explicit update to an object (TBA).
if enqueueStatus == types.GroupAlreadyInPending {
klog.Infof("Pod %s/%s has group already in pending queue, rejecting.", pod.Namespace, pod.Name)
// status := framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod group is actively in pending and cannot be changed")
// sched.FailureHandler(ctx, fwk, assumedPodInfo, status, clearNominatedNode, start)
deletePod = true

} else if enqueueStatus == types.PodInvalid {
klog.Infof("Pod %s/%s is invalid or erroneous, rejecting.", pod.Namespace, pod.Name)
// status := framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod is invalid or unable to be scheduled")
// sched.FailureHandler(ctx, fwk, assumedPodInfo, status, clearNominatedNode, start)
deletePod = true

} else if enqueueStatus == types.PodEnqueueSuccess {
Expand Down
1 change: 1 addition & 0 deletions kubernetes/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ func (sched *Scheduler) Run(ctx context.Context) {
// and saving that output to use later (down here)
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()

_, queuedInfo, _ = sched.schedulingCycle(schedulingCycleCtx, state, fwk, queuedInfo, start, podsToActivate)

// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
Expand Down

0 comments on commit f925ffa

Please sign in to comment.