Skip to content

Commit

Permalink
Run a single uploader service per process
Browse files Browse the repository at this point in the history
Prior to this change, each individual service (proxy, app, SSH, db, etc)
would spin up its own uploader service. If you are running multiple
Teleport services in the same process, this means you get multiple
uploaders all looking at the same directory, which can result in
duplicate upload events in the audit log.

Additionally, desktop access has (mistakenly) failed to set up this
service, so desktop sessions would only be uploaded if you happened
to also run some other service in the same process that does spin up
the uploader.

Solve these issues by centralizing the uploader service so that it
runs once per process, and each Teleport service doesn't need to think
about whether or not the service should run.

Fixes #12549
  • Loading branch information
zmb3 committed Jul 22, 2022
1 parent 13d68af commit 5b8f459
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 79 deletions.
15 changes: 0 additions & 15 deletions lib/service/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/events/filesessions"
"github.com/gravitational/teleport/lib/limiter"
"github.com/gravitational/teleport/lib/reversetunnel"
"github.com/gravitational/teleport/lib/services"
Expand Down Expand Up @@ -85,20 +84,6 @@ func (process *TeleportProcess) initDatabaseService() (retErr error) {
}
}

// Start uploader that will scan a path on disk and upload completed
// sessions to the auth server.
uploaderCfg := filesessions.UploaderConfig{
Streamer: accessPoint,
AuditLog: conn.Client,
}
completerCfg := events.UploadCompleterConfig{
SessionTracker: conn.Client,
}
err = process.initUploaderService(uploaderCfg, completerCfg)
if err != nil {
return trace.Wrap(err)
}

// Create database resources from databases defined in the static configuration.
var databases types.Databases
for _, db := range process.Config.Databases.Databases {
Expand Down
14 changes: 0 additions & 14 deletions lib/service/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/events/filesessions"
kubeproxy "github.com/gravitational/teleport/lib/kube/proxy"
"github.com/gravitational/teleport/lib/labels"
"github.com/gravitational/teleport/lib/reversetunnel"
Expand Down Expand Up @@ -84,19 +83,6 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C
return trace.Wrap(err)
}

// Start uploader that will scan a path on disk and upload completed
// sessions to the Auth Server.
uploaderCfg := filesessions.UploaderConfig{
Streamer: accessPoint,
AuditLog: conn.Client,
}
completerCfg := events.UploadCompleterConfig{
SessionTracker: conn.Client,
}
if err := process.initUploaderService(uploaderCfg, completerCfg); err != nil {
return trace.Wrap(err)
}

proxyGetter := reversetunnel.NewConnectedProxyGetter()

// This service can run in 2 modes:
Expand Down
89 changes: 39 additions & 50 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,11 @@ func NewTeleport(cfg *Config, opts ...NewTeleportOption) (*TeleportProcess, erro

process.RegisterFunc("common.rotate", process.periodicSyncRotationState)

// run one upload completer per-process
// even in sync recording modes, since the recording mode can be changed
// at any time with dynamic configuration
process.RegisterFunc("common.upload", process.initUploaderService)

if !serviceStarted {
return nil, trace.BadParameter("all services failed to start")
}
Expand Down Expand Up @@ -2283,22 +2288,6 @@ func (process *TeleportProcess) initSSH() error {
return trace.Wrap(err)
}

// init uploader service for recording SSH node, if proxy is not
// enabled on this node, because proxy stars uploader service as well
if !cfg.Proxy.Enabled {
uploaderCfg := filesessions.UploaderConfig{
Streamer: authClient,
AuditLog: conn.Client,
}
completerCfg := events.UploadCompleterConfig{
SessionTracker: conn.Client,
GracePeriod: defaults.UploadGracePeriod,
}
if err := process.initUploaderService(uploaderCfg, completerCfg); err != nil {
return trace.Wrap(err)
}
}

if !conn.UseTunnel() {
listener, err := process.importOrCreateListener(ListenerNodeSSH, cfg.SSH.Addr.Addr)
if err != nil {
Expand Down Expand Up @@ -2420,12 +2409,29 @@ func (process *TeleportProcess) registerWithAuthServer(role types.SystemRole, ev
})
}

// initUploadService starts a file-based uploader that scans the local streaming logs directory
// initUploaderService starts a file-based uploader that scans the local streaming logs directory
// (data/log/upload/streaming/default/)
func (process *TeleportProcess) initUploaderService(uploaderCfg filesessions.UploaderConfig, completerCfg events.UploadCompleterConfig) error {
func (process *TeleportProcess) initUploaderService() error {
log := process.log.WithFields(logrus.Fields{
trace.Component: teleport.Component(teleport.ComponentAuditLog, process.id),
})

eventC := make(chan Event, 1)
process.WaitForEvent(process.ExitContext(), TeleportReadyEvent, eventC)
select {
case <-eventC:
log.Infof("starting upload completer service")
case <-process.GracefulExitContext().Done():
return nil
}

connectors := process.getConnectors()
if len(connectors) == 0 {
return trace.BadParameter("no connectors found")
}

conn := connectors[0]

// create folder for uploads
uid, gid, err := adminCreds()
if err != nil {
Expand Down Expand Up @@ -2453,9 +2459,14 @@ func (process *TeleportProcess) initUploaderService(uploaderCfg filesessions.Upl
}
}

uploaderCfg.ScanDir = filepath.Join(path...)
uploaderCfg.EventsC = process.Config.UploadEventsC
fileUploader, err := filesessions.NewUploader(uploaderCfg)
uploadsDir := filepath.Join(path...)

fileUploader, err := filesessions.NewUploader(filesessions.UploaderConfig{
AuditLog: conn.Client,
Streamer: conn.Client,
ScanDir: uploadsDir,
EventsC: process.Config.UploadEventsC,
})
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -2478,16 +2489,17 @@ func (process *TeleportProcess) initUploaderService(uploaderCfg filesessions.Upl
// upload completer scans for uploads that have been initiated, but not completed
// by the client (aborted or crashed) and completes them. It will be closed once
// the uploader context is closed.
handler, err := filesessions.NewHandler(filesessions.Config{
Directory: filepath.Join(path...),
})
handler, err := filesessions.NewHandler(filesessions.Config{Directory: uploadsDir})
if err != nil {
return trace.Wrap(err)
}

completerCfg.Uploader = handler
completerCfg.AuditLog = uploaderCfg.AuditLog
uploadCompleter, err := events.NewUploadCompleter(completerCfg)
uploadCompleter, err := events.NewUploadCompleter(events.UploadCompleterConfig{
GracePeriod: defaults.UploadGracePeriod,
Uploader: handler,
AuditLog: conn.Client,
SessionTracker: conn.Client,
})
if err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -3812,16 +3824,6 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
log.Infof("Exited.")
})

uploaderCfg := filesessions.UploaderConfig{
Streamer: accessPoint,
AuditLog: conn.Client,
}
completerCfg := events.UploadCompleterConfig{
SessionTracker: conn.Client,
}
if err := process.initUploaderService(uploaderCfg, completerCfg); err != nil {
return trace.Wrap(err)
}
return nil
}

Expand Down Expand Up @@ -4132,19 +4134,6 @@ func (process *TeleportProcess) initApps() {
log.Debugf("Application service dependencies have started, continuing.")
}

// Start uploader that will scan a path on disk and upload completed
// sessions to the Auth Server.
uploaderCfg := filesessions.UploaderConfig{
Streamer: accessPoint,
AuditLog: conn.Client,
}
completerCfg := events.UploadCompleterConfig{
SessionTracker: conn.Client,
}
if err := process.initUploaderService(uploaderCfg, completerCfg); err != nil {
return trace.Wrap(err)
}

// Start header dumping debugging application if requested.
if process.Config.Apps.DebugApp {
debugCh := make(chan Event)
Expand Down

0 comments on commit 5b8f459

Please sign in to comment.