Skip to content

Commit

Permalink
Workers v2 (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelvigee authored Apr 28, 2024
1 parent 1a16d93 commit 57c1fdc
Show file tree
Hide file tree
Showing 94 changed files with 4,201 additions and 1,865 deletions.
16 changes: 8 additions & 8 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Set up Go
uses: magnetikonline/action-golang-cache@v4
with:
go-version: 1.21.4
go-version: ~1.22
cache-key-suffix: -build

- name: Cache
Expand Down Expand Up @@ -59,6 +59,12 @@ jobs:
echo $VERSION
echo "version=$VERSION" >> $GITHUB_OUTPUT
# - name: Setup tmate session
# if: ${{ failure() }}
# run: |
# curl -L https://github.com/tmate-io/tmate/releases/download/2.4.0/tmate-2.4.0-static-linux-amd64.tar.xz | tar -xJ --strip-components 1
# ./tmate -F

upload_gcs:
name: Upload binaries
needs: [ build ]
Expand Down Expand Up @@ -125,7 +131,7 @@ jobs:
- name: Set up Go
uses: magnetikonline/action-golang-cache@v4
with:
go-version: ~1.21
go-version: ~1.22
cache-key-suffix: -test

- uses: actions/download-artifact@v3
Expand Down Expand Up @@ -170,9 +176,3 @@ jobs:
- name: Cleanup .heph
run: |
rm -rf .heph/cache/test
# - name: Setup tmate session
# if: ${{ failure() }}
# run: |
# curl -L https://github.com/tmate-io/tmate/releases/download/2.4.0/tmate-2.4.0-static-linux-amd64.tar.xz | tar -xJ --strip-components 1
# ./tmate -F
2 changes: 1 addition & 1 deletion BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ load("//backend/node", "yarn_toolchain")

go_toolchain(
name = "go",
version = "1.21.4",
version = "1.22.2",
architectures = [
"darwin_amd64",
"darwin_arm64",
Expand Down
52 changes: 35 additions & 17 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/hephbuild/heph/buildfiles"
"github.com/hephbuild/heph/config"
"github.com/hephbuild/heph/exprs"
"github.com/hephbuild/heph/gitstatus"
"github.com/hephbuild/heph/graph"
"github.com/hephbuild/heph/hbuiltin"
"github.com/hephbuild/heph/hroot"
Expand All @@ -23,7 +24,8 @@ import (
"github.com/hephbuild/heph/targetrun"
"github.com/hephbuild/heph/upgrade"
"github.com/hephbuild/heph/utils/finalizers"
"github.com/hephbuild/heph/worker"
"github.com/hephbuild/heph/worker2"
"github.com/pbnjay/memory"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -64,7 +66,7 @@ type BootOpts struct {
Summary bool
JaegerEndpoint string
DisableCloudTelemetry bool
Pool *worker.Pool
Pool *worker2.Engine

PostBootBase func(bs BaseBootstrap) error

Expand Down Expand Up @@ -133,13 +135,22 @@ type Bootstrap struct {
Observability *observability.Observability
Cloud Cloud
Summary *obsummary.Summary
Pool *worker.Pool
Pool *worker2.Engine
Packages *packages.Registry
BuildFiles *buildfiles.State
Graph *graph.State
PlatformProviders []platform.PlatformProvider
}

func DefaultScheduler(cpu int) *worker2.ResourceScheduler {
return worker2.NewResourceScheduler(map[string]float64{
"cpu": float64(cpu),
"memory": float64(memory.TotalMemory()),
}, map[string]float64{
"cpu": float64(1),
})
}

func Boot(ctx context.Context, opts BootOpts) (Bootstrap, error) {
bs := Bootstrap{}

Expand Down Expand Up @@ -182,7 +193,9 @@ func Boot(ctx context.Context, opts BootOpts) (Bootstrap, error) {

pool := opts.Pool
if pool == nil {
pool = worker.NewPool(opts.Workers)
pool = worker2.NewEngine()
pool.SetDefaultScheduler(DefaultScheduler(opts.Workers))
go pool.Run()
}
bs.Pool = pool

Expand Down Expand Up @@ -294,21 +307,26 @@ func BootScheduler(ctx context.Context, bs Bootstrap) (*scheduler.Scheduler, err
}

e := scheduler.New(scheduler.Scheduler{
Cwd: bs.Cwd,
Root: bs.Root,
Config: bs.Config,
Observability: bs.Observability,
GetFlowID: getFlowId,
LocalCache: localCache,
RemoteCache: remoteCache,
Packages: bs.Packages,
BuildFilesState: bs.BuildFiles,
Graph: bs.Graph,
Pool: bs.Pool,
Finalizers: fins,
Runner: runner,
Cwd: bs.Cwd,
Root: bs.Root,
Config: bs.Config,
Observability: bs.Observability,
GetFlowID: getFlowId,
LocalCache: localCache,
RemoteCache: remoteCache,
Packages: bs.Packages,
BuildFilesState: bs.BuildFiles,
Graph: bs.Graph,
Pool: bs.Pool,
BackgroundTracker: worker2.NewRunningTracker(),
Finalizers: fins,
Runner: runner,
})

if bs.Config.Engine.GitCacheHints {
e.GitStatus = gitstatus.New(bs.Root.Root.Abs())
}

bs.Finalizers.RegisterWithErr(func(err error) {
fins.Run(err)
})
Expand Down
3 changes: 3 additions & 0 deletions bootstrap/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/hephbuild/heph/hroot"
"github.com/hephbuild/heph/log/log"
"os"
"time"
)

func BuildConfig(root *hroot.State, profiles []string) (*config.Config, error) {
Expand All @@ -16,6 +17,8 @@ func BuildConfig(root *hroot.State, profiles []string) (*config.Config, error) {
cfg.CacheHistory = 3
cfg.Engine.GC = true
cfg.Engine.CacheHints = true
cfg.Engine.GitCacheHints = false
cfg.ProgressInterval = time.Second
cfg.Engine.ParallelCaching = true
cfg.Engine.SmartGen = true
cfg.CacheOrder = config.CacheOrderLatency
Expand Down
25 changes: 13 additions & 12 deletions bootstrap/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"github.com/hephbuild/heph/log/log"
"github.com/hephbuild/heph/targetrun"
"github.com/hephbuild/heph/worker"
"github.com/hephbuild/heph/worker2"
"go.uber.org/multierr"
"io"
"os"
Expand Down Expand Up @@ -42,7 +42,7 @@ func printErrTargetFailed(err error) bool {
}

func PrintHumanError(err error) {
errs := multierr.Errors(worker.CollectRootErrors(err))
errs := worker2.CollectRootErrors(err)
skippedCount := 0
skipSpacing := true

Expand All @@ -57,18 +57,19 @@ func PrintHumanError(err error) {
for _, err := range errs {
if printErrTargetFailed(err) {
// Printed !
continue
}

var jerr worker2.Error
if errors.As(err, &jerr) && jerr.Skipped() {
skippedCount++
skipSpacing = true
log.Debugf("skipped: %v", jerr)
} else {
var jerr worker.JobError
if errors.As(err, &jerr) && jerr.Skipped() {
skippedCount++
for _, err := range multierr.Errors(err) {
skipSpacing = true
log.Debugf("skipped: %v", jerr)
} else {
for _, err := range multierr.Errors(err) {
skipSpacing = true
separate()
log.Error(err)
}
separate()
log.Error(err)
}
}
}
Expand Down
33 changes: 33 additions & 0 deletions bootstrap/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package bootstrap

import (
"errors"
"github.com/hephbuild/heph/worker2"
"github.com/stretchr/testify/assert"
"testing"
)

func TestCollectRootErrors(t *testing.T) {
err := worker2.Error{
ID: 2,
Name: "a2",
State: worker2.ExecStateSkipped,
Err: worker2.Error{
ID: 1,
Name: "a1",
State: worker2.ExecStateFailed,
Err: errors.New("sad beep bop"),
},
}

errs := worker2.CollectRootErrors(err)

assert.EqualValues(t, []error{
worker2.Error{
ID: 1,
Name: "a1",
State: worker2.ExecStateFailed,
Err: errors.New("sad beep bop"),
},
}, errs)
}
4 changes: 2 additions & 2 deletions bootstrap/rrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/hephbuild/heph/targetrun"
"github.com/hephbuild/heph/utils/ads"
"github.com/hephbuild/heph/utils/sets"
"github.com/hephbuild/heph/worker/poolwait"
"github.com/hephbuild/heph/worker2/poolwait"
)

var errHasExprDep = errors.New("has expr, bailing out")
Expand Down Expand Up @@ -125,7 +125,7 @@ func RunGen(ctx context.Context, e *scheduler.Scheduler, plain bool, filterFacto
return err
}

err = poolwait.Wait(ctx, fmt.Sprintf("Gen run %v", i), e.Pool, deps, plain)
err = poolwait.Wait(ctx, fmt.Sprintf("Gen run %v", i), e.Pool, deps, plain, e.Config.ProgressInterval)
if err != nil {
return err
}
Expand Down
31 changes: 12 additions & 19 deletions bootstrap/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"errors"
"fmt"
"github.com/dlsniper/debugger"
"github.com/hephbuild/heph/log/log"
"github.com/hephbuild/heph/sandbox"
"github.com/hephbuild/heph/scheduler"
"github.com/hephbuild/heph/specs"
"github.com/hephbuild/heph/targetrun"
"github.com/hephbuild/heph/worker"
"github.com/hephbuild/heph/worker/poolwait"
"github.com/hephbuild/heph/worker2/poolwait"
"os"
"os/exec"
)
Expand All @@ -37,6 +37,12 @@ func Run(ctx context.Context, e *scheduler.Scheduler, rrs targetrun.Requests, ru
}

func RunMode(ctx context.Context, e *scheduler.Scheduler, rrs targetrun.Requests, runopts RunOpts, inlineSingle bool, mode string, iocfg sandbox.IOConfig) error {
debugger.SetLabels(func() []string {
return []string{
"where", "RunMode",
}
})

for i := range rrs {
rrs[i].Mode = mode
}
Expand All @@ -60,32 +66,19 @@ func RunMode(ctx context.Context, e *scheduler.Scheduler, rrs targetrun.Requests
inlineRR = &rrs[0]
}

// fgDeps will include deps created inside the scheduled jobs to be waited for in the foreground
// The DoneSem() must be called after all the tdeps have finished
ctx, fgDeps := poolwait.ContextWithForegroundWaitGroup(ctx)
fgDeps.AddSem()

var skip []specs.Specer
if inlineRR != nil {
skip = []specs.Specer{inlineRR.Target}
}
tdepsMap, err := e.ScheduleTargetRRsWithDeps(ctx, rrs, skip)
tdepsMap, tracker, err := e.ScheduleTargetRRsWithDeps(ctx, rrs, skip)
if err != nil {
fgDeps.DoneSem()
return err
}

tdeps := tdepsMap.All()
go func() {
<-tdeps.Done()
fgDeps.DoneSem()
}()

runDeps := &worker.WaitGroup{}
runDeps.AddChild(tdeps)
runDeps.AddChild(fgDeps)
tdeps.AddDep(tracker.Group())

err = poolwait.Wait(ctx, "Run", e.Pool, runDeps, runopts.Plain)
err = poolwait.Wait(ctx, "Run", e.Pool, tdeps, runopts.Plain, e.Config.ProgressInterval)
if err != nil {
return err
}
Expand Down Expand Up @@ -119,7 +112,7 @@ func RunMode(ctx context.Context, e *scheduler.Scheduler, rrs targetrun.Requests
iocfg.Stdout = os.Stderr
}

err = e.RunWithSpan(ctx, *inlineRR, iocfg)
err = e.RunWithSpan(ctx, *inlineRR, iocfg, nil)
if err != nil {
var eerr *exec.ExitError
if errors.As(err, &eerr) {
Expand Down
Loading

0 comments on commit 57c1fdc

Please sign in to comment.