Skip to content

Commit

Permalink
Merge pull request #11 from converged-computing/add-build-images
Browse files Browse the repository at this point in the history
ci: add back build images workflow
  • Loading branch information
vsoch authored Aug 2, 2024
2 parents 9a0a78e + 085f31b commit 4cb267f
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 20 deletions.
121 changes: 121 additions & 0 deletions .github/workflows/build-deploy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
name: fluxnetes build-and deploy

on:
pull_request: []
release:
types: [published]
push:
branches:
- main

jobs:
build-fluxnetes:
permissions:
packages: write
env:
container: ghcr.io/converged-computing/fluxnetes
runs-on: ubuntu-latest
name: build fluxnetes
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: ^1.21.9

- name: Build Containers
run: |
make prepare
make build REGISTRY=ghcr.io/converged-computing SCHEDULER_IMAGE=fluxnetes
- name: Tag Release Image
if: (github.event_name == 'release')
run: |
tag=${GITHUB_REF#refs/tags/}
echo "Tagging and releasing ${{ env.container}}:${tag}"
docker tag ${{ env.container }}:latest ${{ env.container }}:${tag}
- name: GHCR Login
if: (github.event_name != 'pull_request')
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Deploy Container
if: (github.event_name != 'pull_request')
run: docker push ${{ env.container }} --all-tags

build-sidecar:
permissions:
packages: write
env:
container: ghcr.io/converged-computing/fluxnetes-sidecar
runs-on: ubuntu-latest
name: build sidecar
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: ^1.21.9

- name: Build Containers
run: |
make prepare
make build-sidecar REGISTRY=ghcr.io/converged-computing SIDECAR_IMAGE=fluxnetes-sidecar
- name: Tag Release Image
if: (github.event_name == 'release')
run: |
tag=${GITHUB_REF#refs/tags/}
echo "Tagging and releasing ${{ env.container}}:${tag}"
docker tag ${{ env.container }}:latest ${{ env.container }}:${tag}
- name: GHCR Login
if: (github.event_name != 'pull_request')
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Deploy Container
if: (github.event_name != 'pull_request')
run: docker push ${{ env.container }} --all-tags

build-postgres:
permissions:
packages: write
env:
container: ghcr.io/converged-computing/fluxnetes-postgres
runs-on: ubuntu-latest
name: build postgres
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: ^1.21.9

- name: Build Container
run: |
make prepare
make build-postgres REGISTRY=ghcr.io/converged-computing
- name: Tag Release Image
if: (github.event_name == 'release')
run: |
tag=${GITHUB_REF#refs/tags/}
echo "Tagging and releasing ${{ env.container }}:${tag}"
docker tag ${{ env.container }}:latest ${{ env.container }}:${tag}
- name: GHCR Login
if: (github.event_name != 'pull_request')
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Deploy Container
if: (github.event_name != 'pull_request')
run: docker push ${{ env.container }} --all-tags
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ UPSTREAMS ?= ./upstreams

# Local repository directories
UPSTREAM_K8S ?= $(UPSTREAMS)/kubernetes
UPSTREAM_COMMIT ?= "20b216738a5e9671ddf4081ed97b5565e0b1ee01"

# Remote repositories
UPSTREAM_K8S_REPO ?= https://github.com/kubernetes/kubernetes
Expand All @@ -13,7 +14,7 @@ TAG ?= latest
ARCH ?= amd64

# These are passed to build the sidecar
REGISTRY ?= ghcr.io/flux-framework
REGISTRY ?= ghcr.io/converged-computing
SIDECAR_IMAGE ?= fluxnetes-sidecar:latest
POSTGRES_IMAGE ?= fluxnetes-postgres:latest
SCHEDULER_IMAGE ?= fluxnetes
Expand All @@ -26,7 +27,7 @@ upstreams:
mkdir -p $(UPSTREAMS)

clone-k8s: upstreams
if [ -d "$(UPSTREAM_K8S)" ]; then echo "Kubernetes upstream is cloned"; else ./hack/clone-k8s.sh $(UPSTREAM_K8S_REPO) $(UPSTREAM_K8S); fi
if [ -d "$(UPSTREAM_K8S)" ]; then echo "Kubernetes upstream is cloned"; else ./hack/clone-k8s.sh $(UPSTREAM_K8S_REPO) $(UPSTREAM_K8S) $(UPSTREAM_COMMIT); fi

prepare: clone clone-k8s
# Add fluxnetes as a new in-tree plugin
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ SELECT group_name, group_size from pods_provisional;
### TODO

- [ ] I'd like a more efficient query (or strategy) to move pods from provisional into the worker queue. Right now I have three queries and it's too many.
- Note that I've added a view I think will help with this - we need to regenerate it and do a join!
- [ ] Restarting with postgres shouldn't have crashloopbackoff when the database isn't ready yet
- [ ] In-tree registry plugins (that are related to resources) should be run first to inform fluxion what nodes not to bind, where there are volumes, etc.
- [ ] The queue should inherit (and return) the start time (when the pod was first seen) "start" in scheduler.go
Expand All @@ -185,6 +184,7 @@ SELECT group_name, group_size from pods_provisional;
- [ ] create state diagram that shows how stuff works
- [ ] When a job is allocated, we likely need to submit a cancel job that will ensure it can be cancelled when the time runs out
- add the label for the job timeout, default to one hour
- note clear how to orchestrate this if we need parent object

Thinking:

Expand Down
5 changes: 4 additions & 1 deletion hack/clone-k8s.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

UPSTREAM_K8S_REPO=${1}
UPSTREAM_K8S=${2}
COMMIT=${3}

# Important - the apimachinery runtime package has a change in interface
# that changes (and will break) after this version
echo "git clone --depth 1 ${UPSTREAM_K8S_REPO} ${UPSTREAM_K8S}"
git clone --depth 1 ${UPSTREAM_K8S_REPO} ${UPSTREAM_K8S}
git clone ${UPSTREAM_K8S_REPO} ${UPSTREAM_K8S}
cd ${UPSTREAM_K8S}
git checkout ${COMMIT}

# We need to add our custom module as a staging path
# sed -i '256 a k8s.io/podgroup-controller => ./staging/src/k8s.io/podgroup-controller' ${UPSTREAM_K8S}/go.mod
Expand Down
13 changes: 9 additions & 4 deletions kubernetes/pkg/fluxnetes/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@ const (
DeleteReservationsQuery = "truncate reservations; delete from reservations;"
GetReservationsQuery = "select (group_name, flux_id) from reservations;"

// This could use improvement from someone good at SQL. We need to:
// This query should achieve the following (but does not work)
// 1. Select groups for which the size >= the number of pods we've seen
// 2. Then get the group_name, group_size, and podspec for each (this goes to scheduler)
// 3. Delete all from the table
// Ensure we are sorting by the timestamp when they were added (should be DESC I think)
// Ensures we are sorting by the timestamp when they were added (should be DESC I think)
RefreshGroupsQuery = "refresh materialized view groups_size;"
SelectGroupsReadyQuery = "select * from pods_provisional join groups_size on pods_provisional.group_name = groups_size.group_name where group_size >= count order by created_at desc;"

// 3. Then delete all from the table
DeleteGroupsQuery = "delete from pods_provisional where group_name in ('%s');"

// Note that is used to be done with two queries - these are no longer used
SelectGroupsAtSizeQuery = "select group_name from pods_provisional group by group_name, group_size, created_at having group_size >= count(*) order by created_at desc;"
SelectGroupsQuery = "select group_name, group_size, podspec from pods_provisional where group_name in ('%s');"
DeleteGroupsQuery = "delete from pods_provisional where group_name in ('%s');"
)
1 change: 1 addition & 0 deletions kubernetes/pkg/fluxnetes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func NewQueue(ctx context.Context) (*Queue, error) {
}

// Validates reservation depth
// TODO(vsoch) allow -1 to disable
depth := strategy.GetReservationDepth()
if depth < 0 {
return nil, fmt.Errorf("Reservation depth of a strategy must be >= -1")
Expand Down
40 changes: 40 additions & 0 deletions kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,46 @@ func (q *ProvisionalQueue) getGroupsAtSize(ctx context.Context, pool *pgxpool.Po
return jobs, nil
}

// This was an attmpt to combine into one query (does not work, still two!)
func (q *ProvisionalQueue) getGroupsReady(ctx context.Context, pool *pgxpool.Pool) ([]workers.JobArgs, []string, error) {

groupNames := []string{}

// Refresh groups table
_, err := pool.Query(ctx, queries.RefreshGroupsQuery)
if err != nil {
return nil, groupNames, err
}

// Now we need to collect all the pods that match that.
rows, err := pool.Query(ctx, queries.SelectGroupsReadyQuery)
if err != nil {
return nil, groupNames, err
}
defer rows.Close()

// Collect rows into map, and then slice of jobs
// The map whittles down the groups into single entries
// We will eventually not want to do that, assuming podspecs are different in a group
jobs := []workers.JobArgs{}
lookup := map[string]workers.JobArgs{}

// Collect rows into single result
models, err := pgx.CollectRows(rows, pgx.RowToStructByName[JobModel])

// TODO(vsoch) we need to collect all podspecs here and be able to give that to the worker
for _, model := range models {
jobArgs := workers.JobArgs{GroupName: model.GroupName, Podspec: model.Podspec, GroupSize: model.GroupSize}
lookup[model.GroupName] = jobArgs
}

for _, jobArgs := range lookup {
jobs = append(jobs, jobArgs)
groupNames = append(groupNames, jobArgs.GroupName)
}
return jobs, groupNames, nil
}

// ReadyJobs returns jobs that are ready from the provisional table, also cleaning up
func (q *ProvisionalQueue) ReadyJobs(ctx context.Context, pool *pgxpool.Pool) ([]workers.JobArgs, error) {

Expand Down
7 changes: 1 addition & 6 deletions kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@ type CleanupWorker struct {
river.WorkerDefaults[CleanupArgs]
}

// Work performs the AskFlux action. Cases include:
// Allocated: the job was successful and does not need to be re-queued. We return nil (completed)
// NotAllocated: the job cannot be allocated and needs to be requeued
// Not possible for some reason, likely needs a cancel
// Are there cases of scheduling out into the future further?
// See https://riverqueue.com/docs/snoozing-jobs
// Work performs the Cancel action
func (w CleanupWorker) Work(ctx context.Context, job *river.Job[CleanupArgs]) error {
klog.Infof("[CLEANUP-WORKER-START] Cleanup (cancel) running for jobid %s", job.Args.FluxID)

Expand Down
12 changes: 6 additions & 6 deletions kubernetes/pkg/fluxnetes/strategy/workers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,16 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error {
}
defer rows.Close()

// Kick off a cleaning job for when everyting should be cancelled
// client, err := river.ClientFromContextSafely[pgx.Tx](ctx)
// if err != nil {
// return fmt.Errorf("error getting client from context: %w", err)
// }

// Collect rows into single result
// pgx.CollectRows(rows, pgx.RowTo[string])
// klog.Infof("Values: %s", values)
klog.Infof("[JOB-WORKER-COMPLETE] nodes allocated %s for group %s (flux job id %d)\n",
nodeStr, job.Args.GroupName, job.Args.FluxJob)
return nil
}

// If needed, to get a client from a worker (to submit more jobs)
// client, err := river.ClientFromContextSafely[pgx.Tx](ctx)
// if err != nil {
// return fmt.Errorf("error getting client from context: %w", err)
// }

0 comments on commit 4cb267f

Please sign in to comment.