Skip to content

Commit

Permalink
Revert "fix: Allow log streaming to take longer than build execution (#…
Browse files Browse the repository at this point in the history
…390)"

This reverts commit b50e19f.
  • Loading branch information
plyr4 authored Dec 12, 2022
1 parent c48b403 commit 990f62c
Show file tree
Hide file tree
Showing 13 changed files with 2 additions and 310 deletions.
12 changes: 0 additions & 12 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package main

import (
"context"
"sync"
"time"

"github.com/go-vela/worker/executor"
Expand Down Expand Up @@ -76,7 +75,6 @@ func (w *Worker) exec(index int) error {
Driver: w.Config.Executor.Driver,
LogMethod: w.Config.Executor.LogMethod,
MaxLogSize: w.Config.Executor.MaxLogSize,
LogStreamingTimeout: w.Config.Executor.LogStreamingTimeout,
EnforceTrustedRepos: w.Config.Executor.EnforceTrustedRepos,
PrivilegedImages: w.Config.Runtime.PrivilegedImages,
Client: w.VelaClient,
Expand Down Expand Up @@ -110,13 +108,7 @@ func (w *Worker) exec(index int) error {
timeoutCtx, timeout := context.WithTimeout(buildCtx, t)
defer timeout()

// This WaitGroup delays calling DestroyBuild until the StreamBuild goroutine finishes.
var wg sync.WaitGroup

defer func() {
// if exec() exits before starting StreamBuild, this returns immediately.
wg.Wait()

logger.Info("destroying build")

// destroy the build with the executor (pass a background
Expand Down Expand Up @@ -145,12 +137,8 @@ func (w *Worker) exec(index int) error {
return nil
}

// add StreamBuild goroutine to WaitGroup
wg.Add(1)

// log/event streaming uses buildCtx so that it is not subject to the timeout.
go func() {
defer wg.Done()
logger.Info("streaming build logs")
// execute the build with the executor
err = _executor.StreamBuild(buildCtx)
Expand Down
1 change: 0 additions & 1 deletion cmd/vela-worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func run(c *cli.Context) error {
Driver: c.String("executor.driver"),
LogMethod: c.String("executor.log_method"),
MaxLogSize: c.Uint("executor.max_log_size"),
LogStreamingTimeout: c.Duration("executor.log_streaming_timeout"),
EnforceTrustedRepos: c.Bool("executor.enforce-trusted-repos"),
},
// logger configuration
Expand Down
8 changes: 0 additions & 8 deletions executor/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
package executor

import (
"time"

"github.com/go-vela/types/constants"

"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -39,12 +37,6 @@ var Flags = []cli.Flag{
Name: "executor.max_log_size",
Usage: "maximum log size (in bytes)",
},
&cli.DurationFlag{
EnvVars: []string{"WORKER_LOG_STREAMING_TIMEOUT", "VELA_LOG_STREAMING_TIMEOUT", "LOG_STREAMING_TIMEOUT"},
Name: "executor.log_streaming_timeout",
Usage: "maximum amount of time to wait for log streaming after build completes",
Value: 5 * time.Minute,
},
&cli.BoolFlag{
EnvVars: []string{"VELA_EXECUTOR_ENFORCE_TRUSTED_REPOS", "EXECUTOR_ENFORCE_TRUSTED_REPOS"},
FilePath: "/vela/executor/enforce_trusted_repos",
Expand Down
12 changes: 2 additions & 10 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/go-vela/worker/internal/build"
context2 "github.com/go-vela/worker/internal/context"
"github.com/go-vela/worker/internal/image"
"github.com/go-vela/worker/internal/step"
)
Expand Down Expand Up @@ -567,15 +566,10 @@ func (c *client) ExecBuild(ctx context.Context) error {
// StreamBuild receives a StreamRequest and then
// runs StreamService or StreamStep in a goroutine.
func (c *client) StreamBuild(ctx context.Context) error {
// cancel streaming after a timeout once the build has finished
delayedCtx, cancelStreaming := context2.
WithDelayedCancelPropagation(ctx, c.logStreamingTimeout, "streaming", c.Logger)
defer cancelStreaming()

// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
streams, streamCtx := errgroup.WithContext(delayedCtx)
streams, streamCtx := errgroup.WithContext(ctx)

defer func() {
c.Logger.Trace("waiting for stream functions to return")
Expand All @@ -585,8 +579,6 @@ func (c *client) StreamBuild(ctx context.Context) error {
c.Logger.Errorf("error in a stream request, %v", err)
}

cancelStreaming()

c.Logger.Info("all stream functions have returned")
}()

Expand Down Expand Up @@ -615,7 +607,7 @@ func (c *client) StreamBuild(ctx context.Context) error {

return nil
})
case <-delayedCtx.Done():
case <-ctx.Done():
c.Logger.Debug("streaming context canceled")
// build done or canceled
return nil
Expand Down
1 change: 0 additions & 1 deletion executor/linux/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,6 @@ func TestLinux_StreamBuild(t *testing.T) {
WithPipeline(_pipeline),
WithRepo(_repo),
WithRuntime(_runtime),
WithLogStreamingTimeout(1*time.Second),
WithUser(_user),
WithVelaClient(_client),
withStreamRequests(streamRequests),
Expand Down
2 changes: 0 additions & 2 deletions executor/linux/linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package linux
import (
"reflect"
"sync"
"time"

"github.com/go-vela/sdk-go/vela"
"github.com/go-vela/types/library"
Expand Down Expand Up @@ -35,7 +34,6 @@ type (
init *pipeline.Container
logMethod string
maxLogSize uint
logStreamingTimeout time.Duration
privilegedImages []string
enforceTrustedRepos bool
build *library.Build
Expand Down
13 changes: 0 additions & 13 deletions executor/linux/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package linux

import (
"fmt"
"time"

"github.com/go-vela/sdk-go/vela"
"github.com/go-vela/types/library"
Expand Down Expand Up @@ -65,18 +64,6 @@ func WithMaxLogSize(size uint) Opt {
}
}

// WithLogStreamingTimeout sets the log streaming timeout in the executor client for Linux.
func WithLogStreamingTimeout(timeout time.Duration) Opt {
return func(c *client) error {
c.Logger.Trace("configuring log streaming timeout in linux executor client")

// set the maximum log size in the client
c.logStreamingTimeout = timeout

return nil
}
}

// WithPrivilegedImages sets the privileged images in the executor client for Linux.
func WithPrivilegedImages(images []string) Opt {
return func(c *client) error {
Expand Down
41 changes: 0 additions & 41 deletions executor/linux/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net/http/httptest"
"reflect"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -162,46 +161,6 @@ func TestLinux_Opt_WithMaxLogSize(t *testing.T) {
}
}

func TestLinux_Opt_WithLogStreamingTimeout(t *testing.T) {
// setup tests
tests := []struct {
name string
failure bool
logStreamingTimeout time.Duration
}{
{
name: "defined",
failure: false,
logStreamingTimeout: 1 * time.Second,
},
}

// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_engine, err := New(
WithLogStreamingTimeout(test.logStreamingTimeout),
)

if test.failure {
if err == nil {
t.Errorf("WithLogStreamingTimeout should have returned err")
}

return // continue to next test
}

if err != nil {
t.Errorf("WithLogStreamingTimeout returned err: %v", err)
}

if !reflect.DeepEqual(_engine.logStreamingTimeout, test.logStreamingTimeout) {
t.Errorf("WithLogStreamingTimeout is %v, want %v", _engine.logStreamingTimeout, test.logStreamingTimeout)
}
})
}
}

func TestLinux_Opt_WithPrivilegedImages(t *testing.T) {
// setup tests
tests := []struct {
Expand Down
5 changes: 0 additions & 5 deletions executor/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package executor
import (
"fmt"
"strings"
"time"

"github.com/go-vela/sdk-go/vela"

Expand Down Expand Up @@ -41,9 +40,6 @@ type Setup struct {
LogMethod string
// specifies the maximum log size
MaxLogSize uint
// specifies how long to wait after the build finishes
// for log streaming to complete
LogStreamingTimeout time.Duration
// specifies a list of privileged images to use
PrivilegedImages []string
// configuration for enforcing that only trusted repos may run privileged images
Expand Down Expand Up @@ -89,7 +85,6 @@ func (s *Setup) Linux() (Engine, error) {
linux.WithBuild(s.Build),
linux.WithLogMethod(s.LogMethod),
linux.WithMaxLogSize(s.MaxLogSize),
linux.WithLogStreamingTimeout(s.LogStreamingTimeout),
linux.WithPrivilegedImages(s.PrivilegedImages),
linux.WithEnforceTrustedRepos(s.EnforceTrustedRepos),
linux.WithHostname(s.Hostname),
Expand Down
2 changes: 0 additions & 2 deletions executor/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package executor
import (
"net/http/httptest"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -80,7 +79,6 @@ func TestExecutor_Setup_Linux(t *testing.T) {
linux.WithBuild(_build),
linux.WithLogMethod("byte-chunks"),
linux.WithMaxLogSize(2097152),
linux.WithLogStreamingTimeout(1*time.Second),
linux.WithHostname("localhost"),
linux.WithPipeline(_pipeline),
linux.WithRepo(_repo),
Expand Down
49 changes: 0 additions & 49 deletions internal/context/context.go

This file was deleted.

Loading

0 comments on commit 990f62c

Please sign in to comment.