Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve step logging #3722

Merged
merged 23 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions agent/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@ package agent
import (
"io"
"sync"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"go.woodpecker-ci.org/woodpecker/v2/pipeline"
backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/log"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
)

func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) pipeline.Logger {
const (
// Store not more than 1mb in a log-line as 4mb is the limit of a grpc message
// and log-lines needs to be parsed by the browsers later on.
maxLogLineLength = 1024 * 1024 // 1mb
)

func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) pipeline.Logger {
return func(step *backend.Step, rc io.Reader) error {
logLogger := logger.With().
logger := _logger.With().
Str("image", step.Image).
Str("workflowID", workflow.ID).
Logger()
Expand All @@ -40,14 +47,14 @@ func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, wo
secrets = append(secrets, secret.Value)
}

logLogger.Debug().Msg("log stream opened")
logger.Debug().Msg("log stream opened")

logStream := rpc.NewLineWriter(r.client, step.UUID, secrets...)
if _, err := io.Copy(logStream, rc); err != nil {
log.Error().Err(err).Msg("copy limited logStream part")
logStream := log.NewLineWriter(r.client, step.UUID, secrets...)
if err := log.CopyLineByLine(logStream, rc, maxLogLineLength); err != nil {
logger.Error().Err(err).Msg("copy limited logStream part")
}

logLogger.Debug().Msg("log stream copied, close ...")
logger.Debug().Msg("log stream copied, close ...")
uploads.Done()

return nil
Expand Down
7 changes: 4 additions & 3 deletions cli/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/yaml/compiler"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/yaml/linter"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/yaml/matrix"
pipelineLog "go.woodpecker-ci.org/woodpecker/v2/pipeline/log"
"go.woodpecker-ci.org/woodpecker/v2/shared/utils"
)

Expand Down Expand Up @@ -273,8 +274,8 @@ func convertPathForWindows(path string) string {
return filepath.ToSlash(path)
}

const maxLogLineLength = 1024 * 1024 // 1mb
qwerty287 marked this conversation as resolved.
Show resolved Hide resolved
var defaultLogger = pipeline.Logger(func(step *backendTypes.Step, rc io.Reader) error {
logStream := NewLineWriter(step.Name, step.UUID)
_, err := io.Copy(logStream, rc)
return err
logWriter := NewLineWriter(step.Name, step.UUID)
return pipelineLog.CopyLineByLine(logWriter, rc, maxLogLineLength)
})
44 changes: 14 additions & 30 deletions cli/exec/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,50 +16,34 @@ package exec

import (
"fmt"
"io"
"os"
"strings"
"time"

"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
)

// LineWriter sends logs to the client.
type LineWriter struct {
stepName string
stepUUID string
num int
now time.Time
rep *strings.Replacer
lines []*rpc.LogEntry
stepName string
stepUUID string
num int
startTime time.Time
}

// NewLineWriter returns a new line reader.
func NewLineWriter(stepName, stepUUID string) *LineWriter {
func NewLineWriter(stepName, stepUUID string) io.WriteCloser {
return &LineWriter{
stepName: stepName,
stepUUID: stepUUID,
now: time.Now().UTC(),
stepName: stepName,
stepUUID: stepUUID,
startTime: time.Now().UTC(),
}
}

func (w *LineWriter) Write(p []byte) (n int, err error) {
data := string(p)
if w.rep != nil {
data = w.rep.Replace(data)
}

line := &rpc.LogEntry{
Data: data,
StepUUID: w.stepUUID,
Line: w.num,
Time: int64(time.Since(w.now).Seconds()),
Type: rpc.LogEntryStdout,
}

fmt.Fprintf(os.Stderr, "[%s:L%d:%ds] %s", w.stepName, w.num, int64(time.Since(w.now).Seconds()), data)

fmt.Fprintf(os.Stderr, "[%s:L%d:%ds] %s", w.stepName, w.num, int64(time.Since(w.startTime).Seconds()), p)
w.num++

w.lines = append(w.lines, line)
return len(p), nil
}

func (w *LineWriter) Close() error {
return nil
}
79 changes: 79 additions & 0 deletions pipeline/log/line_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2022 Woodpecker Authors
// Copyright 2011 Drone.IO Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package log

import (
"context"
"io"
"strings"
"sync"
"time"

"github.com/rs/zerolog/log"

"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/shared"
)

// LineWriter sends logs to the client.
type LineWriter struct {
sync.Mutex

peer rpc.Peer
stepUUID string
num int
startTime time.Time
replacer *strings.Replacer
}

// NewLineWriter returns a new line reader.
func NewLineWriter(peer rpc.Peer, stepUUID string, secret ...string) io.WriteCloser {
lw := &LineWriter{
peer: peer,
stepUUID: stepUUID,
startTime: time.Now().UTC(),
replacer: shared.NewSecretsReplacer(secret),
}
return lw
}

func (w *LineWriter) Write(p []byte) (n int, err error) {
data := string(p)
if w.replacer != nil {
data = w.replacer.Replace(data)
}
log.Trace().Str("step-uuid", w.stepUUID).Msgf("grpc write line: %s", data)

line := &rpc.LogEntry{
Data: []byte(strings.TrimSuffix(data, "\n")), // remove trailing newline
StepUUID: w.stepUUID,
Time: int64(time.Since(w.startTime).Seconds()),
Type: rpc.LogEntryStdout,
Line: w.num,
}

w.num++

if err := w.peer.Log(context.Background(), line); err != nil {
return 0, err
}

return len(data), nil
}

func (w *LineWriter) Close() error {
return nil
}
58 changes: 58 additions & 0 deletions pipeline/log/line_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2019 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package log_test

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"go.woodpecker-ci.org/woodpecker/v2/pipeline/log"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/mocks"
)

func TestLineWriter(t *testing.T) {
peer := mocks.NewPeer(t)
peer.On("Log", mock.Anything, mock.Anything).Return(nil)

secrets := []string{"world"}
lw := log.NewLineWriter(peer, "e9ea76a5-44a1-4059-9c4a-6956c478b26d", secrets...)
defer lw.Close()

_, err := lw.Write([]byte("hello world\n"))
assert.NoError(t, err)
_, err = lw.Write([]byte("the previous line had no newline at the end"))
assert.NoError(t, err)

peer.AssertCalled(t, "Log", mock.Anything, &rpc.LogEntry{
StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d",
Time: 0,
Type: rpc.LogEntryStdout,
Line: 0,
Data: []byte("hello ********"),
})

peer.AssertCalled(t, "Log", mock.Anything, &rpc.LogEntry{
StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d",
Time: 0,
Type: rpc.LogEntryStdout,
Line: 1,
Data: []byte("the previous line had no newline at the end"),
})

peer.AssertExpectations(t)
}
62 changes: 62 additions & 0 deletions pipeline/log/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2024 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package log

import (
"bufio"
"errors"
"io"
)

func writeChunks(dst io.WriteCloser, data []byte, size int) error {
if len(data) <= size {
_, err := dst.Write(data)
return err
}

for len(data) > size {
if _, err := dst.Write(data[:size]); err != nil {
anbraten marked this conversation as resolved.
Show resolved Hide resolved
return err
}
data = data[size:]
}

if len(data) > 0 {
_, err := dst.Write(data)
return err
}

return nil
}

func CopyLineByLine(dst io.WriteCloser, src io.Reader, maxSize int) error {
r := bufio.NewReader(src)
defer dst.Close()

for {
// TODO: read til newline or maxSize directly
line, err := r.ReadBytes('\n')
if errors.Is(err, io.EOF) {
return writeChunks(dst, line, maxSize)
} else if err != nil {
return err
}

err = writeChunks(dst, line, maxSize)
if err != nil {
return err
}
}
}
Loading