diff --git a/.github/workflows/build-deploy.yaml b/.github/workflows/build-deploy.yaml new file mode 100644 index 0000000..e44696c --- /dev/null +++ b/.github/workflows/build-deploy.yaml @@ -0,0 +1,121 @@ +name: fluxnetes build-and deploy + +on: + pull_request: [] + release: + types: [published] + push: + branches: + - main + +jobs: + build-fluxnetes: + permissions: + packages: write + env: + container: ghcr.io/converged-computing/fluxnetes + runs-on: ubuntu-latest + name: build fluxnetes + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v4 + with: + go-version: ^1.21.9 + + - name: Build Containers + run: | + make prepare + make build REGISTRY=ghcr.io/converged-computing SCHEDULER_IMAGE=fluxnetes + + - name: Tag Release Image + if: (github.event_name == 'release') + run: | + tag=${GITHUB_REF#refs/tags/} + echo "Tagging and releasing ${{ env.container}}:${tag}" + docker tag ${{ env.container }}:latest ${{ env.container }}:${tag} + + - name: GHCR Login + if: (github.event_name != 'pull_request') + uses: docker/login-action@v2 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Deploy Container + if: (github.event_name != 'pull_request') + run: docker push ${{ env.container }} --all-tags + + build-sidecar: + permissions: + packages: write + env: + container: ghcr.io/converged-computing/fluxnetes-sidecar + runs-on: ubuntu-latest + name: build sidecar + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v4 + with: + go-version: ^1.21.9 + + - name: Build Containers + run: | + make prepare + make build-sidecar REGISTRY=ghcr.io/converged-computing SIDECAR_IMAGE=fluxnetes-sidecar + + - name: Tag Release Image + if: (github.event_name == 'release') + run: | + tag=${GITHUB_REF#refs/tags/} + echo "Tagging and releasing ${{ env.container}}:${tag}" + docker tag ${{ env.container }}:latest ${{ env.container }}:${tag} + + - name: GHCR Login + if: (github.event_name != 'pull_request') + uses: docker/login-action@v2 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Deploy Container + if: (github.event_name != 'pull_request') + run: docker push ${{ env.container }} --all-tags + + build-postgres: + permissions: + packages: write + env: + container: ghcr.io/converged-computing/fluxnetes-postgres + runs-on: ubuntu-latest + name: build postgres + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v4 + with: + go-version: ^1.21.9 + + - name: Build Container + run: | + make prepare + make build-postgres REGISTRY=ghcr.io/converged-computing + + - name: Tag Release Image + if: (github.event_name == 'release') + run: | + tag=${GITHUB_REF#refs/tags/} + echo "Tagging and releasing ${{ env.container }}:${tag}" + docker tag ${{ env.container }}:latest ${{ env.container }}:${tag} + + - name: GHCR Login + if: (github.event_name != 'pull_request') + uses: docker/login-action@v2 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Deploy Container + if: (github.event_name != 'pull_request') + run: docker push ${{ env.container }} --all-tags \ No newline at end of file diff --git a/Makefile b/Makefile index e12df4a..75740a8 100644 --- a/Makefile +++ b/Makefile @@ -3,6 +3,7 @@ UPSTREAMS ?= ./upstreams # Local repository directories UPSTREAM_K8S ?= $(UPSTREAMS)/kubernetes +UPSTREAM_COMMIT ?= "20b216738a5e9671ddf4081ed97b5565e0b1ee01" # Remote repositories UPSTREAM_K8S_REPO ?= https://github.com/kubernetes/kubernetes @@ -13,7 +14,7 @@ TAG ?= latest ARCH ?= amd64 # These are passed to build the sidecar -REGISTRY ?= ghcr.io/flux-framework +REGISTRY ?= ghcr.io/converged-computing SIDECAR_IMAGE ?= fluxnetes-sidecar:latest POSTGRES_IMAGE ?= fluxnetes-postgres:latest SCHEDULER_IMAGE ?= fluxnetes @@ -26,7 +27,7 @@ upstreams: mkdir -p $(UPSTREAMS) clone-k8s: upstreams - if [ -d "$(UPSTREAM_K8S)" ]; then echo "Kubernetes upstream is cloned"; else ./hack/clone-k8s.sh $(UPSTREAM_K8S_REPO) $(UPSTREAM_K8S); fi + if [ -d "$(UPSTREAM_K8S)" ]; then echo "Kubernetes upstream is cloned"; else ./hack/clone-k8s.sh $(UPSTREAM_K8S_REPO) $(UPSTREAM_K8S) $(UPSTREAM_COMMIT); fi prepare: clone clone-k8s # Add fluxnetes as a new in-tree plugin diff --git a/README.md b/README.md index a4ab558..eaa2f80 100644 --- a/README.md +++ b/README.md @@ -173,7 +173,6 @@ SELECT group_name, group_size from pods_provisional; ### TODO - [ ] I'd like a more efficient query (or strategy) to move pods from provisional into the worker queue. Right now I have three queries and it's too many. - - Note that I've added a view I think will help with this - we need to regenerate it and do a join! - [ ] Restarting with postgres shouldn't have crashloopbackoff when the database isn't ready yet - [ ] In-tree registry plugins (that are related to resources) should be run first to inform fluxion what nodes not to bind, where there are volumes, etc. - [ ] The queue should inherit (and return) the start time (when the pod was first seen) "start" in scheduler.go @@ -185,6 +184,7 @@ SELECT group_name, group_size from pods_provisional; - [ ] create state diagram that shows how stuff works - [ ] When a job is allocated, we likely need to submit a cancel job that will ensure it can be cancelled when the time runs out - add the label for the job timeout, default to one hour + - note clear how to orchestrate this if we need parent object Thinking: diff --git a/hack/clone-k8s.sh b/hack/clone-k8s.sh index 870ca56..1b342e7 100755 --- a/hack/clone-k8s.sh +++ b/hack/clone-k8s.sh @@ -2,11 +2,14 @@ UPSTREAM_K8S_REPO=${1} UPSTREAM_K8S=${2} +COMMIT=${3} # Important - the apimachinery runtime package has a change in interface # that changes (and will break) after this version echo "git clone --depth 1 ${UPSTREAM_K8S_REPO} ${UPSTREAM_K8S}" -git clone --depth 1 ${UPSTREAM_K8S_REPO} ${UPSTREAM_K8S} +git clone ${UPSTREAM_K8S_REPO} ${UPSTREAM_K8S} +cd ${UPSTREAM_K8S} +git checkout ${COMMIT} # We need to add our custom module as a staging path # sed -i '256 a k8s.io/podgroup-controller => ./staging/src/k8s.io/podgroup-controller' ${UPSTREAM_K8S}/go.mod diff --git a/kubernetes/pkg/fluxnetes/queries/queries.go b/kubernetes/pkg/fluxnetes/queries/queries.go index ad3eec8..f153ea8 100644 --- a/kubernetes/pkg/fluxnetes/queries/queries.go +++ b/kubernetes/pkg/fluxnetes/queries/queries.go @@ -13,12 +13,17 @@ const ( DeleteReservationsQuery = "truncate reservations; delete from reservations;" GetReservationsQuery = "select (group_name, flux_id) from reservations;" - // This could use improvement from someone good at SQL. We need to: + // This query should achieve the following (but does not work) // 1. Select groups for which the size >= the number of pods we've seen // 2. Then get the group_name, group_size, and podspec for each (this goes to scheduler) - // 3. Delete all from the table - // Ensure we are sorting by the timestamp when they were added (should be DESC I think) + // Ensures we are sorting by the timestamp when they were added (should be DESC I think) + RefreshGroupsQuery = "refresh materialized view groups_size;" + SelectGroupsReadyQuery = "select * from pods_provisional join groups_size on pods_provisional.group_name = groups_size.group_name where group_size >= count order by created_at desc;" + + // 3. Then delete all from the table + DeleteGroupsQuery = "delete from pods_provisional where group_name in ('%s');" + + // Note that is used to be done with two queries - these are no longer used SelectGroupsAtSizeQuery = "select group_name from pods_provisional group by group_name, group_size, created_at having group_size >= count(*) order by created_at desc;" SelectGroupsQuery = "select group_name, group_size, podspec from pods_provisional where group_name in ('%s');" - DeleteGroupsQuery = "delete from pods_provisional where group_name in ('%s');" ) diff --git a/kubernetes/pkg/fluxnetes/queue.go b/kubernetes/pkg/fluxnetes/queue.go index 8134af6..a7f72b7 100644 --- a/kubernetes/pkg/fluxnetes/queue.go +++ b/kubernetes/pkg/fluxnetes/queue.go @@ -93,6 +93,7 @@ func NewQueue(ctx context.Context) (*Queue, error) { } // Validates reservation depth + // TODO(vsoch) allow -1 to disable depth := strategy.GetReservationDepth() if depth < 0 { return nil, fmt.Errorf("Reservation depth of a strategy must be >= -1") diff --git a/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go b/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go index 1386b5e..7bfbbcf 100644 --- a/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go +++ b/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go @@ -133,6 +133,46 @@ func (q *ProvisionalQueue) getGroupsAtSize(ctx context.Context, pool *pgxpool.Po return jobs, nil } +// This was an attmpt to combine into one query (does not work, still two!) +func (q *ProvisionalQueue) getGroupsReady(ctx context.Context, pool *pgxpool.Pool) ([]workers.JobArgs, []string, error) { + + groupNames := []string{} + + // Refresh groups table + _, err := pool.Query(ctx, queries.RefreshGroupsQuery) + if err != nil { + return nil, groupNames, err + } + + // Now we need to collect all the pods that match that. + rows, err := pool.Query(ctx, queries.SelectGroupsReadyQuery) + if err != nil { + return nil, groupNames, err + } + defer rows.Close() + + // Collect rows into map, and then slice of jobs + // The map whittles down the groups into single entries + // We will eventually not want to do that, assuming podspecs are different in a group + jobs := []workers.JobArgs{} + lookup := map[string]workers.JobArgs{} + + // Collect rows into single result + models, err := pgx.CollectRows(rows, pgx.RowToStructByName[JobModel]) + + // TODO(vsoch) we need to collect all podspecs here and be able to give that to the worker + for _, model := range models { + jobArgs := workers.JobArgs{GroupName: model.GroupName, Podspec: model.Podspec, GroupSize: model.GroupSize} + lookup[model.GroupName] = jobArgs + } + + for _, jobArgs := range lookup { + jobs = append(jobs, jobArgs) + groupNames = append(groupNames, jobArgs.GroupName) + } + return jobs, groupNames, nil +} + // ReadyJobs returns jobs that are ready from the provisional table, also cleaning up func (q *ProvisionalQueue) ReadyJobs(ctx context.Context, pool *pgxpool.Pool) ([]workers.JobArgs, error) { diff --git a/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go b/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go index a9ddfba..e6189fb 100644 --- a/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go +++ b/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go @@ -26,12 +26,7 @@ type CleanupWorker struct { river.WorkerDefaults[CleanupArgs] } -// Work performs the AskFlux action. Cases include: -// Allocated: the job was successful and does not need to be re-queued. We return nil (completed) -// NotAllocated: the job cannot be allocated and needs to be requeued -// Not possible for some reason, likely needs a cancel -// Are there cases of scheduling out into the future further? -// See https://riverqueue.com/docs/snoozing-jobs +// Work performs the Cancel action func (w CleanupWorker) Work(ctx context.Context, job *river.Job[CleanupArgs]) error { klog.Infof("[CLEANUP-WORKER-START] Cleanup (cancel) running for jobid %s", job.Args.FluxID) diff --git a/kubernetes/pkg/fluxnetes/strategy/workers/job.go b/kubernetes/pkg/fluxnetes/strategy/workers/job.go index ef351c2..3f777fe 100644 --- a/kubernetes/pkg/fluxnetes/strategy/workers/job.go +++ b/kubernetes/pkg/fluxnetes/strategy/workers/job.go @@ -149,6 +149,12 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { } defer rows.Close() + // Kick off a cleaning job for when everyting should be cancelled + // client, err := river.ClientFromContextSafely[pgx.Tx](ctx) + // if err != nil { + // return fmt.Errorf("error getting client from context: %w", err) + // } + // Collect rows into single result // pgx.CollectRows(rows, pgx.RowTo[string]) // klog.Infof("Values: %s", values) @@ -156,9 +162,3 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { nodeStr, job.Args.GroupName, job.Args.FluxJob) return nil } - -// If needed, to get a client from a worker (to submit more jobs) -// client, err := river.ClientFromContextSafely[pgx.Tx](ctx) -// if err != nil { -// return fmt.Errorf("error getting client from context: %w", err) -// }