Skip to content

Commit

Permalink
Fixes configuration with multiple event backends
Browse files Browse the repository at this point in the history
This commit fixes #4598

Config with multiple event backends was crashing on 4.4:

```yaml
  storage:
    audit_events_uri: ['dynamodb://streaming', 'stdout://', 'dynamodb://streaming2']
```
  • Loading branch information
klizhentas committed Oct 20, 2020
1 parent e0ddeb0 commit 46eb4ba
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 6 deletions.
33 changes: 33 additions & 0 deletions lib/events/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package events

import (
"context"
"fmt"
"io"
"time"

"github.com/gravitational/teleport"
Expand Down Expand Up @@ -168,6 +170,37 @@ func (*DiscardEmitter) ResumeAuditStream(ctx context.Context, sid session.ID, up
return &DiscardStream{}, nil
}

// NewWriterEmitter returns a new instance of emitter writing to writer
func NewWriterEmitter(w io.WriteCloser) *WriterEmitter {
return &WriterEmitter{
w: w,
WriterLog: NewWriterLog(w),
}
}

// WriterEmitter is an emitter that emits all events
// to the external writer
type WriterEmitter struct {
w io.WriteCloser
*WriterLog
}

// Close releases connection and resources associated with log if any
func (w *WriterEmitter) Close() error {
return w.w.Close()
}

// EmitAuditEvent writes the event to the writer
func (w *WriterEmitter) EmitAuditEvent(ctx context.Context, event AuditEvent) error {
// line is the text to be logged
line, err := utils.FastMarshal(event)
if err != nil {
return trace.Wrap(err)
}
_, err = fmt.Fprintln(w.w, string(line))
return trace.ConvertSystemError(err)
}

// NewLoggingEmitter returns an emitter that logs all events to the console
// with the info level
func NewLoggingEmitter() *LoggingEmitter {
Expand Down
23 changes: 23 additions & 0 deletions lib/events/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ limitations under the License.
package events

import (
"bufio"
"bytes"
"context"
"fmt"
"testing"
"time"

"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -107,3 +110,23 @@ func TestProtoStreamer(t *testing.T) {
})
}
}

// TestWriterEmitter tests writer emitter
func TestWriterEmitter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

events := GenerateTestSession(SessionParams{PrintEvents: 0})
buf := &bytes.Buffer{}
emitter := NewWriterEmitter(utils.NopWriteCloser(buf))

for _, event := range events {
err := emitter.EmitAuditEvent(ctx, event)
assert.NoError(t, err)
}

scanner := bufio.NewScanner(buf)
for i := 0; scanner.Scan(); i++ {
assert.Contains(t, scanner.Text(), events[i].GetCode())
}
}
18 changes: 14 additions & 4 deletions lib/events/multilog.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,27 @@ import (
)

// NewMultiLog returns a new instance of a multi logger
func NewMultiLog(loggers ...IAuditLog) *MultiLog {
return &MultiLog{
loggers: loggers,
func NewMultiLog(loggers ...IAuditLog) (*MultiLog, error) {
emitters := make([]Emitter, len(loggers))
for i := range loggers {
emitter, ok := loggers[i].(Emitter)
if !ok {
return nil, trace.BadParameter("expected emitter, got %T", loggers[i])
}
emitters[i] = emitter
}
return &MultiLog{
MultiEmitter: NewMultiEmitter(emitters...),
loggers: loggers,
}, nil
}

// MultiLog is a logger that fan outs write operations
// to all loggers, and performs all read and search operations
// on the first logger that implements the operation
type MultiLog struct {
loggers []IAuditLog
*MultiEmitter
}

// WaitForDelivery waits for resources to be released and outstanding requests to
Expand All @@ -45,7 +55,7 @@ func (m *MultiLog) WaitForDelivery(ctx context.Context) error {
return nil
}

// Closer releases connections and resources associated with logs if any
// Close releases connections and resources associated with logs if any
func (m *MultiLog) Close() error {
var errors []error
for _, log := range m.loggers {
Expand Down
4 changes: 2 additions & 2 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ func initExternalLog(auditConfig services.AuditConfig) (events.IAuditLog, error)
}
loggers = append(loggers, logger)
case teleport.SchemeStdout:
logger := events.NewWriterLog(utils.NopWriteCloser(os.Stdout))
logger := events.NewWriterEmitter(utils.NopWriteCloser(os.Stdout))
loggers = append(loggers, logger)
default:
return nil, trace.BadParameter(
Expand All @@ -922,7 +922,7 @@ func initExternalLog(auditConfig services.AuditConfig) (events.IAuditLog, error)
}

if len(loggers) > 1 {
return events.NewMultiLog(loggers...), nil
return events.NewMultiLog(loggers...)
}

return loggers[0], nil
Expand Down

0 comments on commit 46eb4ba

Please sign in to comment.