Skip to content

Commit

Permalink
Remove concept of worker
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelvigee committed May 4, 2024
1 parent 2f751ad commit 868c613
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 108 deletions.
26 changes: 11 additions & 15 deletions worker2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Engine struct {
execUid uint64
wg sync.WaitGroup
defaultScheduler Scheduler
workers []*Worker
liveExecs []*Execution
m sync.RWMutex
eventsCh chan Event
hooks []Hook
Expand All @@ -34,11 +34,11 @@ func (e *Engine) SetDefaultScheduler(s Scheduler) {
e.defaultScheduler = s
}

func (e *Engine) GetWorkers() []*Worker {
func (e *Engine) GetLiveExecutions() []*Execution {
e.m.Lock()
defer e.m.Unlock()

return e.workers[:]
return e.liveExecs[:]
}

func (e *Engine) loop() {
Expand All @@ -62,6 +62,11 @@ func (e *Engine) handle(event Event) {
e.finalize(event.Execution, ExecStateSucceeded, nil)
}
e.runHooks(event, event.Execution)
case EventSuspended:
go func() {
<-event.Bag.WaitResume()
e.queue(event.Execution)
}()
default:
if event, ok := event.(WithExecution); ok {
defer e.runHooks(event, event.getExecution())
Expand Down Expand Up @@ -257,24 +262,15 @@ func (e *Engine) start(exec *Execution) {
e.m.Lock()
defer e.m.Unlock()

w := &Worker{
ctx: exec.Dep.GetCtx(),
exec: exec,
queue: func() {
e.queue(exec)
},
}
e.workers = append(e.workers, w)
e.liveExecs = append(e.liveExecs, exec)

go func() {
w.Run()
exec.Run()

e.m.Lock()
defer e.m.Unlock()

e.workers = ads.Filter(e.workers, func(worker *Worker) bool {
return worker != w
})
e.liveExecs = ads.Remove(e.liveExecs, exec)
}()

e.runHooks(EventStarted{Execution: exec}, exec)
Expand Down
18 changes: 13 additions & 5 deletions worker2/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestStatus(t *testing.T) {
<-emittedCh

var emittedStatus status.Statuser
for _, worker := range e.GetWorkers() {
for _, worker := range e.GetLiveExecutions() {
emittedStatus = worker.status
if emittedStatus != nil {
break
Expand Down Expand Up @@ -664,7 +664,7 @@ func TestSuspend(t *testing.T) {
assert.Equal(t, "end_wait", <-logCh)
assert.Equal(t, "leave", <-logCh)

<-e.Wait()
<-a.Wait()
close(eventCh)

events := make([]string, 0)
Expand Down Expand Up @@ -702,15 +702,23 @@ func TestSuspendStress(t *testing.T) {
go e.Run()
defer e.Stop()

doneCh := make(chan struct{})
defer close(doneCh)

go func() {
for {
time.Sleep(time.Second)
t.Log(done)
select {
case <-doneCh:
return
case <-time.After(time.Second):
}

t.Log(atomic.LoadInt64(&done))
}
}()

wg.Wait()
time.Sleep(time.Second) // todo figure out why things are trying to send events after the pool is stopped
<-e.Wait()
}

func TestSuspendLimit(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions worker2/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (e EventReady) getExecution() *Execution {
type EventSuspended struct {
At time.Time
Execution *Execution
Bag *SuspendBag
}

func (e EventSuspended) getExecution() *Execution {
Expand Down
46 changes: 35 additions & 11 deletions worker2/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/dlsniper/debugger"
"github.com/hephbuild/heph/status"
"runtime/debug"
"strconv"
"sync"
Expand Down Expand Up @@ -74,6 +75,8 @@ type Execution struct {
StartedAt time.Time
QueuedAt time.Time

status status.Statuser

debugString string
}

Expand All @@ -93,15 +96,7 @@ func (e *Execution) GetOutput() Value {
return e.outStore.Get()
}

type ErrSuspended struct {
Bag *SuspendBag
}

func (e ErrSuspended) Error() string {
return "suspended"
}

func (e *Execution) Run(ctx context.Context) error {
func (e *Execution) Run() {
e.m.Lock()
if e.errCh == nil {
e.errCh = make(chan error)
Expand All @@ -114,6 +109,10 @@ func (e *Execution) Run(ctx context.Context) error {
e.StartedAt = time.Now()

go func() {
ctx := e.Dep.GetCtx()
ctx = contextWithExecution(ctx, e)
ctx = status.ContextWithHandler(ctx, e)

err := e.run(ctx)
e.errCh <- err
}()
Expand All @@ -125,10 +124,19 @@ func (e *Execution) Run(ctx context.Context) error {

select {
case sb := <-e.WaitSuspend():
e.scheduler.Done(e.Dep)

e.State = ExecStateSuspended
return ErrSuspended{Bag: sb}

e.eventsCh <- EventSuspended{Execution: e, Bag: sb}
case err := <-e.errCh:
return err
e.scheduler.Done(e.Dep)

e.eventsCh <- EventCompleted{
Execution: e,
Output: e.outStore.Get(),
Error: err,
}
}
}

Expand Down Expand Up @@ -221,3 +229,19 @@ func (e *Execution) ResumeAck() {
func (e *Execution) WaitSuspend() chan *SuspendBag {
return e.suspendCh
}

func (e *Execution) Status(status status.Statuser) {
e.status = status
}

func (e *Execution) GetStatus() status.Statuser {
s := e.status
if s == nil {
s = status.String("")
}
return s
}

func (e *Execution) Interactive() bool {
return true
}
9 changes: 2 additions & 7 deletions worker2/poolui/tui.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,7 @@ func (m *Model) updateMsg(final bool) UpdateMessage {
}

var workers []workerEntry
for _, w := range m.pool.GetWorkers() {
exec := w.Execution()
if exec == nil {
continue
}

for _, exec := range m.pool.GetLiveExecutions() {
if _, ok := exec.Dep.(*worker2.Group); ok {
continue
}
Expand All @@ -98,7 +93,7 @@ func (m *Model) updateMsg(final bool) UpdateMessage {
}

workers = append(workers, workerEntry{
status: w.GetStatus(),
status: exec.GetStatus(),
duration: duration,
exec: exec,
})
Expand Down
19 changes: 7 additions & 12 deletions worker2/poolwait/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ func logUI(name string, deps worker2.Dep, pool *worker2.Engine, interval time.Du

printWorkersStatus := func() {
statusm := map[string]struct{}{}
for _, job := range pool.GetWorkers() {
if job.Execution().State != worker2.ExecStateSuspended {
for _, exec := range pool.GetLiveExecutions() {
if exec.State != worker2.ExecStateSuspended {
continue
}

duration := time.Since(job.Execution().StartedAt)
duration := time.Since(exec.StartedAt)

status := job.GetStatus().String(log.Renderer())
status := exec.GetStatus().String(log.Renderer())
if status == "" {
status = "Suspended..."
}
Expand All @@ -71,20 +71,15 @@ func logUI(name string, deps worker2.Dep, pool *worker2.Engine, interval time.Du
if len(statusm) > 0 {
fmt.Fprintf(os.Stderr, "===\n")
}
for _, w := range pool.GetWorkers() {
j := w.Execution()
if j == nil {
continue
}

duration := time.Since(j.StartedAt)
for _, exec := range pool.GetLiveExecutions() {
duration := time.Since(exec.StartedAt)

if duration < 5*time.Second {
// Skip printing short jobs
continue
}

status := w.GetStatus().String(log.Renderer())
status := exec.GetStatus().String(log.Renderer())
if status == "" {
status = "Waiting..."
}
Expand Down
58 changes: 0 additions & 58 deletions worker2/worker.go

This file was deleted.

0 comments on commit 868c613

Please sign in to comment.