Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(log): remove byte-chunks log method #447

Merged
merged 1 commit into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func (w *Worker) exec(index int) error {
Logger: logger,
Mock: w.Config.Mock,
Driver: w.Config.Executor.Driver,
LogMethod: w.Config.Executor.LogMethod,
MaxLogSize: w.Config.Executor.MaxLogSize,
LogStreamingTimeout: w.Config.Executor.LogStreamingTimeout,
EnforceTrustedRepos: w.Config.Executor.EnforceTrustedRepos,
Expand Down
1 change: 0 additions & 1 deletion cmd/vela-worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func run(c *cli.Context) error {
// executor configuration
Executor: &executor.Setup{
Driver: c.String("executor.driver"),
LogMethod: c.String("executor.log_method"),
MaxLogSize: c.Uint("executor.max_log_size"),
LogStreamingTimeout: c.Duration("executor.log_streaming_timeout"),
EnforceTrustedRepos: c.Bool("executor.enforce-trusted-repos"),
Expand Down
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ services:
- vela
environment:
EXECUTOR_DRIVER: linux
EXECUTOR_LOG_METHOD: 'time-chunks'
QUEUE_DRIVER: redis
QUEUE_ADDR: 'redis://redis:6379'
VELA_BUILD_LIMIT: 1
Expand Down
2 changes: 0 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func TestExecutor_New(t *testing.T) {
_linux, err := linux.New(
linux.WithBuild(_build),
linux.WithHostname("localhost"),
linux.WithLogMethod("byte-chunks"),
linux.WithMaxLogSize(2097152),
linux.WithPipeline(_pipeline),
linux.WithRepo(_repo),
Expand Down Expand Up @@ -103,7 +102,6 @@ func TestExecutor_New(t *testing.T) {
Build: _build,
Client: _client,
Driver: constants.DriverLinux,
LogMethod: "byte-chunks",
MaxLogSize: 2097152,
Pipeline: _pipeline,
Repo: _repo,
Expand Down
7 changes: 0 additions & 7 deletions executor/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ var Flags = []cli.Flag{
Usage: "driver to be used for the executor",
Value: constants.DriverLinux,
},
&cli.StringFlag{
EnvVars: []string{"VELA_EXECUTOR_LOG_METHOD", "EXECUTOR_LOG_METHOD"},
FilePath: "/vela/executor/log_method",
Name: "executor.log_method",
Usage: "method used to publish logs to the server - options: (byte-chunks|time-chunks)",
Value: "byte-chunks",
},
&cli.UintFlag{
EnvVars: []string{"VELA_EXECUTOR_MAX_LOG_SIZE", "EXECUTOR_MAX_LOG_SIZE"},
FilePath: "/vela/executor/max_log_size",
Expand Down
2 changes: 0 additions & 2 deletions executor/linux/linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type (

// private fields
init *pipeline.Container
logMethod string
maxLogSize uint
logStreamingTimeout time.Duration
privilegedImages []string
Expand Down Expand Up @@ -72,7 +71,6 @@ func Equal(a, b *client) bool {
a.Hostname == b.Hostname &&
a.Version == b.Version &&
reflect.DeepEqual(a.init, b.init) &&
a.logMethod == b.logMethod &&
a.maxLogSize == b.maxLogSize &&
reflect.DeepEqual(a.privilegedImages, b.privilegedImages) &&
a.enforceTrustedRepos == b.enforceTrustedRepos &&
Expand Down
17 changes: 0 additions & 17 deletions executor/linux/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,6 @@ func WithBuild(b *library.Build) Opt {
}
}

// WithLogMethod sets the method used to publish logs in the executor client for Linux.
func WithLogMethod(method string) Opt {
return func(c *client) error {
c.Logger.Trace("configuring log streaming method in linux executor client")

// check if a method is provided
if len(method) == 0 {
return fmt.Errorf("empty log method provided")
}

// set the log method in the client
c.logMethod = method

return nil
}
}

// WithMaxLogSize sets the maximum log size (in bytes) in the executor client for Linux.
func WithMaxLogSize(size uint) Opt {
return func(c *client) error {
Expand Down
50 changes: 0 additions & 50 deletions executor/linux/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,56 +69,6 @@ func TestLinux_Opt_WithBuild(t *testing.T) {
}
}

func TestLinux_Opt_WithLogMethod(t *testing.T) {
// setup tests
tests := []struct {
name string
failure bool
logMethod string
}{
{
name: "byte-chunks",
failure: false,
logMethod: "byte-chunks",
},
{
name: "time-chunks",
failure: false,
logMethod: "time-chunks",
},
{
name: "empty",
failure: true,
logMethod: "",
},
}

// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_engine, err := New(
WithLogMethod(test.logMethod),
)

if test.failure {
if err == nil {
t.Errorf("WithLogMethod should have returned err")
}

return // continue to next test
}

if err != nil {
t.Errorf("WithLogMethod returned err: %v", err)
}

if !reflect.DeepEqual(_engine.logMethod, test.logMethod) {
t.Errorf("WithLogMethod is %v, want %v", _engine.logMethod, test.logMethod)
}
})
}
}

func TestLinux_Opt_WithMaxLogSize(t *testing.T) {
// setup tests
tests := []struct {
Expand Down
178 changes: 65 additions & 113 deletions executor/linux/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,130 +224,82 @@ func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) err
// create new buffer for uploading logs
logs := new(bytes.Buffer)

switch c.logMethod {
case "time-chunks":
// create new channel for processing logs
done := make(chan bool)

go func() {
logger.Debug("polling logs for container")

// spawn "infinite" loop that will upload logs
// from the buffer until the channel is closed
for {
// sleep for "1s" before attempting to upload logs
time.Sleep(1 * time.Second)

// create a non-blocking select to check if the channel is closed
select {
// after repo timeout of idle (no response) end the stream
//
// this is a safety mechanism
case <-time.After(time.Duration(c.repo.GetTimeout()) * time.Minute):
logger.Tracef("repo timeout of %d exceeded", c.repo.GetTimeout())

return
// channel is closed
case <-done:
logger.Trace("channel closed for polling container logs")

// return out of the go routine
return
// channel is not closed
default:
// update the existing log with the new bytes if there is new data to add
if len(logs.Bytes()) > 0 {
logger.Trace(logs.String())

// update the existing log with the new bytes
//
// https://pkg.go.dev/github.com/go-vela/types/library?tab=doc#Log.AppendData
_log.AppendData(logs.Bytes())

logger.Debug("appending logs")
// send API call to append the logs for the service
//
// https://pkg.go.dev/github.com/go-vela/sdk-go/vela?tab=doc#LogService.UpdateService
_log, _, err = c.Vela.Log.UpdateService(c.repo.GetOrg(), c.repo.GetName(), c.build.GetNumber(), ctn.Number, _log)
if err != nil {
logger.Error(err)
}

// flush the buffer of logs
logs.Reset()
// create new channel for processing logs
done := make(chan bool)

go func() {
logger.Debug("polling logs for container")

// spawn "infinite" loop that will upload logs
// from the buffer until the channel is closed
for {
// sleep for "1s" before attempting to upload logs
time.Sleep(1 * time.Second)

// create a non-blocking select to check if the channel is closed
select {
// after repo timeout of idle (no response) end the stream
//
// this is a safety mechanism
case <-time.After(time.Duration(c.repo.GetTimeout()) * time.Minute):
logger.Tracef("repo timeout of %d exceeded", c.repo.GetTimeout())

return
// channel is closed
case <-done:
logger.Trace("channel closed for polling container logs")

// return out of the go routine
return
// channel is not closed
default:
// update the existing log with the new bytes if there is new data to add
if len(logs.Bytes()) > 0 {
logger.Trace(logs.String())

// update the existing log with the new bytes
//
// https://pkg.go.dev/github.com/go-vela/types/library?tab=doc#Log.AppendData
_log.AppendData(logs.Bytes())

logger.Debug("appending logs")
// send API call to append the logs for the service
//
// https://pkg.go.dev/github.com/go-vela/sdk-go/vela?tab=doc#LogService.UpdateService
_log, _, err = c.Vela.Log.UpdateService(c.repo.GetOrg(), c.repo.GetName(), c.build.GetNumber(), ctn.Number, _log)
if err != nil {
logger.Error(err)
}

// check whether we've reached the maximum log size
if c.maxLogSize > 0 && uint(len(_log.GetData())) >= c.maxLogSize {
logger.Trace("maximum log size reached")

return
}
// flush the buffer of logs
logs.Reset()
}
}
}()

// create new scanner from the container output
scanner := bufio.NewScanner(rc)

// scan entire container output
for scanner.Scan() {
// write all the logs from the scanner
logs.Write(append(scanner.Bytes(), []byte("\n")...))
}
// check whether we've reached the maximum log size
if c.maxLogSize > 0 && uint(len(_log.GetData())) >= c.maxLogSize {
logger.Trace("maximum log size reached")

logger.Info("finished streaming logs")

// close channel to stop processing logs
close(done)

return scanner.Err()
case "byte-chunks":
fallthrough
default:
// create new scanner from the container output
scanner := bufio.NewScanner(rc)

// scan entire container output
for scanner.Scan() {
// write all the logs from the scanner
logs.Write(append(scanner.Bytes(), []byte("\n")...))

// if we have at least 1000 bytes in our buffer
if logs.Len() > 1000 {
logger.Trace(logs.String())

// update the existing log with the new bytes
//
// https://pkg.go.dev/github.com/go-vela/types/library?tab=doc#Log.AppendData
_log.AppendData(logs.Bytes())

logger.Debug("appending logs")
// send API call to append the logs for the service
//
// https://pkg.go.dev/github.com/go-vela/sdk-go/vela?tab=doc#LogService.UpdateService
//nolint:contextcheck // ignore passing context
_log, _, err = c.Vela.Log.UpdateService(c.repo.GetOrg(), c.repo.GetName(), c.build.GetNumber(), ctn.Number, _log)
if err != nil {
return err
return
}

// flush the buffer of logs
logs.Reset()
}

// check whether we've reached the maximum log size
if c.maxLogSize > 0 && uint(len(_log.GetData())) >= c.maxLogSize {
logger.Trace("maximum log size reached")

break
}
}
}()

logger.Info("finished streaming logs")
// create new scanner from the container output
scanner := bufio.NewScanner(rc)

return scanner.Err()
// scan entire container output
for scanner.Scan() {
// write all the logs from the scanner
logs.Write(append(scanner.Bytes(), []byte("\n")...))
}

logger.Info("finished streaming logs")

// close channel to stop processing logs
close(done)

return scanner.Err()
}

// DestroyService cleans up services after execution.
Expand Down
Loading