Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stream output for custom workflows #2261

Merged
merged 24 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f7cd127
Start threading job output to RunStepRunner
ascandella May 17, 2022
0200c62
Strip ANSI
ascandella May 17, 2022
22388bc
Fix lint
ascandella May 17, 2022
efb38a8
Use waitgroup to avoid test flakiness
ascandella May 17, 2022
831a8f6
Move waitgroup higher
ascandella May 17, 2022
589b43c
Add ANSI test and use strings.Builder
ascandella May 17, 2022
51cfc39
Fix lint
ascandella May 17, 2022
bdae433
Use errors.Wrap per style guide
ascandella May 17, 2022
f1fda9d
Create ShellCommandRunner to encapsulate streaming
ascandella May 17, 2022
a6567b5
WIP: shell command runner
ascandella May 17, 2022
ad6a941
Update signatures to propagate error finding version
ascandella May 17, 2022
4e894c4
Fix log output
ascandella May 17, 2022
d87edd8
Fix error checking
ascandella May 17, 2022
2b5e5e9
Merge branch 'master' into stream-all-output
ascandella May 17, 2022
c9bc5df
Fix accidental whitespace stripping
ascandella May 17, 2022
427c5de
Remove unused struct field
ascandella May 17, 2022
7fe7e3e
Fix error checking in terraform client
ascandella May 17, 2022
d39b3b5
Add unit tests to verify command output handler was called
ascandella May 17, 2022
795ac51
Remove err from async interface
ascandella May 18, 2022
58dd23e
Remove duplicative log now that shell command runner does it
ascandella May 18, 2022
dddd74e
Hide output in stream for env/multienv
ascandella May 18, 2022
86c5edd
Add comment explaining goroutines
ascandella May 18, 2022
bdcbcd0
Use printf for better macOS compatibility
ascandella May 18, 2022
45aacf3
Merge branch 'master' into stream-all-output
ascandella Jun 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions server/controllers/events/events_controller_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,8 +973,9 @@ func setupE2E(t *testing.T, repoDir string) (events_controllers.VCSEventsControl
TerraformExecutor: terraformClient,
},
RunStepRunner: &runtime.RunStepRunner{
TerraformExecutor: terraformClient,
DefaultTFVersion: defaultTFVersion,
TerraformExecutor: terraformClient,
DefaultTFVersion: defaultTFVersion,
ProjectCmdOutputHandler: projectCmdOutputHandler,
},
WorkingDir: workingDir,
Webhooks: &mockWebhookSender{},
Expand Down
10 changes: 5 additions & 5 deletions server/core/runtime/apply_step_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
. "github.com/petergtz/pegomock"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/core/runtime"
"github.com/runatlantis/atlantis/server/core/terraform"
runtimemodels "github.com/runatlantis/atlantis/server/core/runtime/models"
"github.com/runatlantis/atlantis/server/core/terraform/mocks"
matchers2 "github.com/runatlantis/atlantis/server/core/terraform/mocks/matchers"
"github.com/runatlantis/atlantis/server/events/command"
Expand Down Expand Up @@ -371,11 +371,11 @@ type remoteApplyMock struct {
}

// RunCommandAsync fakes out running terraform async.
func (r *remoteApplyMock) RunCommandAsync(ctx command.ProjectContext, path string, args []string, envs map[string]string, v *version.Version, workspace string) (chan<- string, <-chan terraform.Line) {
func (r *remoteApplyMock) RunCommandAsync(ctx command.ProjectContext, path string, args []string, envs map[string]string, v *version.Version, workspace string) (chan<- string, <-chan runtimemodels.Line) {
r.CalledArgs = args

in := make(chan string)
out := make(chan terraform.Line)
out := make(chan runtimemodels.Line)

// We use a wait group to ensure our sending and receiving routines have
// completed.
Expand All @@ -398,10 +398,10 @@ func (r *remoteApplyMock) RunCommandAsync(ctx command.ProjectContext, path strin
// Asynchronously send the lines we're supposed to.
go func() {
for _, line := range strings.Split(r.LinesToSend, "\n") {
out <- terraform.Line{Line: line}
out <- runtimemodels.Line{Line: line}
}
if r.Err != nil {
out <- terraform.Line{Err: r.Err}
out <- runtimemodels.Line{Err: r.Err}
}
close(out)
wg.Done()
Expand Down
4 changes: 3 additions & 1 deletion server/core/runtime/env_step_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ func (r *EnvStepRunner) Run(ctx command.ProjectContext, command string, value st
if value != "" {
return value, nil
}
res, err := r.RunStepRunner.Run(ctx, command, path, envs)
// Pass `false` for streamOutput because this isn't interesting to the user reading the build logs
// in the web UI.
res, err := r.RunStepRunner.Run(ctx, command, path, envs, false)
// Trim newline from res to support running `echo env_value` which has
// a newline. We don't recommend users run echo -n env_value to remove the
// newline because -n doesn't work in the sh shell which is what we use
Expand Down
7 changes: 5 additions & 2 deletions server/core/runtime/env_step_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/runatlantis/atlantis/server/core/terraform/mocks"
"github.com/runatlantis/atlantis/server/events/command"
"github.com/runatlantis/atlantis/server/events/models"
jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks"
"github.com/runatlantis/atlantis/server/logging"

. "github.com/petergtz/pegomock"
Expand Down Expand Up @@ -40,9 +41,11 @@ func TestEnvStepRunner_Run(t *testing.T) {
tfClient := mocks.NewMockClient()
tfVersion, err := version.NewVersion("0.12.0")
Ok(t, err)
projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler()
runStepRunner := runtime.RunStepRunner{
TerraformExecutor: tfClient,
DefaultTFVersion: tfVersion,
TerraformExecutor: tfClient,
DefaultTFVersion: tfVersion,
ProjectCmdOutputHandler: projectCmdOutputHandler,
}
envRunner := runtime.EnvStepRunner{
RunStepRunner: &runStepRunner,
Expand Down
161 changes: 161 additions & 0 deletions server/core/runtime/models/shell_command_runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package models
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a particularly good place for this, but noticed it was where the other exec functionality was put. Not totally sure on the naming convention of this project, but it couldn't be in runtime because of import cycles with the terraform package.


import (
"bufio"
"io"
"os/exec"
"strings"
"sync"

"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/events/command"
"github.com/runatlantis/atlantis/server/events/terraform/ansi"
"github.com/runatlantis/atlantis/server/jobs"
)

// Setting the buffer size to 10mb
const BufioScannerBufferSize = 10 * 1024 * 1024

// Line represents a line that was output from a shell command.
type Line struct {
// Line is the contents of the line (without the newline).
Line string
// Err is set if there was an error.
Err error
}

// ShellCommandRunner runs a command via `exec.Command` and streams output to the
// `ProjectCommandOutputHandler`.
type ShellCommandRunner struct {
command string
workingDir string
outputHandler jobs.ProjectCommandOutputHandler
streamOutput bool
cmd *exec.Cmd
}

func NewShellCommandRunner(command string, environ []string, workingDir string, streamOutput bool, outputHandler jobs.ProjectCommandOutputHandler) *ShellCommandRunner {
cmd := exec.Command("sh", "-c", command) // #nosec
cmd.Env = environ
cmd.Dir = workingDir

return &ShellCommandRunner{
command: command,
workingDir: workingDir,
outputHandler: outputHandler,
streamOutput: streamOutput,
cmd: cmd,
}
}

func (s *ShellCommandRunner) Run(ctx command.ProjectContext) (string, error) {
_, outCh := s.RunCommandAsync(ctx)

outbuf := new(strings.Builder)
var err error
for line := range outCh {
if line.Err != nil {
err = line.Err
break
}
outbuf.WriteString(line.Line)
outbuf.WriteString("\n")
}

// sanitize output by stripping out any ansi characters.
output := ansi.Strip(outbuf.String())
return output, err
}

// RunCommandAsync runs terraform with args. It immediately returns an
// input and output channel. Callers can use the output channel to
// get the realtime output from the command.
// Callers can use the input channel to pass stdin input to the command.
// If any error is passed on the out channel, there will be no
// further output (so callers are free to exit).
func (s *ShellCommandRunner) RunCommandAsync(ctx command.ProjectContext) (chan<- string, <-chan Line) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was extracted as-is from terraform_client.go

outCh := make(chan Line)
inCh := make(chan string)

// We start a goroutine to do our work asynchronously and then immediately
// return our channels.
go func() {
// Ensure we close our channels when we exit.
defer func() {
close(outCh)
close(inCh)
}()

stdout, _ := s.cmd.StdoutPipe()
stderr, _ := s.cmd.StderrPipe()
stdin, _ := s.cmd.StdinPipe()

ctx.Log.Debug("starting %q in %q", s.command, s.workingDir)
err := s.cmd.Start()
if err != nil {
err = errors.Wrapf(err, "running %q in %q", s.command, s.workingDir)
ctx.Log.Err(err.Error())
outCh <- Line{Err: err}
return
}

// If we get anything on inCh, write it to stdin.
// This function will exit when inCh is closed which we do in our defer.
go func() {
for line := range inCh {
ctx.Log.Debug("writing %q to remote command's stdin", line)
_, err := io.WriteString(stdin, line)
if err != nil {
ctx.Log.Err(errors.Wrapf(err, "writing %q to process", line).Error())
}
}
}()

wg := new(sync.WaitGroup)
wg.Add(2)
// Asynchronously copy from stdout/err to outCh.
go func() {
scanner := bufio.NewScanner(stdout)
buf := []byte{}
scanner.Buffer(buf, BufioScannerBufferSize)

for scanner.Scan() {
message := scanner.Text()
outCh <- Line{Line: message}
if s.streamOutput {
s.outputHandler.Send(ctx, message, false)
}
}
wg.Done()
}()
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
message := scanner.Text()
outCh <- Line{Line: message}
if s.streamOutput {
s.outputHandler.Send(ctx, message, false)
}
}
wg.Done()
}()

// Wait for our copying to complete. This *must* be done before
// calling cmd.Wait(). (see https://github.com/golang/go/issues/19685)
wg.Wait()

// Wait for the command to complete.
err = s.cmd.Wait()

// We're done now. Send an error if there was one.
if err != nil {
err = errors.Wrapf(err, "running %q in %q", s.command, s.workingDir)
ctx.Log.Err(err.Error())
outCh <- Line{Err: err}
} else {
ctx.Log.Info("successfully ran %q in %q", s.command, s.workingDir)
}
}()

return inCh, outCh
}
75 changes: 75 additions & 0 deletions server/core/runtime/models/shell_command_runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package models_test

import (
"fmt"
"os"
"strings"
"testing"

. "github.com/petergtz/pegomock"
"github.com/runatlantis/atlantis/server/core/runtime/mocks/matchers"
"github.com/runatlantis/atlantis/server/core/runtime/models"
"github.com/runatlantis/atlantis/server/events/command"
"github.com/runatlantis/atlantis/server/jobs/mocks"
"github.com/runatlantis/atlantis/server/logging"
. "github.com/runatlantis/atlantis/testing"
)

func TestShellCommandRunner_Run(t *testing.T) {
cases := []struct {
Command string
ExpLines []string
Environ map[string]string
}{
{
Command: "echo $HELLO",
Environ: map[string]string{
"HELLO": "world",
},
ExpLines: []string{"world"},
},
{
Command: ">&2 echo this is an error",
ExpLines: []string{"this is an error"},
},
}

for _, c := range cases {
t.Run(c.Command, func(t *testing.T) {
RegisterMockTestingT(t)
ctx := command.ProjectContext{
Log: logging.NewNoopLogger(t),
Workspace: "default",
RepoRelDir: ".",
}
projectCmdOutputHandler := mocks.NewMockProjectCommandOutputHandler()

cwd, err := os.Getwd()
Ok(t, err)
environ := []string{}
for k, v := range c.Environ {
environ = append(environ, fmt.Sprintf("%s=%s", k, v))
}
expectedOutput := fmt.Sprintf("%s\n", strings.Join(c.ExpLines, "\n"))

// Run once with streaming enabled
runner := models.NewShellCommandRunner(c.Command, environ, cwd, true, projectCmdOutputHandler)
output, err := runner.Run(ctx)
Ok(t, err)
Equals(t, expectedOutput, output)
for _, line := range c.ExpLines {
projectCmdOutputHandler.VerifyWasCalledOnce().Send(ctx, line, false)
}

// And again with streaming disabled. Everything should be the same except the
// command output handler should not have received anything

projectCmdOutputHandler = mocks.NewMockProjectCommandOutputHandler()
runner = models.NewShellCommandRunner(c.Command, environ, cwd, false, projectCmdOutputHandler)
output, err = runner.Run(ctx)
Ok(t, err)
Equals(t, expectedOutput, output)
projectCmdOutputHandler.VerifyWasCalled(Never()).Send(matchers.AnyModelsProjectCommandContext(), AnyString(), EqBool(false))
})
}
}
2 changes: 1 addition & 1 deletion server/core/runtime/multienv_step_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type MultiEnvStepRunner struct {
// Run runs the multienv step command.
// The command must return a json string containing the array of name-value pairs that are being added as extra environment variables
func (r *MultiEnvStepRunner) Run(ctx command.ProjectContext, command string, path string, envs map[string]string) (string, error) {
res, err := r.RunStepRunner.Run(ctx, command, path, envs)
res, err := r.RunStepRunner.Run(ctx, command, path, envs, false)
if err == nil {
envVars := strings.Split(res, ",")
if len(envVars) > 0 {
Expand Down
7 changes: 5 additions & 2 deletions server/core/runtime/multienv_step_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/runatlantis/atlantis/server/core/terraform/mocks"
"github.com/runatlantis/atlantis/server/events/command"
"github.com/runatlantis/atlantis/server/events/models"
jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks"
"github.com/runatlantis/atlantis/server/logging"
. "github.com/runatlantis/atlantis/testing"
)
Expand All @@ -31,9 +32,11 @@ func TestMultiEnvStepRunner_Run(t *testing.T) {
tfClient := mocks.NewMockClient()
tfVersion, err := version.NewVersion("0.12.0")
Ok(t, err)
projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler()
runStepRunner := runtime.RunStepRunner{
TerraformExecutor: tfClient,
DefaultTFVersion: tfVersion,
TerraformExecutor: tfClient,
DefaultTFVersion: tfVersion,
ProjectCmdOutputHandler: projectCmdOutputHandler,
}
multiEnvStepRunner := runtime.MultiEnvStepRunner{
RunStepRunner: &runStepRunner,
Expand Down
8 changes: 4 additions & 4 deletions server/core/runtime/plan_step_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"testing"

"github.com/hashicorp/go-version"
"github.com/runatlantis/atlantis/server/core/terraform"
"github.com/runatlantis/atlantis/server/events/command"
mocks2 "github.com/runatlantis/atlantis/server/events/mocks"

. "github.com/petergtz/pegomock"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/core/runtime"
runtimemodels "github.com/runatlantis/atlantis/server/core/runtime/models"
"github.com/runatlantis/atlantis/server/core/terraform/mocks"
matchers2 "github.com/runatlantis/atlantis/server/core/terraform/mocks/matchers"
"github.com/runatlantis/atlantis/server/events/mocks/matchers"
Expand Down Expand Up @@ -885,13 +885,13 @@ type remotePlanMock struct {
CalledArgs []string
}

func (r *remotePlanMock) RunCommandAsync(ctx command.ProjectContext, path string, args []string, envs map[string]string, v *version.Version, workspace string) (chan<- string, <-chan terraform.Line) {
func (r *remotePlanMock) RunCommandAsync(ctx command.ProjectContext, path string, args []string, envs map[string]string, v *version.Version, workspace string) (chan<- string, <-chan runtimemodels.Line) {
r.CalledArgs = args
in := make(chan string)
out := make(chan terraform.Line)
out := make(chan runtimemodels.Line)
go func() {
for _, line := range strings.Split(r.LinesToSend, "\n") {
out <- terraform.Line{Line: line}
out <- runtimemodels.Line{Line: line}
}
close(out)
close(in)
Expand Down
Loading