From 46377f8693b49272be9cb5babebdaa825c79030e Mon Sep 17 00:00:00 2001 From: Petr Reichl Date: Tue, 1 Oct 2024 19:46:18 +0800 Subject: [PATCH] Tailing logs from CloudWatch for executed process --- pkg/ecs/exec.go | 16 ++++++++++------ pkg/ecs/logs.go | 36 ++++++++++++++++++++++++++++-------- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/pkg/ecs/exec.go b/pkg/ecs/exec.go index 7875849..5f50406 100644 --- a/pkg/ecs/exec.go +++ b/pkg/ecs/exec.go @@ -122,6 +122,8 @@ func (s *Service) Execute(cmd []string, wait bool, dockerImageTag string) { fmt.Printf("The task definition %s is used", taskDef) } + fmt.Println() + output, err := svc.RunTask(ctx, &ecs.RunTaskInput{ Cluster: &s.Cluster, TaskDefinition: &taskDef, @@ -146,16 +148,18 @@ func (s *Service) Execute(cmd []string, wait bool, dockerImageTag string) { } executedTask := output.Tasks[0] + var lastTimestamp *int64 = nil + + fmt.Printf("Task %s executed", *executedTask.TaskArn) + fmt.Println() - log.Printf("Task %s executed", *executedTask.TaskArn) if wait { for { + logsOutput, _ := s.printProcessLogs(ctx, tdef.LogGroup, tdef.LogStreamPrefix, *executedTask.TaskArn, tdef.Name, lastTimestamp) + lastTimestamp = &logsOutput.lastEventTimestamp + success, err := s.wait(ctx, svc, *executedTask.TaskArn) if err != nil { - logsErr := s.printProcessLogs(ctx, tdef.LogGroup, tdef.LogStreamPrefix, *executedTask.TaskArn, tdef.Name) - if logsErr != nil { - log.Printf("Failed to fetch events from CloudWatch %v", logsErr) - } log.Fatalln(err) } @@ -163,7 +167,7 @@ func (s *Service) Execute(cmd []string, wait bool, dockerImageTag string) { break } - time.Sleep(6 * time.Second) + time.Sleep(5 * time.Second) } fmt.Printf("task %s finished", *executedTask.TaskArn) diff --git a/pkg/ecs/logs.go b/pkg/ecs/logs.go index 61eafe6..88041dc 100644 --- a/pkg/ecs/logs.go +++ b/pkg/ecs/logs.go @@ -3,39 +3,59 @@ package ecs import ( "context" "fmt" - "log" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" ) +type printProcessLogsOutput struct { + lastEventTimestamp int64 +} + func (s *Service) printProcessLogs( ctx context.Context, logGroupname string, logStreamPrefix string, taskArn string, - name string) error { - log.Printf("Loading logs for %s: %s", logGroupname, taskArn) + name string, + startTime *int64) (printProcessLogsOutput, error) { cfg, err := s.initCfg() if err != nil { - return fmt.Errorf("failed to initialize AWS configuration. (%w)", err) + return printProcessLogsOutput{}, fmt.Errorf("failed to initialize AWS configuration. (%w)", err) } processID := extractProcessID(taskArn) client := cloudwatchlogs.NewFromConfig(cfg) - output, err := client.FilterLogEvents(ctx, &cloudwatchlogs.FilterLogEventsInput{ + input := &cloudwatchlogs.FilterLogEventsInput{ LogGroupName: aws.String(logGroupname), LogStreamNames: []string{fmt.Sprintf("%s/%s/%s", logStreamPrefix, name, processID)}, - }) + } + + if startTime != nil { + input.StartTime = startTime + } + + output, err := client.FilterLogEvents(ctx, input) if err != nil { - return fmt.Errorf("failed to filter log events (%w)", err) + return printProcessLogsOutput{}, fmt.Errorf("failed to filter log events (%w)", err) } for _, event := range output.Events { fmt.Println(*event.LogStreamName, *event.Message) } - return nil + var lastEventTimestamp int64 + if len(output.Events) > 0 { + lastEventTimestamp = *output.Events[len(output.Events)-1].Timestamp + 1 + } else if startTime != nil { + lastEventTimestamp = *startTime + } else { + lastEventTimestamp = 0 + } + + return printProcessLogsOutput{ + lastEventTimestamp: lastEventTimestamp, + }, nil }