Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save agent-id for tasks and add endpoint to get agent tasks #1631

Merged
merged 15 commits into from
Mar 21, 2023
Merged
25 changes: 25 additions & 0 deletions server/api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/gorilla/securecookie"

"github.com/woodpecker-ci/woodpecker/server"
"github.com/woodpecker-ci/woodpecker/server/model"
"github.com/woodpecker-ci/woodpecker/server/router/middleware/session"
"github.com/woodpecker-ci/woodpecker/server/store"
Expand Down Expand Up @@ -51,6 +52,30 @@ func GetAgent(c *gin.Context) {
c.JSON(http.StatusOK, agent)
}

func GetAgentTasks(c *gin.Context) {
agentID, err := strconv.ParseInt(c.Param("agent"), 10, 64)
if err != nil {
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}

agent, err := store.FromContext(c).AgentFind(agentID)
if err != nil {
c.String(http.StatusNotFound, "Cannot find agent. %s", err)
return
}

tasks := []*model.Task{}
info := server.Config.Services.Queue.Info(c)
for _, task := range info.Running {
if task.AgentID == agent.ID {
tasks = append(tasks, task)
}
}

c.JSON(http.StatusOK, tasks)
}

func PatchAgent(c *gin.Context) {
_store := store.FromContext(c)

Expand Down
21 changes: 6 additions & 15 deletions server/grpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Pipeline, er
return nil, nil
}

task, err := s.queue.Poll(c, fn)
task, err := s.queue.Poll(c, agent.ID, fn)
if err != nil {
return nil, err
} else if task == nil {
Expand Down Expand Up @@ -131,14 +131,6 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error {
return err
}

metadata, ok := grpcMetadata.FromIncomingContext(c)
if ok {
hostname, ok := metadata["hostname"]
if ok && len(hostname) != 0 {
step.Machine = hostname[0]
}
}

repo, err := s.store.GetRepo(currentPipeline.RepoID)
if err != nil {
log.Error().Msgf("error: cannot find repo with id %d: %s", currentPipeline.RepoID, err)
Expand Down Expand Up @@ -258,13 +250,12 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error {
log.Error().Msgf("error: cannot find step with id %d: %s", stepID, err)
return err
}
metadata, ok := grpcMetadata.FromIncomingContext(c)
if ok {
hostname, ok := metadata["hostname"]
if ok && len(hostname) != 0 {
step.Machine = hostname[0]
}

agent, err := s.getAgentFromContext(c)
if err != nil {
return err
}
step.AgentID = agent.ID

currentPipeline, err := s.store.GetPipeline(step.PipelineID)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions server/model/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type StepStore interface {
// swagger:model step
type Step struct {
ID int64 `json:"id" xorm:"pk autoincr 'step_id'"`
PipelineID int64 `json:"pipeline_id" xorm:"UNIQUE(s) INDEX 'step_pipeline_id'"`
PipelineID int64 `json:"pipeline_id" xorm:"UNIQUE(s) INDEX 'step_pipeline_id'"`
PID int `json:"pid" xorm:"UNIQUE(s) 'step_pid'"`
PPID int `json:"ppid" xorm:"step_ppid"`
PGID int `json:"pgid" xorm:"step_pgid"`
Expand All @@ -42,7 +42,7 @@ type Step struct {
ExitCode int `json:"exit_code" xorm:"step_exit_code"`
Started int64 `json:"start_time,omitempty" xorm:"step_started"`
Stopped int64 `json:"end_time,omitempty" xorm:"step_stopped"`
Machine string `json:"machine,omitempty" xorm:"step_machine"`
AgentID int64 `json:"agent_id,omitempty" xorm:"step_agent_id"`
Platform string `json:"platform,omitempty" xorm:"step_platform"`
Environ map[string]string `json:"environ,omitempty" xorm:"json 'step_environ'"`
Children []*Step `json:"children,omitempty" xorm:"-"`
Expand Down
9 changes: 1 addition & 8 deletions server/model/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ type TaskStore interface {
TaskDelete(string) error
}

type TaskStatusValue string

const (
TaskStatusSkipped TaskStatusValue = "skipped"
TaskStatusSuccess TaskStatusValue = "success"
TaskStatusFailure TaskStatusValue = "failure"
)

// Task defines scheduled pipeline Task.
type Task struct {
ID string `json:"id" xorm:"PK UNIQUE 'task_id'"`
Expand All @@ -42,6 +34,7 @@ type Task struct {
Dependencies []string `json:"dependencies" xorm:"json 'task_dependencies'"`
RunOn []string `json:"run_on" xorm:"json 'task_run_on'"`
DepStatus map[string]StatusValue `json:"dep_status" xorm:"json 'task_dep_status'"`
AgentID int64 `json:"agent_id" xorm:"'agent_id'"`
}

// TableName return database table name for xorm
Expand Down
5 changes: 4 additions & 1 deletion server/queue/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type entry struct {
}

type worker struct {
agentID int64
filter FilterFn
channel chan *model.Task
}
Expand Down Expand Up @@ -82,9 +83,10 @@ func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error {
}

// Poll retrieves and removes the head of this queue.
func (q *fifo) Poll(c context.Context, f FilterFn) (*model.Task, error) {
func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) {
q.Lock()
w := &worker{
agentID: agentID,
channel: make(chan *model.Task, 1),
filter: f,
}
Expand Down Expand Up @@ -254,6 +256,7 @@ func (q *fifo) process() {
q.filterWaiting()
for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() {
task := pending.Value.(*model.Task)
task.AgentID = worker.agentID
delete(q.workers, worker)
q.pending.Remove(pending)
q.running[task.ID] = &entry{
Expand Down
44 changes: 22 additions & 22 deletions server/queue/fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestFifo(t *testing.T) {
return
}

got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
if got != want {
t.Errorf("expect task returned form queue")
return
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestFifoExpire(t *testing.T) {
return
}

got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
if got != want {
t.Errorf("expect task returned form queue")
return
Expand All @@ -84,7 +84,7 @@ func TestFifoWait(t *testing.T) {
q := New(context.Background()).(*fifo)
assert.NoError(t, q.Push(noContext, want))

got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
if got != want {
t.Errorf("expect task returned form queue")
return
Expand Down Expand Up @@ -137,15 +137,15 @@ func TestFifoDependencies(t *testing.T) {
q := New(context.Background()).(*fifo)
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task1}))

got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
if got != task1 {
t.Errorf("expect task1 returned from queue as task2 depends on it")
return
}

assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess))

got, _ = q.Poll(noContext, func(*model.Task) bool { return true })
got, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
if got != task2 {
t.Errorf("expect task2 returned from queue")
return
Expand Down Expand Up @@ -173,15 +173,15 @@ func TestFifoErrors(t *testing.T) {
q := New(context.Background()).(*fifo)
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))

got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
if got != task1 {
t.Errorf("expect task1 returned from queue as task2 depends on it")
return
}

assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")))

got, _ = q.Poll(noContext, func(*model.Task) bool { return true })
got, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
if got != task2 {
t.Errorf("expect task2 returned from queue")
return
Expand All @@ -192,7 +192,7 @@ func TestFifoErrors(t *testing.T) {
return
}

got, _ = q.Poll(noContext, func(*model.Task) bool { return true })
got, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
if got != task3 {
t.Errorf("expect task3 returned from queue")
return
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestFifoErrors2(t *testing.T) {
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))

for i := 0; i < 2; i++ {
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
if got != task1 && got != task2 {
t.Errorf("expect task1 or task2 returned from queue as task3 depends on them")
return
Expand All @@ -237,7 +237,7 @@ func TestFifoErrors2(t *testing.T) {
}
}

got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
if got != task3 {
t.Errorf("expect task3 returned from queue")
return
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestFifoErrorsMultiThread(t *testing.T) {
go func(i int) {
for {
fmt.Printf("Worker %d started\n", i)
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
obtainedWorkCh <- got
}
}(i)
Expand All @@ -299,7 +299,7 @@ func TestFifoErrorsMultiThread(t *testing.T) {
go func() {
for {
fmt.Printf("Worker spawned\n")
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
obtainedWorkCh <- got
}
}()
Expand All @@ -313,7 +313,7 @@ func TestFifoErrorsMultiThread(t *testing.T) {
go func() {
for {
fmt.Printf("Worker spawned\n")
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
obtainedWorkCh <- got
}
}()
Expand Down Expand Up @@ -359,14 +359,14 @@ func TestFifoTransitiveErrors(t *testing.T) {
q := New(context.Background()).(*fifo)
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))

got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
if got != task1 {
t.Errorf("expect task1 returned from queue as task2 depends on it")
return
}
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")))

got, _ = q.Poll(noContext, func(*model.Task) bool { return true })
got, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
if got != task2 {
t.Errorf("expect task2 returned from queue")
return
Expand All @@ -377,7 +377,7 @@ func TestFifoTransitiveErrors(t *testing.T) {
}
assert.NoError(t, q.Done(noContext, got.ID, model.StatusSkipped))

got, _ = q.Poll(noContext, func(*model.Task) bool { return true })
got, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
if got != task3 {
t.Errorf("expect task3 returned from queue")
return
Expand Down Expand Up @@ -409,7 +409,7 @@ func TestFifoCancel(t *testing.T) {
q := New(context.Background()).(*fifo)
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))

_, _ = q.Poll(noContext, func(*model.Task) bool { return true })
_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
assert.NoError(t, q.Error(noContext, task1.ID, fmt.Errorf("canceled")))
assert.NoError(t, q.Error(noContext, task2.ID, fmt.Errorf("canceled")))
assert.NoError(t, q.Error(noContext, task3.ID, fmt.Errorf("canceled")))
Expand All @@ -430,7 +430,7 @@ func TestFifoPause(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
_, _ = q.Poll(noContext, func(*model.Task) bool { return true })
_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
wg.Done()
}()

Expand All @@ -450,7 +450,7 @@ func TestFifoPause(t *testing.T) {
q.Pause()
assert.NoError(t, q.Push(noContext, task1))
q.Resume()
_, _ = q.Poll(noContext, func(*model.Task) bool { return true })
_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
}

func TestFifoPauseResume(t *testing.T) {
Expand All @@ -463,7 +463,7 @@ func TestFifoPauseResume(t *testing.T) {
assert.NoError(t, q.Push(noContext, task1))
q.Resume()

_, _ = q.Poll(noContext, func(*model.Task) bool { return true })
_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
}

func TestWaitingVsPending(t *testing.T) {
Expand All @@ -487,15 +487,15 @@ func TestWaitingVsPending(t *testing.T) {
q := New(context.Background()).(*fifo)
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))

got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })

info := q.Info(noContext)
if info.Stats.WaitingOnDeps != 2 {
t.Errorf("2 should wait on deps")
}

assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")))
got, err := q.Poll(noContext, func(*model.Task) bool { return true })
got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true })
assert.NoError(t, err)
assert.EqualValues(t, task2, got)

Expand Down
4 changes: 2 additions & 2 deletions server/queue/persistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func (q *persistentQueue) PushAtOnce(c context.Context, tasks []*model.Task) err
}

// Poll retrieves and removes a task head of this queue.
func (q *persistentQueue) Poll(c context.Context, f FilterFn) (*model.Task, error) {
task, err := q.Queue.Poll(c, f)
func (q *persistentQueue) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) {
task, err := q.Queue.Poll(c, agentID, f)
if task != nil {
log.Debug().Msgf("pull queue item: %s: remove from backup", task.ID)
if derr := q.store.TaskDelete(task.ID); derr != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type Queue interface {
PushAtOnce(c context.Context, tasks []*model.Task) error

// Poll retrieves and removes a task head of this queue.
Poll(c context.Context, f FilterFn) (*model.Task, error)
Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error)

// Extend extends the deadline for a task.
Extend(c context.Context, id string) error
Expand Down
1 change: 1 addition & 0 deletions server/router/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func apiRoutes(e *gin.Engine) {
agentBase.GET("", api.GetAgents)
agentBase.POST("", api.PostAgent)
agentBase.GET("/:agent", api.GetAgent)
agentBase.GET("/:agent/tasks", api.GetAgentTasks)
agentBase.PATCH("/:agent", api.PatchAgent)
agentBase.DELETE("/:agent", api.DeleteAgent)
}
Expand Down
4 changes: 2 additions & 2 deletions server/store/datastore/step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestStepFind(t *testing.T) {
State: model.StatusSuccess,
Error: "pc load letter",
ExitCode: 255,
Machine: "localhost",
AgentID: 1,
Platform: "linux/amd64",
Environ: map[string]string{"GOLANG": "tip"},
},
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestStepUpdate(t *testing.T) {
State: "pending",
Error: "pc load letter",
ExitCode: 255,
Machine: "localhost",
AgentID: 1,
Platform: "linux/amd64",
Environ: map[string]string{"GOLANG": "tip"},
}
Expand Down
Loading