Skip to content

Commit

Permalink
implement new api
Browse files Browse the repository at this point in the history
  • Loading branch information
xacrimon committed Jun 23, 2021
1 parent e7aff34 commit 3ba6d42
Show file tree
Hide file tree
Showing 17 changed files with 899 additions and 363 deletions.
42 changes: 42 additions & 0 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,48 @@ func (c *Client) DeleteAllNodes(ctx context.Context, namespace string) error {
return trail.FromGRPC(err)
}

// StreamSessionEvents streams audit events from a given session recording.
func (c *Client) StreamSessionEvents(ctx context.Context, sessionID string) (context.Context, chan events.AuditEvent) {
request := &proto.StreamSessionEventsRequest{
SessionID: sessionID,
}

ch := make(chan events.AuditEvent)

stream, err := c.grpc.StreamSessionEvents(ctx, request)
if err != nil {
close(ch)
return utils.NewErrContext(trace.Wrap(err)), ch
}

subCtx, cancel := context.WithCancel(ctx)

go func() {
for {
oneOf, _ := stream.Recv()
if err != nil {
if err != io.EOF {
cancel()
}

break
}

event, err := events.FromOneOf(*oneOf)
if err != nil {
cancel()
break
}

ch <- event
}

close(ch)
}()

return subCtx, ch
}

// SearchEvents allows searching for events with a full pagination support.
func (c *Client) SearchEvents(ctx context.Context, fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, startKey string) ([]events.AuditEvent, string, error) {
request := &proto.GetEventsRequest{
Expand Down
974 changes: 612 additions & 362 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions api/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,12 @@ message IsMFARequiredRequest {
}
}

// StreamSessionEventsRequest is a request containing needed data to fetch a session recording.
message StreamSessionEventsRequest {
// SessionID is the ID for a given session in an UUIDv4 format.
string SessionID = 1;
}

// NodeLogin specifies an SSH node and OS login.
message NodeLogin {
// Node can be node's hostname or UUID.
Expand Down Expand Up @@ -1132,4 +1138,6 @@ service AuthService {
rpc GetEvents(GetEventsRequest) returns (Events);
// In-session request for audit events.
rpc GetSessionEvents(GetSessionEventsRequest) returns (Events);
// StreamSessionEvents streams audit events from a given session recording.
rpc StreamSessionEvents(StreamSessionEventsRequest) returns (stream events.OneOf);
}
50 changes: 50 additions & 0 deletions api/utils/errcontext.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2021 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import "time"

type ErrContext struct {
done <-chan struct{}
err error
}

func NewErrContext(err error) *ErrContext {
c := make(chan struct{})
close(c)

return &ErrContext{
done: c,
err: err,
}
}

func (e *ErrContext) Deadline() (deadline time.Time, ok bool) {
return time.Now(), true
}

func (e *ErrContext) Done() <-chan struct{} {
return e.done
}

func (e *ErrContext) Err() error {
return e.err
}

func (e *ErrContext) Value(key interface{}) interface{} {
return nil
}
2 changes: 1 addition & 1 deletion e
Submodule e updated from 3fed8e to 4c5a65
11 changes: 11 additions & 0 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -3067,6 +3067,17 @@ func (a *ServerWithRoles) SearchSessionEvents(fromUTC, toUTC time.Time, limit in
return events, lastKey, nil
}

// StreamSessionEvents streams all events from a given session recording. A subcontext
// is created from the supplied context and is cancelled if the parent context gets cancelled
// or the function encounters an error.
func (a *ServerWithRoles) StreamSessionEvents(ctx context.Context, sessionID string) (context.Context, chan apievents.AuditEvent) {
if err := a.action(apidefaults.Namespace, types.KindSession, types.VerbList); err != nil {
return apiutils.NewErrContext(trace.Wrap(err)), make(chan apievents.AuditEvent)
}

return a.alog.StreamSessionEvents(ctx, sessionID)
}

// NewAdminAuthServer returns auth server authorized as admin,
// used for auth server cached access
func NewAdminAuthServer(authServer *Server, sessions session.Service, alog events.IAuditLog) (ClientI, error) {
Expand Down
7 changes: 7 additions & 0 deletions lib/auth/clt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1447,6 +1447,13 @@ func (c *Client) GetSessionEvents(namespace string, sid session.ID, afterN int,
return retval, nil
}

// StreamSessionEvents streams all events from a given session recording. A subcontext
// is created from the supplied context and is cancelled if the parent context gets cancelled
// or the function encounters an error.
func (c *Client) StreamSessionEvents(ctx context.Context, sessionID string) (context.Context, chan apievents.AuditEvent) {
return c.APIClient.StreamSessionEvents(ctx, sessionID)
}

// SearchEvents allows searching for audit events with pagination support.
func (c *Client) SearchEvents(fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, startKey string) ([]apievents.AuditEvent, string, error) {
events, lastKey, err := c.APIClient.SearchEvents(context.TODO(), fromUTC, toUTC, namespace, eventTypes, limit, startKey)
Expand Down
31 changes: 31 additions & 0 deletions lib/auth/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2683,6 +2683,37 @@ func (g *GRPCServer) authenticate(ctx context.Context) (*grpcContext, error) {
}, nil
}

// Streams events from a session recording.
func (g *GRPCServer) StreamSessionEvents(req *proto.StreamSessionEventsRequest, stream proto.AuthService_StreamSessionEventsServer) error {
auth, err := g.authenticate(context.TODO())
if err != nil {
return trace.Wrap(err)
}

ctx, c := auth.ServerWithRoles.StreamSessionEvents(context.TODO(), req.SessionID)

for {
select {
case event, more := <-c:
if !more {
return nil
}

oneOf, err := apievents.ToOneOf(event)
if err != nil {
return trace.Wrap(err)
}

if err := stream.Send(oneOf); err != nil {
return trace.Wrap(err)
}
case <-ctx.Done():
close(c)
return trace.Wrap(ctx.Err())
}
}
}

// GetEvents searches for events on the backend and sends them back in a response.
func (g *GRPCServer) GetEvents(ctx context.Context, req *proto.GetEventsRequest) (*proto.Events, error) {
auth, err := g.authenticate(ctx)
Expand Down
5 changes: 5 additions & 0 deletions lib/events/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,11 @@ type IAuditLog interface {
// WaitForDelivery waits for resources to be released and outstanding requests to
// complete after calling Close method
WaitForDelivery(context.Context) error

// StreamSessionEvents streams all events from a given session recording. A subcontext
// is created from the supplied context and is cancelled if the parent context gets cancelled
// or the function encounters an error.
StreamSessionEvents(ctx context.Context, sessionID string) (context.Context, chan apievents.AuditEvent)
}

// EventFields instance is attached to every logged event
Expand Down
78 changes: 78 additions & 0 deletions lib/events/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,78 @@ func (l *AuditLog) SearchSessionEvents(fromUTC, toUTC time.Time, limit int, star
return l.localLog.SearchSessionEvents(fromUTC, toUTC, limit, startKey)
}

const chunkStreamSize = 64 * 1024

type chunkStream struct {
log *AuditLog
sessionID session.ID
buffer []byte
readUntil int
offset int
}

func (c *chunkStream) Read(p []byte) (n int, err error) {
if c.readUntil >= len(c.buffer) {
chunk, err := c.log.GetSessionChunk("default", c.sessionID, c.offset, chunkStreamSize)
if err != nil {
if trace.Unwrap(err) == io.EOF {
return 0, io.EOF
}

return 0, err
}

c.buffer = chunk
c.readUntil = 0
c.offset += len(c.buffer)
}

written := copy(p, c.buffer[c.readUntil:])
c.readUntil += written
return written, nil
}

// StreamSessionEvents streams all events from a given session recording. A subcontext
// is created from the supplied context and is cancelled if the parent context gets cancelled
// or the function encounters an error.
func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID string) (context.Context, chan apievents.AuditEvent) {
l.log.Debugf("StreamSessionEvents(%v)", sessionID)

rawStream := &chunkStream{
log: l,
sessionID: session.ID(sessionID),
readUntil: chunkStreamSize,
offset: 0,
}

protoReader := NewProtoReader(rawStream)
c := make(chan apievents.AuditEvent)
ctx, cancel := context.WithCancel(ctx)

go func() {
for {
if ctx.Err() != nil {
close(c)
break
}

event, err := protoReader.Read(ctx)
if err != nil {
if err != io.EOF {
cancel()
}

close(c)
break
}

c <- event
}
}()

return ctx, c
}

// getLocalLog returns the local (file based) audit log.
func (l *AuditLog) getLocalLog() IAuditLog {
l.RLock()
Expand Down Expand Up @@ -1235,3 +1307,9 @@ func (a *closedLogger) WaitForDelivery(context.Context) error {
func (a *closedLogger) Close() error {
return trace.NotImplemented(loggerClosedMessage)
}

func (a *closedLogger) StreamSessionEvents(_ctx context.Context, sessionID string) (context.Context, chan apievents.AuditEvent) {
ctx, cancel := context.WithTimeout(context.Background(), 0)
cancel()
return ctx, make(chan apievents.AuditEvent)
}
3 changes: 3 additions & 0 deletions lib/events/discard.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,6 @@ func (d *DiscardAuditLog) UploadSessionRecording(SessionRecording) error {
func (d *DiscardAuditLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error {
return nil
}
func (d *DiscardAuditLog) StreamSessionEvents(ctx context.Context, sessionID string) (context.Context, chan apievents.AuditEvent) {
return ctx, make(chan apievents.AuditEvent)
}
8 changes: 8 additions & 0 deletions lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/gravitational/teleport"
apidefaults "github.com/gravitational/teleport/api/defaults"
apievents "github.com/gravitational/teleport/api/types/events"
apiutils "github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/dynamo"
"github.com/gravitational/teleport/lib/events"
Expand Down Expand Up @@ -1412,3 +1413,10 @@ func convertError(err error) error {
return err
}
}

// StreamSessionEvents streams all events from a given session recording. A subcontext
// is created from the supplied context and is cancelled if the parent context gets cancelled
// or the function encounters an error.
func (l *Log) StreamSessionEvents(ctx context.Context, sessionID string) (context.Context, chan apievents.AuditEvent) {
return apiutils.NewErrContext(trace.NotImplemented("not implemented")), make(chan apievents.AuditEvent)
}
8 changes: 8 additions & 0 deletions lib/events/filelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/gravitational/teleport"
apievents "github.com/gravitational/teleport/api/types/events"
apiutils "github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"
Expand Down Expand Up @@ -599,6 +600,13 @@ func (l *FileLog) findInFile(path string, eventFilter []string) ([]EventFields,
return retval, nil
}

// StreamSessionEvents streams all events from a given session recording. A subcontext
// is created from the supplied context and is cancelled if the parent context gets cancelled
// or the function encounters an error.
func (l *FileLog) StreamSessionEvents(ctx context.Context, sessionID string) (context.Context, chan apievents.AuditEvent) {
return apiutils.NewErrContext(trace.NotImplemented("not implemented")), make(chan apievents.AuditEvent)
}

type eventFile struct {
os.FileInfo
path string
Expand Down
8 changes: 8 additions & 0 deletions lib/events/firestoreevents/firestoreevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/gravitational/teleport"
apidefaults "github.com/gravitational/teleport/api/defaults"
apiutils "github.com/gravitational/teleport/api/utils"
firestorebk "github.com/gravitational/teleport/lib/backend/firestore"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
Expand Down Expand Up @@ -631,3 +632,10 @@ func (l *Log) purgeExpiredEvents() error {
}
}
}

// StreamSessionEvents streams all events from a given session recording. A subcontext
// is created from the supplied context and is cancelled if the parent context gets cancelled
// or the function encounters an error.
func (l *Log) StreamSessionEvents(ctx context.Context, sessionID string) (context.Context, chan apievents.AuditEvent) {
return apiutils.NewErrContext(trace.NotImplemented("not implemented")), make(chan apievents.AuditEvent)
}
14 changes: 14 additions & 0 deletions lib/events/multilog.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,17 @@ func (m *MultiLog) SearchSessionEvents(fromUTC, toUTC time.Time, limit int, star
}
return events, lastKey, err
}

// StreamSessionEvents streams all events from a given session recording. A subcontext
// is created from the supplied context and is cancelled if the parent context gets cancelled
// or the function encounters an error.
func (m *MultiLog) StreamSessionEvents(ctx context.Context, sessionID string) (context.Context, chan apievents.AuditEvent) {
for _, log := range m.loggers {
ctx, c := log.StreamSessionEvents(ctx, sessionID)
if trace.IsNotImplemented(ctx.Err()) {
return ctx, c
}
}

return ctx, make(chan apievents.AuditEvent)
}
8 changes: 8 additions & 0 deletions lib/events/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

apievents "github.com/gravitational/teleport/api/types/events"
apiutils "github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"

Expand Down Expand Up @@ -130,3 +131,10 @@ func (w *WriterLog) SearchSessionEvents(fromUTC, toUTC time.Time, limit int, sta
func (w *WriterLog) WaitForDelivery(context.Context) error {
return nil
}

// StreamSessionEvents streams all events from a given session recording. A subcontext
// is created from the supplied context and is cancelled if the parent context gets cancelled
// or the function encounters an error.
func (w *WriterLog) StreamSessionEvents(ctx context.Context, sessionID string) (context.Context, chan apievents.AuditEvent) {
return apiutils.NewErrContext(trace.NotImplemented("not implemented")), make(chan apievents.AuditEvent)
}
Loading

0 comments on commit 3ba6d42

Please sign in to comment.