Skip to content

Commit

Permalink
Tailing logs from CloudWatch for executed process
Browse files Browse the repository at this point in the history
  • Loading branch information
meap committed Oct 1, 2024
1 parent ae150df commit 46377f8
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 14 deletions.
16 changes: 10 additions & 6 deletions pkg/ecs/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ func (s *Service) Execute(cmd []string, wait bool, dockerImageTag string) {
fmt.Printf("The task definition %s is used", taskDef)
}

fmt.Println()

output, err := svc.RunTask(ctx, &ecs.RunTaskInput{
Cluster: &s.Cluster,
TaskDefinition: &taskDef,
Expand All @@ -146,24 +148,26 @@ func (s *Service) Execute(cmd []string, wait bool, dockerImageTag string) {
}

executedTask := output.Tasks[0]
var lastTimestamp *int64 = nil

fmt.Printf("Task %s executed", *executedTask.TaskArn)
fmt.Println()

log.Printf("Task %s executed", *executedTask.TaskArn)
if wait {
for {
logsOutput, _ := s.printProcessLogs(ctx, tdef.LogGroup, tdef.LogStreamPrefix, *executedTask.TaskArn, tdef.Name, lastTimestamp)
lastTimestamp = &logsOutput.lastEventTimestamp

success, err := s.wait(ctx, svc, *executedTask.TaskArn)
if err != nil {
logsErr := s.printProcessLogs(ctx, tdef.LogGroup, tdef.LogStreamPrefix, *executedTask.TaskArn, tdef.Name)
if logsErr != nil {
log.Printf("Failed to fetch events from CloudWatch %v", logsErr)
}
log.Fatalln(err)
}

if success {
break
}

time.Sleep(6 * time.Second)
time.Sleep(5 * time.Second)
}

fmt.Printf("task %s finished", *executedTask.TaskArn)
Expand Down
36 changes: 28 additions & 8 deletions pkg/ecs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,59 @@ package ecs
import (
"context"
"fmt"
"log"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
)

type printProcessLogsOutput struct {
lastEventTimestamp int64
}

func (s *Service) printProcessLogs(
ctx context.Context, logGroupname string,
logStreamPrefix string,
taskArn string,
name string) error {
log.Printf("Loading logs for %s: %s", logGroupname, taskArn)
name string,
startTime *int64) (printProcessLogsOutput, error) {

cfg, err := s.initCfg()
if err != nil {
return fmt.Errorf("failed to initialize AWS configuration. (%w)", err)
return printProcessLogsOutput{}, fmt.Errorf("failed to initialize AWS configuration. (%w)", err)
}

processID := extractProcessID(taskArn)
client := cloudwatchlogs.NewFromConfig(cfg)

output, err := client.FilterLogEvents(ctx, &cloudwatchlogs.FilterLogEventsInput{
input := &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: aws.String(logGroupname),
LogStreamNames: []string{fmt.Sprintf("%s/%s/%s", logStreamPrefix, name, processID)},
})
}

if startTime != nil {
input.StartTime = startTime
}

output, err := client.FilterLogEvents(ctx, input)

if err != nil {
return fmt.Errorf("failed to filter log events (%w)", err)
return printProcessLogsOutput{}, fmt.Errorf("failed to filter log events (%w)", err)
}

for _, event := range output.Events {
fmt.Println(*event.LogStreamName, *event.Message)
}

return nil
var lastEventTimestamp int64
if len(output.Events) > 0 {
lastEventTimestamp = *output.Events[len(output.Events)-1].Timestamp + 1
} else if startTime != nil {
lastEventTimestamp = *startTime
} else {
lastEventTimestamp = 0
}

return printProcessLogsOutput{
lastEventTimestamp: lastEventTimestamp,
}, nil
}

0 comments on commit 46377f8

Please sign in to comment.