Skip to content

Commit

Permalink
Wire event suspended
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelvigee committed Mar 17, 2024
1 parent 7ad3f61 commit 3ec87a1
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 11 deletions.
35 changes: 24 additions & 11 deletions worker2/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,23 +409,29 @@ func TestExecProducerConsumer(t *testing.T) {

func TestSuspend(t *testing.T) {
t.Parallel()
eventCh := make(chan string)
event := func(s string) {
logCh := make(chan string)
log := func(s string) {
fmt.Println(s)
eventCh <- s
logCh <- s
}
resumeCh := make(chan struct{})
resumeAckCh := make(chan struct{})
eventCh := make(chan Event, 1000)
a := &Action{
Hooks: []Hook{
func(event Event) {
eventCh <- event
},
},
Do: func(ctx context.Context, ds InStore, os OutStore) error {
event("enter")
log("enter")
Wait(ctx, func() {
event("start_wait")
log("start_wait")
<-resumeCh
resumeAckCh <- struct{}{}
event("end_wait")
log("end_wait")
})
event("leave")
log("leave")
return nil
},
}
Expand All @@ -436,12 +442,19 @@ func TestSuspend(t *testing.T) {

e.Schedule(a)

assert.Equal(t, "enter", <-eventCh)
assert.Equal(t, "start_wait", <-eventCh)
assert.Equal(t, "enter", <-logCh)
assert.Equal(t, "start_wait", <-logCh)
close(resumeCh)
<-resumeAckCh
assert.Equal(t, "end_wait", <-eventCh)
assert.Equal(t, "leave", <-eventCh)
assert.Equal(t, "end_wait", <-logCh)
assert.Equal(t, "leave", <-logCh)

e.Wait()
close(eventCh)

events := make([]string, 0)
for event := range eventCh {
events = append(events, fmt.Sprintf("%T", event))
}
assert.EqualValues(t, []string{"worker2.EventScheduled", "worker2.EventReady", "worker2.EventStarted", "worker2.EventSuspended", "worker2.EventReady", "worker2.EventStarted", "worker2.EventCompleted"}, events)
}
8 changes: 8 additions & 0 deletions worker2/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,11 @@ type EventReady struct {
func (e EventReady) getExecution() *Execution {
return e.Execution
}

type EventSuspended struct {
Execution *Execution
}

func (e EventSuspended) getExecution() *Execution {
return e.Execution
}
7 changes: 7 additions & 0 deletions worker2/goroutine.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,16 @@ func (g *GoroutineWorker) Start(e *Execution) error {
g.state = WorkerStateIdle
g.m.Unlock()
if errors.Is(err, ErrSuspended) {
e.eventsCh <- EventSuspended{Execution: e}

go func() {
select {
case <-ctx.Done():
e.eventsCh <- EventCompleted{
Execution: e,
Output: e.outStore.Get(),
Error: ctx.Err(),
}
case <-e.resumeCh:
e.eventsCh <- EventReady{Execution: e}
}
Expand Down

0 comments on commit 3ec87a1

Please sign in to comment.